RXJava – наблюдаемый буфер 1 до наблюдаемого 2 испускает один элемент

Мне нужно следующее поведение:

observableMain должен буферизировать все элементы до тех пор, пока observableResumed испустит значение. Затем observableMain должен испускать все буферизированные и все значения признаков …

Что я делаю в своей activity's onCreate :

  PublishSubject<T> subject = ...; // I create a subject to emit items to and to subscribe to // 1) I create a main observable from my subject final Observable<T> observableMain = subject .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); // 2) I use a custom base class in which I can register listeners // for the onResume event and which I can query the isResumed state! // I call the object the pauseResumeProvider! // 2.1) I create an observable, it emits a value ONLY if my activity is resumed final Observable<Boolean> obsIsResumed = Observable .defer(() -> Observable.just(pauseResumeProvider.isActivityResumed())) .skipWhile(aBoolean -> aBoolean != true); // 2.2) I create a second observable, it emits a value as soon as my activity is resumed final Observable<Boolean> obsOnResumed = Observable.create(new Observable.OnSubscribe<Boolean>() { @Override public void call(final Subscriber<? super Boolean> subscriber) { pauseResumeProvider.addPauseResumeListener(new IPauseResumeListener() { @Override public void onResume() { pauseResumeProvider.removePauseResumeListener(this); subscriber.onNext(true); subscriber.onCompleted(); } @Override public void onPause() { } }); } }); // 2.3) I combine the resumed observables and only emit the FIRST value I can get final Observable<Boolean> observableResumed = Observable .concat(obsIsResumed, obsOnResumed) .first(); // 3) here I'm stuck // 3 - Variant 1: Observable<T> observable = observableMain .buffer(observableResumed) .concatMap(values -> Observable.from(values)); // 3 - Variant 2: // Observable<T> observable = observableMain.delay(t -> observableResumed); // 4) I emit events to my my subject... // this event is LOST! subject.onNext("Test in onCreate"); 

проблема

Все элементы, которые отправляются subject после возобновления работы, работают, все элементы до этого теряются (по крайней мере, с решением о delay ). Я не могу заставить свое желаемое поведение работать. Как я могу правильно это решить?

Попросите источник воспроизвести и используйте задержку подписки для активации реальной подписки.

 PublishSubject<Integer> emitNow = PublishSubject.create(); ConnectableObservable<T> co = source.replay(); co.subscribe(); co.connect(); co.delaySubscription(emitNow).subscribe(...); emitNow.onNext(1); 

Редактировать:

Вот суть с оператором, который вы можете lift в последовательности, которая может приостанавливать и возобновлять выбросы от восходящего потока.

Другой способ сделать это – задержать выбросы источника (горячего) наблюдаемого до тех пор, пока не откроется «клапан»

 source.delay(new Func1<Integer, Observable<Boolean>>() { @Override public Observable<Boolean> call(Integer integer) { return valve.filter(new Func1<Boolean, Boolean>() { @Override public Boolean call(Boolean isOpen) { //emits only when isOpen is true return isOpen; } }); } }) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println("out: " + integer); } }); 

Gist: Rx Valve