Research on the responsive programming library rxjava

Qinshuiyu 2021-01-24 14:53:44
research responsive programming library rxjava


Introduction

reading Hystrix Source code , Found some strange writing . A little search , I know the latest popular responsive programming library is used RxJava. So what is responsive programming like ? In this paper, responsive programming and RxJava The library makes a preliminary exploration .

When learning new programming models , I like to connect it to the original programming model . Because the new programming model is often the inheritance and combination of the original programming model . The two basic elements of responsive programming are :

  • Event driven mechanism based on observer pattern .
  • Functional programming : Through decoration and combination , Make the processing of responsive programming more fluent and flexible ;

Functional programming , Previous articles “ Completely ” Functional programming ”“Java8 The secret of functional programming ”“ Refine the code : once Java The journey of refactoring functional programming ” And so on , The observer pattern is “ Observer mode of design mode : Realize the real-time push of configuration update ” It's been told . We're going to explore responsive programming on the basis of both .

Basics

For the first time contact RxJava , It's easy to get caught up in a series of Observer, Observable, Disposable, subscribeOn, onSubscribe, onNext, onError, onComplete Isokinetic halo . But there's nothing new in the software . Most of them are nothing more than organizing logic in a new way . Observer based event driven is no exception . We just need to sort out the context , It's easy to understand . The observer model has three basic participants :

  • Observed :Observable ;
  • launcher :Emitter;
  • The observer : Observer.

The basic process is : Observed Observable Equip the launcher Emitter, Launch message , Create an event ; The observer Observer Listening for events , Receiving messages from observers , Call the corresponding function onNext, onError and onComplete To deal with .onError and OnComplete Only one can be triggered .

Write a basic Demo To simulate the basic process . In order to better understand , I've separated the three .

Demo

First, define the observer MyObserver, Inherited abstract class DefaultObserver , It's the least cost .


package zzz.study.reactor;
import com.alibaba.fastjson.JSON;
import io.reactivex.observers.DefaultObserver;
/**
* @Description The observer defines
* @Date 2021/1/23 4:13 Afternoon
* @Created by qinshu
*/
public class MyObserver extends DefaultObserver {
@Override
public void onStart() {
System.out.println("MyObserver: Start");
}
@Override
public void onNext(Object o) {
System.out.println("Observed: " + JSON.toJSONString(o));
}
@Override
public void onError(Throwable e) {
System.out.println("Observed: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("MyObserver: Complete");
}
}

next , Define the launcher ( Launch message ) MyEmitter:

package zzz.study.reactor;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* @Description launcher
* @Date 2021/1/24 7:04 In the morning
* @Created by qinshu
*/
public class MyEmitter implements ObservableOnSubscribe {
Random random = new Random(System.currentTimeMillis());
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
TimeUnit.SECONDS.sleep(1);
emitter.onNext("next");
if (random.nextInt(3) == 0) {
emitter.onError(new RuntimeException("A RuntimeException"));
}
else {
emitter.onComplete();
}
}
}

Last , Create the observed , And string together processes :

package zzz.study.reactor;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
/**
* @Description RxJava basic Demo
* @Date 2021/1/23 12:28 Afternoon
* @Created by qinshu
*/
public class RxJavaBasic {
public static void main(String[] args) {
for (int i=0; i<5; i++) {
ObservableOnSubscribe observableOnSubscribe = new MyEmitter();
Observable observable = Observable.create(observableOnSubscribe);
Observer observer = new MyObserver();
observable.subscribe(observer);
}
}
}

function , But it turns out :

MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
Observed: A RuntimeException
MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
MyObserver: Complete

Explain

How to understand the above process and results ? The best way is to debug in one step . After one step debugging , You can see that the whole process is as follows :

step 1: The whole process is triggered by this line observable.subscribe(observer); , Going to call Observable.subscribeActual Method , Assign to a concrete implementation class ObservableCreate.subscribeActual ; The advantage of single step debugging is that it can determine the specific implementer ;

step 2: ObservableCreate.subscribeActual The things that were done , call observer.onSubscribe ( MyObserver.onStart Method ), Then forward it to MyEmitter.subscribe To send messages .

 @Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

step 3:MyEmitter perform onNext , Assign to a concrete implementation class CreateEmitter.onNext , And then call observer.onNext Method ;
step 4:MyEmitter perform onError , Assign to a concrete implementation class CreateEmitter.onError , , in turn, call observer.onError Method ; If MyEmitter launch onComplete , Then it is assigned to the concrete implementation class CreateEmitter.onComplete , And then call observer.onComplete Method . Be careful ,onError and onComplete Only one of the two can be executed .

This is the basic process .

Extension

Disposable

In addition to subscription customization Emitter To send messages , class Observable It also provides a variety of tools and methods , More convenient to subscribe and push . such as :

public static void testDirectSubscribe() {
Observable.fromArray("I", "Have", "a", "dream").subscribe(new MyObserver());
}

Will be output :

MyObserver: Start
Observed: "I"
Observed: "Have"
Observed: "a"
Observed: "dream"
MyObserver: Complete

The concrete realization is : fromArray Method creates a Observable The concrete class ObservableFromArray, And this kind of subscribeActual Method creates a FromArrayDisposable To deal with it .FromArrayDisposable Of run Method is called , Traverse the specified list in turn , call observer.onNext , Last call observer.onComplete. The specific source code is as follows :

public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}
// other methods
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}

that Disposable What's the meaning of ? My understanding is that : It's a closed loop of the subscription process . Like subscribing to the same observer repeatedly , The following code :

 public static void testDirectSubscribe() {
Observer observer = new MyObserver();
Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
Observable.fromArray("changed").subscribe(observer);
}

It throws an exception :

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) zzz.study.reactor.MyObserver multiple times. Please create a fresh instance of zzz.study.reactor.MyObserver and subscribe that to the target source instead.
at io.reactivex.internal.util.EndConsumerHelper.reportDoubleSubscription(EndConsumerHelper.java:148)
at io.reactivex.internal.util.EndConsumerHelper.validate(EndConsumerHelper.java:57)
at io.reactivex.observers.DefaultObserver.onSubscribe(DefaultObserver.java:70)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:34)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at zzz.study.reactor.RxJavaBasic.testDirectSubscribe(RxJavaBasic.java:34)
at zzz.study.reactor.RxJavaBasic.main(RxJavaBasic.java:17)

This exception is calling DefaultObserver.onSubscribe Throw the :

 @Override
public final void onSubscribe(@NonNull Disposable d) {
if (EndConsumerHelper.validate(this.upstream, d, getClass())) {
this.upstream = d;
onStart();
}
}
public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) {
ObjectHelper.requireNonNull(next, "next is null");
if (upstream != null) {
next.dispose();
if (upstream != DisposableHelper.DISPOSED) {
reportDoubleSubscription(observer);
}
return false;
}
return true;
}

That is to say , If the same observer , Its last one Disposable The subscription doesn't end , So subscribe again Disposable Will go wrong . How to solve it ? Can be in MyObserver Of onError and onComplete add to super.cancel call , You can end your last subscription , Subscribing again doesn't throw an exception :

 @Override
public void onError(Throwable e) {
System.out.println("Observed: " + e.getMessage());
super.cancel();
}
@Override
public void onComplete() {
System.out.println("MyObserver: Complete");
super.cancel();
}
/**
* Cancels the upstream's disposable.
*/
protected final void cancel() {
Disposable upstream = this.upstream;
this.upstream = DisposableHelper.DISPOSED;
upstream.dispose();
}

however , Even so , We can't send our new subscription message either . This is because the last time upstream Not for null, This subscription cannot be launched .

We can't override DefaultObserver.onSubscribe Method , Because the method is declared as final Of , And upstream Declare as private , There is no public way to set upstream. This is a clear indication of the designer's intention : This is a Observer subscribe Disposable Pre detection Convention for , Not to be destroyed , Otherwise, we will be responsible for the consequences .

We can bypass DefaultObserver , Don't inherit it , But directly Observer Interface :


public static void testDirectSubscribe() {
Observer observer = new RepeatedSubscribeMyObserver();
Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
Observable.fromArray("changed").subscribe(observer);
}
/**
* @Description A repeatable observer
* @Date 2021/1/24 10:11 In the morning
* @Created by qinshu
*/
public class RepeatedSubscribeMyObserver<T> implements Observer<T> {
public Disposable upstream;
@Override
public void onSubscribe(@NonNull Disposable d){
System.out.println(getName() + ": Start");
this.upstream = d;
}
@Override
public void onNext(T o) {
System.out.println(getName() + ": " + JSON.toJSONString(o));
}
@Override
public void onError(Throwable e) {
System.out.println(getName() + "RepeatedSubscribeMyObserver: " + e.getMessage());
cancel();
}
@Override
public void onComplete() {
System.out.println(getName() + "RepeatedSubscribeMyObserver: Complete");
cancel();
}
public String getName() {
return this.getClass().getSimpleName();
}
/**
* Cancels the upstream's disposable.
*/
protected final void cancel() {
Disposable upstream = this.upstream;
this.upstream = DisposableHelper.DISPOSED;
upstream.dispose();
}
}

