二、线程间的共享和协作

    科技2022-08-19  119

    1 线程间的共享

    1.1 synchronized内置锁

         线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价值很少,如果多个线程能够相互配合完成工作,包括数据之间的共享,协同处理事情。这将会带来巨大的价值。      Java 支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制(synchronized锁的是对象)。

    /** * 类说明:线程不安全 */ public class NotSafe { private long count =0; public long getCount() { return count; } public void setCount(long count) { this.count = count; } //count进行累加 public void incCount(){ count++; } //线程 private static class Count extends Thread{ private NotSafe simplOper; public Count(NotSafe simplOper) { this.simplOper = simplOper; } @Override public void run() { for(int i=0;i<10000;i++){ simplOper.incCount(); } } } public static void main(String[] args) throws InterruptedException { NotSafe simplOper = new NotSafe(); //启动两个线程 Count count1 = new Count(simplOper); Count count2 = new Count(simplOper); count1.start(); count2.start(); Thread.sleep(50); System.out.println(simplOper.count); //结果始终小于20000 (线程不安全) } } /** *类说明:synchronized关键字的使用方法,保证线程安全 */ public class SynTest { private long count =0; private Object obj = new Object();//作为一个锁 public long getCount() { return count; } public void setCount(long count) { this.count = count; } /*用在同步块上*/ public void incCount(){ synchronized (obj){ count++; } } /*用在方法上*/ public synchronized void incCount2(){ count++; } /*用在同步块上,但是锁的是当前类的对象实例*/ public void incCount3(){ synchronized (this){ count++; } } //线程 private static class Count extends Thread { private SynTest simplOper; public Count(SynTest simplOper) { this.simplOper = simplOper; } @Override public void run() { for(int i=0;i<10000;i++){ //下方的三种方式任意一种都可以 simplOper.incCount();//count = count+10000 //simplOper.incCount2();//count = count+10000 //simplOper.incCount3();//count = count+10000 } } } public static void main(String[] args) throws InterruptedException { SynTest simplOper = new SynTest(); //启动两个线程 Count count1 = new Count(simplOper); Count count2 = new Count(simplOper); count1.start(); count2.start(); Thread.sleep(50); System.out.println(simplOper.count);//20000 } }

    1.2 对象锁和类锁:

         对象锁是用于对象实例方法,或者一个对象实例上的,类锁是用于类的静态方法或者一个类的 class 对象上的。我们知道,类的对象实例可以有很多个,但是每个类只有一个 class 对象,所以不同对象实例的对象锁是互不干扰的,但是每个类只有一个类锁。      但是有一点必须注意的是,其实类锁只是一个概念上的东西,并不是真实存在的,类锁其实锁的是每个类的对应的class对象。类锁和对象锁之间也是互不干扰的。

    /** * *类说明:线程休眠辅助工具类 */ public class SleepTools { /** * 按秒休眠 * @param seconds 秒数 */ public static final void second(int seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { } } /** * 按毫秒数休眠 * @param seconds 毫秒数 */ public static final void ms(int seconds) { try { TimeUnit.MILLISECONDS.sleep(seconds); } catch (InterruptedException e) { } } } /** *类说明:类锁和对象锁 */ public class StaticAndClass { private static class SynClass extends Thread{ @Override public void run() { System.out.println(currentThread().getName() +":SynClass is running..."); synClass(); } } private static class SynStatic extends Thread{ @Override public void run() { System.out.println(currentThread().getName() +"SynStatic is running..."); synStatic(); } } //类在进行类加载的时候在虚拟机上都会有一个class类,所以static修饰的方法上加锁,意味着锁的是当前类的class对象 private static synchronized void synClass() { System.out.println(Thread.currentThread().getName() +"synClass going..."); SleepTools.second(1); System.out.println(Thread.currentThread().getName() +"synClass end"); } //锁的是对象 private static Object obj = new Object(); private static void synStatic() { synchronized (obj){ System.out.println(Thread.currentThread().getName() +"synStatic going..."); SleepTools.second(1); System.out.println(Thread.currentThread().getName() +"synStatic end"); } } public static void main(String[] args) { Thread t1 = new SynClass(); // Thread t2 = new SynStatic(); //相对于t1,由于锁的不是同一个对象,故两个线程是并行的 Thread t2 = new SynClass(); //相对于t1,由于锁的是同一个对象,所以需要等上一个线程执行结束才执行 t2.start(); SleepTools.second(1); //按秒休眠 t1.start(); } }

    注意:错误的加锁和原因分析

    /** * 类说明:错误的加锁和原因分析 */ public class TestIntegerSyn { public static void main(String[] args) throws InterruptedException { Worker worker=new Worker(1); //Thread.sleep(50); for(int i=0;i<5;i++) { new Thread(worker).start(); } } private static class Worker implements Runnable{ private Integer i; public Worker(Integer i) { this.i=i; } @Override public void run() { synchronized (i) { Thread thread=Thread.currentThread(); System.out.println(thread.getName()+"--@"+System.identityHashCode(i)); i++; System.out.println(thread.getName()+"-------"+i+"-@"+System.identityHashCode(i)); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(thread.getName()+"-------"+i+"--@"+System.identityHashCode(i)); } } } }

         虽然对 i 进行了加锁,但是反编译这个类的 class 文件后,可以看到 i++实际是返回了一个新的 Integer 对象。也就是每个线程实际加锁的是不同的 Integer 对象。

    1.3 volatile,最轻量的同步机制

         volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

    /** * 类说明:演示Volatile的提供的可见性 */ public class VolatileCase { //不加volatile时,子线程无法感知主线程修改了ready的值,而加了volatile后,子线程可以感知主线程修改了ready的值 private static volatile boolean ready; //reday默认false private static int number; private static class PrintThread extends Thread{ @Override public void run() { System.out.println("PrintThread is running......."); while(!ready) { ; } System.out.println("number = "+number); } } public static void main(String[] args) { new PrintThread().start(); SleepTools.second(1); number = 51; ready = true; SleepTools.second(5); //按秒休眠 System.out.println("main is ended!"); } }

         不加volatile时,子线程无法感知主线程修改了ready的值,从而不会退出循环,而加了volatile后,子线程可以感知主线程修改了ready的值,迅速退出循环。但是volatile不能保证数据在多个线程下同时写时的线程安全,如下代码:

    /** * 类说明:volatile 不能保证数据在多个线程下同时写时的线程安全 */ public class NotSafe { private volatile long count =0; public long getCount() { return count; } public void setCount(long count) { this.count = count; } //count进行累加 public void incCount(){ count++; } //线程 private static class Count extends Thread{ private NotSafe simplOper; public Count(NotSafe simplOper) { this.simplOper = simplOper; } @Override public void run() { for(int i=0;i<10000;i++){ simplOper.incCount(); } } } public static void main(String[] args) throws InterruptedException { NotSafe simplOper = new NotSafe(); //启动两个线程 Count count1 = new Count(simplOper); Count count2 = new Count(simplOper); count1.start(); count2.start(); Thread.sleep(50); System.out.println(simplOper.count); //结果始终小于20000 (线程不安全) } }

    原因:线程执行count++实际上分为3步,从内存中取到count原始值放到自己的缓存中,然后执行+1操作,在同步到主内存中。此过程会出现其中一个线程只执行了前两步时另一个线程就执行了第一步,此时count的原始值并没有被第一个线程改变,最后结果就是两个线程写到主内存中的是同一个值。 注意:volatile 最适用的场景:一个线程写,多个线程读。

    2 ThreadLocal辨析

    2.1 ThreadLocal与Synchonized的比较

         ThreadLocal和Synchonized都用于解决多线程并发访问。可是 ThreadLocal与synchronized有本质的差别。synchronized 是利用锁的机制,使变量或代码块在某一时该仅仅能被一个线程访问。而ThreadLocal为每个线程都提供了变量的副本,使得每个线程在某一时间访问到的并非同一个对象,这样就隔离了多个线程对数据的数据共享。      Spring 的事务就借助了ThreadLocal 类。Spring会从数据库连接池中获得一个connection,然会把connection 放进ThreadLocal 中,也就和线程绑定了,事务需要提交或者回滚,只要从ThreadLocal中拿到connection进行操作。为何 Spring的事务要借助ThreadLocal 类? 以 JDBC 为例,正常的事务代码可能如下: dbc = new DataBaseConnection();//第 1 行 Connection con = dbc.getConnection();//第 2 行 con.setAutoCommit(false);// //第 3 行 con.executeUpdate(…);//第 4 行 con.executeUpdate(…);//第 5 行 con.executeUpdate(…);//第 6 行 con.commit();第 7 行 上述代码,可以分成三个部分: 事务准备阶段:第 1~3 行 业务处理阶段:第 4~6 行 事务提交阶段:第 7 行 可以很明显的看到,不管我们开启事务还是执行具体的 sql 都需要一个具体的数据库连接。      现在我们开发应用一般都采用三层结构,如果我们控制事务的代码都放在DAO(DataAccessObject)对象中,在 DAO 对象的每个方法当中去打开事务和关闭事务,当 Service 对象在调用 DAO 时,如果只调用一个 DAO,那我们这样实现则效果不错,但往往我们的 Service 会调用一系列的 DAO 对数据库进行多次操作,那么,这个时候我们就无法控制事务的边界了,因为实际应用当中,我们的Service调用的 DAO 的个数是不确定的,可根据需求而变化,而且还可能出现 Service 调用 Service 的情况。      如果不使用 ThreadLocal,代码大概就会是这个样子:

         但是需要注意一个问题,如何让三个 DAO 使用同一个数据源连接呢?我们就必须为每个 DAO 传递同一个数据库连接,要么就是在 DAO 实例化的时候作为构造方法的参数传递,要么在每个 DAO 的实例方法中作为方法的参数传递。这两种方式无疑对我们的 Spring 框架或者开发人员来说都不合适。为了让这个数据库连接可以跨阶段传递,又不显示的进行参数传递,就必须使用别的办法。       Web 容器中,每个完整的请求周期会由一个线程来处理。因此,如果我们能将一些参数绑定到线程的话,就可以实现在软件架构中跨层次的参数共享(是隐式的共享)。而 JAVA 中恰好提供了绑定的方法–使用 ThreadLocal。      结合使用 Spring 里的 IOC 和 AOP,就可以很好的解决这一点。      只要将一个数据库连接放入 ThreadLocal 中,当前线程执行时只要有使用数据库连接的地方就从 ThreadLocal 获得就行了。

    2.2 ThreadLocal 的使用

    ThreadLocal 类接口很简单,只有 4 个方法,了解一下:

    void set(Object value) 设置当前线程的线程局部变量的值。public Object get() 该方法返回当前线程所对应的线程局部变量。public void remove() 将当前线程局部变量的值删除,目的是为了减少内存的占用,该方法是 JDK5.0新增的方法。需要指出的是,当线程结束后,对应该线程的局部变量将自动被垃圾回收,所以显式调用该方法清除线程的局部变量并不是必须的操作,但它可以加快内存回收的速度。protected Object initialValue() 返回该线程局部变量的初始值,该方法是一个 protected 的方法,显然是为了让子类覆盖而设计的。这个方法是一个延迟调用方法,在线程第 1 次调用get()或set(Object)时才执行,并且仅执行 1 次。ThreadLocal 中的缺省实现直接返回一个null。

    2.3 实现解析

    2.4 引发的内存泄漏分析

    2.5 错误使用ThreadLocal导致线程不安全

    3 线程间的协作

         线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件,如果条件满足则退出 while 循环,从而完成消费者的工作。却存在如下问题:      1) 难以确保及时性。      2)难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。

    3.1 等待/通知机制

         是指一个线程 A 调用了对象O的wait()方法进入等待状态,而另一个线程 B调用了对象O的notify()或者notifyAll()方法,线程A 收到通知后从对象O的wait()方法返回,进而执行后续操作。上述两个线程通过对象O来完成交互,而对象上的 wait()和 notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

    先解释两个概念: 等待池:假设一个线程A调用了某个对象的wait()方法,线程A就会释放该对象的锁后(因为wait()方法必须出现在synchronized中,这样自然在执行wait()方法之前线程A就已经拥有了该对象的锁),进入到了该对象的等待池,等待池中的线程不会去竞争该对象的锁。 锁池:只有获取了对象的锁,线程才能执行对象的 synchronized 代码,对象的锁每次只有一个线程可以获得,其他线程只能在锁池中等待。 下方关键字需要跟synchronized配合使用

    notify(): 通知一个在对象上等待的线程,使其从wait方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入WAITING状态。notifyAll(): 通知所有等待在该对象上的线程wait() 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回.需要注意,调用 wait()方法后,会释放对象的锁wait(long) 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回wait (long,int) 对于超时时间更细粒度的控制,可以达到纳秒

    为什么 wait(), notify() 需要搭配synchronized关键字使用?      wait(), notify() 操作的目的是基于某种条件, 协调多个线程间的运行状态,由于涉及到多个线程间基于共享变量的相互通信,必然需要引入某种同步机制, 以确保wait(), notify() 操作在线程层面的原子性。

    3.2 等待和通知的标准范式

    等待方遵循如下原则:      1)获取对象的锁。      2)如果条件不满足,那么调用对象的 wait()方法,被通知后仍要检查条件。      3)条件满足则执行对应的逻辑。

    synchronized(对象) { while(条件不满足) { 对象.wait(); } 对应的处理逻辑 }

    通知方遵循如下原则:      1)获得对象的锁。      2)改变条件。      3)通知所有等待在对象上的线程。

    synchronized(对象) { 改变条件 对象.notifyAll(); }

         在调用wait() 、notify() 系列方法 之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法 、notify() 系列方法,进入 wait()方法后,当前线程释放锁,在从 wait()返回前,线程与其他线程竞争重新获得锁,执行 notify()系列方法的线程退出调用了notifyAll 的synchronized代码块的时候后,他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出synchronized代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。

    3.3 notify和notifyAll该用谁

         尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程。

    /** *类说明:快递实体类 */ public class Express { public final static String CITY = "ShangHai"; private int km;/*快递运输里程数*/ private String site;/*快递到达地点*/ public Express() { } public Express(int km, String site) { this.km = km; this.site = site; } /* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/ public synchronized void changeSite(){ this.site = "BeiJing"; notifyAll(); } public synchronized void waitSite(){ while(this.site.equals(CITY)){//快递到达目的地 try { wait(); System.out.println("Check Site thread["+Thread.currentThread().getId() +"] is be notified"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("the site is "+this.site+",I will call user"); } } /** *类说明:测试wait/notify/notifyAll */ public class TestWN { private static Express express = new Express(0,Express.CITY); /*检查地点变化的线程,不满足条件,线程一直等待*/ private static class CheckSite extends Thread{ @Override public void run() { express.waitSite(); } } public static void main(String[] args) throws InterruptedException { for(int i=0;i<3;i++){ new CheckSite().start(); } Thread.sleep(1000); express.changeSite();//快递地点变化 } }

    3.4 等待超时模式实现一个连接池

         调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果。 实现机制:

    /** *类说明:连接池的实现(模拟数据库连接池获取连接) */ public class DBPool { //容器,存放连接 private static LinkedList<Connection> pool = new LinkedList<Connection>(); //限制池的大小 public DBPool(int initialSize) { if (initialSize > 0) { for (int i = 0; i < initialSize; i++) { //此处数据库连接池SqlConnectImpl.fetchConnection() pool.addLast(SqlConnectImpl.fetchConnection()); } } } //释放连接,通知其它的等待连接的线程 public void releaseConnection(Connection connection) { if (connection != null) { synchronized (pool) { pool.addLast(connection); //通知其它等待连接的线程 pool.notifyAll(); } } } //获取连接 //在mills内无法获取到连接,将会返回null public Connection fetchConnection(long mills) throws InterruptedException { synchronized (pool) { //永不超时 if (mills < 0) { while (pool.isEmpty()) { pool.wait(); } return pool.removeFirst(); } else { //超时时刻 long future = System.currentTimeMillis() + mills; //等待时长 long remaining = mills; while (pool.isEmpty() && remaining > 0) { pool.wait(remaining); //唤醒一次,重新计算等待时长 remaining = future - System.currentTimeMillis(); } Connection connection = null; if (!pool.isEmpty()) { connection = pool.removeFirst(); } return connection; } } } } /** *类说明: */ public class SqlConnectImpl implements Connection{ /*拿一个数据库连接*/ public static final Connection fetchConnection(){ return new SqlConnectImpl(); } //重写所有方法即可 }

    3.5 调用 yield() 、sleep()、wait()、notify()等方法对锁有何影响?

         yield() 、sleep()被调用后,都不会释放当前线程所持有的锁。      调用 wait()方法后,会释放当前线程持有的锁,而且当前被唤醒后,会重新去竞争锁,锁竞争到后才会执行 wait 方法后面的代码。      调用 notify()系列方法后,对锁无影响,线程只有在synchronized同步代码执行完后才会自然而然的释放锁,所以 notify()系列方法一般都是synchronized同步代码的最后一行。

    /** *类说明:测试Sleep对锁的影响 */ public class SleepLock { private Object lock = new Object(); public static void main(String[] args) { SleepLock sleepTest = new SleepLock(); Thread threadA = sleepTest.new ThreadSleep(); threadA.setName("ThreadSleep"); Thread threadB = sleepTest.new ThreadNotSleep(); threadB.setName("ThreadNotSleep"); threadA.start(); try { Thread.sleep(1000); System.out.println(" Main slept!"); } catch (InterruptedException e) { e.printStackTrace(); } threadB.start(); } private class ThreadSleep extends Thread{ @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName+" will take the lock"); try { synchronized(lock) { System.out.println(threadName+" taking the lock"); Thread.sleep(5000); System.out.println("Finish the work: "+threadName); } } catch (InterruptedException e) { //e.printStackTrace(); } } } private class ThreadNotSleep extends Thread{ @Override public void run() { String threadName = Thread.currentThread().getName(); System.out.println(threadName+" will take the lock time="+System.currentTimeMillis()); synchronized(lock) { System.out.println(threadName+" taking the lock time="+System.currentTimeMillis()); System.out.println("Finish the work: "+threadName); } } } }
    Processed: 0.008, SQL: 9