Introduction

reading Hystrix Source code , Found some strange writing . A little search , I know the latest popular responsive programming library is used RxJava. So what is responsive programming like ? In this paper, responsive programming and RxJava The library makes a preliminary exploration .

When learning new programming models , I like to connect it to the original programming model . Because the new programming model is often the inheritance and combination of the original programming model . The two basic elements of responsive programming are :

  • Event driven mechanism based on observer pattern .
  • Functional programming : Through decoration and combination , Make the processing of responsive programming more fluent and flexible ;

Functional programming , Previous articles “ Completely ” Functional programming ”“Java8 The secret of functional programming ”“ Refine the code : once Java The journey of refactoring functional programming ” And so on , The observer pattern is “ Observer mode of design mode : Realize the real-time push of configuration update ” It's been told . We're going to explore responsive programming on the basis of both .

Basics

For the first time contact RxJava , It's easy to get caught up in a series of Observer, Observable, Disposable, subscribeOn, onSubscribe, onNext, onError, onComplete Isokinetic halo . But there's nothing new in the software . Most of them are nothing more than organizing logic in a new way . Observer based event driven is no exception . We just need to sort out the context , It's easy to understand . The observer has three basic participants :

  • Observed :Observable ;
  • launcher :Emitter;
  • The observer : Observer.

The basic process is : Observed Observable Equip the launcher Emitter, Launch message , Create an event ; The observer Observer Listening for events , Receiving messages from observers , Call the corresponding function onNext, onError and onComplete To deal with .onError and OnComplete Only one can be triggered .

Write a basic Demo To simulate the basic process . In order to better understand , I've separated the three .

Demo

First, define the observer MyObserver, Inherited abstract class DefaultObserver , It's the least cost .


package zzz.study.reactor; import com.alibaba.fastjson.JSON;
import io.reactivex.observers.DefaultObserver; /**
* @Description The observer defines
* @Date 2021/1/23 4:13 Afternoon
* @Created by qinshu
*/
public class MyObserver extends DefaultObserver { @Override
public void onStart() {
System.out.println("MyObserver: Start");
} @Override
public void onNext(Object o) {
System.out.println("Observed: " + JSON.toJSONString(o));
} @Override
public void onError(Throwable e) {
System.out.println("Observed: " + e.getMessage());
} @Override
public void onComplete() {
System.out.println("MyObserver: Complete");
}
}

next , Define the launcher ( Launch message ) MyEmitter:

package zzz.study.reactor;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe; import java.util.Random;
import java.util.concurrent.TimeUnit; /**
* @Description launcher
* @Date 2021/1/24 7:04 In the morning
* @Created by qinshu
*/
public class MyEmitter implements ObservableOnSubscribe { Random random = new Random(System.currentTimeMillis()); @Override
public void subscribe(ObservableEmitter emitter) throws Exception {
TimeUnit.SECONDS.sleep(1);
emitter.onNext("next");
if (random.nextInt(3) == 0) {
emitter.onError(new RuntimeException("A RuntimeException"));
}
else {
emitter.onComplete();
}
}
}

Last , Create the observed , And string together processes :

package zzz.study.reactor;
import io.reactivex.Observable;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer; /**
* @Description RxJava basic Demo
* @Date 2021/1/23 12:28 Afternoon
* @Created by qinshu
*/
public class RxJavaBasic { public static void main(String[] args) {
for (int i=0; i<5; i++) {
ObservableOnSubscribe observableOnSubscribe = new MyEmitter();
Observable observable = Observable.create(observableOnSubscribe);
Observer observer = new MyObserver();
observable.subscribe(observer);
}
}
}

function , But it turns out :

MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
Observed: A RuntimeException
MyObserver: Start
Observed: "next"
MyObserver: Complete
MyObserver: Start
Observed: "next"
MyObserver: Complete

Explain

How to understand the above process and results ? The best way is to debug in one step . After one step debugging , You can see that the whole process is as follows :

