JUC之CountDownLatch、CyclicBarrier-多线程与高并发

    科技2025-10-13  4

    CountDownLatch实现了通过一个计数,来控制一个或多个线程是否需要一直阻塞等待。

    CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(Number);

    初始化时指定一个数字,通过await()方法控制线程阻塞等待。

    startSignal.await();

    然后通过countDown()方法实现对初始化时指定的数字减1,每调用一次就减1一次,直到指定的数字变量减为0,这个时候await()实现的阻塞状态被改变,由此程序得以继续向下执行。

    doneSignal.countDown();

    我们可以看一个JDK中官方给的例子:

    例子中,用到了两个CountDownlatch对象,一个用于控制线程开始工作,一个控制等待所有线程工作完成。

    package basic.aqs; import java.util.concurrent.CountDownLatch; class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(15); for (int i = 0; i < 15; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); //doSomethingElse(); // don't let run yet开始前的准备工作 startSignal.countDown(); // let all threads proceed 让线程正常工作取消阻塞 //doSomethingElse(); doneSignal.await(); // wait for all to finish等待所有线程执行完成 } } package basic.aqs; import java.util.concurrent.CountDownLatch; class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { } }  

    另外,await方法可以实现控制等待的时间,超过这个时间的就不等待阻塞了,直接执行后面的操作。

    countDownLatch.await(10,TimeUnit.MILLISECONDS); CountDownLatch底层应用了AQS(AbstractQueuedSynchronizer)同步框架功能。

    从上面的例子我们可以看到,CountDownLatch中指定的数字不断减1,直到为0。

    CyclicBarrier比CountDownLatch升级了一下,初始化指定的数字可以进行重置,然后重复这个countdown过程。由此实现了分批等待操作,比如下面这个例子,对于100个线程,每满20个,就将这20个线程的状态改为正常运行的状态不再阻塞。

    package basic.aqs.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class TestCyclicBarrier { public static void main(String[] args) { //CyclicBarrier barrier = new CyclicBarrier(20); CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人")); /*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() { @Override public void run() { System.out.println("满人,发车"); } });*/ for(int i=0; i<100; i++) { new Thread(()->{ try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }

     

    Processed: 0.010, SQL: 8