Phaser API的使用

    科技2022-07-10  101

    Phaser 方法

    int arriveAndAwaitAdvance(); 当前线程到达屏障parties值加1,在此等候条件满足后再向下一个屏障运行,条件不满足时线程呈现阻塞状态。 int arriveAndDeregister(); parties值减一; int getPhase();获取已经到达第几个屏障; boolean onAdvance(int phase, int registeredParties);通过新屏障的时候调用。返回true则屏障不等待,Phaser呈现无效/销毁状态,返回false则Phaser继续工作。

    Phaser phaser = new Phaser(3) { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("onAdvance方法被调用"); return true; } };

    int getRegisteredParties();获得注册的parties值; int register();parties值 动态添加1; int bulkRegister(int parties); 批量增加parties值; int getArrivedParties();获得已经被使用了的parties个数。 getUnarrivedParties(); 获得没有被使用的parties个数。 int arrive(); parties值加1,并且不在屏障处等待(调用线程),直接执行下面代码。经过屏障后parties计数会被重置为0;其他线程不满足计数条件是还是呈现等待状态。 int awaitAdvance(int phase) ; 传入参数phase值和当前getPhase()方法返回值一样,则在屏障处等待,否则继续向下运行。注意:方法不参与parties值得计数操作,只具有判断功能。还是线程不可中断的。 int awaitAdvanceInterruptibly(int phase); 传入参数phase值和当前getPhase()方法返回值一样就等待,否则就继续执行下面的代码。线程时可中断的。 int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit);在指定的屏障数最大等待指定的时间,指定时间内屏障数未变则抛出异常。 void forceTermination(); 使Phaser屏障功能失效。 boolean isTerminated(); 判断Phaser对象是否已经呈现销毁状态。

    awaitAdvance()

    public class ThreadA extends Thread{ private Phaser phaser; public ThreadA(Phaser phaser) { super(); this.phaser = phaser; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" 开始 "+System.currentTimeMillis()); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+" 结束 "+System.currentTimeMillis()); } } public class ThreadC extends Thread{ private Phaser phaser; public ThreadC(Phaser phaser) { super(); this.phaser = phaser; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" 开始 "+System.currentTimeMillis()); try { Thread.sleep(3000); System.out.println(Thread.currentThread().getName() + " phaser " + phaser.getPhase() + " " + System.currentTimeMillis()); phaser.awaitAdvance(0); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" 结束 "+System.currentTimeMillis()); } } public class ThreadD extends Thread { private Phaser phaser; public ThreadD(Phaser phaser) { super(); this.phaser = phaser; } @Override public void run() { System.out.println(Thread.currentThread().getName() + " 开始 " + System.currentTimeMillis()); try { Thread.sleep(5000); System.out.println(Thread.currentThread().getName() + " phaser " + phaser.getPhase() + " " + System.currentTimeMillis()); phaser.arriveAndAwaitAdvance(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 结束 " + System.currentTimeMillis()); } } public static void main(String[] args) { Phaser phaser = new Phaser(2); ThreadA a = new ThreadA(phaser); a.setName("线程A"); a.start(); ThreadC c = new ThreadC(phaser); c.setName("线程C"); c.start(); ThreadD d = new ThreadD(phaser); d.setName("线程D"); d.start(); }

    相等的测试结果 线程A 开始 1601714914797 线程C 开始 1601714914799 线程D 开始 1601714914799 线程C phaser 0 1601714917799 线程D phaser 0 1601714919799 线程C 结束 1601714919799 线程A 结束 1601714919799 线程D 结束 1601714919799

    不相等的测试结果 线程A 开始 1601715071795 线程C 开始 1601715071798 线程D 开始 1601715071799 线程C phaser 0 1601715074798 线程C 结束 1601715074798 线程D phaser 0 1601715076800 线程A 结束 1601715076800 线程D 结束 1601715076800

    Phaser执行时机的控制 先将parties值加1,在做逻辑处理满足条件后再减1,达到控制Phaser对象的执行时机效果。

    public class ThreadA extends Thread{ private Phaser phaser; public ThreadA(Phaser phaser) { super(); this.phaser = phaser; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" 开始 "+System.currentTimeMillis()); phaser.arriveAndAwaitAdvance(); System.out.println(Thread.currentThread().getName()+" 结束 "+System.currentTimeMillis()); } } Phaser phaser = new Phaser(2); phaser.register(); ThreadA a = new ThreadA(phaser); a.setName("线程A"); a.start(); ThreadA b = new ThreadA(phaser); b.setName("线程B"); b.start(); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } phaser.arriveAndDeregister();

    测试结果 线程B 开始 1601715266962 线程A 开始 1601715266962 线程B 结束 1601715271962 线程A 结束 1601715271962

    Processed: 0.011, SQL: 8