RxJava原理

Publish by https://www.leachchen.com/

Observable.create(new ObservableOnSubscribe<Object>() {
    @Override
    public void call(@NonNull Observer<Object> e) throws Exception {
        //事件源业务代码
        e.onNext(o);
    }
}).subscribe(new Observer<Object>() {

    @Override
    public void onNext(@NonNull Object o) {
        //订阅者业务代码
    }
});

public class Observable<T> {
  final ObservableOnSubscribe<T> source;

  ...

  public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
      return new Observable<T>(source);
}

public class Observable<T> {
  final ObservableOnSubscribe<T> source;



  protected void subscribe(Observer<? super T> observer) {
      source.call(observer);
  }        

背压问题

当上下游在不同的线程中,通过Observable发射,处理,响应数据流时,如果上游发射数据的速度快于下游接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题.RxJava1未能处理这个问题,RxJava2 新增Flowable来处理该问题。

由于基于Flowable发射的数据流,以及对数据加工处理的各操作符都添加了背压支持,附加了额外的逻辑,其运行效率要比Observable慢得多,只有在需要处理背压问题时,才需要使用Flowable。

Flowable有几种策略:

缓存:当事件消费者不能及时处理事件时,产生的事件可以缓存起来,等待消费者消费

丢弃:当事件消费者处理速度慢无法处理更多事件时,它可以丢弃所有的事件,当消费者可以继续工作时,处理最新生成的事件

报错 事件消费者可以直接抛出MissingBackpressureException

Leach Chen

Leach Chen

I am an Android developer.I will add description latter.