step 1: The whole process is triggered by this line observable.subscribe(observer); , Going to call Observable.subscribeActual Method , Assign to a concrete implementation class ObservableCreate.subscribeActual ; The advantage of single step debugging is that it can determine the specific implementer ;

step 2: ObservableCreate.subscribeActual The things that were done , call observer.onSubscribe ( MyObserver.onStart Method ), Then forward it to MyEmitter.subscribe To send messages .

 @Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent); try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

step 3:MyEmitter perform onNext , Assign to a concrete implementation class CreateEmitter.onNext , And then call observer.onNext Method ;

step 4:MyEmitter perform onError , Assign to a concrete implementation class CreateEmitter.onError , , in turn, call observer.onError Method ; If MyEmitter launch onComplete , Then it is assigned to the concrete implementation class CreateEmitter.onComplete , And then call observer.onComplete Method . Be careful ,onError and onComplete Only one of the two can be executed .

This is the basic process .

Extension

Disposable

In addition to subscription customization Emitter To send messages , class Observable It also provides a variety of tools and methods , More convenient to subscribe and push . such as :

public static void testDirectSubscribe() {
Observable.fromArray("I", "Have", "a", "dream").subscribe(new MyObserver());
}

Will be output :

MyObserver: Start
Observed: "I"
Observed: "Have"
Observed: "a"
Observed: "dream"
MyObserver: Complete

The concrete realization is : fromArray Method creates a Observable The concrete class ObservableFromArray, And this kind of subscribeActual Method creates a FromArrayDisposable To deal with it .FromArrayDisposable Of run Method is called , Traverse the specified list in turn , call observer.onNext , Last call observer.onComplete. The specific source code is as follows :

public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
} @Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
} static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
} // other methods @Override
public void dispose() {
disposed = true;
} @Override
public boolean isDisposed() {
return disposed;
} void run() {
T[] a = array;
int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}

that Disposable What's the meaning of ? My understanding is that : It's a closed loop of the subscription process . Like subscribing to the same observer repeatedly , The following code :

 public static void testDirectSubscribe() {
Observer observer = new MyObserver();
Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
Observable.fromArray("changed").subscribe(observer);
}

It throws an exception :

io.reactivex.exceptions.ProtocolViolationException: It is not allowed to subscribe with a(n) zzz.study.reactor.MyObserver multiple times. Please create a fresh instance of zzz.study.reactor.MyObserver and subscribe that to the target source instead.
at io.reactivex.internal.util.EndConsumerHelper.reportDoubleSubscription(EndConsumerHelper.java:148)
at io.reactivex.internal.util.EndConsumerHelper.validate(EndConsumerHelper.java:57)
at io.reactivex.observers.DefaultObserver.onSubscribe(DefaultObserver.java:70)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:34)
at io.reactivex.Observable.subscribe(Observable.java:12284)
at zzz.study.reactor.RxJavaBasic.testDirectSubscribe(RxJavaBasic.java:34)
at zzz.study.reactor.RxJavaBasic.main(RxJavaBasic.java:17)

This exception is calling DefaultObserver.onSubscribe Throw the :

 @Override
public final void onSubscribe(@NonNull Disposable d) {
if (EndConsumerHelper.validate(this.upstream, d, getClass())) {
this.upstream = d;
onStart();
}
} public static boolean validate(Disposable upstream, Disposable next, Class<?> observer) {
ObjectHelper.requireNonNull(next, "next is null");
if (upstream != null) {
next.dispose();
if (upstream != DisposableHelper.DISPOSED) {
reportDoubleSubscription(observer);
}
return false;
}
return true;
}

That is to say , If the same observer , Its last one Disposable The subscription doesn't end , So subscribe again Disposable Will go wrong . How to solve it ? Can be in MyObserver Of onError and onComplete add to super.cancel call , You can end your last subscription , Subscribing again doesn't throw an exception :

 @Override
