Schedulers.io () не возвращается в основной поток

Я использую RxParse для анализа асинхронной загрузки запроса, но когда я подписываю свои наблюдаемые с помощью subscribeOn (Schedulers.io ()), мой метод onCompleted никогда не вызывается в основном потоке. Вместо этого мой метод onCompleted вызывается внутри пула рабочих потоков. Если я использую функцию наблюдения (AndroidSchedulers.mainThread), все будет работать, но мой onNextMethod также будет вызван в основной поток, и я не хочу этого.

Что-то не так в моем коде?

Что-то не так в моем коде?

ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(AndroidSchedulers.handlerThread(new Handler())) .subscribe( new Subscriber<MyObj>() { @Override public void onError(Throwable e) { Log.e("error","error",e); } @Override public void onNext(T t) { // ... worker thread (but here is ok) } public void onCompleted() { // ... worker thread again instead of mainThread } } ) ); 

К сожалению, подписка находится в одном потоке для всех методов ( onNext , onError и onCompleted

Но вы можете наблюдать в Schedulers.io() и внутри onNext(T t) , создать новый Observable для прослушивания в MainThread следующим образом:

 ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(Schedulers.io()) .subscribe( new Subscriber<MyObj>() { @Override public void onError(Throwable e) { Log.e("error","error",e); } @Override public void onNext(T t) { Observable.just(t) .observeOn(AndroidSchedulers.mainThread()) .subscribe((t) -> { // do something in MainThread }) } public void onCompleted() { // ... worker thread again instead of mainThread } } ) ); 

Надеюсь, это поможет!

Сначала вам нужно понять разницу между subscribeOn() и observeOn() . Это два совершенно разных оператора, которые влияют на разные части Rx-цепи.

subscribeOn() указывает, где ваша Observable будет выполнять свою работу. Это не повлияет на выполнение onNext() , onError() и onComplete() .

observeOn() указывает, где onNext() обратные вызовы (например, onNext() ). Это не повлияет на то, где ваша Observable выполняет свою работу.

Все обратные вызовы будут выполняться в одном потоке. Вы не можете указать, что некоторые обратные вызовы происходят в одном потоке, а некоторые происходят по другому через любые API RxJava. Если это поведение, которое вы желаете, вам придется реализовать его самостоятельно в своих обратных вызовах.

В этом случае я бы рекомендовал использовать операторы «бокового действия». Мне кажется, что это немного более элегантное решение, чем использование вложенных наблюдаемых:

 ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(AndroidSchedulers.handlerThread(new Handler())) .doOnCompleted(() -> onCompleteAction()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(value -> onNext(value)) .subscribe(); 

Не рекомендуется подписываться в рамках подписки.

subscribeOn определяет, где цепочка Observable будет запускаться, когда наблюдатель подписывается на нее.

observeOn может использоваться в разных точках (и несколько раз, если потребуется) во всей наблюдаемой цепочке, чтобы передавать управление между потоками . (Вы можете проверить это, проверив, находитесь ли вы в основном потоке или нет в каждом из этих блоков).

 ParseObservable.find(myQuery) .map(myMapFunc()) // Added this: .doOnNext(obj -> { // NOTE: This will happen on your `subscribeOn` scheduler // Do something with `obj` here while on worker thread } .subscribeOn(AndroidSchedulers.handlerThread(new Handler())) // Added this: .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<>() { next -> { // NOTE: This will happen on the main thread }, error -> { Log.e("error","error",e); // NOTE: This will happen on the main thread }, () -> { // NOTE: This will happen on the main thread } }); 
Intereting Posts
Приложение Launcher и домашний ключ несовместимы Разработка приложения для Android как для игры Google, так и для магазина Amazon Store Удаление значка приложения и текста из ActionBar Результат результата findViewById android Краткий способ написания новых классов DialogPreference? Android Родственный сбой в /system/lib/libskia.so (обработка Bitmap) Как я могу прочитать json-файл с SD-карты Получение уникального идентификатора для маршрутизатора WiFi Использование AsyncTask для отправки электронной почты Android Локальное обслуживание или удаленный сервис? Отправка SMS без использования Android SmsManager .. Собственная библиотека? Как создать импульсный эффект ImageView с использованием девяти старых анимаций андроидов Как выбрать и скопировать текст в представлении, которое не связано с деятельностью? Как установить макет на 7 "двух разных планшета? Проверьте, можно ли безопасно отклонить диалог