扣丁书屋

使用RxJava 2.0

1年以前  |  阅读数:554 次  |    

首先在 gradle 文件中添加依赖

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

创建被观察者

Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
        e.onNext(1);
        e.onNext(2);
        e.onNext(3);
        e.onComplete();
    }
});

创建观察者

Observer observer = new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "======================onSubscribe");
    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "======================onNext " + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "======================onError");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "======================onComplete");
    }
};

订阅

observable.subscribe(observer);

总结

被被观察者就是数据的产生者,观察者是数据的使用者,而订阅,则是把产生者与使用者连接在一起,以便处理数据。

一个常见的场景是,网络层源源不断的接受数据,然后向上层发射数据包。而上层则依次处理一个一个的数据包。

相关文章: