Java高并发编程基础三大利器之CyclicBarrier
小职 2021-03-29 来源 :java金融 阅读 490 评论 0

摘要:本文主要介绍了Java高并发编程基础三大利器之CyclicBarrier,通过具体的内容向大家展现,希望对大家Java开发的学习有所帮助。

本文主要介绍了Java高并发编程基础三大利器之CyclicBarrier,通过具体的内容向大家展现,希望对大家Java开发的学习有所帮助。

Java高并发编程基础三大利器之CyclicBarrier

什么是CyclicBarrier

CyclicBarrier是什么?把它拆开来翻译就是循环(Cycle)和屏障(Barrier)

 

它的主要作用其实和CountDownLanch差不多,都是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障会被打开,所有被屏障阻塞的线程才会继续执行,不过它是可以循环执行的,这是它与CountDownLanch最大的不同。CountDownLanch是只有当最后一个线程把计数器置为0的时候,其他阻塞的线程才会继续执行。

 

如何使用

我们首先先来看下关于使用CyclicBarrier的一个demo:比如游戏中有个关卡的时候,每次进入下一关的时候都需要进行加载一些地图、特效背景音乐什么的只有全部加载完了才能够进行游戏:

 

/**demo 来源https://blog.csdn.net/lstcui/article/details/107389371

 * 公众号【java金融】

 */

public class CyclicBarrierExample {

    static class PreTaskThread implements Runnable {

        private String task;

        private CyclicBarrier cyclicBarrier;

 

        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {

            this.task = task;

            this.cyclicBarrier = cyclicBarrier;

        }

 

        @Override

        public void run() {

            for (int i = 0; i < 4; i++) {

                Random random = new Random();

                try {

                    Thread.sleep(random.nextInt(1000));

                    System.out.println(String.format("关卡 %d 的任务 %s 完成", i, task));

                    cyclicBarrier.await();

                } catch (InterruptedException | BrokenBarrierException e) {

                    e.printStackTrace();

                }

            }

        }

 

        public static void main(String[] args) {

            CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {

                System.out.println("本关卡所有的前置任务完成,开始游戏... ...");

            });

            new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();

            new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();

            new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();

        }

    }

}

输出结果如下:

 Java高并发编程基础三大利器之CyclicBarrier

 

 

我们可以看到每次游戏开始都会等当前关卡把游戏的人物模型,地图数据、背景音乐加载完成后才会开始进行游戏。并且还是可以循环控制的。

 

源码分析

结构组成

/** The lock for guarding barrier entry */

private final ReentrantLock lock = new ReentrantLock();

/** Condition to wait on until tripped */

private final Condition trip = lock.newCondition();

/** The number of parties */

private final int parties;

/* The command to run when tripped */

private final Runnable barrierCommand;

/** The current generation */

private Generation generation = new Generation();

lock:用于保护屏障入口的锁

trip :达到屏障并且不能放行的线程在trip条件变量上等待

parties :栅栏开启需要的到达线程总数

barrierCommand:最后一个线程到达屏障后执行的回调任务

generation:这是一个内部类,通过它实现CyclicBarrier重复利用,每当await达到最大次数的时候,就会重新new 一个,表示进入了下一个轮回。里面只有一个boolean型属性,用来表示当前轮回是否有线程中断。

主要方法

public int await() throws InterruptedException, BrokenBarrierException {

    try {

        return dowait(false, 0L);

    } catch (TimeoutException toe) {

        throw new Error(toe); // cannot happen

    }

}

 

 * Main barrier code, covering the various policies.

 */

