线程并发与通信
线程并发与通信
[参考文章][1]
- 多线程与cpu是单核还是多核无关,只是合理的利用cpu资源的一种方法
- 多线程更多与分布式系统的并发性一起被提及
- 线程间写入与访问共享资源问题,因共享资源存在不同步,产生线程安全问题
- 分布式系统共享资源多线程问题
线程并发问题
• 多线程:指的是这个程序(一个进程)运行时产生了不止一个线程
• 并行与并发:
◦ 并行:多个cpu实例或者多台机器同时执行一段处理逻辑,是真正的同时。
◦ 并发:通过cpu调度算法,让用户看上去同时执行,实际上从cpu操作层面不是真正的同时。并发往往在场景中有公用的资源,那么针对这个公用的资源往往产生瓶颈,我们会用TPS或者QPS来反应这个系统的处理能力。
[多线程参考文章][2]
并发场景与解决问题
共享实例变量、共享连接资源时。
并发问题举例:
银行转账问题
投票问题
卖票问题
订单消库存问题
连接池问题
什么是IPC
进程间通信(IPC,Inter-Process Communication),指至少两个进程或线程间传送数据或信号的一些技术或方法。
RPC、RMI、EJB、WebService都是IPC的一种方法。
线程通信
[参考文章][3]
线程通信的目标是使线程间能够互相发送信号。另一方面,线程通信使线程能够等待其他线程的信号。
例如,线程 B 可以等待线程 A 的一个信号,这个信号会通知线程 B 数据已经准备好了。本文将讲解以下几个 JAVA 线程间通信的主题:
- 通过共享对象通信
- 忙等待
- wait(),notify()和 notifyAll()
- 丢失的信号
- 假唤醒
- 多线程等待相同信号
- 不要对常量字符串或全局对象调用 wait()
通过案例一步步解决并发问题
卖票案例:
假如库存有100张票,在高并发的环境下售卖,会有什么问题,怎么解决?
package com.wolf.thread.model;
import org.junit.Test;
class Ticket {
private int num;
public Ticket(){
super();
}
public Ticket(int num) {
this.num = num;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
public class TicketTest{
@Test
public void testSall(){
final Ticket ticket = new Ticket(10);
//分四个线程去卖票,直到票卖完
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
}
}
class TicketRunnable implements Runnable{
private Ticket ticket;
public TicketRunnable(Ticket ticket) {
this.ticket = ticket;
}
@Override
public void run() {
while (true){
if (ticket.getNum() > 0) {
ticket.setNum(ticket.getNum() - 1);
System.out.println(Thread.currentThread().getName() + ".....sale...." + ticket.getNum());
}
}
}
}
运行了几次,发现总有重复卖票的情况:
![paste image][image-1]
这很明显是并发操作共享资源导致的问题。
解决方案
- 在操作共享源的代码块或方法加上synchronized关键字,虽然可以简单的解决,但是会很影响性能。
- 设置对象共享锁
设置对象共享锁
class Ticket {
private int num;
public Ticket(){
super();
}
public Ticket(int num) {
this.num = num;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
public class TicketTest{
@Test
public void testSall(){
final Ticket ticket = new Ticket(10);
//分四个线程去卖票,直到票卖完
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
new Thread(new TicketRunnable(ticket)).start();
}
}
class TicketRunnable implements Runnable{
private Ticket ticket;
//线程是否被通知
private boolean hasSall;
public TicketRunnable(Ticket ticket) {
this.ticket = ticket;
}
@Override
public void run() {
while (true){
synchronized (ticket){
if (hasSall) {
try {
ticket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
//等待完后才会执行这句,说明上个卖票线程已经执行完,hasSall要重置成已卖
hasSall = false;
}
}
if (ticket.getNum() > 0) {
ticket.setNum(ticket.getNum() - 1);
System.out.println(Thread.currentThread().getName() + ".....sale...." + ticket.getNum());
//notify,只要线程卖了,就设置共享状态为true已通知
synchronized (ticket){
//设置卖票状态为正在卖
hasSall = true;
ticket.notify();
}
}
}
}
}
这种方法也是加锁,但是经测试,这个场景无法达到和使用synchronized同步代码块时的效果。
- 使用并发包提供的线程锁进行加锁
** Lock接口的ReentrantLock是乐观锁,是非阻塞式的,线程可中断的**
** synchronized是悲观锁,是阻塞式的,线程不可中断的**
class Ticket2 {
private int num;
Lock lock = new ReentrantLock();
public Ticket2(){
super();
}
public Ticket2(int num) {
this.num = num;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
TicketRunnable2类:
class TicketRunnable2 implements Runnable{
private Ticket2 ticket;
public TicketRunnable2(Ticket2 ticket) {
this.ticket = ticket;
}
@Override
public void run() {
while (true){
try {
ticket.lock.lock();
if (ticket.getNum() > 0) {
ticket.setNum(ticket.getNum() - 1);
System.out.println(Thread.currentThread().getName() + ".....sale...." + ticket.getNum());
}
}finally {
ticket.lock.unlock();
}
}
}
}
单元测试:
@Test
public void testSall2() throws InterruptedException {
final Ticket2 ticket = new Ticket2(100);
//分四个线程去卖票,直到票卖完
new Thread(new TicketRunnable2(ticket)).start();
new Thread(new TicketRunnable2(ticket)).start();
new Thread(new TicketRunnable2(ticket)).start();
new Thread(new TicketRunnable2(ticket)).start();
//让主线程睡眠10毫秒,等待子线程执行完毕
TimeUnit.SECONDS.sleep(10);
}
- BlockingQueue,BlockingDeque
*
线程通信使用组合
- synchronized语句+wait组合
- lock+condition组合
- 多线程读写问题
假如分布式系统上需要写入和读取缓存,但是写入和读取又不是同时执行的,现在要保证每次读取缓存时,都必须在写入之后,我们用这两种组合分别来解决这个问题。
synchronized语句+wait组合
class FileStream{
private byte[] files;
public byte[] getFiles() {
return files;
}
public void setFiles(byte[] files) {
this.files = files;
}
public void readFile(){
for (byte fileb : files){
System.out.printf("files content:"+fileb);
}
}
//假设读写操作都是异步操作
public void writeFile(){
files = new byte[12];
for (int i=0;i<files.length;i++){
files[i] = (byte) ('a' + i);
}
}
}
线程实现类:
class RunableObj1 implements Runnable{
private FileStream obj;
public RunableObj1(FileStream obj) {
this.obj = obj;
}
@Override
public void run() {
//这是obj的一个对象锁
synchronized (obj){
try {
//将线程1的对象锁让出,让线程1等待被通知唤醒
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
//因为直接读会发现还没有写入文件,需要等待写入后再读
obj.readFile();
System.out.println("线程"+Thread.currentThread().getName()+"获取到了锁。。。。");
}
}
}
class RunableObj2 implements Runnable{
private FileStream obj;
public RunableObj2(FileStream obj) {
this.obj = obj;
}
@Override
public void run() {
synchronized (obj){
//写完后调用notyfy释放锁,通知其它在等待的线程继续操作
obj.writeFile();
obj.notify();
System.out.println("线程"+Thread.currentThread().getName()+"调用了notify");
}
System.out.println("线程"+Thread.currentThread().getName()+"释放了锁");
}
}
运行:
@Test
public void testObjThread(){
FileStream obj = new FileStream();
Thread t1 = new Thread(new RunableObj1(obj));
Thread t2 = new Thread(new RunableObj2(obj));
t1.start();
t2.start();
}
结果如下:
![paste image][image-2]
这个结果正是我要的执行顺序。
lock+condition组合
class RunableObjLock1 implements Runnable{
private FileStream obj;
private Lock lock;
private Condition condition;
public RunableObjLock1(FileStream obj,Lock lock,Condition condition) {
this.obj = obj;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
lock.lock();
try {
// 将线程1的对象锁让出,让线程1等待被通知唤醒
condition.await();
//因为直接读会发现还没有写入文件,需要等待写入后再读
obj.readFile();
System.out.println("线程"+Thread.currentThread().getName()+"获取到了锁。。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
class RunableObjLock2 implements Runnable{
private FileStream obj;
private Lock lock;
private Condition condition;
public RunableObjLock2(FileStream obj,Lock lock,Condition condition) {
this.obj = obj;
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
lock.lock();
try {
//写完后调用notyfy释放锁,通知其它在等待的线程继续操作
obj.writeFile();
//obj.notify();
condition.signal();
System.out.println("线程"+Thread.currentThread().getName()+"调用了notify");
}finally {
lock.unlock();
}
}
}
运行代码:
@Test
public void testObjThreadForLock(){
FileStream obj = new FileStream();
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Thread t1 = new Thread(new RunableObjLock1(obj,lock,condition));
Thread t2 = new Thread(new RunableObjLock2(obj,lock,condition));
t1.start();
t2.start();
}
结果如下:
![paste image][image-3]
由于锁和condition都是用在FileStream对象上的,所以我把代码重构下:
class RunableObjLock1 implements Runnable{
private FileStream obj;
public RunableObjLock1(FileStream obj) {
this.obj = obj;
}
@Override
public void run() {
obj.readFile();
}
}
class RunableObjLock2 implements Runnable{
private FileStream obj;
public RunableObjLock2(FileStream obj) {
this.obj = obj;
}
@Override
public void run() {
obj.writeFile();
}
}
class FileStream{
private byte[] files;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
public byte[] getFiles() {
return files;
}
public void setFiles(byte[] files) {
this.files = files;
}
public void readFile(){
if (lock.tryLock()){
try {
condition.await();
System.out.println("线程"+Thread.currentThread().getName()+"获取到了锁。。。。");
for (byte fileb : files){
System.out.println("files content:"+fileb);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
//假设读写操作都是异步操作
public void writeFile(){
lock.lock();
try {
files = new byte[12];
for (int i=0;i<files.length;i++){
files[i] = (byte) ('a' + i);
}
condition.signal();
System.out.println("线程"+Thread.currentThread().getName()+"调用了notify");
}finally {
lock.unlock();
}
}
}
最后结果还是一样的。
生产者消费者问题
我们先来看看等待/通知机制下的生产者消费者模式:我们假设这样一个场景,我们是卖北京烤鸭店铺,我们现在只有一条生产线也只有一条消费线,也就是说只能生产线程生产完了,再通知消费线程才能去卖,如果消费线程没烤鸭了,就必须通知生产线程去生产,此时消费线程进入等待状态。在这样的场景下,我们不仅要保证共享数据(烤鸭数量)的线程安全,而且还要保证烤鸭数量在消费之前必须有烤鸭。
volatile与synchronized区别
- 它们的区别其实只是对于volatile来说,没有使用锁,不保证成员变量的原子性,只保证成员变量的可见性。
- volatile只能保证本身的成员变量的可见性,对于引用成员变量(就是对象中的属性是从外面传入)设置成volatile,如果引用变量不是volatile,就没有效果了。
- 给数组等非线程安全的集合框架设置volatile也是无效的。
volatile的作用
使线程中成员变量对其它线程的值可见。
对于这句话的理解,需要介绍下java对象的存储模型原理。
java对象存储模型
java给每个线程运行时都提供了工作存储空间,且每个线程的工作空间都相互独立,对共享数据读写都是在本线程的工作空间进行的,所有的线程内的变量和对象都是对其它线程不可见的,包括外面传入的long和double类型的共享对象。
(在深入jvm虚拟机中提到,int等不大于32位的基本类型的操作都是原子操作,但对long和double不一定是原子操作,是因为它们有些数的位数已经大于32位了,所以不具原子性)
如果给一个成员变量设置了volatile关键字,则线程在操作完成之后会将工作空间的修改对象(数据值)同步写回java主线程的存储器,也称主存储器。使其在其它线程中可见,其它线程在读取共享数据时,会先从主存储器更新数据,然后再进行操作,实现了数据的同步。
![paste image][image-4]
每个线程的对象都有一套这样的存储模型。
主存储器与每个线程之间传送数据的方式(术语叫协调),引申出来三个特性:
原子性
可见性
在什么情况下,一个线程的写入成员变量的值对另一个线程对该成员变量的读出操作的值是可见的。顺序化
什么情况下可以使用volatile关键字
- 不需要和其它成员变量之间遵循不变约束
- 不需要根据当前值重写成员变量
- 没有线程会使用正常语法写入非法值
- 读线程的行为不依赖任何非volatile成员变量的值
以上使用场景,用在将成员变量作标志去停止线程是最合适了。
分布式系统间通信(进程通信,远程调用)
RPC
RPC=Remote Produce Call 是一种技术的概念名词. HTTP是一种协议,RPC可以通过HTTP来实现,也可以通过Socket自己实现一套协议来实现。
用来解决分布式系统间通信的一种协议。
RMI、Dubbo、Zookeeper都是RPC协议的实现RMI
远程方法调用(Remote Method Invocation ,RMI),实现在不同的Java虚拟机(JVM)间对象间的通信,客户端(一个JVM)可以同步调用服务器(其它JVM)的对象的方法。
使用RMI的利弊:
* 优势:面向对象的远程服务模型;基于TCP协议上的服务,执行速度快。
* 劣势:不能跨语言;每个远程对象都要绑定端口,不易维护;不支持分布式事务JTAWebService
Web Service是一组分布式应用模型的规范, 它定义了网络传输协议和访问服务的数据格式。该模型隐藏了技术的实现细节,旨在提供松散耦合、跨平台、跨语言的服务组件。
有第三库,如:CXF
- EJB
ejb是java EE 中的一个规范,该规范描述了分布式应用程序需要解决的问题,例如事务处理、安全、日志、分布式等,而同时呢,sun公司也实现了自己定义的这一个标准,相当于自己颁布一个标准然后,又给出了实现供别人使用,实现以很多API的方式提供给用的人。
ejb是按照java服务器接口定义的java类,可以理解为一个特殊的java类,放在容器里容器可以帮助该类管理事务、分布式、安全等,一般小的程序不会用到,只有大型分布式系统才会用到ejb,既然ejb是一个java类或是一个组件,颗粒较小,这也是与Webservice的区别之一,下面会说到,它就可以被其它一个或多个模块调用。
包含了三种类型的Bean,可以通过注释JPA一个规范来标记,其中有一种Bean,叫MDB消息驱动bean,它的通信机制涉及到了JMS协议。
ejb可以进行远程调用,但是不能够跨语言,ejb是同步调用,而平时我们说的的ejb异步调用指的是ejb的MDB异步通信。
使用EJB的利弊:
• 优势:可扩展性好,安全性强,支持分布式事务处理。
• 劣势:不能跨语言;配置相对复杂,不同J2EE容器之间很难做无缝迁移。
网络通信
- TCP
- HTTP
- Socket
- UDP
sokect也可以实现UDP协议。
分布式中间件消息通信
- JMS
- MQ
分布式事务
[1]: http://wiki.jikexueyuan.com/project/java-concurrent/introduction.html
[2]: http://www.importnew.com/21089.html
[3]: http://wiki.jikexueyuan.com/project/java-concurrent/thread-communication.html
[image-1]: http://clockcoder.com/images/1499649740468a09sykrv.png?imageslim
[image-2]: http://clockcoder.com/images/1499741801797yxf28x97.png?imageslim
[image-3]: http://clockcoder.com/images/1499741801797yxf28x97.png?imageslim
[image-4]: http://clockcoder.com/images/1499915269361g7o97cu5.png?imageslim