博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java并发—— CountDownLatch与CyclicBarrier
阅读量:6449 次
发布时间:2019-06-23

本文共 8189 字,大约阅读时间需要 27 分钟。

CountDownLatch

CountDownLatch闭锁相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,门永远保持打开状态

CountDownLatch实现原理

CountDownLatch通过内部类Sync实现方法,sync继承AQS重写模板中的方法。sync内部定义:

private static final class Sync extends AbstractQueuedSynchronizer {        private static final long serialVersionUID = 4982264981922014374L;                 Sync(int count) {            setState(count);        }        /**         * 获取同步状态         */        int getCount() {            return getState();        }        /**         * 获取同步状态         */        protected int tryAcquireShared(int acquires) {            return (getState() == 0) ? 1 : -1;        }        /**         * 释放同步状态         */        protected boolean tryReleaseShared(int releases) {            // Decrement count; signal when transition to zero            for (;;) {                int c = getState();                if (c == 0)                    return false;                int nextc = c-1;                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }    }复制代码

从源码中重写的方法可以得知,CountDownLatch中的sync采用共享模式。CountDownLatch示例:

public class TestHarness {         public static long timeTasks(int nThreads, final Runnable task) throws InterruptedException {        final CountDownLatch startGate = new CountDownLatch(1);        final CountDownLatch endGate = new CountDownLatch(nThreads);                 for (int i = 0; i < nThreads; i++) {            Thread t = new Thread() {                @Override                public void run() {                    try {                        startGate.await();                        try {                            System.out.println(Thread.currentThread().getName() + "开始执行");                            task.run();                        } finally {                            endGate.countDown();                            System.out.println(Thread.currentThread().getName() + "执行结束");                        }                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            };            t.start();        }        long start = System.nanoTime();        startGate.countDown();        endGate.await();        long end = System.nanoTime();        System.out.println("所有线程执行完毕,耗时:" + (end-start));        return end - start;    }             public static void main(String[] args) throws InterruptedException {        System.out.println(timeTasks(10, new Runnable() {            @Override            public void run() {                System.out.println(Thread.currentThread().getName() + "————————work");            }        }));    }}复制代码

运行结果:

Thread-0开始执行Thread-3开始执行Thread-0————————workThread-0执行结束Thread-1开始执行Thread-2开始执行Thread-7开始执行Thread-7————————workThread-7执行结束Thread-9开始执行Thread-9————————workThread-9执行结束Thread-8开始执行Thread-8————————workThread-8执行结束Thread-2————————workThread-2执行结束Thread-6开始执行Thread-1————————workThread-6————————workThread-6执行结束Thread-5开始执行Thread-5————————workThread-5执行结束Thread-3————————workThread-3执行结束Thread-4开始执行Thread-1执行结束Thread-4————————workThread-4执行结束所有线程执行完毕,耗时:27949762794976复制代码

CyclicBarrier

相对于CountDownLatch是一次性对象,一旦进入终止状态,就不能被重置,CyclicBarrier可以反复使用。CyclicBarrier类似于闭锁,与闭锁的关键区别在于,闭锁用于等待事件,栅栏用于等待其他线程,其作用是让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier实现原理

CyclicBarrier构造方法

public CyclicBarrier(int parties) {        this(parties, null);    }        public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        this.count = parties;        this.barrierCommand = barrierAction;    }复制代码

参数parties指栅栏拦截的线程数量

参数barrierAction指当这些线程都到达栅栏时优先会执行的线程

await()方法

public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }        private int dowait(boolean timed, long nanos)        throws InterruptedException, BrokenBarrierException,               TimeoutException {        final ReentrantLock lock = this.lock;        // 获取锁        lock.lock();        try {            final Generation g = generation;            // 若栅栏处于断开状态,抛出异常            if (g.broken)                throw new BrokenBarrierException();            // 若线程中断,断开CyclicBarrier            if (Thread.interrupted()) {                breakBarrier();                throw new InterruptedException();            }            int index = --count;            // count为0表明所有线程到达栅栏位置            if (index == 0) {  // tripped                boolean ranAction = false;                try {                    // 若初始化时指定了所有线程到达栅栏时的任务,执行它                     final Runnable command = barrierCommand;                    if (command != null)                        command.run();                    ranAction = true;                    // 唤醒所有等待线程,开始新的generation                    nextGeneration();                    return 0;                } finally {                    // 若任务执行异常,断开CyclicBarrier                    if (!ranAction)                        breakBarrier();                }            }            // 循环所有线程到达栅栏或栅栏断开或线程中断或超时            for (;;) {                try {                    // 一直等待                    if (!timed)                        trip.await();                    // 限时等待                        else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    // 若线程中断且栅栏没有断开,断开CyclicBarrier                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        // We're about to finish waiting even if we had not                        // been interrupted, so this interrupt is deemed to                        // "belong" to subsequent execution.                        Thread.currentThread().interrupt();                    }                }                                 if (g.broken)                    throw new BrokenBarrierException();                                if (g != generation)                    return index;                // 若等待超时,断开CyclicBarrier                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            // 释放锁            lock.unlock();        }    }复制代码

其主要逻辑:若有线程未到达栅栏位置,到达栅栏位置的线程一直等待状态,直至发生以下场景:

①. 所有线程都到达栅栏位置
②. 有线程被中断
③. 线程等待超时
④. 有线程调用reset()方法,断开当前栅栏,将栅栏重置为初始状态
reset方法:

public void reset() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            // 断开当前栅栏            breakBarrier();   // break the current generation            // 开始新的generation            nextGeneration(); // start a new generation        } finally {            lock.unlock();        }    }复制代码

CyclicBarrier示例

public class CyclicBarrierTest {         private static CyclicBarrier cyclicBarrier;         static class CyclicBarrierThread extends Thread{        public void run() {            System.out.println("运动员:" + Thread.currentThread().getName() + "到场");            try {                cyclicBarrier.await();            } catch (Exception e) {                e.printStackTrace();            }        }    }         public static void main(String[] args){        cyclicBarrier = new CyclicBarrier(5, new Runnable() {            @Override            public void run() {                System.out.println("运动员全部到齐,比赛开始");            }        });                 for(int i = 0 ; i < 5 ; i++){            new CyclicBarrierThread().start();        }    }}复制代码

运行结果:

运动员:Thread-0到场运动员:Thread-1到场运动员:Thread-2到场运动员:Thread-3到场运动员:Thread-4到场运动员全部到齐,比赛开始复制代码

CountDownLatch与CyclicBarrier区别

①.CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次

②.CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断

③.CountDownLatch倾向于一个线程等多个线程,CyclicBarrier倾向于多个线程互相等待

转载地址:http://zmlwo.baihongyu.com/

你可能感兴趣的文章
Redis集群中删除/修改节点(master、slave)(实验)
查看>>
memcache数据库和redis数据库的区别(理论)
查看>>
我的友情链接
查看>>
MyBatis+Spring结合
查看>>
shell实例-判断apache是否正常启动
查看>>
SharedPreferences存储复杂对象解决方案
查看>>
Office 365之SkyDrive Pro
查看>>
脑残式网络编程入门(二):我们在读写Socket时,究竟在读写什么?
查看>>
无缝滚动实现原理分析【公告栏】
查看>>
Java Web 高性能开发
查看>>
redis-cli 命令总结
查看>>
CentOS 4.4双网卡绑定,实现负载均衡
查看>>
GitHub页面使用方法
查看>>
Python爬虫综述(笔记)
查看>>
Scala之柯里化和隐式转换
查看>>
wmic命令
查看>>
Merge and BottomUpSort
查看>>
reids 安装记录
查看>>
获取androdmanifest里面的meta-data
查看>>
Centos 6.3编译安装nagios
查看>>