博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在Android中运用RxJava
阅读量:5788 次
发布时间:2019-06-18

本文共 12732 字,大约阅读时间需要 42 分钟。

1、RxJava 2.0

1.1、什么是RxJava和响应式编程(reactive programming)

        在响应编程中,消费者对数据进行反应,这就是为什么异步编程也被称为响应式编程的原因。 响应式编程允许将事件更改传播到已注册的观察者。

        RxJava是从Netflix的反向扩展(Rx)到Java的端口。 RxJava是2014年开源的,托管于http://reactivex.io/。

         “观察者模式做的正确。 ReactiveX是来自Observer模式,Iterator模式和功能编程的最佳创意的组合。“ 

                                                                                                                                                    --activex.io                                     

       这个概念的Java版本叫做RxJava,它托管在https://github.com/ReactiveX/RxJava下。 RxJava根据Apache 2.0许可证发布。

       RxJava将自己描述为用于具有可观察流的异步编程的API。

1.2、定义与RxJava 2.0的依赖关系

      在撰写本文时,2.0.4版本目前是发布版本。 将g.a.v替换为2.0.6或更高版本。

      对于Gradle构建,您可以通过以下依赖关系语句添加RxJava。

      compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v'

      对于Maven,您可以添加以下代码段的依赖关系

io.reactivex.rxjava2
rxjava
g.a.v
     对于OSGi环境,例如Eclipse RCP开发,https://dl.bintray.com/simon-scholz/RxJava-OSGi/可用作p2更新站点。

           

1.3、 异步编程

       现在的编程以一种迫切的单向线程的方式进行编程通常会导致奇怪的行为,阻塞不响应的用户界面,从而导致糟糕的用户体验。
       例如,如果网络不响应,则主动等待数据库查询或Web服务调用可能导致应用程序冻结。
       这可以通过异步处理不可预测的事情来避免。

       一个例子是:

public List
getTodos() { List
todosFromWeb = // query a webservice (with bad network latency) return todosFromDb;}
      从主线程或UI线程调用getTodos()方法将导致一个非响应的应用程序,直到todosFromWeb到达。
      为了改进这个查询,这需要不可预测的时间量,这个代码应该运行在不同的线程中,并在结果进入时通知主线程。

public void getTodos(Consumer
> todosCallback) { Thread thread = new Thread(()-> { List
todosFromWeb = // query a webservice todosCallback.accept(todosFromWeb); }); thread.start();}
       现在调用 getTodos(Consumer <List <Todo >> todosConsumer)后,主线程可以继续工作,一旦调用了给定的Consumer的accept方法,就不会被阻塞,并且可以做出反应。
       现在的代码真正是异步的。
       但是如果发生Web服务查询中的错误怎么办?

public void getTodos(FailableCallback
> todosCallback) { Thread thread = new Thread(()-> { try { List
todosFromWeb = // query a web service todosCallback.accept(todosFromWeb); } catch(Exception ex) { todosCallback.error(ex); } }); thread.start();}
       使用自定义
FailableCallback界面可以工作,但也增加了复杂性。
      还有更多的问题可以发生:

  • 与UI同步(SWT和Android中的小部件必须从UI线程更新)
  • 如果FailableCallback的消费者不再存在,该怎么办?
  • 如果这样的FailableCallback取决于另一个FailableCallback怎么办?
public void getUserPermission(FailableCallback
permissionCallback) { Thread thread = new Thread(()-> { try { UserPermission permission = // query a web service permissionCallback.accept(permission); } catch(Exception ex) { permission.error(ex); } }); thread.start();}public void getTodos(FailableCallback
> todosCallback) { Thread thread = new Thread(()-> { getUserPermission(new FailableCallback() { public void accept(UserPermission permission) { if(permission.isValid()) { try { List
todosFromWeb = // query a web service if(!todosCallbackInstance.isDisposed()) { if(syncWithUIThread()) { todosCallback.accept(todosFromWeb); } } } catch(Exception ex) { if(!todosCallbackInstance.isDisposed()) { if(syncWithUIThread()) { todosCallback.error(ex); } } } } } public void error(Exception ex) { // Oh no! } }); }); thread.start();}
      这是非常糟糕的编码,它可能会变得更糟,应该只显示一个可以用ReactiveX解决的例子。 这些问题通常被认为是回调地狱/大坑。

2、RxJava可观察类型

      为了存档这个RxJava,它附带了作为数据源的可观察类型,用于订阅这些可观察类型的类以及用于修改,组合和转换正在观察者和订阅者之间交换的数据的许多方法。
      其中一些方法与Java 8提供的Stream API非常相似,例如filter(),map()等等。
   
 表1.表可观察类型 
     
     可以重复或甚至无限发射数据的类型
Flowable<T>
Obervable<T>.
     
Observable
todoObservable = Observable.create(emitter -> { try { List
todos = getTodos(); for (Todo todo : todos) { emitter.onNext(todo); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); }});
      典型的Observable可能会发出无限数据,就像一个点击监听器一样,UI监听器是不可预测的,通常用户可能会点击按钮或其他UI小部件。通常终止成功或失败的类型
Maybe<T>
Single<T>和 
Completable
      Maybe<T>
对象是一种异步java.util.Optional从Java 8。
Maybe
> todoMaybe = Maybe.create(emitter -> { try { List
todos = getTodos(); if(todos != null && !todos.isEmpty()) { emitter.onSuccess(todos); (1) }else { emitter.onComplete(); (2) } } catch (Exception e) { emitter.onError(e); (3) }});
 
 
(1) java.util.Optional 与一个值
(2) java.util.Optional 不包含值→null
(3) 发生错误
Single<T>对象也可以被认为是承诺,在异步框架中也很受欢迎,并且类似于
Maybe<T>对象,但只有没有
onComplete()方法。
Completable对象与
Single<T>对象非常相似,但没有返回值,因此也不具有类似其他类型的泛型。
Completable对象也可以看作是反应式
java.lang.Runnable对象。
      除了这些可观察类型的最流行的
create()方法之外,还有更多的方便方法来创建这些类型之一。
  • Observable.just() - 允许在其他数据类型周围创建一个可观察的包装
  • Observable.fromIterable() - 接受一个java.lang.Iterable <T>,并在数据结构中按顺序排列它们的值
  • Observable.fromArray() - 获取一个数组,并在数据结构中按顺序排列它们的值
  • Observable.fromCallable() - 允许为java.util.concurrent.Callable <V>创建一个observable
  • Observable.fromFuture() - 允许为java.util.concurrent.Future创建一个observable
  • Observable.interval() - 在给定间隔内发出长对象的可观察值
  • ...

3、在RxJava中订阅

    一个可观察的实例是可用的监听器/用户可以附加。
    所有可观察类型都提供了各种各样的订阅方法。
Observable
todoObservable = Observable.create(emitter -> { ... });// Simply subscribe with a io.reactivex.functions.Consumer
, which will be informed onNext()Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));// Dispose the subscription when not interested in the emitted data any moredisposable.dispose();// Also handle the error case with a second io.reactivex.functions.Consumer
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());// ...
  •    有更多的io.reactivex.functions.Consumer <T> onNext,onSuccess,onFailure,onComplete等符合可观察类型。
  •    io.reactivex.functions.Consumer <T>几乎等于java 8中的java.util.function.Consumer,除了它的accept方法可以抛出异常。 此外,RxJava也不依赖Java 8,但与Java 6兼容。
     在可观察的实例上还有一个subscribeWith方法,可以像这样使用:

DisposableObserver
disposableObserver = todoObservable.subscribeWith(new DisposableObserver
() { @Override public void onNext(Todo t) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { }});

 4、处理订阅并使用CompositeDisposable

       当注册者或订阅者被附加时,他们通常不应该永久地聆听。所以可能会发生这样的情况:由于某些状态的改变,一个可观察的发射的事件可能不再是有趣的。
import io.reactivex.Single;import io.reactivex.disposables.Disposable;import io.reactivex.observers.DisposableSingleObserver;Single
> todosSingle = getTodos();Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver
>() { @Override public void onSuccess(List
todos) { // work with the resulting todos } @Override public void onError(Throwable e) { // handle the error case }});// continue working and dispose when value of the Single is not interesting any moredisposable.dispose();
  •        单一类和其他可观察类提供不同的订阅方法,返回一个Disposable对象。
       当使用多个订阅(由于使用CompositeDisposable进行相同状态更改可能会过时)可能会很方便地处理订阅集合。

import io.reactivex.Single;import io.reactivex.disposables.Disposable;import io.reactivex.observers.DisposableSingleObserver;import io.reactivex.disposables.CompositeDisposable;CompositeDisposable compositeDisposable = new CompositeDisposable();Single
> todosSingle = getTodos();Single
happiness = getHappiness();compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver
>() { @Override public void onSuccess(List
todos) { // work with the resulting todos } @Override public void onError(Throwable e) { // handle the error case }}));compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver
() { @Override public void onSuccess(Happiness happiness) { // celebrate the happiness :-D } @Override public void onError(Throwable e) { System.err.println("Don't worry, be happy! :-P"); }}));// continue working and dispose all subscriptions when the values from the Single objects are not interesting any morecompositeDisposable.dispose();

5.缓存已完成的可观察值的值

       当使用可观察器对可观察器上的每个订阅进行异步调用时,通常不是必需的。可能会发生在应用程序中传递观察器,而不需要在添加订阅的同时进行这样一个昂贵的调用。以下代码执行昂贵的网页查询4次,即使这样做一次会很好,因为应该显示相同的Todo对象,而只能以不同的方式显示。
Single
> todosSingle = Single.create(emitter -> { Thread thread = new Thread(() -> { try { List
todosFromWeb = // query a webservice System.out.println("Called 4 times!"); emitter.onSuccess(todosFromWeb); } catch (Exception e) { emitter.onError(e); } }); thread.start();});todosSingle.subscribe(... " Show todos times in a bar chart " ...);showTodosInATable(todosSingle);todosSingle.subscribe(... " Show todos in gant diagram " ...);anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);
下一个代码片段使用缓存方法,以便在第一次成功之后,
Single实例保持其结果。
Single
> todosSingle = Single.create(emitter -> { Thread thread = new Thread(() -> { try { List
todosFromWeb = // query a webservice System.out.println("I am only called once!"); emitter.onSuccess(todosFromWeb); } catch (Exception e) { emitter.onError(e); } }); thread.start();});// cache the result of the single, so that the web query is only done onceSingle
> cachedSingle = todosSingle.cache();cachedSingle.subscribe(... " Show todos times in a bar chart " ...);showTodosInATable(cachedSingle);cachedSingle.subscribe(... " Show todos in gant diagram " ...);anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);

6. Flowable<T> 和 Backpressure

      RxJava 2.0引入了一种新型的
Flowable <T>,它与API相当于
Observable <T>,但
Flowable <T>支持
Backpressure,而
Observable <T>则不支持。回到RxJava 1.0
Backpressure的概念太晚了,被添加到了
Observable的类型,但有些则抛出了一个
MissingBackpressureException,所以区分
Flowable <T>
Observable <T>是一件好事。除了
Observable <T>也可能
<T>,Single <T>
Completable没有
Backpressure

7.类型之间的转换

      很容易在不同的RxJava类型之间进行转换。
From / To Flowable Observable Maybe Single Completable

Flowable

 

toObservable()

reduce()

elementAt()
firstElement()
lastElement()
singleElement()

scan()

elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Observable

toFlowable()

 

reduce()

elementAt()
firstElement()
lastElement()
singleElement()

scan()

elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Maybe

toFlowable()

toObservable()

 

toSingle()

sequenceEqual()

toCompletable()

Single

toFlowable()

toObservable()

toMaybe()

 

toCompletable()

Completable

toFlowable()

toObservable()

toMaybe()

toSingle()

toSingleDefault()

 

8、测试RxJava可观察和订阅

8.1、 测试可观察量

      您可以通过RxJava库提供的TestSubscriber类来测试可观察值。
Observable
obs = ...// assume creation code hereTestSubscriber
testSubscriber = new TestSubscriber<>();obs.subscribe(testSubscriber);testSubscriber.assertNoErrors();List
chickens = testSubscriber.getOnNextEvents();// TODO assert your string integrity...

8.2、 测试可观察量

       RxJava提供了一种覆盖显示的调度程序的方法,以便可观察器被同步调用。 请参阅http://fedepaol.github.io/blog/2015/09/13/testing-rxjava-observables-subscriptions/作为示例。

9、RxJava的相关资源

10、原文链接

                                          

你可能感兴趣的文章
MongoDB工具MagicMongoDBTool使用介绍(一) -- 简单MongoDB入门
查看>>
javascript的事件
查看>>
android 打开SD卡文件夹,并获得选中文件的路径怎么实现?
查看>>
android 如何实现连接蓝牙打印机来实现打印功能
查看>>
CSS3 高级技术
查看>>
原型模式(Prototype )
查看>>
201521123009 《Java程序设计》第1周学习总结
查看>>
年终述职--常见问题分析解答
查看>>
C#_细说Cookie_Json Helper_Cookies封装
查看>>
对事件循环的一点理解
查看>>
在mui中创建aJax来请求数据..并展示在页面上
查看>>
spring 之AOP
查看>>
总结 15/4/23
查看>>
守护进程
查看>>
Windows 7环境下网站性能测试小工具 Apache Bench 和 Webbench使用和下载
查看>>
C#常见错误解决方法
查看>>
安装cnpm (npm淘宝镜像)
查看>>
js 利用事件委托解决mousedown中的click
查看>>
游戏设计艺术 第2版 (Jesse Schell 著)
查看>>
Java 面向对象(基础) 知识点总结I
查看>>