Я использую RxParse для анализа асинхронной загрузки запроса, но когда я подписываю свои наблюдаемые с помощью subscribeOn (Schedulers.io ()), мой метод onCompleted никогда не вызывается в основном потоке. Вместо этого мой метод onCompleted вызывается внутри пула рабочих потоков. Если я использую функцию наблюдения (AndroidSchedulers.mainThread), все будет работать, но мой onNextMethod также будет вызван в основной поток, и я не хочу этого.
Что-то не так в моем коде?
Что-то не так в моем коде?
ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(AndroidSchedulers.handlerThread(new Handler())) .subscribe( new Subscriber<MyObj>() { @Override public void onError(Throwable e) { Log.e("error","error",e); } @Override public void onNext(T t) { // ... worker thread (but here is ok) } public void onCompleted() { // ... worker thread again instead of mainThread } } ) );
К сожалению, подписка находится в одном потоке для всех методов ( onNext
, onError
и onCompleted
Но вы можете наблюдать в Schedulers.io()
и внутри onNext(T t)
, создать новый Observable
для прослушивания в MainThread
следующим образом:
ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(Schedulers.io()) .subscribe( new Subscriber<MyObj>() { @Override public void onError(Throwable e) { Log.e("error","error",e); } @Override public void onNext(T t) { Observable.just(t) .observeOn(AndroidSchedulers.mainThread()) .subscribe((t) -> { // do something in MainThread }) } public void onCompleted() { // ... worker thread again instead of mainThread } } ) );
Надеюсь, это поможет!
Сначала вам нужно понять разницу между subscribeOn()
и observeOn()
. Это два совершенно разных оператора, которые влияют на разные части Rx-цепи.
subscribeOn()
указывает, где ваша Observable будет выполнять свою работу. Это не повлияет на выполнение onNext()
, onError()
и onComplete()
.
observeOn()
указывает, где onNext()
обратные вызовы (например, onNext()
). Это не повлияет на то, где ваша Observable выполняет свою работу.
Все обратные вызовы будут выполняться в одном потоке. Вы не можете указать, что некоторые обратные вызовы происходят в одном потоке, а некоторые происходят по другому через любые API RxJava. Если это поведение, которое вы желаете, вам придется реализовать его самостоятельно в своих обратных вызовах.
В этом случае я бы рекомендовал использовать операторы «бокового действия». Мне кажется, что это немного более элегантное решение, чем использование вложенных наблюдаемых:
ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(AndroidSchedulers.handlerThread(new Handler())) .doOnCompleted(() -> onCompleteAction()) .observeOn(AndroidSchedulers.mainThread()) .doOnNext(value -> onNext(value)) .subscribe();
Не рекомендуется подписываться в рамках подписки.
subscribeOn
определяет, где цепочка Observable будет запускаться, когда наблюдатель подписывается на нее.
observeOn
может использоваться в разных точках (и несколько раз, если потребуется) во всей наблюдаемой цепочке, чтобы передавать управление между потоками . (Вы можете проверить это, проверив, находитесь ли вы в основном потоке или нет в каждом из этих блоков).
ParseObservable.find(myQuery) .map(myMapFunc()) // Added this: .doOnNext(obj -> { // NOTE: This will happen on your `subscribeOn` scheduler // Do something with `obj` here while on worker thread } .subscribeOn(AndroidSchedulers.handlerThread(new Handler())) // Added this: .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<>() { next -> { // NOTE: This will happen on the main thread }, error -> { Log.e("error","error",e); // NOTE: This will happen on the main thread }, () -> { // NOTE: This will happen on the main thread } });