注册

交替打印最容易理解的实现——同步队列

前言


原创不易,禁止转载!


本文旨在实现最简形式的交替打印。理解了同步队列,你可以轻松解决60%以上的多线程面试题。同步队列作为JUC提供的并发原语之一,使用了无锁算法,性能更好,但是却常常被忽略。


交替打印是一类常见的面试题,也是很多人第一次学习并发编程面对的问题,如:



  • 三个线程T1、T2、T3轮流打印ABC,打印n次,如ABCABCABCABC.......
  • 两个线程交替打印1-100的奇偶数
  • N个线程循环打印1-100

很多文章(如: zhuanlan.zhihu.com/p/370130458 )总结了实现交替打印的多种做法:



  1. synchronized + wait/notify: 使用synchronized关键字和wait/notify方法来实现线程间的通信和同步。
  2. join() : 利用线程的join()方法来确保线程按顺序执行。
  3. Lock: 使用ReentrantLock来实现线程同步,通过锁的机制来控制线程的执行顺序。
  4. Lock + Condition: 在Lock的基础上,使用Condition对象来实现更精确的线程唤醒,避免不必要的线程竞争。
  5. Semaphore: 使用信号量来控制线程的执行顺序,通过acquire()和release()方法来管理线程的访问。
  6. 此外还有LockSupport、CountDownLatch、AtomicInteger 等实现方式。

笔者认为,在面试时能够选择一种无bug实现即可。


缺点


这些实现使用的都是原语,也就是并发编程中的基本组件,偏向于底层,同时要求开发者深入理解这些原语的工作原理,掌握很多技巧。


问题在于:如果真正的实践中实现,容易出现 bug,一般也不推荐在生产中使用;


这也是八股文的弊端之一:过于关注所谓的底层实现,忽略了真正的实践。


我们分析这些组件的特点,不外乎临界区锁定、线程同步、共享状态等。以下分析一个实现,看看使用到了哪些技巧:


class Wait_Notify_ACB {

   private int num;
   private static final Object LOCK = new Object();

   private void printABC(int targetNum) {
       for (int i = 0; i < 10; i++) {
           synchronized (LOCK) {
               while (num % 3 != targetNum) {
                   try {
                       LOCK.wait();
                  } catch (InterruptedException e) {
                       e.printStackTrace();
                  }
              }
               num++;
               System.out.print(Thread.currentThread().getName());
               LOCK.notifyAll();
          }
      }
  }
   public static void main(String[] args) {
       Wait_Notify_ACB  wait_notify_acb = new Wait_Notify_ACB ();
       new Thread(() -> {
           wait_notify_acb.printABC(0);
      }, "A").start();
       new Thread(() -> {
           wait_notify_acb.printABC(1);
      }, "B").start();
       new Thread(() -> {
           wait_notify_acb.printABC(2);
      }, "C").start();
  }    
}

整体观之,使用的是 synchronized 隐式锁。使用等待队列实现线程同步,while 循环避免虚假唤醒,维护了多线程共享的 num 状态,此外需要注意多个任务的启动和正确终止。


InterruptedException 的处理是错误的,由于我们没有使用到中断机制,可以包装后抛出 IllegalStateException 表示未预料的异常。实践中,也可以设置当前线程为中断状态,待其他代码进行处理。


Lock不应该是静态的,可以改成非静态或者方法改成静态也行。


总之,经过分析可以看出并发原语的复杂性,那么有没有更高一层的抽象来简化问题呢?


更好的实现


笔者在项目的生产环境中遇到过类似的问题,多个线程需要协作,某些线程需要其他线程的结果,这种结果的交接是即时的,也就是说,A线程的结果直接交给B线程进行处理。


更好的实现要求我们实现线程之间的同步,同时应该避免并发修改。我们很自然地想到 SynchronousQueue,使用 CSP 实现 + CompletableFuture,可以减少我们考虑底层的心智负担,方便写出正确的代码。SynchronousQueue 适用于需要在生产者和消费者之间进行直接移交的场景,通常用于线程之间的切换或传递任务。


看一个具体例子:


以下是两个线程交替打印 1 - 100 的实现,由于没有在代码中使用锁,也没有状态维护的烦恼,这也是函数式的思想(减少状态)。


实现思路为:任务1从队列1中取结果,计算,提交给队列2。任务2同理。使用SynchronousQueue 实现直接交接。


private static Stopwatch betterImpl() {
   Stopwatch sw = Stopwatch.createStarted();
   BlockingQueue<Integer> q1 = new SynchronousQueue<>();
   BlockingQueue<Integer> q2 = new SynchronousQueue<>();
   int limit = 100;
   CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
       while (true) {
           Integer i = Uninterruptibles.takeUninterruptibly(q1);
           if (i <= limit) {
               System.out.println("thread1: i = " + i);
          }
           Uninterruptibles.putUninterruptibly(q2, i + 1);
           if (i == limit - 1) {
               break;
          }
      }
  });
   CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
       while (true) {
           Integer i = Uninterruptibles.takeUninterruptibly(q2);
           if (i <= limit) {
               System.out.println("thread2: i = " + i);
          }
           if (i == limit) {
               break;
          }
           Uninterruptibles.putUninterruptibly(q1, i + 1);
      }
  });
   Uninterruptibles.putUninterruptibly(q1, 1);
   CompletableFuture.allOf(cf1, cf2).join();
   return sw.stop();
}

Uninterruptibles 是 Guava 中的并发工具,很实用,可以避免 try-catch 中断异常这样的样板代码。


线程池配置与本文讨论内容关系不大,故忽略。


一般实践中,阻塞方法都要设置超时时间,这里也忽略了。


这个实现简单明了,性能也不错。如果不需要即时交接,可以替换成缓冲队列(如 ArrayBlockingQueue)。


笔者简单比较了两种实现,结果如下:


private static Stopwatch bufferImpl() {
   Stopwatch sw = Stopwatch.createStarted();
   BlockingQueue<Integer> q1 = new ArrayBlockingQueue<>(2);
   BlockingQueue<Integer> q2 = new ArrayBlockingQueue<>(2);
   // ...
}

public static void main(String[] args) {
   for (int i = 0; i < 100; i++) {
       betterImpl();
       bufferImpl();
    // 预热
  }
   Stopwatch result1 = bufferImpl();
   Stopwatch result2 = betterImpl();
   System.out.println("result1 = " + result1);
   System.out.println("result2 = " + result2);
}

// ...
thread2: i = 92
thread1: i = 93
thread2: i = 94
thread1: i = 95
thread2: i = 96
thread1: i = 97
thread2: i = 98
thread1: i = 99
thread2: i = 100
result1 = 490.3 μs
result2 = 469.1 μs

结论:使用 SynchronousQueue 性能更好,感兴趣的读者可以自己写 JMH 比对。


如果你觉得本文对你有帮助的话,欢迎给个点赞加收藏,也欢迎进一步的讨论。
后续我将继续分享并发编程、性能优化等有趣内容,力求做到全网独一份、深入浅出,一周两更,欢迎关注支持。


作者:桦说编程
来源:juejin.cn/post/7532925096828026899

0 个评论

要回复文章请先登录注册