Java并发工具学习(八)——Semaphore和Condition

从这一篇博客开始,开始总结线程间协作,并发流程控制的工具类,这一篇主要介绍Semaphore和Condition

Semaphore中文译文为信号量,操作系统中也有同样的概念。类似于生活中常见的许可证的概念。在执行指定业务逻辑之前,需要先获取相关的许可证。在限流的使用场景中也有Semaphore的影子。

在Java中,信号量的作用是维护一个"许可证"的计数,线程可以获取许可证,获取成功信号量的个数就减一,同时线程也可以释放一个许可证,释放一个信号量就加一,当信号量所拥有的许可证数量为0,那么其他想要获取许可证的线程就需要等待,直到有另外的线程释放了许可证。

在初始化Semaphore的时候,需要初始化信号量个数,同时也可以设置对应的公平策略,如果指定为公平,那么Semaphore会把之前的等待线程放入到自己维护的一个FIFO队列中,信号量的分发会根据在FIFO队列中等待的时长来进行。

通过Semaphore获取信号量的方法有多个,大体上分为以下三类

方法 作用
acquire()
acquire(int permits)
获取信号量的时候阻塞,同时响应中断。这里的permits参数指的是想要获取信号量的个数
acquireUninterruptibly()
acquireUninterruptibly(int permits)
在获取信号量的时候不响应中断
tryAcquire()
tryAcquire(int permits)
tryAcquire(int permits, long timeout, TimeUnit unit)
tryAcquire(long timeout, TimeUnit unit)
尝试获取信号量,如果没有则不阻塞,可以指定获取信号量的时间

如果执行完逻辑,可以调用release来释放信号量

简单实例

/**  * autor:liman  * createtime:2021/11/28  * comment:多个线程等待许可证  */ public class SemaphoreDemo01 {      static Semaphore semaphore = new Semaphore(3, true);//3个许可证,公平模式      public static void main(String[] args) {         ExecutorService executorService = Executors.newFixedThreadPool(50);         for (int i = 0; i < 100; i++) {             executorService.submit(new Task());         }         //判断线程池是否停止         executorService.shutdown();         while(!executorService.isTerminated()){             //线程池没有执行完成任务,主线程就空转,直到线程池运行结束         }     }      static class Task implements Runnable {          @Override         public void run() {             try {                 semaphore.acquire();                 //获取多个信号量                 //semaphore.acquire(3);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println(Thread.currentThread().getName() + "拿到了许可证");             try {                 Thread.sleep(2000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             semaphore.release();             //释放多个信号量             //semaphore.release(3);             System.out.println(Thread.currentThread().getName() + "释放了许可证");         }     } } 

信号量的获取个数可以灵活指定,这在一定程度上可以根据线程耗费资源的程度进行侧重,比如有两个任务,一个是TaskA ,一个是TaskB,TaskA的执行需要耗费较多的资源,TaskB需要耗费较少的资源,我们可以定义3个信号量。执行TaskA需要获取全部信号量才能执行,而执行TaskB只需要获取一个信号量,这样TaskA和TaskB就永远不可能同时执行。

信号量需要注意的是,在获取和释放的时候,数量必须要保证一致,每次获取多少信号量就要释放多少信号量,如果某个线程获取了2个信号量,但是释放的时候,只释放了1个,随着时间的推移,总会有一个时间点,其他线程的信号量不够用了,这样会导致程序卡死。

信号量的获取和释放,可以不在同一个线程中执行,也许是线程A获取了3个信号量,线程B释放了3个信号量,这也是可行的,但要保证逻辑合理。

Condition是一个接口,其通过ReentrantLock创建,其作用非常类似于Object中的wait和notify的组合,在Conditon中变成了await和signal。关于wait和notify这两者可以参考之前的博客Thread和Object中的方法。

Java并发工具学习(八)——Semaphore和Condition

简单基本实例

/**  * autor:liman  * createtime:2021/11/28  * comment:普通的condition用法实例  */ @Slf4j public class ConditionDemo {      private ReentrantLock lock = new ReentrantLock();     //condition由锁来实例化     private Condition condition = lock.newCondition();      public void method01(){         lock.lock();         try{             System.out.println("条件不满足,进入await");             //这里             condition.await();             System.out.println("条件满足,开始继续执行");         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             lock.unlock();         }     }      public void method02(){         lock.lock();         try{             System.out.println("准备工作完成,唤醒其他线程");             condition.signal();         }finally {             lock.unlock();         }     }      public static void main(String[] args) {         ConditionDemo conditionDemo = new ConditionDemo();         new Thread(()->{             try {                 Thread.sleep(1000);                 conditionDemo.method02();//唤醒在condition上等待的线程             } catch (InterruptedException e) {                 e.printStackTrace();             }         }).start();         //主线程调用method01先行阻塞,在1秒之后,被上面新建的线程唤醒         conditionDemo.method01();     } } 

与notify相对应的,Condition也存在signal和signalAll两个方法,其中signalAll会唤醒Condition上等待的所有线程,而signal是公平的,只会唤醒等待时间最长的线程。

利用Condition实现的生产者消费者模式

/**  * autor:liman  * createtime:2021/11/28  * comment:condition实现生产者和消费者  */ @Slf4j public class ConditionProducerAndCustomer {      private int queueSize = 10;     private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);     private Lock lock = new ReentrantLock();     private Condition notFull = lock.newCondition();     private Condition notEmpty = lock.newCondition();      class Customer extends Thread{         @Override         public void run() {             consume();         }          private void consume(){             while(true){//消费者一直消费                 lock.lock();                 try{                     while(queue.size() == 0){                         System.out.println("队列为空,消费者进入等待");                         try {                             notEmpty.await();//用notEmpty进行等待                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                     queue.poll();                     notFull.signal();//取出数据,同志爱生产者继续生产                     System.out.println("从队列取走了一个数据,队列剩余"+queue.size()+"个数据");                 }finally {                     lock.unlock();                 }             }         }     }      class Producer extends Thread{         @Override         public void run() {             produce();         }          private void produce(){             while(true){//生产者一直生产                 lock.lock();                 try{                     while(queue.size() == queueSize){                         System.out.println("队列已满,生产者进入等待");                         try {                             notFull.await();//用 notFull 进行等待                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                     queue.offer(new Random().nextInt(100));                     notEmpty.signalAll();//取出数据,同志爱生产者继续生产                     System.out.println("生产者向消费者插入了一个数据,队列剩余"+queue.size()+"个数据");                 }finally {                     lock.unlock();                 }             }         }     }      public static void main(String[] args) {         ConditionProducerAndCustomer conditionProducerAndCustomer = new ConditionProducerAndCustomer();         Producer producer = conditionProducerAndCustomer.new Producer();         Customer customer = conditionProducerAndCustomer.new Customer();         customer.start();         producer.start();     } } 

如果说Lock是用来替代synchronized关键字的,那么Condition其实可以理解为用来替代Object.wati/notify的,在用法和性质上,二者几乎没有区别。

await方法会释放lock的锁,和Object.wait一样,不需要自己手动先行释放

调用await的时候,也必须要持有lock锁,否则会出现异常(虽然编译的时候不会出现异常)

简单总结了一下Semaphore和Condition,下一篇总结一下CountDownLatch和CyclicBarrier