本文共 12732 字,大约阅读时间需要 42 分钟。
在响应编程中,消费者对数据进行反应,这就是为什么异步编程也被称为响应式编程的原因。 响应式编程允许将事件更改传播到已注册的观察者。
RxJava是从Netflix的反向扩展(Rx)到Java的端口。 RxJava是2014年开源的,托管于http://reactivex.io/。
“观察者模式做的正确。 ReactiveX是来自Observer模式,Iterator模式和功能编程的最佳创意的组合。“
--activex.io
这个概念的Java版本叫做RxJava,它托管在https://github.com/ReactiveX/RxJava下。 RxJava根据Apache 2.0许可证发布。
RxJava将自己描述为用于具有可观察流的异步编程的API。在撰写本文时,2.0.4版本目前是发布版本。 将g.a.v替换为2.0.6或更高版本。
对于Gradle构建,您可以通过以下依赖关系语句添加RxJava。
compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v' 对于Maven,您可以添加以下代码段的依赖关系
对于OSGi环境,例如Eclipse RCP开发,https://dl.bintray.com/simon-scholz/RxJava-OSGi/可用作p2更新站点。io.reactivex.rxjava2 rxjava g.a.v
一个例子是:
public List从主线程或UI线程调用getTodos()方法将导致一个非响应的应用程序,直到todosFromWeb到达。 为了改进这个查询,这需要不可预测的时间量,这个代码应该运行在不同的线程中,并在结果进入时通知主线程。getTodos() { List todosFromWeb = // query a webservice (with bad network latency) return todosFromDb;}
public void getTodos(Consumer现在调用 getTodos(Consumer <List <Todo >> todosConsumer)后,主线程可以继续工作,一旦调用了给定的Consumer的accept方法,就不会被阻塞,并且可以做出反应。 现在的代码真正是异步的。 但是如果发生Web服务查询中的错误怎么办?
> todosCallback) { Thread thread = new Thread(()-> { List todosFromWeb = // query a webservice todosCallback.accept(todosFromWeb); }); thread.start();}
public void getTodos(FailableCallback使用自定义 FailableCallback界面可以工作,但也增加了复杂性。 还有更多的问题可以发生:
> todosCallback) { Thread thread = new Thread(()-> { try { List todosFromWeb = // query a web service todosCallback.accept(todosFromWeb); } catch(Exception ex) { todosCallback.error(ex); } }); thread.start();}
public void getUserPermission(FailableCallback这是非常糟糕的编码,它可能会变得更糟,应该只显示一个可以用ReactiveX解决的例子。 这些问题通常被认为是回调地狱/大坑。permissionCallback) { Thread thread = new Thread(()-> { try { UserPermission permission = // query a web service permissionCallback.accept(permission); } catch(Exception ex) { permission.error(ex); } }); thread.start();}public void getTodos(FailableCallback
> todosCallback) { Thread thread = new Thread(()-> { getUserPermission(new FailableCallback() { public void accept(UserPermission permission) { if(permission.isValid()) { try { List todosFromWeb = // query a web service if(!todosCallbackInstance.isDisposed()) { if(syncWithUIThread()) { todosCallback.accept(todosFromWeb); } } } catch(Exception ex) { if(!todosCallbackInstance.isDisposed()) { if(syncWithUIThread()) { todosCallback.error(ex); } } } } } public void error(Exception ex) { // Oh no! } }); }); thread.start();}
Observable典型的Observable可能会发出无限数据,就像一个点击监听器一样,UI监听器是不可预测的,通常用户可能会点击按钮或其他UI小部件。通常终止成功或失败的类型 Maybe<T>, Single<T>和 Completable。 Maybe<T>对象是一种异步java.util.Optional从Java 8。todoObservable = Observable.create(emitter -> { try { List todos = getTodos(); for (Todo todo : todos) { emitter.onNext(todo); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); }});
Maybe
> todoMaybe = Maybe.create(emitter -> { try { List todos = getTodos(); if(todos != null && !todos.isEmpty()) { emitter.onSuccess(todos); (1) }else { emitter.onComplete(); (2) } } catch (Exception e) { emitter.onError(e); (3) }});
(1) | java.util.Optional 与一个值 |
(2) | java.util.Optional 不包含值→null |
(3) | 发生错误 |
ObservabletodoObservable = Observable.create(emitter -> { ... });// Simply subscribe with a io.reactivex.functions.Consumer , which will be informed onNext()Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));// Dispose the subscription when not interested in the emitted data any moredisposable.dispose();// Also handle the error case with a second io.reactivex.functions.Consumer Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());// ...
DisposableObserverdisposableObserver = todoObservable.subscribeWith(new DisposableObserver () { @Override public void onNext(Todo t) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { }});
import io.reactivex.Single;import io.reactivex.disposables.Disposable;import io.reactivex.observers.DisposableSingleObserver;Single
> todosSingle = getTodos();Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver
>() { @Override public void onSuccess(List todos) { // work with the resulting todos } @Override public void onError(Throwable e) { // handle the error case }});// continue working and dispose when value of the Single is not interesting any moredisposable.dispose();
import io.reactivex.Single;import io.reactivex.disposables.Disposable;import io.reactivex.observers.DisposableSingleObserver;import io.reactivex.disposables.CompositeDisposable;CompositeDisposable compositeDisposable = new CompositeDisposable();Single
> todosSingle = getTodos();Single happiness = getHappiness();compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver
>() { @Override public void onSuccess(List todos) { // work with the resulting todos } @Override public void onError(Throwable e) { // handle the error case }}));compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver () { @Override public void onSuccess(Happiness happiness) { // celebrate the happiness :-D } @Override public void onError(Throwable e) { System.err.println("Don't worry, be happy! :-P"); }}));// continue working and dispose all subscriptions when the values from the Single objects are not interesting any morecompositeDisposable.dispose();
Single
> todosSingle = Single.create(emitter -> { Thread thread = new Thread(() -> { try { List todosFromWeb = // query a webservice System.out.println("Called 4 times!"); emitter.onSuccess(todosFromWeb); } catch (Exception e) { emitter.onError(e); } }); thread.start();});todosSingle.subscribe(... " Show todos times in a bar chart " ...);showTodosInATable(todosSingle);todosSingle.subscribe(... " Show todos in gant diagram " ...);anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);
Single
> todosSingle = Single.create(emitter -> { Thread thread = new Thread(() -> { try { List todosFromWeb = // query a webservice System.out.println("I am only called once!"); emitter.onSuccess(todosFromWeb); } catch (Exception e) { emitter.onError(e); } }); thread.start();});// cache the result of the single, so that the web query is only done onceSingle
> cachedSingle = todosSingle.cache();cachedSingle.subscribe(... " Show todos times in a bar chart " ...);showTodosInATable(cachedSingle);cachedSingle.subscribe(... " Show todos in gant diagram " ...);anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);
From / To | Flowable | Observable | Maybe | Single | Completable |
---|---|---|---|---|---|
Flowable | toObservable() | reduce() elementAt()firstElement()lastElement()singleElement() | scan() elementAt()first()/firstOrError()last()/lastOrError()single()/singleOrError()all()/any()/count()(and more…) | ignoreElements() | |
Observable | toFlowable() | reduce() elementAt()firstElement()lastElement()singleElement() | scan() elementAt()first()/firstOrError()last()/lastOrError()single()/singleOrError()all()/any()/count()(and more…) | ignoreElements() | |
Maybe | toFlowable() | toObservable() | toSingle() sequenceEqual() | toCompletable() | |
Single | toFlowable() | toObservable() | toMaybe() | toCompletable() | |
Completable | toFlowable() | toObservable() | toMaybe() | toSingle() toSingleDefault() |
Observableobs = ...// assume creation code hereTestSubscriber testSubscriber = new TestSubscriber<>();obs.subscribe(testSubscriber);testSubscriber.assertNoErrors();List chickens = testSubscriber.getOnNextEvents();// TODO assert your string integrity...