laitimes

Reactive Programming RxJava Design Principles Explained

author:Flash Gene

一、ReactiveX 与 RxJava

The full name of ReactiveX is Reactive Extension, which is generally abbreviated as Rx, which is what we commonly call reactive programming. The design principle mainly uses the observer pattern to distinguish between data producers and consumers, and asynchronous processing of data through event streaming.

RxJava is an implementation of the ReactiveX Java language, and its programming experience is very similar to that of functional programming and Stream in Java 8.

This article focuses on the understanding of several major design patterns in RxJava, and provides a clearer understanding of how event-driven works in RxJava by combing through the class diagrams related to Observables and explaining the relationships between these classes.

2. Concepts in RxJava

First, we write a simple RxJava program that sends the elements in the array as events, which are finally printed by the consumer in the console:

Reactive Programming RxJava Design Principles Explained

Based on this simple code, we'll explain the four concepts that run through the entire ReactiveX design: observer, observed, event, and subscription.

  • Observer: The object that responds to the event, which can also be called a consumer, in the above code, the argument of the subscirbe method is a Consumer object, which is then wrapped into a LambdaObserver object, which is the observer (consumer) in this code.
  • Observed: The object that generated the event, which can also be called a producer, in the above code, Observable.fromArray(...) What is returned is an Observable object, which is the observer (producer) of the program.
  • Events: There are four types of event streams in RxJava: onSubscribe, onNext, onError, and onComplete. In the above code, the elements in the array are sent as data in the onNext event.
  • Subscribe: Creates an observation relationship between the observer and the observed, corresponding to the subscribe() method in the above code. RxJava's event-driven model is a "pull model" in which no events are generated until the observer subscribes to the event, and only after the observer subscribes will the observed produce an event be triggered.
Reactive Programming RxJava Design Principles Explained

Perform time series analysis on the above code, you can clearly see the running process of this piece of code, and finally FromArrayDisposable produces onNext and onComplete events, and notifies the Observer to consume.

At the same time, we also see that a simple line of code involves so many classes of interaction, if we add some other operators, we will not be so easy to control the whole program, the following we will analyze some of the main design patterns in RxJava, dissect the relationship between classes, to understand the working principle of RxJava more clearly.

三、 集大成者Observable

Observables can be said to be the most important object in the whole data processing process. As you can see from the sequence diagram above, the client (the producer or consumer of the message) only interacts with the observable, and the relationship between the observer and the observed is also implemented by the observable, instead of the coding we show, which greatly reduces the cost of using the observer pattern.

So what are the main functions of Observable, let's first take a look at the class diagram related to Observable:

Reactive Programming RxJava Design Principles Explained

From the diagram we can see:

  • Observable implements the ObservableSource interface, which can be understood literally, this is an interface that provides observation capabilities, so one of the major capabilities of the Observable is for the observer to subscribe to events, and the implementation of the method for event subscription is to call the subscribe() method of the Observable
  • Observable is an abstract class that provides a subscribeActual template method for subclasses to implement, and as you can see from the source code, the observable's subscribe() method will eventually delegate the subclass's subscribeActual() method implementation, which will establish the relationship between the producer and the consumer.
  • In addition, Observable is also a factory class, which provides static methods such as fromArray() and create() to create specific observables, as well as flatMap(), concatMap() and other operation methods to wrap observables.

The existence of Observables completely decouples the producer and the consumer, the producer only needs to pay attention to what kind of Observable object he generates, and the consumer only needs to pay attention to what kind of Observable he observes.

In practice, Rxjava already provides a variety of operators for us to use, and the producer only needs to call the corresponding method in the Observable to generate the required observables for consumers to subscribe to. It is extremely convenient for consumers to call the subscribe() method of the observable object to establish an observation relationship with the producer.

4. Authentic observations

The observer mode is the core idea of RxJava design, in the observer mode there is always an object to observe and an object to be observed, and it can be seen from the above analysis that the observable is more of a controller than the actual source of events. So what is the real producer and what is the real consumer in RxJava?

Let's take a look at the following three common types of observables:

Reactive Programming RxJava Design Principles Explained

The function of fromArray is to send the elements in the array as onNext events, the role of create is to send custom events, and the role of just is to send a single event.

In the previous section, we talked about how the actual subscription behavior is implemented by the subscribeActual() method in each Observable class, so let's take a look at the subscribeActual() method of these three classes.

Reactive Programming RxJava Design Principles Explained

Aside from the minutiae, all three methods can be broken down into the following three steps

  1. Create an observer object and pass it to the observer to establish the relationship between the two.
  2. The onSubscribe event is triggered, and the observer responds to the event.
  3. 进行事件的拉取,我们可以进入到d.run(),source.subscribe(parent),sd.run()这些方法的内部看一些,可以看到这些方法就是在发送onNext(),onError(),onComplete()等事件。

The following diagram is a diagram of the related classes throughout the process. The actual sender of the event is an object such as FromArrayDisposable, while the actual observer is an entity class that implements the Observer interface. If we pass in a lambda expression when subscribe, it will be wrapped as a default LambdaObserver object for event consumption.

Reactive Programming RxJava Design Principles Explained

5. The necessity of packaging

RxJava provides a wealth of operators, such as flatMap, concatMap, etc., which can be used to transform events, and subscribeOn, observableOn, etc., which can control the production and consumption threads. These operators actually call the wrapper method in the Observable to wrap the original observable, returning an enhanced observable.

There are many types of operators, so let's take flatMap as an example and analyze how these operators work.

First, the flatMap operation returns an ObservableFlatMap object, which is created by passing the original Observable object as a parameter to the constructor.

查看其核心方法subscribeActual,

Reactive Programming RxJava Design Principles Explained

As you can see, the subscribeActual method for this type of object is different from the method in the previous section, and instead of actually creating an observation relationship, it does two things:

  1. The observer is enhanced to wrap it as a MergeObserver object, which responds to the resulting time.
  2. Then call the subscribe method of source, where source is the Observable object passed in the previous constructor, and then establish the observation relationship.

    The following diagram shows the relevant class diagram for the decorator pattern in RxJava: all wrapper classes inherit from the AbstractObservableWithUpstream class, which has a member function of type ObservableSource that holds the decorated object.

Reactive Programming RxJava Design Principles Explained

Observables are chained, just like Streams in Java 8, so let's consider such a line of code.

Reactive Programming RxJava Design Principles Explained

When we analyze the above string of code, it will be very messy, and when we look at the source code, we will see the front and forget the back, but if we know enough about the packaging process of RxJava, we can easily analyze the above code.

Reactive Programming RxJava Design Principles Explained

6. Summary

The encapsulation of RxJava is powerful enough to make it easy for us to use and extend, but it also makes it difficult for us to understand how it really works, and if we are in a state of ignorance of the whole event process, then we will not be able to calmly orchestrate the service asynchronously, and it will be difficult to find the root cause of the problem in the actual development process.

This article mainly analyzes the main design patterns in RxJava, including template pattern, factory pattern, observer pattern, and decorator pattern, and understands these design patterns and the relationship between classes in RxJava, so that we can understand the entire event processing process and analyze the code to get twice the result with half the effort.

作者:Yunjie Ma

Source-WeChat public account: vivo Internet Technology

Source: https://mp.weixin.qq.com/s/duO1pAfaKUI2_x_GVvZHMg

Read on