提升 IO 密集型服务性能的异步处理模型


电子商务平台的订单处理服务是典型的 I/O 密集型(I/O Bound)服务。

IO 密集型任务:一般就是说需要大量请求数据库和网络请求这些,如果请求比较多,系统大部分时间都花在等待 IO 操作完成。也就导致系统的性能瓶颈是在 IO 的延迟上。因此需要采用并发加载的方式,减少系统串行等待 IO 操作导致的系统性能下降。

并发加载的方式

主要讲三种方式,同步模型,线程池异步,NIO 异步模型,CompletableFuture 的异步模型。

同步模型

很经典,也就是一个服务调用完返回结果,再去调另一个服务,之后逐个返回结果拼成 VO(响应的数据结果)。

这种其实不用多说,肯定是最不可取的,我们至少需要引入异步操作。

线程池异步

线程池的异步实际上是线程的并发操作,不会阻塞主线程

我们可以通过引入线程池,也就是利用线程池能够管理线程的特点:

  • 线程可复用,避免线程重复创建
  • 异步操作,直接分配线程去处理请求,不会阻塞主线程
  • 任务执行完成后线程被释放,并且能够被分配给其他任务

这样,我们可以在一个请求来到时,把向各个服务的请求交给线程池处理,我们再拿到响应的数据进行封装。这样能避免串行场景下的等待时间。但是,这样还有问题:

CPU 仍然资源利用率低

虽然等待时间减少了,但是这些子线程仍然需要等待 IO 操作,CPU 仍然处于等待 IO 操作的空闲状态。这样也会导致 CPU 资源利用率低。

所以,一般通过回调去防止阻塞,也就是在执行 IO 操作时做异步处理,当 IO 操作完成之后,通知线程 IO 操作完成,需要继续执行数据的处理和响应。进而减少 CPU 阻塞时间。然而这种场景只适合少量请求的回调,如果回调函数过多,会造成”回调地狱”问题,使得代码可读性和可维护性降低。

为增加并发度,需要增大线程池容量或数量

CPU 可以调度的线程数量更多了,但是竞争更激烈了,可能会发生频繁的上下文切换,导致资源白白消耗。并且线程池的参数也不好设置,大量空闲线程也会占用系统资源。

所以线程池异步实际上也不是完全合适的。

补充:线程池是可以通过 Future 类去接受子线程执行的结果的,可以通过它的 get 方法等待任务执行完成之后获取结果,如果任务还没有结束,则会阻塞在获取操作那里,等待任务完成。

NIO 异步

和线程池的异步不同,NIO 异步基于事件驱动模型和 selector 机制完成异步加载。

  • Selector:负责监控所有注册的 Channel(Channel 通道实际上就是 I/O 连接,比如文件传输通道,Socket 套接字)。
  • Channel 的非阻塞模式:当需要进行 I/O 操作时,Channel 不会进行等待,而是立即返回继续处理其他请求。Selector 会监控这些 Channel,当 I/O 操作可以进行时(例如数据准备好读取或可以写入),Selector 会通知应用程序处理这些 I/O 操作。
  • 事件驱动模型:当有 I/O 事件(如 read、write、accept 等操作)发生时,Selector 会检测到并返回一个包含所有准备好进行 I/O 操作的 Channel 的 SelectionKey 集合。应用程序迭代这个集合,并处理每个 Channel 上的 I/O 事件。通过这种方式,I/O 操作是由事件驱动的,而不是由线程阻塞等待。

那它是怎么和处理请求的线程联系起来的呢

这点,我的理解是:NIO 的选择器能够监听所有的 IO 操作,然后通过 Channel 的方式将这些请求提交给线程池处理,也就不会阻塞主线程了

所以和线程池的根本区别是:线程池实际上没有把 IO 操作完全的抛离出去,而是用子线程去处理请求,如果需要使用回调函数则可能会引发回调地狱。而 NIO 呢,是把所有会阻塞线程的 IO 操作异步的通过 selector 去将 IO 操作传递给线程池的线程去做。

NIO 的不足

NIO 主要适用于高并发的网络通信和 I/O 操作,不适用于所有异步任务。也就是那种需要大量用到 CPU 的请求,NIO 的性能还是有限的。而且 NIO 模型的代码复杂度很高,涉及到缓存,字节,缓冲区等概念模型,编码难度大,不好维护。

而 CompletableFuture ,更适合 CPU 密集型以及需要大量处理业务逻辑的异步处理任务,同时也能够兼容这种 IO 密集型的任务(因为回调地狱)。能够结合线程池实现任务的异步执行,并且它是基于 Future 的进化版,能够获取异步任务执行结果,总之很全面,下面我们开始继续介绍。

当然,NIO 和 CompletableFuture 也是可以联用的。

CompletableFuture 异步模型

CompletableFuture 是基于 Future 的进化版,能够解决传统情况下 ListenableFuture 引发的回调地狱问题。因为传统场景下,Future 实现异步和回调是这样的:

  • 每个异步操作都需要嵌套多个回调函数(比如成功的结果回调,失败的结果回调,或者特殊情况下的回调结果),这种嵌套很容易导致所谓的“回调地狱”,使得代码难以阅读和维护。

ListenableFuture 的实现方式

比如下面这段代码是 ListenableFuture 的异步实现:

  • 定义了一个线程池,两个异步任务
  • 两个异步任务 AB 执行结束后,通过 AllAsList 方法组合两个异步任务执行的结果,之后再给这个聚合的结果加上一个回调函数。
  • 这两个异步任务结束后,会调用回调函数,其中首先打印之前的两个异步任务的结果,然后再新增一个异步任务 C 提交给线程池,再给 C 加上回调函数,依次类推,最后打印出所有异步任务的结果
  • 如果异步任务没有执行完成,则获取异步结果的方法会被阻塞(而 CompletableFuture 虽然通过 get 方法也会阻塞,但是可以选择通过 thenAccept 异步的获取执行结果,所以不会被阻塞)。
    ExecutorService executor = Executors.newFixedThreadPool(5);
    ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executor);
    ListenableFuture<String> future1 = guavaExecutor.submit(() -> {
        // step 1
        System.out.println("执行step 1");
        return "step1 result";
    });
    ListenableFuture<String> future2 = guavaExecutor.submit(() -> {
        // step 2
        System.out.println("执行step 2");
        return "step2 result";
    });
    ListenableFuture<List<String>> future1And2 = Futures.allAsList(future1, future2);
    Futures.addCallback(future1And2, new FutureCallback<List<String>>() {
        @Override
        public void onSuccess(List<String> result) {
            System.out.println(result);
            ListenableFuture<String> future3 = guavaExecutor.submit(() -> {
                System.out.println("执行step 3");
                return "step3 result";
            });
            Futures.addCallback(future3, new FutureCallback<String>() {
                @Override
                public void onSuccess(String result) {
                    System.out.println(result);
                }        
                @Override
                public void onFailure(Throwable t) {
                }
            }, guavaExecutor);
        }
    
        @Override
        public void onFailure(Throwable t) {
        }
    }, guavaExecutor);

流程图如下:

CompletableFuture 的实现方式

CompletableFuture 提供了很多组合操作,比如 thenCombinethenAcceptthenApply,下面这个就是用到了 thenCombine ,能够将异步任务直接传递到参数,等待两个异步任务执行完成后,可以直接进行调用,其内部就是回调函数的内容。还可以继续拼接其他方法,比如 thenAccept ,可以处理上一个异步操作(thenCombine )得到的结果。

可以看到,这里是可以采用链式调用的方式完成组合,以及进一步处理。而不是像 ListenableFuture 去嵌套的完成。

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行step 1");
    return "step1 result";
}, executor);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行step 2");
    return "step2 result";
}, executor);
cf1.thenCombine(cf2, (result1, result2) -> {
    System.out.println(result1 + " , " + result2);
    System.out.println("执行step 3");
    return "step3 result";
}).thenAccept(result3 -> System.out.println(result3));

流程图如下:

ThenApply,是对上一个操作返回的结果进行封装,再次生成一个异步任务。

因此呢,我们选择用 CompletableFuture 来处理这种异步任务,能够简化异步调用流程以及回调函数的处理。不会遇到回调地狱的问题,代码看起来也更简洁。

补充 :线程池与 CompletableFuture 结合使用的注意事项

  1. 如果在调用方法时,不指定线程池对象,会默认使用公共线程池 ForkJoinPool,导致频繁竞争,并且有性能瓶颈。因此建议都要指定线程池对象。
  2. 线程池对象最好进行业务之间的隔离,否则容易导致业务间干扰。
  3. 循环依赖导致死锁:

比如下面这个场景:

public Object doGet() {
  ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  //do sth
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("child");
        return "child";
      }, threadPool1).join();//子任务
    }, threadPool1);
  return cf1.join();
}

父任务和子任务共用一个线程池,如果请求量过大,父任务将线程池线程用完了,会导致子任务获取不到线程执行,而父任务需要依赖子任务的执行结果,进而导致死锁。

所以:需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞

写到最后

本文讨论了外卖商家端 API 服务的并发加载方式,主要包括同步模型、线程池异步、NIO 异步模型和 CompletableFuture 异步模型。

  1. 同步模型:逐个调用服务,性能低下,不推荐。
  2. 线程池异步:通过线程池处理并发请求,减少主线程阻塞,但可能导致 CPU 利用率低和回调地狱问题。
  3. NIO 异步:基于事件驱动模型和 selector 机制,适用于高并发的网络通信,但代码复杂度高。
  4. CompletableFuture 异步模型:解决回调地狱问题,适合 CPU 密集型任务,代码简洁易读。

总结:对于电子商务平台的订单处理服务,推荐使用 CompletableFuture 结合线程池实现异步处理,以提高系统性能和可维护性。


文章作者: KTpro
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 KTpro !
  目录