打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
RxJava基本使用

一、自己构造事件

 
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Overridepublic void subscribe(ObservableEmitter emitter) {int i = getNumber();if (i < 0) {
                    emitter.onComplete();return;
                } else {
                    Log.d(TAG, Thread.currentThread().getName());
                    emitter.onNext(i);
                    emitter.onComplete();
                }
            }
        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Overridepublic void accept(Integer integer) throws Exception {
                        Log.d(TAG, Thread.currentThread().getName());
                        Log.d(TAG, integer + "");
                    }
                }, new Consumer<Throwable>() {
                    @Overridepublic void accept(Throwable throwable) throws Exception {
 
                    }
                });

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

onNext():方法用来发送事件。

下面看看其他两个方法:

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

讲一下我们上面的例子,上面这个例子是采用简洁的链式调用来写的:

首先使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则,然后通过emitter.onNext(i)传递出来,.subscribeOn(Schedulers.io())便是指定该事件产生的所在的线程为子线程,.observeOn(AndroidSchedulers.mainThread())指定观察者执行的线程为主线程。这时候为止返回的对象为Observable对象。

然后该Observable对象subscribe绑定观察者(也就是观察者进行订阅),里面有接收被观察者发出来的事件,有一个成功的方法,和一个失败的方法,这样就实现了由被观察者向观察传递事件。

二、对集合里的数据进行变换

        List<Integer> list = new ArrayList<Integer>() {
            {
                add(0);
                add(1);
                add(2);
            }
        };
        Observable.fromIterable(list).map(new Function() {
            @Overridepublic Object apply(Object o) throws Exception {int i = (int) o + 1;return String.valueOf(i);
            }
        })
                .toList()
                .toObservable().subscribeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer() {
                    @Overridepublic void accept(Object o) throws Exception {
                        Log.d(TAG, o.toString());
                    }
                });

且看,我们需要对某个集合里面的数据一一进行变换,然后发送出来执行其他操作。

上面便是对集合里面的每一项进行加一操作,然后再转换为String类型,然后toList(),组合成集合发送出来,最后在观察者方法中打印出每一项。

三、合并执行

定义两个被观察者,各自产生事件,然后合并在一起,发送给一个观察者。

首先定义我们上面第一个例子的被观察者,用于发送一个数字:

        Observable observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Overridepublic void subscribe(ObservableEmitter emitter) {int i = getNumber();if (i < 0) {
                    emitter.onComplete();return;
                } else {
                    Log.d(TAG, Thread.currentThread().getName());
                    emitter.onNext(i);
                    emitter.onComplete();
                }
            }
        })
                .subscribeOn(Schedulers.io());

其次再定义我们上面第二个例子的被观察者:

        List<Integer> list = new ArrayList<Integer>() {
            {
                add(0);
                add(1);
                add(2);
            }
        };
        Observable observable2 = Observable.fromIterable(list).map(new Function() {
            @Overridepublic Object apply(Object o) {int i = (int) o + 1;return String.valueOf(i);
            }
        })
                .toList()
                .toObservable().subscribeOn(Schedulers.io());

最后将这两个被观察者的事件合并起来发送给一个观察者:

        Disposable disposable = Observable.zip(observable1, observable2, new BiFunction() {
            @Overridepublic Object apply(Object o, Object o2) throws Exception {int i = (int) o;
                String k = (String) ((List) o2).get(0);return k + i;
            }
        })
                .subscribe(new Consumer() {
                    @Overridepublic void accept(Object o) {
                        Log.d(TAG, (String) o);
                    }
                }, new Consumer<Throwable>() {
                    @Overridepublic void accept(Throwable throwable) {
                        Log.d(TAG, throwable.getMessage());
                    }
                });

zip方法,顾名思义,有点类似与于打包的意思。

o为被观察者1返回的结果,o2为被观察2返回的结果,将这两个结果一起处理后发送给观察者。打印出来。

现在先介绍这几个,找个时间再整理一些其他的用法以及原理实现。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Rxjava3文档级教程二: 操作符全解
一篇博客让你了解RxJava
RxJava2.0
自己动手来实现一个RxJava
给 Android 开发者的 RxJava 详解
深入浅出RxJava07——多线程&辅助操作
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服