In this way, you can subscribe to the same Observer 了 . Running results :

RepeatedSubscribeMyObserver: Start
RepeatedSubscribeMyObserver: "I"
RepeatedSubscribeMyObserver: "Have"
RepeatedSubscribeMyObserver: "a"
RepeatedSubscribeMyObserver: "dream"
RepeatedSubscribeMyObserver: Complete
RepeatedSubscribeMyObserver: Start
RepeatedSubscribeMyObserver: "changed"
RepeatedSubscribeMyObserver: Complete

I understand Observable.fromArray Implementation principle of , Just make it clear Observable A lot of basic methods in the basic routine . such as just Method has two or more parameters , It's actually fromArray Packaging , and range The method is to create a RangeDisposable To deal with it .

Observable.just(1,2,3).subscribe(observer);
Observable.range(1,4).subscribe(observer);

Combine

As mentioned above, one of the basic elements of responsive programming is functional programming . The advantage of functional expression is that it can stack and combine infinitely , Build flexible functions and behaviors . This makes the behavior of the observer more flexible . Can combine multiple Observable The launch behavior of .

Merge

Simple combination merge Method , Construct a Observable A list of , Traverse each of the merged Observable The launch information of :

Iterable<? extends ObservableSource<? extends Integer>> observableSourceSet = Sets.newHashSet(
Observable.fromArray(3,4,5),
Observable.range(10,3)
);
Observable.merge(observableSourceSet).subscribe(observer);

streaming

Observable Can pass Stream Are combined , This is where functional programming comes in . As shown in the following code :

Observable.range(1,10).filter(x -> x%2 ==0).subscribe(observer);

be aware , The decorator mode is used here .filter Method creates a ObservableFilter object , And in this object ,subscribeActual Method creates a FilterObserver Will the incoming observer Decorate it .downstream It's the incoming observer.


@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}
public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate)); // FilterObserver Decorated the incoming custom observer
}
static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter;
FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
@Override
public void onNext(T t) { // Here's what's coming in Observer.onNext Made a decoration , Called only if the condition holds
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
downstream.onNext(t); // downstream It's the custom one we've passed in Observer
}
} else {
downstream.onNext(null);
}
}
}

just as filter Filter the transmit data stream ,map or flatMap Then map and transform the transmitting data stream , And stream.map or stream.flatMap Similar function of :

Observable.range(1,10).map(x -> x*x).subscribe(observer);
Observable.range(1,10).flatMap(x -> Observable.just(x*x)).subscribe(observer);

map Method will create a ObservableMap object , stay subscribeActual of use MapObserver Put the incoming observer Decorate it ;flatMap Will create a ObservableFlatMap object , stay subscribeActual in MergeObserver Will the incoming observer Decorate it .

You can also use scan: For each value generated , Use accumulator (x,y) -> x*y Generate new values and emit .

Observable.range(1, 10).scan(1, (x,y) -> x*y).subscribe(observer);

Finally, give an example of grouping :

Observable.just(28,520,25,999).groupBy( i -> ( i > 100 ? "old": "new")).subscribe(new GroupedRepeatedSubscribeMyObserver());
/**
* @Description Repeatable group observers
* @Date 2021/1/24 10:11 In the morning
* @Created by qinshu
*/
public class GroupedRepeatedSubscribeMyObserver extends RepeatedSubscribeMyObserver<GroupedObservable> {
@Override
public void onNext(GroupedObservable o) {
o.subscribe(new RepeatedSubscribeMyObserver() {
@Override
public void onNext(Object v) {
String info = String.format("GroupedRepeatedSubscribeMyObserver: [group=%s][value=%s]", o.getKey(), JSON.toJSONString(v));
System.out.println(info);
}
});
}
}

groupBy Method generates a GroupedObservable , So subscribe to one Observer The observer realization of .

This article first writes here .

See engineering for project code : “ALLIN” My bag zzz.study.reactor Next . Need to introduce Maven rely on :

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.20</version>
</dependency>

Summary

This article explains the responsive programming and RxJava The basic concept of Library :Observable , Observer And Emitter, Disposable , It's also about how to combine Observable To build a more flexible message transmission mechanism . These basically constitute the basic skeleton process of responsive programming .

The power of responsive programming is built on event driven mechanism and functional programming , There are a lot of decorators in it . therefore , Be familiar with these basic programming ideas , It is also helpful to master the responsive programming model .

版权声明
本文为[Qinshuiyu]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210124145033429J.html

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云