public void onError(Throwable e) {
System.out.println("Observed: " + e.getMessage());
super.cancel();
} @Override
public void onComplete() {
System.out.println("MyObserver: Complete");
super.cancel();
} /**
* Cancels the upstream's disposable.
*/
protected final void cancel() {
Disposable upstream = this.upstream;
this.upstream = DisposableHelper.DISPOSED;
upstream.dispose();
}

however , Even so , We can't send our new subscription message either . This is because the last time upstream Not for null, This subscription cannot be launched .

We can't override DefaultObserver.onSubscribe Method , Because the method is declared as final Of , And upstream Declare as private , There is no public way to set upstream. This is a clear indication of the designer's intention : This is a Observer subscribe Disposable Pre detection Convention for , Not to be destroyed , Otherwise, we will be responsible for the consequences .

We can bypass DefaultObserver , Don't inherit it , But directly Observer Interface :


public static void testDirectSubscribe() {
Observer observer = new RepeatedSubscribeMyObserver();
Observable.fromArray("I", "Have", "a", "dream").subscribe(observer);
Observable.fromArray("changed").subscribe(observer);
} /**
* @Description A repeatable observer
* @Date 2021/1/24 10:11 In the morning
* @Created by qinshu
*/
public class RepeatedSubscribeMyObserver<T> implements Observer<T> { public Disposable upstream; @Override
public void onSubscribe(@NonNull Disposable d){
System.out.println(getName() + ": Start");
this.upstream = d;
} @Override
public void onNext(T o) {
System.out.println(getName() + ": " + JSON.toJSONString(o));
} @Override
public void onError(Throwable e) {
System.out.println(getName() + "RepeatedSubscribeMyObserver: " + e.getMessage());
cancel();
} @Override
public void onComplete() {
System.out.println(getName() + "RepeatedSubscribeMyObserver: Complete");
cancel();
} public String getName() {
return this.getClass().getSimpleName();
} /**
* Cancels the upstream's disposable.
*/
protected final void cancel() {
Disposable upstream = this.upstream;
this.upstream = DisposableHelper.DISPOSED;
upstream.dispose();
}
}

In this way, you can subscribe to the same Observer 了 . Running results :

RepeatedSubscribeMyObserver: Start
RepeatedSubscribeMyObserver: "I"
RepeatedSubscribeMyObserver: "Have"
RepeatedSubscribeMyObserver: "a"
RepeatedSubscribeMyObserver: "dream"
RepeatedSubscribeMyObserver: Complete
RepeatedSubscribeMyObserver: Start
RepeatedSubscribeMyObserver: "changed"
RepeatedSubscribeMyObserver: Complete

I understand Observable.fromArray Implementation principle of , Just make it clear Observable A lot of basic methods in the basic routine . such as just Method has two or more parameters , It's actually fromArray Packaging , and range The method is to create a RangeDisposable To deal with it .

Observable.just(1,2,3).subscribe(observer);
Observable.range(1,4).subscribe(observer);

Combine

As mentioned above, one of the basic elements of responsive programming is functional programming . The advantage of functional expression is that it can stack and combine infinitely , Build flexible functions and behaviors . This makes the behavior of the observer more flexible . Can combine multiple Observable The launch behavior of .

Merge

Simple combination merge Method , Construct a Observable A list of , Traverse each of the merged Observable The launch information of :

Iterable<? extends ObservableSource<? extends Integer>> observableSourceSet = Sets.newHashSet(
Observable.fromArray(3,4,5),
Observable.range(10,3)
);
Observable.merge(observableSourceSet).subscribe(observer);

streaming

Observable Can pass Stream Are combined , This is where functional programming comes in . As shown in the following code :

Observable.range(1,10).filter(x -> x%2 ==0).subscribe(observer);

be aware , The decorator mode is used here .filter Method creates a ObservableFilter object , And in this object ,subscribeActual Method creates a FilterObserver Will the incoming observer Decorate it .downstream It's the incoming observer.


