JAVA语言之CountDownLatch + Callbale+FutureTask 实现异步变同步调用
小标 2018-07-17 来源 : 阅读 1473 评论 0

摘要:本文主要向大家介绍了JAVA语言之CountDownLatch + Callbale+FutureTask 实现异步变同步调用,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了JAVA语言之CountDownLatch + Callbale+FutureTask 实现异步变同步调用,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

通过HTTP接口实现调用MQTT Client发送数据,HTTP接口返回值为MQTT Client发送数据的对应结果。 HTTP接口为同步阻塞,MQTT Client 为异步回调方式。
如何实现在HTTP接口中调用MQTT Client发送数据后,能够阻塞等待MQTT返回结果,然后将结果返回?

解决方法

CountDownLatch + Callbale+FutureTask

1.CountDownLatch作用

CountDownLatch实现在MQTT Client 发送数据后 到接收数据后这段时间的阻塞。

HTTP每次请求,新建一个CountDownLatch,然后将CountDownLatch作为值和deviceId作为KEY保存到Map中,调用MQTT Client 发送数据后,countDownLatch.await(),进行同步等待在MQTT Client接收数据的回调方法中更加deviceId取出CountDwonLatch然后计数减一

 

2.Callbale+FutureTask作用

将调用MQTT Client发送数据的过程,封装成Callable,投递发送任务时,通过返回的FutureTask的get()方法,

同步阻塞,直到结果返回。

 

关键代码

1.Map保存CountDownObj用于同步阻塞等待MQTT Client返回结果,以及将返回结果传递个FutureTask

    private final static ConcurrentMap<String, CountDownObj> countDownLatchMap = new ConcurrentHashMap<>();

    //线程池

    private final ThreadPoolExecutor threadPoolExecutor =

            new ThreadPoolExecutor(3, 5, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), runnable -> {

        Thread thread = new Thread(runnable, "mqtt thread");

        return thread;

    });

2.HTTP API 调用的发送MQTT 消息数据的接口

    /**

     * HTTP API 调用的发送MQTT 消息数据的接口

     * 同步阻塞

     */

    public Integer send(Long packageId, String deviceId) throws Exception {

        ......

       FutureTask<Integer> futureTask = sendTask(publishDto));

       return futureTask.get()

    }

3.投递发送MQTT指令的task方法

   /**

     * 投递MQTT发送指令任务

     * 同步阻塞

     */ 

   private FutureTask<Integer> sendTask(PublishDto publishDto) throws Exception {

        FutureTask<Integer> futureTask = new FutureTask<>(new GetDatapointValueCallable(publishDto));

        threadPoolExecutor.execute(futureTask);

        //阻塞线程

        return futureTask;

    }

4.封装CountDownLatch 和 Integer的对象,用于CountDownLatch阻塞控制和返回结果

    /**

     * 封装CountDownLatch 和 Integer

     * 用于CountDownLatch阻塞控制和返回结果

     */

    private class CountDownObj {

        private final CountDownLatch countDownLatch;

        private volatile Integer value;

 

        private CountDownObj(CountDownLatch countDownLatch) {

            this.countDownLatch = countDownLatch;

        }

 

        public CountDownLatch getCountDownLatch() {

            return countDownLatch;

        }

 

        public Integer getValue() {

            return value;

        }

 

        public void setValue(Integer value) {

            this.value = value;

        }

    }

5.具体发送MQTT数据的Callbale线程Task,会新建CountDownLatch,并通过CountDownLatch.await()方法阻塞,直到MQTT回调接收到数据或者超时。

    /**

     * 发送MQTT消息的任务Callable

     */

    private class GetDatapointValueCallable implements Callable<Integer> {

        private final PublishDto publishDto;

 

        GetDatapointValueCallable(PublishDto publishDto) {

            this.publishDto = publishDto;

        }

 

        @Override

        public Integer call() throws Exception {

            //mqtt client 发送数据,此处具体代码省略

            ......

            

            CountDownLatch countDownLatch = new CountDownLatch(1);

            countDownLatchMap.putIfAbsent(publishDto.getDeviceId(), new CountDownObj(countDownLatch));

            //阻塞,超时时间3s

            countDownLatch.await(3, TimeUnit.SECONDS);

            //返回mqtt指令对应的结果或者null

            return countDownLatchMap.remove(publishDto.getDeviceId()).getValue();

        }

 

    }

6.MQTT接收数据回调,这里通过deviceId从MAP里面取到CountDownObj,释放闭锁(结束callable线程的等待)和设置MQTT返回的结果(即callable中call()返回的结果,也就是FutureTask的get()方法返回的结果)。

    /**

     * MQTT 接收数据回调

     */

    void mqttReceiveCallback(String deviceId, String datapointId, String value) {

        ......

        

        //接收到数据后,通过闭锁释放阻塞的线程,同时设置结果返回给调用者

        CountDownObj countDownObj=countDownLatchMap.get(deviceId);

        if(countDownObj!=null) {

            countDownObj.setValue(Integer.parseInt(value));

            countDownObj.getCountDownLatch().countDown();

        }

        

        .......

    }

希望对JAVA有兴趣的朋友有所帮助。了解更多内容,请关注职坐标编程语言JAVA频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 0
看完这篇文章有何感觉?已经有1人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程