南宁住房和城乡建设部网站,网站开发工程师薪资待遇,新手做网站看什么书,软件开发公司网络安全管理制度注意#xff1a; 本文内容于 2024-12-28 21:22:12 创建#xff0c;可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容#xff0c;请访问原文地址#xff1a;ReactiveStreams、Reactor、SpringWebFlux。感谢您的关注与支持#xff01;
ReactiveStreams是…注意 本文内容于 2024-12-28 21:22:12 创建可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容请访问原文地址ReactiveStreams、Reactor、SpringWebFlux。感谢您的关注与支持
ReactiveStreams是一个处理异步流的规范定义了Publisher、Subscriber、Subscription、Processor接口。
Reactor是ReactiveStreams的实现对于Publisher提供了两个核心实现——Mono与Flux。
SpringWebFlux是构建在Reactor之上的响应式Web框架。
本文源码
一、Reactive Streams
Reactive Streams 是一个用于处理异步流数据的标准规范特别适合处理非阻塞、背压控制的场景。 所谓的背压控制是指在异步数据流中消费者根据自身的能力向生产者获取数据进行消费以避免数据积压导致系统过载或者崩溃。 TCP中的拥塞控制也可以看作是背压控制的一种实现。 1.1 API规范
Reactive Streams 的四大API接口如下
org.reactivestreams.Publisher: 发布者接口提供数据流。 void subscribe(Subscriber? super T subscriber) org.reactivestreams.Subscriber: 订阅者接口接收数据流。 void onSubscribe(Subscription subscription)void onNext(T item)void onError(Throwable throwable)void onComplete() org.reactivestreams.Subscription: 订阅关系接口提供控制机制。 void request(long n)void cancel() org.reactivestreams.Processor: 继承Publisher和Subscriber的接口。
简单绘制一个时序图加深对整个链路的理解。 使用Publisher、Subscriber、Subscription实现一个简单的订阅功能示例如下 以下代码并没有异步相关的内容。只是为了学习整个API流转链路。 import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;public class Example01 {private static final Logger log LoggerFactory.getLogger(Example01.class);/*** 订阅关系*/public static Subscription getSubscription(Subscriber? super String subscriber, String... items) {return new Subscription() {private final AtomicBoolean canceled new AtomicBoolean(false);private final AtomicInteger sendItems new AtomicInteger(0);/*** request数据* 内部onNext会request后面的数据而onComplete应该要等所有的数据消费完毕后才会执行。* 故需要加锁保证线程安全此处采取CAS。源码参考reactor.core.publisher.Operators.ScalarSubscription#request(long)*/Overridepublic void request(long n) {if (n 0) {if (canceled.get()) {return;}if (sendItems.get() items.length) {subscriber.onComplete();} else {subscriber.onNext(items[sendItems.getAndIncrement()]);}}}Overridepublic void cancel() {canceled.compareAndSet(true, true);}};}/*** 发布者*/private static PublisherString getPublisher(String... items) {return new PublisherString() {Overridepublic void subscribe(Subscriber? super String subscriber) {subscriber.onSubscribe(getSubscription(subscriber, items));}};}/*** 订阅者*/private static SubscriberString getSubscriber() {return new SubscriberString() {private Subscription subscription;Overridepublic void onSubscribe(Subscription s) {this.subscription s;log.info(Subscribed to {}, s);// 请求第一个元素subscription.request(1);}Overridepublic void onNext(String s) {log.info(Received {}, s);// 请求下一个元素subscription.request(1);}Overridepublic void onError(Throwable t) {log.error(Error occurred, t);}Overridepublic void onComplete() {log.info(All items received);}};}public static void main(String[] args) {// 订阅Flux// Flux.just(first, second, third).delayElements(Duration.ofSeconds(2))// .subscribe(getSubscriber());/*** org.reactivestreams.Publisher: 发布者* org.reactivestreams.Subscriber: 订阅者* org.reactivestreams.Subscription: 发布者和订阅者之间的桥梁数据流控制的核心机制。*/// 订阅自定义PublishergetPublisher(first, second, third, fourth, fifth).subscribe(getSubscriber());while (true) {}}
}运行结果 1.2 API实现库
Reactive Streams实现如下
Java9 java.util.concurrent.FlowRxJava: Reactive Extension JavaReactor: Reactor Framework
Java9中提供了java.util.concurrent.Flow在标准库中提供ReactiveStreams规范的接口。 ReactiveStreams内部也提供了适配JDK中Flow的适配器org.reactivestreams.FlowAdapters。
RxJava以及Reactor分别用于Java开发中不同领域。RxJava一般用于Android开发Reactor一般用于Spring开发。
二、Reactor
Reactor提供了两个核心类
reactor.core.publisher.Flux发布0或N个元素的异步数据流reactor.core.publisher.Mono发布0或1个元素的异步数据流
这两者都是Publisher主要区别在于发送数据的数量。因此在使用上相关的API都是差不多的。
2.1 Mono
Mono中的静态方法用于创建Mono实例。
Mono实例中的成员方法如下
方法名说明and合并多个 Mono 实例等所有 Mono 完成后返回一个新的 Mono。as用指定的类型包裹当前 Mono通常用于类型转换。block阻塞并获取 Mono 的结果直到执行完成。blockOptional类似于 block但返回 Optional 包裹的结果。cache缓存当前 Mono 的值使得未来的订阅者可以共享相同的结果。cacheInvalidateIf缓存失效条件满足时重新缓存适用于动态失效策略。cacheInvalidateWhen在指定条件下使缓存失效。cancelOn当给定的 Publisher 发出信号时取消当前 Mono。cast强制类型转换为指定的类型。checkpoint在流的执行过程中插入检查点用于调试。concatWith与另一个 Mono 或 Flux 连接按顺序执行。contextWrite修改 Mono 的上下文。defaultIfEmpty如果 Mono 为空返回默认值。delayElement延迟发出元素的时间。delaySubscription延迟订阅等到指定的时间或事件发生才开始订阅。delayUntil延迟直到指定的 Publisher 发出信号时才开始执行。dematerialize将一个包含 Signal 的 Mono 转换为原始值的 Mono。doAfterSuccessOrError在执行成功或出错后执行的操作。doAfterTerminate在 Mono 结束时执行的操作不论成功或失败。doFinally在 Mono 完成时执行的最终操作。doFirst在 Mono 执行前执行的操作。doOnCancel当订阅者取消时执行的操作。doOnDiscard当元素被丢弃时执行的操作。doOnEach对每个发出的信号执行操作。doOnError当发生错误时执行的操作。doOnNext每次元素发出时执行的操作。doOnRequest在请求信号到达时执行的操作。doOnSubscribe在订阅时执行的操作。doOnSuccess当成功完成时执行的操作。doOnSuccessOrError无论成功还是失败都执行的操作。doOnTerminate在终止时执行的操作。elapsed返回每个信号的时间戳。expand展开 Mono生成新的 Mono直到满足某个条件。expandDeep深度展开 Mono通常递归调用直到满足条件。filter过滤元素只有符合条件的元素才会发出。filterWhen使用 Publisher 的元素条件来过滤当前 Mono。flatMap转换元素返回新的 Mono 或 Flux。flatMapIterable将每个元素转换为一个可迭代的元素。flatMapMany将元素转换为 Flux。flux将 Mono 转换为 Flux。handle基于元素的条件来决定如何处理流。hasElement判断是否包含元素。hide隐藏 Mono 的实现细节返回一个不可观察的 Mono。ignoreElement忽略元素只关心是否完成。log记录 Mono 中的信号便于调试。map将元素映射为另一个元素。mapNotNull映射并排除空值。materialize将信号转化为一个 Signal 对象。mergeWith合并当前 Mono 和另一个 Mono。metrics获取流的度量信息。name为 Mono 设置名称用于调试和监控。ofType根据类型过滤信号。onErrorContinue在发生错误时继续执行。onErrorMap将错误映射为其他类型。onErrorResume在发生错误时恢复操作。onErrorReturn在发生错误时返回默认值。onErrorStop在发生错误时终止流。onTerminateDetach在终止时解除与订阅者的连接。or连接另一个 Mono如果当前 Mono 没有值或为空时执行。publish启动 Mono 并返回一个共享的流。publishOn指定在哪个线程调度上下文中执行 Mono。repeat重复执行 Mono直到满足某个条件。repeatWhen基于另一个 Publisher 的信号来控制重复。repeatWhenEmpty当 Mono 为空时重复执行。retry在发生错误时重试操作。retryWhen基于另一个 Publisher 来控制重试。share共享执行的结果避免重复执行。single获取 Mono 中唯一的元素。subscribe启动流的执行并订阅。subscribeOn指定在哪个线程调度上下文中订阅 Mono。subscribeWith通过指定的 Subscriber 订阅 Mono。subscriberContext获取或修改订阅时的上下文。switchIfEmpty如果 Mono 为空则切换到另一个 Mono。tag为 Mono 打上标签用于调试和日志。take限制只获取前 N 个元素。takeUntilOther当另一个 Publisher 发出信号时停止当前 Mono。then在当前 Mono 执行完后执行另一个操作。thenEmpty在当前 Mono 执行完后返回一个空的 Mono。thenMany在当前 Mono 执行完后返回一个 Flux。thenReturn在当前 Mono 执行完后返回指定的值。timed返回元素和其时间戳。timeout如果 Mono 在指定时间内没有发出信号则触发超时。timestamp返回元素及其时间戳。toFuture将 Mono 转换为 Future。toProcessor将 Mono 转换为 Processor适用于与 Flux 的结合。toString返回 Mono 的字符串表示。transform使用转换函数修改 Mono。transformDeferred延迟转换直到订阅发生。transformDeferredContextual延迟转换并访问上下文。zipWhen与另一个 Mono 的信号配对形成 Mono 的组合。zipWith与另一个 Mono 的信号进行合并形成 Mono 的组合。
2.2 Flux
Flux中的静态方法用于创建Flux实例。
Flux实例中的成员方法如下
方法名说明all判断 Flux 中的所有元素是否满足给定条件。any判断 Flux 中是否有任何一个元素满足给定条件。as将 Flux 转换为指定类型的 Publisher。blockFirst阻塞并返回 Flux 中的第一个元素。blockLast阻塞并返回 Flux 中的最后一个元素。buffer将 Flux 中的元素分成固定大小的缓冲区。bufferTimeout按照指定的时间或缓冲区大小将元素分块。bufferUntil在满足某个条件时开始一个新的缓冲区。bufferUntilChanged将相邻相同的元素合并到同一个缓冲区。bufferWhen根据外部 Publisher 切换缓冲区。bufferWhile按照指定条件将元素分组为缓冲区。cache缓存 Flux 的值使得未来的订阅者可以共享相同的结果。cancelOn当另一个 Publisher 发出信号时取消当前的 Flux。cast将 Flux 强制转换为指定的类型。checkpoint在执行流中插入检查点用于调试和分析。collect收集流中的元素按给定规则生成结果。collectList收集 Flux 中的所有元素并返回一个 List。collectMap将 Flux 中的元素收集为一个 Map。collectMultimap将 Flux 中的元素收集为一个多值 Map。collectSortedList将 Flux 中的元素收集为排序的 List。concatMap将元素转换为 Mono按顺序处理。concatMapDelayError与 concatMap 类似但在错误发生时延迟处理。concatMapIterable将每个元素转换为可迭代的元素并按顺序合并。concatWith与另一个 Flux 连接按顺序执行。concatWithValues连接多个值作为新的 Flux。contextWrite修改 Flux 的上下文。count统计 Flux 中元素的数量。defaultIfEmpty如果 Flux 为空则返回默认值。delayElements延迟元素的发出。delaySequence延迟整个序列的发出。delaySubscription延迟订阅直到指定的时间或事件发生。delayUntil延迟直到另一个 Publisher 发出信号。dematerialize将一个包含 Signal 的 Flux 转换为原始元素的 Flux。distinct过滤掉重复的元素保持唯一性。distinctUntilChanged过滤掉相邻重复的元素。doAfterTerminate在 Flux 完成后执行的操作。doFinally在 Flux 终止时执行的操作。doFirst在 Flux 执行前执行的操作。doOnCancel在 Flux 被取消时执行的操作。doOnComplete在 Flux 完成时执行的操作。doOnDiscard在元素被丢弃时执行的操作。doOnEach对 Flux 发出的每个元素执行操作。doOnError在发生错误时执行的操作。doOnNext每次 Flux 发出元素时执行的操作。doOnRequest在请求信号到达时执行的操作。doOnSubscribe在订阅时执行的操作。doOnTerminate在 Flux 终止时执行的操作。elapsed获取每个元素的时间戳和持续时间。elementAt获取指定索引处的元素。expand对每个元素进行展开生成新的元素流。expandDeep深度展开 Flux通常递归展开元素。filter过滤出符合条件的元素。filterWhen使用外部 Publisher 的信号过滤 Flux 中的元素。flatMap将元素转换为 Flux并合并其发出的所有元素。flatMapDelayError在发生错误时延迟元素的转换。flatMapIterable将元素转换为可迭代的 Flux。flatMapSequential顺序地将元素转换为 Flux。flatMapSequentialDelayError顺序转换并在发生错误时延迟。getPrefetch获取 Flux 的预取量。groupBy将元素按指定的键分组。groupJoin类似 groupBy但用于联接多个流。handle根据元素的条件进行流的处理。hasElement判断 Flux 中是否包含某个元素。hasElements判断 Flux 中是否包含多个元素。hide隐藏 Flux 的实现细节返回不可观察的流。ignoreElements忽略 Flux 中的所有元素只关心终止信号。index返回元素在流中的索引。join将多个 Flux 中的元素合并为一个字符串。last获取 Flux 中的最后一个元素。limitRate限制从流中请求的元素数量。limitRequest限制从流中请求的最大元素数量。log记录流中的元素用于调试。map将元素映射为新的类型。mapNotNull映射并排除空值。materialize将信号转换为 Signal 对象。mergeComparingWith将两个 Flux 合并并根据比较条件排序。mergeOrderedWith将两个有序的 Flux 合并。mergeWith合并当前 Flux 和另一个 Flux。metrics获取流的度量信息。name为 Flux 设置名称便于调试。next获取 Flux 中的下一个元素。ofType根据类型过滤信号。onBackpressureBuffer在背压时缓存元素。onBackpressureDrop在背压时丢弃元素。onBackpressureError在背压时触发错误。onBackpressureLatest在背压时保留最新的元素。onErrorContinue在发生错误时继续执行。onErrorMap在错误时将其映射为其他类型。onErrorResume在错误时恢复操作。onErrorReturn在错误时返回默认值。onErrorStop在错误时终止流。onTerminateDetach在终止时分离与订阅者的连接。or连接另一个 Flux如果当前 Flux 为空时执行。parallel将 Flux 分发到多个线程进行并行处理。publish启动 Flux 并返回一个共享流。publishNext在流的每个元素发出时开始新的发布。publishOn指定在哪个线程调度上下文中执行流。reduce将流中的所有元素合并为单一值。reduceWith使用指定初始值对元素进行合并。repeat重复执行 Flux 直到满足某个条件。repeatWhen基于另一个 Publisher 的信号来控制重复。replay缓存并重播流中的元素。retry在发生错误时重试操作。retryWhen基于另一个 Publisher 来控制重试。sample每隔指定时间间隔取一个元素。sampleFirst获取流中的第一个元素。sampleTimeout超过指定时间间隔时触发超时操作。scan对流中的元素执行累加操作。scanWith使用给定的初始值对元素执行累加操作。share共享流的执行避免重复执行。shareNext将下一个发出的元素共享给多个订阅者。single获取 Flux 中唯一的元素。singleOrEmpty获取 Flux 中唯一的元素如果为空返回空。skip跳过流中的前 N 个元素。skipLast跳过流中的最后 N 个元素。skipUntil跳过直到满足某个条件的元素。skipUntilOther跳过直到另一个 Flux 发出信号时的元素。skipWhile跳过直到满足条件的元素。sort对流中的元素进行排序。startWith在流的开始处添加额外元素。subscribe订阅并启动 Flux。subscribeOn指定在哪个线程调度上下文中订阅流。subscribeWith通过指定的 Subscriber 订阅流。subscriberContext获取或修改订阅时的上下文。switchIfEmpty如果 Flux 为空则切换到另一个 Flux。switchMap将元素转换为另一个 Flux 并切换执行。switchOnFirst在流开始时选择一个 Flux 进行切换。tag为 Flux 打标签便于调试和日志。take限制只获取前 N 个元素。takeLast获取流中的最后 N 个元素。takeUntil获取直到满足条件为止的元素。takeUntilOther获取直到另一个 Flux 发出信号时的元素。takeWhile获取满足条件的元素直到条件不满足为止。then在当前流完成后执行另一个操作。thenEmpty在当前流完成后返回一个空流。thenMany在当前流完成后返回另一个 Flux。timed返回每个元素的时间戳和持续时间。timeout如果 Flux 在指定时间
三、SpringWebFlux
3.1 WebHandler与WebFilter
在SpringMVC中有Servlet、Filter。
在SpringWebFlux中有WebHandler、WebFilter对标的其实就是Servlet API中的Servlet、Filter。甚至执行链也是相似的设计。 Servlet相关知识阅读Servlet - 言成言成啊 Filter相关知识阅读Filter和Listener - 言成言成啊 WebFilter的注册如下
Bean
Order(0) // 值越小优先级越高
ConditionalOnProperty(name allowAllCors.learnFilter, havingValue true)
public WebFilter aFilter() {/*** 在servlet中。请求的扭转是 aFilter--bFilter--servlet--bFilter--aFilter* 在webflux中同理。Filter对应WebFilterServlet对应WebHandler*/return (exchange, chain) - {log.info(aFilter start);return chain.filter(exchange).doOnSuccess(t - log.info(aFilter end));};
}3.2 实际案例
跨域配置
Bean
Order(Integer.MIN_VALUE)
ConditionalOnProperty(name allowAllCors.personal, havingValue true)
public WebFilter personalCorsFilter(WebSocketHandlerAdapter webFluxWebSocketHandlerAdapter) {WebFilter webFilter (exchange, chain) - {ServerHttpRequest request exchange.getRequest();ServerHttpResponse response exchange.getResponse();HttpHeaders headers response.getHeaders();//用*会导致范围过大浏览器出于安全考虑在allowCredentials为true时会不认*这个操作因此可以使用如下代码间接实现允许跨域headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, request.getHeaders().getFirst(origin));headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, *);headers.set(HttpHeaders.ACCESS_CONTROL_ALLOW_METHODS, *);//允许跨域发送cookieheaders.set(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, true);if (OPTIONS.equalsIgnoreCase(request.getMethod().name())) {response.setStatusCode(HttpStatus.OK);return Mono.empty();} else {return chain.filter(exchange);}};log.info(allowAllCors.personal is set to true);return webFilter;}全局异常拦截/定义响应格式
首先定义通用响应格式
import lombok.Data;
import reactor.core.publisher.Mono;Data
public class RespT {private int code;private String msg;private T data;public static T RespT ok(T t) {RespT resp new Resp();resp.setCode(0);resp.setMsg(成功);resp.setData(t);return resp;}public static RespVoid failure(String msg) {RespVoid resp new Resp();resp.setCode(1);resp.setMsg(失败: msg);return resp;}public static RespVoid error() {RespVoid resp new Resp();resp.setCode(500);resp.setMsg(服务器内部错误);return resp;}public static T MonoRespT getSuccessResp(MonoT mono) {return mono.map(Resp::ok);}public static MonoRespVoid getFailureResp(String msg) {return Mono.just(failure(msg));}public static MonoRespVoid getErrorResp() {return Mono.just(error());}
}其次定义自定义异常DIYException。
最后配置全局异常拦截。
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import reactor.core.publisher.Mono;
import top.meethigher.utils.Resp;RestControllerAdvice
Slf4j
public class GlobalExceptionHandler {ExceptionHandler(Exception.class)public MonoRespVoid handleException(Exception e) {log.error(api occurred exception, e);return Resp.getErrorResp();}ExceptionHandler(DIYException.class)public MonoRespVoid handleDiyException(DIYException e) {log.error(api occurred exception, e);return Resp.getFailureResp(e.getMessage());}
}