@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> filter(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
} public final class ObservableFilter<T> extends AbstractObservableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public ObservableFilter(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
} @Override
public void subscribeActual(Observer<? super T> observer) {
source.subscribe(new FilterObserver<T>(observer, predicate)); // FilterObserver Decorated the incoming custom observer
} static final class FilterObserver<T> extends BasicFuseableObserver<T, T> {
final Predicate<? super T> filter; FilterObserver(Observer<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
} @Override
public void onNext(T t) { // Here's what's coming in Observer.onNext Made a decoration , Called only if the condition holds
if (sourceMode == NONE) {
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return;
}
if (b) {
downstream.onNext(t); // downstream It's the custom one we've passed in Observer
}
} else {
downstream.onNext(null);
}
}
}

just as filter Filter the transmit data stream ,map or flatMap Then map and transform the transmitting data stream , And stream.map or stream.flatMap Similar function of :

Observable.range(1,10).map(x -> x*x).subscribe(observer);
Observable.range(1,10).flatMap(x -> Observable.just(x*x)).subscribe(observer);

map Method will create a ObservableMap object , stay subscribeActual of use MapObserver Put the incoming observer Decorate it ;flatMap Will create a ObservableFlatMap object , stay subscribeActual in MergeObserver Will the incoming observer Decorate it .

You can also use scan: For each value generated , Use accumulator (x,y) -> x*y Generate new values and emit .

Observable.range(1, 10).scan(1, (x,y) -> x*y).subscribe(observer);

Finally, give an example of grouping :

Observable.just(28,520,25,999).groupBy( i -> ( i > 100 ? "old": "new")).subscribe(new GroupedRepeatedSubscribeMyObserver());
/**
* @Description Repeatable group observers
* @Date 2021/1/24 10:11 In the morning
* @Created by qinshu
*/
public class GroupedRepeatedSubscribeMyObserver extends RepeatedSubscribeMyObserver<GroupedObservable> {
@Override
public void onNext(GroupedObservable o) {
o.subscribe(new RepeatedSubscribeMyObserver() {
@Override
public void onNext(Object v) {
String info = String.format("GroupedRepeatedSubscribeMyObserver: [group=%s][value=%s]", o.getKey(), JSON.toJSONString(v));
System.out.println(info);
}
}); } }

groupBy Method generates a GroupedObservable , So subscribe to one Observer The observer realization of .

This article first writes here .

See engineering for project code : “ALLIN” My bag zzz.study.reactor Next . Need to introduce Maven rely on :

<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.20</version>
</dependency>

Summary

This article explains the responsive programming and RxJava The basic concept of Library :Observable , Observer And Emitter, Disposable , It's also about how to combine Observable To build a more flexible message transmission mechanism . These basically constitute the basic skeleton process of responsive programming .

The power of responsive programming is built on event driven mechanism and functional programming , There are a lot of decorators in it . therefore , Be familiar with these basic programming ideas , It is also helpful to master the responsive programming model .

