一、Semaphore
1. 简介
Semaphore实现信号量。
Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数。例如:实现一个文件允许的并发访问数。
单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了锁,再由另一个线程释放锁。这可应用于死锁恢复的一些场合。
2. 示例
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;public class SemaphoreTest { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3); for (int i = 0; i < 10; i++) { Runnable r = new Runnable() { @Override public void run() { try { sp.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程"+Thread.currentThread().getName()+"进入,当前已有并发"+(3-sp.availablePermits())); try { Thread.sleep(new Random().nextInt(10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程"+Thread.currentThread().getName()+"即将离开"); sp.release(); System.out.println("线程"+Thread.currentThread().getName()+"已离开,当前已有并发"+(3-sp.availablePermits())); } }; es.execute(r); } es.shutdown(); }}
二、CyclicBarrier
1. 简介
障碍器。表示大家彼此等待,大家集合好后才开始出发,分散活动后又在指定地点集合碰面。
2. 示例
import java.util.Date;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(5); for (int i = 0; i < 5; i++) { Runnable r = new Runnable() { @Override public void run() { try { Thread.sleep(new Random().nextInt(10000)); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"到达目的地1,当前已到达"+(cb.getNumberWaiting()+1)+",正在等候"); cb.await(); System.out.println(new Date()+":所有线程已到达,向目的地2出发"); Thread.sleep(new Random().nextInt(10000)); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"到达目的地2,当前已到达"+(cb.getNumberWaiting()+1)+",正在等候"); cb.await(); System.out.println(new Date()+":所有线程已到达,向目的地3出发"); Thread.sleep(new Random().nextInt(10000)); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"到达目的地3,当前已到达"+(cb.getNumberWaiting()+1)+",正在等候"); cb.await(); System.out.println(new Date()+":所有线程已到达,任务结束"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }; es.execute(r); } es.shutdown(); }}
三、CountDownLatch
1. 简介
犹如倒计时计数器,调用CountDownLatch对象的countDown()方法就将计数器-1,当计数器到达0时,则所有等待者或者单个等待着开始执行。
2. 示例
import java.util.Date;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class CountDownLatchTest { public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(5); for (int i = 0; i < 5; i++) { Runnable r = new Runnable() { @Override public void run() { try { //Thread.sleep(new Random().nextInt(10000)); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"准备接受命令"); cdOrder.await(); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"已接受命令"); Thread.sleep(new Random().nextInt(10000)); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"处理命令"); cdAnswer.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; es.execute(r); } try { Thread.sleep(new Random().nextInt(10000)); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"即将发布命令"); cdOrder.countDown(); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"已发布命令,正等待结果"); cdAnswer.await(); System.out.println(new Date()+":线程"+Thread.currentThread().getName()+"已收到所有响应结果"); } catch (Exception e) { e.printStackTrace(); } es.shutdown(); }}
四、Exchanger
1. 简介
Exchanger用于两个线程之间的数据交换。每个线程在完成一定任务后想与对方交换数据,第一个先拿出数据的线程将一直等待第二个线程拿着数据的到来,才能彼此交换数据。
2. 示例
import java.util.Date;import java.util.Random;import java.util.concurrent.Exchanger;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * Exchanger交换机 两个线程之间用户交换数据 */public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); // 线程之间交换数据 final Exchanger exchanger = new Exchanger(); service.execute(new Runnable() { public void run() { try { String data = "【数据1】"; System.out.println(new Date()+":"+Thread.currentThread().getName() + "准备把" + data + "换出去"); Thread.sleep(new Random().nextInt(10000)); String data2 = (String) exchanger.exchange(data); System.out.println(new Date()+":"+Thread.currentThread().getName() + "换回的数据是" + data2); } catch (InterruptedException e) { e.printStackTrace(); } } }); service.execute(new Runnable() { public void run() { try { String data = "【数据2】"; System.out.println(new Date()+":"+Thread.currentThread().getName() + "准备把" + data + "换出去"); Thread.sleep(new Random().nextInt(10000)); String data2 = (String) exchanger.exchange(data); System.out.println(new Date()+":"+Thread.currentThread().getName() + "换回的数据是" + data2); } catch (Exception e) { e.printStackTrace(); } } }); }}