引子

在读 Hystrix 源码时,发现一些奇特的写法。稍作搜索,知道使用了最新流行的响应式编程库RxJava。那么响应式编程究竟是怎样的呢? 本文对响应式编程及 RxJava 库作一个初步的探索。

在学习新的编程模型时,我喜欢将其与原来的编程模型联系起来。因为新的编程模型往往是对原来编程模型的承袭和组合。响应式编程的两个基本要素是:

  • 基于观察者模式的事件驱动机制。
  • 函数式编程:通过装饰与组合,让响应式编程的处理更流畅灵活;

函数式编程,在之前的文章 “完全”函数式编程”“Java8函数式编程探秘”“精练代码:一次Java函数式编程的重构之旅” 等有较多探索,观察者模式在 “设计模式之观察者模式:实现配置更新实时推送” 有讲述过。我们将在这两者的基础上探索响应式编程。

基础

初次接触 RxJava ,很容易被一连串的 Observer, Observable, Disposable, subscribeOn, onSubscribe, onNext, onError, onComplete 等绕晕。不过软件里面无新鲜事。大多无非是用一种新的方式来组织逻辑罢了。基于观察者模式的事件驱动也不例外。我们只要梳理清楚脉络,就可以容易地理解。观察者有三个基本参与者:

  • 被观察者:Observable ;
  • 发射装置:Emitter;
  • 观察者: Observer。

基本流程是:被观察者 Observable 装备发射装置 Emitter,发射消息,创建事件;观察者 Observer 监听到事件,接收观察者发射的消息,调用对应的函数 onNext, onError 和 onComplete 进行处理。onError 和 OnComplete 只能有一个被触发。

不妨写个基本 Demo 来模拟下基本流程。为了更好滴理解,我把三者都区分开了。

Demo

首先定义观察者 MyObserver,继承抽象类 DefaultObserver ,这样实现成本最小。


package zzz.study.reactor; import com.alibaba.fastjson.JSON;
import io.reactivex.observers.DefaultObserver; /**
* @Description 观察者定义
* @Date 2021/1/23 4:13 下午
* @Created by qinshu
*/
public class MyObserver extends DefaultObserver { @Override
public void onStart() {
System.out.println("MyObserver: Start");
} @Override
public void onNext(Object o) {
System.out.println("Observed: " + JSON.toJSONString(o));
} @Override
public void onError(Throwable e) {
System.out.println("Observed: " + e.getMessage());
} @Override
public void onComplete() {
System.out.println("MyObserver: Complete");
}
}

接着,定义发射装置(发射消息) MyEmitter:

package zzz.study.reactor;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe; import java.util.Random;
import java.util.concurrent.TimeUnit; /**
* @Description 发射装置
* @Date 2021/1/24 7:04 上午
* @Created by qinshu
*/
public class MyEmitter implements ObservableOnSubscribe { Random random = new Random(System.currentTimeMillis()); @Override
public void subscribe(ObservableEmitter emitter) throws Exception {
TimeUnit.SECONDS.sleep(1);
emitter.onNext("next");
if (random.nextInt(3) == 0) {
emitter.onError(new RuntimeException("A RuntimeException"));
}
else {
emitter.onComplete();
}
}
}

最后,创建被观察者,并串起流程:

package zzz.study.reactor;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer; /**
* @Description RxJava基本Demo
* @Date 2021/1/23 12:28 下午
* @Created by qinshu
*/
public class RxJavaBasic { public static void main(String[] args) {
for (int i=0; i<5; i++) {
ObservableOnSubscribe observableOnSubscribe = new MyEmitter();
Observable observable = Observable.create(observableOnSubscribe);
Observer observer = new MyObserver();
observable.subscribe(observer);
}
}
}

运行,可得结果:

MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
Observed: A RuntimeException
MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
MyObserver: Complete

讲解

如何理解上述流程及结果呢?最好的办法就是单步调试。经过单步调试,可以知道整个过程如下:

步骤1: 整个过程由这一行触发 observable.subscribe(observer); ,会去调用 Observable.subscribeActual 方法,分派给具体实现类 ObservableCreate.subscribeActual ;单步调试的好处就是能确定具体实现者;

步骤2: ObservableCreate.subscribeActual 所做的事情,调用 observer.onSubscribe ( MyObserver.onStart 方法 ),然后转发给 MyEmitter.subscribe 来发射消息。

 @Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

步骤3:MyEmitter 执行 onNext ,分派给具体实现类 CreateEmitter.onNext ,进而调用 observer.onNext 方法;

步骤4:MyEmitter 执行 onError ,分派给具体实现类 CreateEmitter.onError ,进而 调用 observer.onError 方法;如果 MyEmitter 发射 onComplete ,那么就会分派给具体实现类 CreateEmitter.onComplete ,进而调用 observer.onComplete 方法。注意,onError 和 onComplete 两者只可能执行一个。

基本流程就是这样。

引申

Disposable

除了订阅自定义 Emitter 来发射消息,类 Observable 还提供了各种工具方法,更便捷滴做订阅和推送。比如:

public static void testDirectSubscribe() {
Observable.fromArray("I", "Have", "a", "dream").subscribe(new MyObserver());
}

会输出:

MyObserver: Start
Observed: "I"
Observed: "Have"
Observed: "a"
Observed: "dream"
MyObserver: Complete

具体实现是: fromArray 方法会创建一个 Observable 的具体类 ObservableFromArray,而这个类的 subscribeActual 方法会创建一个 FromArrayDisposable 来处理。FromArrayDisposable 的 run 方法被调用,依次遍历所指定列表,调用 observer.onNext ,最后调用 observer.onComplete。具体源码如下:

public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
} @Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
} static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
} // other methods @Override
public void dispose() {
disposed = true;
} @Override
public boolean isDisposed() {
return disposed;
} void run() {
T[] a = array;
int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}

那么 Disposable 的意义何在呢 ? 我的理解是:它作为订阅完成的一个流程闭环。比如重复订阅同一个观察者,如下代码:

 public static void testDirectSubscribe() {
Observer observer = new MyObserver();
Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
Observable.fromArray("changed").subscribe(observer);
}

会抛出异常:

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) zzz.study.reactor.MyObserver multiple times. Please create a fresh instance of zzz.study.reactor.MyObserver and subscribe that to the target source instead.
at io.reactivex.internal.util.EndConsumerHelper.reportDoubleSubscription(EndConsumerHelper.java:148)
at io.reactivex.internal.util.EndConsumerHelper.validate(EndConsumerHelper.java:57)
at io.reactivex.observers.DefaultObserver.onSubscribe(DefaultObserver.java:70)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:34)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at zzz.study.reactor.RxJavaBasic.testDirectSubscribe(RxJavaBasic.java:34)
at zzz.study.reactor.RxJavaBasic.main(RxJavaBasic.java:17)

这个异常是在调用 DefaultObserver.onSubscribe 抛出的:

 @Override
public final void onSubscribe(@NonNull Disposable d) {
if (EndConsumerHelper.validate(this.upstream, d, getClass())) {
this.upstream = d;
onStart();
}
} public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) {
ObjectHelper.requireNonNull(next, "next is null");
if (upstream != null) {
next.dispose();
if (upstream != DisposableHelper.DISPOSED) {
reportDoubleSubscription(observer);
}
return false;
}
return true;
}

这就是说,如果同一个观察者,它的上一个 Disposable 订阅没有结束,那么再次订阅 Disposable 就会出错。怎么解决呢?可以在 MyObserver 的 onError 和 onComplete 添加 super.cancel 调用,可以结束上一次的订阅,再次订阅就不抛出异常了:

 @Override
public void onError(Throwable e) {
System.out.println("Observed: " + e.getMessage());
super.cancel();
} @Override
public void onComplete() {
System.out.println("MyObserver: Complete");
super.cancel();
} /**
* Cancels the upstream's disposable.
*/
protected final void cancel() {
Disposable upstream = this.upstream;
this.upstream = DisposableHelper.DISPOSED;
upstream.dispose();
}

但是,即便这样,也无法发射我们新的订阅消息。这是因为上一次的 upstream 不为 null,本次的订阅就无法发射。

我们没法覆写 DefaultObserver.onSubscribe 方法,因为该方法声明为 final 的,且 upstream 声明为 private ,也没有公共方法可以设置 upstream。这明确表明了设计者的意图:这是 Observer 订阅 Disposable 的前置检测约定,不可被破坏,否则后果自负。

我们可以绕过 DefaultObserver , 不继承它,而是直接实现 Observer 接口:


public static void testDirectSubscribe() {
Observer observer = new RepeatedSubscribeMyObserver();
Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
Observable.fromArray("changed").subscribe(observer);
} /**
* @Description 可重复订阅的观察者
* @Date 2021/1/24 10:11 上午
* @Created by qinshu
*/
public class RepeatedSubscribeMyObserver<T> implements Observer<T> { public Disposable upstream; @Override
public void onSubscribe(@NonNull Disposable d){
System.out.println(getName() + ": Start");
this.upstream = d;
} @Override
public void onNext(T o) {
System.out.println(getName() + ": " + JSON.toJSONString(o));
} @Override
public void onError(Throwable e) {
System.out.println(getName() + "RepeatedSubscribeMyObserver: " + e.getMessage());
cancel();
} @Override
public void onComplete() {
System.out.println(getName() + "RepeatedSubscribeMyObserver: Complete");
cancel();
} public String getName() {
return this.getClass().getSimpleName();
} /**
* Cancels the upstream's disposable.
*/
protected final void cancel() {
Disposable upstream = this.upstream;
this.upstream = DisposableHelper.DISPOSED;
upstream.dispose();
}
}

这样就可以实现多次订阅同一个 Observer 了。运行结果:

RepeatedSubscribeMyObserver: Start
RepeatedSubscribeMyObserver: "I"
RepeatedSubscribeMyObserver: "Have"
RepeatedSubscribeMyObserver: "a"
RepeatedSubscribeMyObserver: "dream"
RepeatedSubscribeMyObserver: Complete
RepeatedSubscribeMyObserver: Start
RepeatedSubscribeMyObserver: "changed"
RepeatedSubscribeMyObserver: Complete

弄懂了 Observable.fromArray 的实现原理,就弄清楚了 Observable 中很多基本方法的基本套路。比如 just 方法有两个及以上参数时,其实是 fromArray 的包装,而 range 方法则是创建一个 RangeDisposable 来处理。

Observable.just(1,2,3).subscribe(observer);
Observable.range(1,4).subscribe(observer);

组合

上文谈到了响应式编程的一大基本元素是函数式编程。函数式的优势是可以无限叠加组合,构建出灵活多变的函数和行为。这使得观察者的行为也可以定制得更加灵活。可以组合多个 Observable 的发射行为。

合并

简单的组合使用 merge 方法,构造一个 Observable 的列表,依次遍历合并后的每个 Observable 的发射信息:

Iterable<? extends ObservableSource<? extends Integer>> observableSourceSet = Sets.newHashSet(
Observable.fromArray(3,4,5),
Observable.range(10,3)
);
Observable.merge(observableSourceSet).subscribe(observer);

流式

Observable 可以通过 Stream 进行组合,这里就是函数式编程的用武之地了。如下代码所示:

Observable.range(1,10).filter(x -> x%2 ==0).subscribe(observer);

注意到,这里使用到了装饰器模式。filter 方法会创建一个 ObservableFilter 对象,而在这个对象里,subscribeActual 方法会创建一个 FilterObserver 将传入的 observer 装饰起来。downstream 即是传入的 observer。


@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
} public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
} @Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate)); // FilterObserver 装饰了传入的自定义的 observer
} static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter; FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
} @Override
public void onNext(T t) { // 这里对传入的 Observer.onNext 做了个装饰,仅当条件成立时才调用
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
downstream.onNext(t); // downstream 即是我们传入的自定义的 Observer
}
} else {
downstream.onNext(null);
}
}
}

正如 filter 对发射数据流进行过滤,map 或 flatMap 则对发射数据流进行映射变换,与 stream.map 或 stream.flatMap 的功能类似:

Observable.range(1,10).map(x -> x*x).subscribe(observer);
Observable.range(1,10).flatMap(x -> Observable.just(x*x)).subscribe(observer);

map 方法将创建一个 ObservableMap 对象,在 subscribeActual 中用 MapObserver 将所传入的 observer 装饰起来;flatMap 将创建一个 ObservableFlatMap 对象,在 subscribeActual 中 MergeObserver 将传入的 observer 装饰起来。

还可以使用 scan:对于生成的每个值,使用累加器 (x,y) -> x*y 生成新的值并发射。

Observable.range(1, 10).scan(1, (x,y) -> x*y).subscribe(observer);

最后再给个分组的示例:

Observable.just(28,520,25,999).groupBy( i -> ( i > 100 ? "old": "new")).subscribe(new GroupedRepeatedSubscribeMyObserver());
/**
* @Description 可重复订阅的分组观察者
* @Date 2021/1/24 10:11 上午
* @Created by qinshu
*/
public class GroupedRepeatedSubscribeMyObserver extends RepeatedSubscribeMyObserver<GroupedObservable> {
@Override
public void onNext(GroupedObservable o) {
o.subscribe(new RepeatedSubscribeMyObserver() {
@Override
public void onNext(Object v) {
String info = String.format("GroupedRepeatedSubscribeMyObserver: [group=%s][value=%s]", o.getKey(), JSON.toJSONString(v));
System.out.println(info);
}
}); } }

groupBy 方法生成的是一个 GroupedObservable ,因此要订阅一个 Observer 的观察者实现。

本文先写到这里。

项目代码见工程: “ALLIN” 的包 zzz.study.reactor 下。需要引入 Maven 依赖:

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.20</version>
</dependency>

小结

本文讲解了响应式编程及 RxJava 库的最基本概念:Observable , Observer 及 Emitter, Disposable ,也讲到了如何组合 Observable 来构建更灵活的消息发射机制。这些基本构成了响应式编程的基本骨架流程。

响应式编程的强大能力构建在事件驱动机制和函数式编程上,里面大量应用了装饰器模式。因此,熟悉这些基本编程思想,对掌握响应式编程模型亦大有裨益。

响应式编程库RxJava初探的更多相关文章

  1. ReactiveCocoa,最受欢迎的iOS函数响应式编程库(2.5版),没有之一!

    简介 项目主页: ReactiveCocoa 实例下载: https://github.com/ios122/ios122 简评: 最受欢迎,最有价值的iOS响应式编程库,没有之一!iOS MVVM模 ...

  2. MobX响应式编程库

    MobX https://mobx.js.org/ https://github.com/mobxjs/mobx MobX is a battle tested library that makes ...

  3. (转)Spring Boot 2 (十):Spring Boot 中的响应式编程和 WebFlux 入门

    http://www.ityouknow.com/springboot/2019/02/12/spring-boot-webflux.html Spring 5.0 中发布了重量级组件 Webflux ...

  4. Spring Boot 2 (十):Spring Boot 中的响应式编程和 WebFlux 入门

    Spring 5.0 中发布了重量级组件 Webflux,拉起了响应式编程的规模使用序幕. WebFlux 使用的场景是异步非阻塞的,使用 Webflux 作为系统解决方案,在大多数场景下可以提高系统 ...

  5. 响应式编程(Reactive Programming)(Rx)介绍

    很明显你是有兴趣学习这种被称作响应式编程的新技术才来看这篇文章的. 学习响应式编程是很困难的一个过程,特别是在缺乏优秀资料的前提下.刚开始学习时,我试过去找一些教程,并找到了为数不多的实用教程,但是它 ...

  6. RxJava——响应式编程

    自从06年开始,Rxandroid公司项目中陆续就开始使用它了,而它的基础是由Rxjava演变过来的,如今它也是越来越被广泛使用在商业项目中了,而做为"专业"的自己还是一直对它一知 ...

  7. RxJava(一):响应式编程与Rx

    一,响应式编程 响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式. 1.1 异步编程 传统的编程方式是顺序执行的,必须在完 ...

  8. iOS开发--Swift RAC响应式编程初探

    时间不是很充足, 先少说点, RAC的好处是响应式编程, 不需要自己去设置代理委托, target, 而是主要以信息流(signal), block为主, 看到这里激动吧, 它可以帮你监听你的事件, ...

  9. RxJava响应式编程,入门的HelloWorld;

    RxJava核心就是异步,它也被称之为响应式编程:最大的优势就是随着程序逻辑变得越来越复杂,它依然能够保持简洁. Rxjava真的是让人又爱又恨,因为它的线程切换和链式调用真的很好用,但是入门却有点难 ...

  10. iOS响应式编程:ReactiveCocoa vs RxSwift 选谁好

    转载: iOS响应式编程:ReactiveCocoa vs RxSwift 选谁好 内容来自stack overflow的一个回答:ReactiveCocoa vs RxSwift – pros an ...

随机推荐

  1. Java学习手记1——集合

    一.什么是集合 集合是对象的集合,就像数组是数的集合.集合是一种容器,可以存放对象(可以是不同类型的对象). 二.集合的优点(为什么要用集合) 当然,在java里,可以使用数组来存放一组类型相同的对象 ...

  2. RedHat下安装OPENCV

    1.解压 unzip opencv-2.4.9.zip 2.进入目录,cmake CMakeLists.txt  生成build文件 3.使用命令 make 编译 4.使用命令 make instal ...

  3. TWaver家族新成员 — Legolas工业自动化设计平台

    对于TWaver可视化家族的成员,大家比较熟悉的是我们的网络拓扑图组件和MONO Design三维建模工具.作为开发工具,这两款产品面向广大的程序猿同志,在界面可视化上为大家省时省力.但是,当项目交付 ...

  4. 深入理解javascript中的立即执行函数(function(){…})()

    投稿:junjie 字体:[增加 减小] 类型:转载 时间:2014-06-12 我要评论 这篇文章主要介绍了深入理解javascript中的立即执行函数,立即执行函数也叫立即调用函数,通常它的写法是 ...

  5. LintCode &quot;4 Sum&quot;

    4 Pointer solution. Key: when moving pointers, we skip duplicated ones. Ref: https://github.com/xbz/ ...

  6. SQL Server压缩日志及数据库文件大小

    请按步骤进行,未进行前面的步骤时,请不要做后面的步骤,以免损坏你的数据库. 一般不建议做第4,6两步,第4步不安全,有可能损坏数据库或丢失数据.第6步如果日志达到上限,则以后的数据库处理会失败,在清理 ...

  7. github创建tag

    最近在使用github给新的jQuery插件:滚动高亮 添加版本库的时候,看到很多github上的项目都有这个标签,可以清晰快速的找到每个不同的版本,非常方便以后查找以及使用.于是我就在继上一次的:将 ...

  8. 简单竖向Tab选项卡

    <!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8&quo ...

  9. visual studio 2015离线版msdn下载和安装

    2014年11月13日,微软发布了Visual Studio 2015 Preview,但是Visual Studio 2015 的msdn该如何安装呢?下面脚本之家就为大家分享一篇visual st ...

  10. Codeforces 474 E. Pillars

    水太...... E. Pillars time limit per test 1 second memory limit per test 256 megabytes input standard ...