Responsive programming library RxJava More related articles on this topic

  1. ReactiveCocoa, The most popular iOS Functional responsive programming library (2.5 edition ), Not one of them. !

    brief introduction Project home page : ReactiveCocoa Instance download : https://github.com/ios122/ios122 Those remarks : The most popular , The most valuable iOS Responsive programming library , Not one of them. !iOS MVVM model ...

  2. MobX Responsive programming library

    MobX https://mobx.js.org/ https://github.com/mobxjs/mobx MobX is a battle tested library that makes ...

  3. ( turn )Spring Boot 2 ( Ten ):Spring Boot Responsive programming and WebFlux introduction

    http://www.ityouknow.com/springboot/2019/02/12/spring-boot-webflux.html Spring 5.0 Heavyweight components released in Webflux ...

  4. Spring Boot 2 ( Ten ):Spring Boot Responsive programming and WebFlux introduction

    Spring 5.0 Heavyweight components released in Webflux, It makes the use of responsive programming a prelude . WebFlux The scenario used is asynchronous and non blocking , Use Webflux As a system solution , In most scenarios, the system can be improved ...

  5. Responsive programming (Reactive Programming)(Rx) Introduce

    It's obvious that you're interested in learning this new technology called responsive programming . Learning responsive programming is a very difficult process , Especially in the absence of excellent data . At the beginning of study , I tried to find some tutorials , And found a few practical tutorials , But it is ...

  6. RxJava—— Responsive programming

    since 06 Year begins ,Rxandroid It has been used in company projects , And it's based on Rxjava Evolved , Now it is more and more widely used in commercial projects , Do as " major " I know it all the time ...

  7. RxJava( One ): Responsive programming and Rx

    One , Responsive programming Responsive programming is a technology that focuses on data flow (data streams) And the transmission of change (propagation of change) The asynchronous programming mode of . 1.1 Asynchronous programming The traditional way of programming is sequential execution , It has to be at the end of ...

  8. iOS Development --Swift RAC On responsive programming

    There's not enough time , Say less first , RAC The advantage of this is responsive programming , You don't need to set up your own proxy , target, It's mainly information flow (signal), block Mainly , It's exciting to see this , It can help you monitor your events , ...

  9. RxJava Responsive programming , Introduction to the HelloWorld;

    RxJava The core is asynchronous , It's also called reactive programming : The biggest advantage is that as program logic becomes more and more complex , It still keeps it simple . Rxjava It really makes people love and hate , Because its thread switch and chain call are really easy to use , But it's a little hard to get started ...

  10. iOS Responsive programming :ReactiveCocoa vs RxSwift Who to choose

    Reprint : iOS Responsive programming :ReactiveCocoa vs RxSwift Who to choose Content comes from stack overflow One of the answers :ReactiveCocoa vs RxSwift – pros an ...

Random recommendation

  1. Java Learn to remember 1—— aggregate

    One . What is a collection A collection is a collection of objects , Just like an array is a collection of numbers . A collection is a container , You can store objects ( It can be different types of objects ). Two . The advantages of set ( Why use sets ) Of course , stay java in , You can use arrays to hold a group of objects of the same type ...

  2. RedHat Lower installation OPENCV

    1. decompression  unzip opencv-2.4.9.zip 2. Entry directory ,cmake CMakeLists.txt   Generate build file 3. Use command make  compile 4. Use command make instal ...

  3. TWaver New members of the family — Legolas Industrial automation design platform

    about TWaver Members of the visualization family , We are familiar with our network topology components and MONO Design 3D modeling tools . As a development tool , These two products are for a wide range of applications , Save time and effort in the interface visualization . however , When the project is delivered ...

  4. In depth understanding of javascript The immediate execution function in (function(){…})()

    contribute :junjie typeface :[ increase Reduce ] type : Reprint Time :2014-06-12 I want to comment on This article mainly introduces the in-depth understanding of javascript The immediate execution function in , Immediate function is also called immediate function , Usually it's written as ...

  5. LintCode &quot;4 Sum&quot;

    4 Pointer solution. Key: when moving pointers, we skip duplicated ones. Ref: https://github.com/xbz/ ...

  6. SQL Server Compress log and database file size

    Please follow the steps , Without the previous steps , Please don't do the following steps , So as not to damage your database . It is generally not recommended to do the second 4,6 Two steps , The first 4 It's not safe , It is possible to damage the database or lose data . The first 6 Step 2: if the log reaches the upper limit , Later database processing will fail , Cleaning up ...

  7. github establish tag

    Recently in use github For the new jQuery plug-in unit : Scroll to highlight   When you add a repository , See a lot github Every item on the website has this label , You can find every different version clearly and quickly , It's very convenient to find and use in the future . So I went on with the last one : take ...

  8. Simple vertical Tab tab

    <!DOCTYPE html> <html> <head lang="en"> <meta charset="UTF-8&quo ...

  9. visual studio 2015 Offline version msdn Download and install

    2014 year 11 month 13 Japan , Microsoft released Visual Studio 2015 Preview, however Visual Studio 2015  Of msdn How to install ? Let's share a story from script house visual st ...

  10. Codeforces 474 E. Pillars

    Shuitai ...... E. Pillars time limit per test 1 second memory limit per test 256 megabytes input standard ...