private int dowait(boolean timed, long nanos)

    throws InterruptedException, BrokenBarrierException,

           TimeoutException {

    final ReentrantLock lock = this.lock;

    lock.lock();

     try {

           //获取barrier当前的 “代”也就是当前循环

         final Generation g = generation;

        if (g.broken)

            throw new BrokenBarrierException();

 

        if (Thread.interrupted()) {

            breakBarrier();

            throw new InterruptedException();

        }

        // 每来一个线程调用await方法都会进行减1

        int index = --count;

        if (index == 0) {  // tripped

            boolean ranAction = false;

            try {

                final Runnable command = barrierCommand;

                // new CyclicBarrier 传入 的barrierCommand, command.run()这个方法是同步的,如果耗时比较多的话,是否执行的时候需要考虑下是否异步来执行。

                if (command != null)

                    command.run();

                ranAction = true;

                // 这个方法1. 唤醒所有阻塞的线程,2. 重置下count(count 每来一个线程都会进行减1)和generation,以便于下次循环。

                nextGeneration();

                return 0;

            } finally {

                if (!ranAction)

                    breakBarrier();

            }

        }

 

        // loop until tripped, broken, interrupted, or timed out

        for (;;) {

            try {

                 // 进入if条件,说明是不带超时的await

                if (!timed)

                     // 当前线程会释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒。

                    trip.await();

                else if (nanos > 0L)

                     //说明当前线程调用await方法时 是指定了 超时时间的!

                    nanos = trip.awaitNanos(nanos);

            } catch (InterruptedException ie) {

                 //Node节点在 条件队列内 时 收到中断信号时 会抛出中断异常!

                //g == generation 成立,说明当前代并没有变化。

                //! g.broken 当前代如果没有被打破,那么当前线程就去打破,并且抛出异常..

                if (g == generation && ! g.broken) {

                    breakBarrier();

                    throw ie;

                } else {

                    // We're about to finish waiting even if we had not

                    // been interrupted, so this interrupt is deemed to

                    // "belong" to subsequent execution.

                //执行到else有几种情况?

                //1.代发生了变化,这个时候就不需要抛出中断异常了,因为 代已经更新了,这里唤醒后就走正常逻辑了..只不过设置下 中断标记。

                //2.代没有发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出  brokenBarrier异常。也记录下中断标记位。

                    Thread.currentThread().interrupt();

                }

            }

           //唤醒后,执行到这里,有几种情况?

          //1.正常情况,当前barrier开启了新的一代(trip.signalAll())

          //2.当前Generation被打破,此时也会唤醒所有在trip上挂起的线程

          //3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。

            if (g.broken)

                throw new BrokenBarrierException();

           //唤醒后,执行到这里,有几种情况?

        //1.正常情况,当前barrier开启了新的一代(trip.signalAll())

        //2.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。

            if (g != generation)

                return index;

           //唤醒后,执行到这里,有几种情况?

        //.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒。

            if (timed && nanos <= 0L) {

                breakBarrier();

                throw new TimeoutException();

            }

        }

    } finally {

         lock.unlock();

    }

}

小结

到了这里我们是不是可以知道为啥CyclicBarrier可以进行循环计数?

 

CyclicBarrier采用一个内部类Generation来维护当前循环,每一个await方法都会存储当前的generation,获取到相同generation对象的属于同一组,每当count的次数耗尽就会重新new一个Generation并且重新设置count的值为parties,表示进入下一次新的循环。

从这个await方法我们是不是可以知道只要有一个线程被中断了,当代的 generation的broken 就会被设置为true,所以会导致其他的线程也会被抛出BrokenBarrierException。相当于一个失败其他也必须失败,感觉有“强一致性“的味道。

 

总结

CountDownLanch是为计数器是设置一个值,当多次执行countdown后,计数器减为0的时候所有线程被唤醒,然后CountDownLanch失效,只能够使用一次。

CyclicBarrier是当count为0时同样唤醒全部线程,同时会重新设置count为parties,重新new一个generation来实现重复利用。



我是小职,记得找我

✅ 解锁高薪工作

✅ 免费获取学习教程,开发工具,代码大全,参考书籍

Java高并发编程基础三大利器之CyclicBarrier

本文由 @小职 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程