本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云
前面写了一些 WebFlux 方面的教程,发现很多人搜索到了我的文章,希望我能继续写一些教程。于是便有了本文。
关于 WebFlux 前面我已经反复强调过,它是一种新的编程趋势,未来会越来越流行。所以,很有必要认真的学习学习!
Reactive Streams 是一个标准或者说是规范,由 Netflix、TypeSafe、Pivotal 等公司发起的。Reactor 是 Spring 中的一个子项目,一个基于 java 的响应式编程框架,该框架实现了 Reactive Programming(反应式编程即响应式编程) 思想,符合 Reactive Streams 规范。Mono 和 Flux 是 Reactor 中非常重要的两个类。
Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。
Mono和Flux都是Publisher(发布者)。
其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 Mono 和 Flux。对于 Subscriber 和 Subcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。
比如,在 Web Flux,你的方法只需返回 Mono 或 Flux 即可。你的代码基本也只和 Mono 或 Flux 打交道。而 Web Flux 则会实现 Subscriber ,onNext 时将业务开发人员编写的 Mono 或 Flux 转换为 HTTP Response 返回给客户端。
Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者(Publisher)。
根据上面的介绍,可以得出 Mono 相当于 Optional,或和 Optional 的作用类似。
@RestController public class MonoController { @RequestMapping("/hello") public Optional<String> hello(@RequestParam(required = false) String str){ if(StringUtils.isEmpty(str)){ return Optional.empty(); } return Optional.of("业余草:www.xttblog.com"); } @RequestMapping("/mono") public Mono<String> mono(@RequestParam(required = false) String str){ if(StringUtils.isEmpty(str)){ return Mono.empty(); } return Mono.just("业余草:www.xttblog.com"); } }
反应式编程框架主要采用了观察者模式,而 Spring Reactor的核心则是对观察者模式的一种衍伸。关于观察者模式的架构中被观察者(Observable)和观察者(Subscriber)处在不同的线程环境中时,由于者各自的工作量不一样,导致它们产生事件和处理事件的速度不一样,这时就出现了两种情况:
- 被观察者产生事件慢一些,观察者处理事件很快。那么观察者就会等着被观察者发送事件好比观察者在等米下锅,程序等待)。
- 被观察者产生事件的速度很快,而观察者处理很慢。那就出问题了,如果不作处理的话,事件会堆积起来,最终挤爆你的内存,导致程序崩溃。(好比被观察者生产的大米没人吃,堆积最后就会烂掉)。为了方便下面理解Mono和Flux,也可以理解为Publisher(发布者也可以理解为被观察者)主动推送数据给Subscriber(订阅者也可以叫观察者),如果Publisher发布消息太快,超过了Subscriber的处理速度,如何处理。这时就出现了Backpressure(背压—–指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略)
Mono 实现了 Publisher 接口,但是通过查看源码,发现它是一个抽象类。Mono 里面有很多 API,关于这些 API 的解释如下:
- empty():创建一个不包含任何元素,只发布结束消息的序列。
- just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。
- justOrEmpty():从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
- error(Throwable error):创建一个只包含错误消息的序列。
- never():创建一个不包含任何消息通知的序列。
- fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
- delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
- create():通过 create()方法来使用 MonoSink 来创建 Mono。
API 使用案例如下所示。
//empty():创建一个不包含任何元素,只发布结束消息的序列 Mono.empty().subscribe(System.out::println); //just():可以指定序列中包含的全部元素。创建出来的 Mono序列在发布这些元素之后会自动结束。 Mono.just("www.xttblog.com").subscribe(System.out::println); //ustOrEmpty():从一个 Optional 对象或可能为 null 的对象中创建 Mono。 //只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。 Mono.justOrEmpty(null).subscribe(System.out::println); Mono.justOrEmpty("业余草").subscribe(System.out::println); Mono.justOrEmpty(Optional.of("业余草")).subscribe(System.out::println); //error(Throwable error):创建一个只包含错误消息的序列。 Mono.error(new RuntimeException("error")).subscribe(System.out::println, System.err::println); //never():创建一个不包含任何消息通知的序列。 Mono.never().subscribe(System.out::println); //通过 create()方法来使用 MonoSink 来创建 Mono。 Mono.create(sink -> sink.success("业余草")).subscribe(System.out::println); //通过fromRunnable创建,并实现异常处理 Mono.fromRunnable(() -> { System.out.println("thread run"); throw new RuntimeException("thread run error"); }).subscribe(System.out::println, System.err::println); //通过fromCallable创建 Mono.fromCallable(() -> "callable run ").subscribe(System.out::println); //通过fromSupplier创建 Mono.fromSupplier(() -> "create from supplier").subscribe(System.out::println); //delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。 long start = System.currentTimeMillis(); Disposable disposable = Mono.delay(Duration.ofSeconds(2)).subscribe(n -> { System.out.println("生产数据源:"+ n); System.out.println("当前线程ID:"+ Thread.currentThread().getId() + ",生产到消费耗时:"+ (System.currentTimeMillis() - start)); }); System.out.println("主线程"+ Thread.currentThread().getId() + "耗时:"+ (System.currentTimeMillis() - start)); while(!disposable.isDisposed()) { }
本文主要是介绍了一下 Mono 类中相关 API 的用法,关于 Flux 类,我们下章继续。
最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!
本文原文出处:业余草: » WebFlux 之 Mono API 教程