首页 热点资讯 义务教育 高等教育 出国留学 考研考公
您的当前位置:首页正文

响应式编程总览

2024-12-20 来源:化拓教育网

前情概要:

1 响应式编程总览

在上述响应式编程(后面简称RP)的定义中,除了异步编程,还包含两个重要的关键词:

  • Data streams: 即数据流,分为静态数据流(比如数组,文件)和动态数据流(比如事件流,日志流)两种。基于数据流模型,RP得以提供一套统一的Stream风格的数据处理接口。和Java 8中的Stream API相比,RP API除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
  • The propagation of change: 变化传播,简单来说就是以一个数据流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。这就有点像函数式编程中的组合函数,将多个函数串联起来,把一组输入数据转化为格式迥异的输出数据。

一个容易混淆的概念是响应式设计,虽然它的名字中也包含了“响应式”三个字,但其实和RP完全是两码事。响应式设计是指网页能够自动调整布局和样式以适配不同尺寸的屏幕,属于网站设计的范畴,而RP是一种关注系统可响应性,面向数据流的编程思想或者说编程框架。

特性

从本质上说,RP是一种异步编程框架,和其他框架相比,RP至少包含了以下三个特性:

  • 描述而非执行:在你最终调用subscribe()方法之前,从发布端到订阅端,没有任何事会发生。就好比无论多长的水管,只要水龙头不打开,水管里的水就不会流动。为了提高描述能力,RP提供了比Stream丰富的多的多的API,比如buffer(), merge(), onErrorMap()等。
  • 提高吞吐量: 类似于HTTP/2中的连接复用,RP通过线程复用来提高吞吐量。在传统的Servlet容器中,每来一个请求就会发起一个线程进行处理。受限于机器硬件资源,单台服务器所能支撑的线程数是存在一个上限的,假设为T,那么应用同时能处理的请求数(吞吐量)必然也不会超过T。但对于一个使用开发的RP应用,如果运行在像Netty这样的异步容器中,无论有多少个请求,用于处理请求的线程数是相对固定的,因此最大吞吐量就有可能超过T。
  • 背压(Backpressure)支持:简单来说,背压就是一种反馈机制。在一般的Push模型中,发布者既不知道也不关心订阅者的处理速度,当数据的发布速度超过处理速度时,需要订阅者自己决定是缓存还是丢弃。如果使用RP,决定权就交回给发布者,订阅者只需要根据自己的处理能力问发布者请求相应数量的数据。你可能会问这不就是Pull模型吗?其实是不同的。在Pull模型中,订阅者每次处理完数据,都要重新发起一次请求拉取新的数据,而使用背压,订阅者只需要发起一次请求,就能连续不断的重复请求数据。

适用场景

了解了RP的这些特性,你可能已经猜想到RP有哪些适用场景了。一般来说,RP适用于高并发、带延迟操作的场景,比如以下这些情况(的组合):

  • 一次请求涉及多次外部服务调用
  • 非可靠的网络传输
  • 高并发下的消息处理
  • 弹性计算网络

代价

Every coin has two sides.

和任何框架一样,有优势必然就有劣势。RP的两个比较大的问题是:

  • 虽然复用线程有助于提高吞吐量,但一旦在某个回调函数中线程被卡住,那么这个线程上所有的请求都会被阻塞,最严重的情况,整个应用会被拖垮。
  • 难以调试。由于RP强大的描述能力,在一个典型的RP应用中,大部分代码都是以链式表达式的形式出现,比如flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出错,你将很难定位到具体是哪个环节出了问题。所幸的是,RP框架一般都会提供一些工具方法来辅助进行调试。

2 Reactor实战

提高吞吐量

    @Test
    public void testImperative() throws InterruptedException {
        _runInParallel(CONCURRENT_SIZE, () -> {
            ImperativeRestaurantRepository.INSTANCE.insert(load);
        });
    }

    private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for (int i = 0; i < nThreads; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }

    @Test
    public void testReactive() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
        for (int i = 0; i < CONCURRENT_SIZE; i++) {
            ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
            }, e -> latch.countDown(), latch::countDown);
        }
        latch.await();
    }

用例解读:

  • 第一个测试用例使用的是多线程+MongoDB Driver,同时起100个线程,每个线程往MongoDB中插入10000条数据,总共100万条数据,平均用时15秒左右。
  • 第二个测试用例使用的是Reactor+MongoDB Reactive Streams Driver,同样是插入100万条数据,平均用时不到10秒,吞吐量提高了50%!

背压

在演示测试用例之前,先看两张图,帮助你更形象的理解什么是背压。

两张图乍一看没啥区别,但其实是完全两种不同的背压策略。第一张图,发布速度(100/s)远大于订阅速度(1/s),但由于背压的关系,发布者严格按照订阅者的请求数量发送数据。第二张图,发布速度(1/s)小于订阅速度(100/s),当订阅者请求100个数据时,发布者会积满所需个数的数据再开始发送。可以看到,通过背压机制,发布者可以根据各个订阅者的能力动态调整发布速度。

    @BeforeEach
    public void beforeEach() {
        // initialize publisher
        AtomicInteger count = new AtomicInteger();
        timerPublisher = Flux.create(s ->
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        s.next(count.getAndIncrement());
                        if (count.get() == 10) {
                            
                        }
                    }
                }, 100, 100)
        );
    }

    @Test
    public void testNormal() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        timerPublisher
                .subscribe(r -> System.out.println("Continuous consuming " + r),
                        e -> latch.countDown(),
                        latch::countDown);
        latch.await();
    }

    @Test
    public void testBackpressure() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
        Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                timerSubscription.set(subscription);
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("consuming " + value);
            }

            @Override
            protected void hookOnComplete() {
                latch.countDown();
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                latch.countDown();
            }
        };
        timerPublisher.onBackpressureDrop().subscribe(subscriber);
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                timerSubscription.get().request(1);
            }
        }, 100, 200);
        latch.await();
    }

用例解读:

  • 第一个测试用例演示了在理想情况下,即订阅者的处理速度能够跟上发布者的发布速度(以100ms为间隔产生10个数字),控制台从0打印到9,一共10个数字,和发布端一致。
  • 第二个测试用例故意调慢了订阅者的处理速度(每200ms处理一个数字),同时发布者采用了Drop的背压策略,结果控制台只打印了一半的数字(0,2,4,6,8),另外一半的数字由于背压的原因被发布者Drop掉了,并没有发给订阅者。

3 小结

通过上面的介绍,不难看出RP实际上是一种内置了发布者订阅者模型的异步编程框架,包含了线程复用,背压等高级特性,特别适用于高并发、有延迟的场景。

4 参考

显示全文