Как игнорировать ошибку и продолжать бесконечный поток?

Я хотел бы знать, как игнорировать исключения и продолжать бесконечный поток (в моем случае поток местоположений)?

Я загружаю текущую позицию пользователя (используя Android-ReactiveLocation ), а затем отправляю их в свой API (с помощью Retrofit ).

В моем случае, когда исключение возникает во время сетевого вызова (например, тайм-аут), onError метод onError и поток останавливается сам. Как этого избежать?

Мероприятия:

 private RestService mRestService; private Subscription mSubscription; private LocationRequest mLocationRequest = LocationRequest.create() .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY) .setInterval(100); ... private void start() { mRestService = ...; ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this); mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest) .buffer(50) .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); } 

RestService:

 public interface RestService { @POST("/.../") Observable<Response> postLocations(@Body List<Location> locations); } 

Solutions Collecting From Web of "Как игнорировать ошибку и продолжать бесконечный поток?"

mRestService.postLocations(locations) испускают один элемент, затем завершают. Если ошибка возникает, то она испускает ошибку, которая заполняет поток.

Когда вы вызываете этот метод в flatMap , ошибка продолжается до вашего «основного» потока, а затем ваш поток останавливается.

Что вы можете сделать, так это преобразовать вашу ошибку в другой элемент (как описано здесь: https://stackoverflow.com/a/28971140/476690 ), но не в вашем основном потоке (как я полагаю, вы уже пробовали), а в mRestService.postLocations(locations) .

Таким образом, этот вызов выдает ошибку, которая будет преобразована в элемент / другой наблюдаемый, а затем завершена. (Без вызова onError ).

В представлении потребителя mRestService.postLocations(locations) будет выделять один элемент, а затем завершить, например, если все будет успешным.

 mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest) .buffer(50) .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can throw exception .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); 

Вы можете использовать один из операторов обработки ошибок .

  • onErrorResumeNext( ) – инструктирует Observable испускать последовательность элементов, если встречает ошибку
  • onErrorReturn( ) – инструктирует Observable испускать определенный элемент, когда он встречает ошибку
  • onExceptionResumeNext( ) – инструктирует Observable продолжать выдавать элементы после того, как он встречает исключение (но не другое разнообразие бросаемых)
  • retry( ) – если источник Observable испускает ошибку, повторно подпишитесь на нее в надежде, что она завершится без ошибок
  • retryWhen( ) – если источник Observable испускает ошибку, передайте эту ошибку другому наблюдаемому, чтобы определить, следует ли повторно подписываться на источник

Especialy onExceptionResumeNext и onExceptionResumeNext выглядят многообещающими в вашем случае.

Просто вставляя информацию о ссылке из ответа @ MikeN, она теряется:

 import rx.Observable.Operator; import rx.functions.Action1; public final class OperatorSuppressError<T> implements Operator<T, T> { final Action1<Throwable> onError; public OperatorSuppressError(Action1<Throwable> onError) { this.onError = onError; } @Override public Subscriber<? super T> call(final Subscriber<? super T> t1) { return new Subscriber<T>(t1) { @Override public void onNext(T t) { t1.onNext(t); } @Override public void onError(Throwable e) { onError.call(e); } @Override public void onCompleted() { t1.onCompleted(); } }; } } 

И использовать его близко к наблюдаемому источнику, потому что другие операторы могут с нетерпением отказаться от подписки до этого.

 Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe(); 

Обратите внимание, однако, что это подавляет доставку ошибок из источника. Если какой-либо из onNext в цепочке после того, как он выдает исключение, все же вероятно, что источник будет отписано.

Добавьте решение этой проблемы:

 privider .compose(ignoreErrorsTransformer) .subscribe() private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer = new Observable.Transformer<ResultType, ResultType>() { @Override public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) { return resultTypeObservable .materialize() .filter(new Func1<Notification<ResultType>, Boolean>() { @Override public Boolean call(Notification<ResultType> resultTypeNotification) { return !resultTypeNotification.isOnError(); } }) .dematerialize(); } }; 

Попробуйте вызвать службу останова в вызове Observable.defer. Таким образом, для каждого вызова у вас будет возможность использовать свой собственный «onErrorResumeNext», и ошибки не приведут к завершению вашего основного потока.

 reactiveLocationProvider.getUpdatedLocation(mLocationRequest) .buffer(50) .flatMap(locations -> Observable.defer(() -> mRestService.postLocations(locations)) .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>) ) ........ 

Это решение из этого потока -> RxJava Observable и Subscriber для исключения исключений? , Но я думаю, что это будет работать и в вашем случае.

Небольшая модификация решения (@MikeN), позволяющая завершить конечные потоки:

 import rx.Observable.Operator; import rx.functions.Action1; public final class OperatorSuppressError<T> implements Operator<T, T> { final Action1<Throwable> onError; public OperatorSuppressError(Action1<Throwable> onError) { this.onError = onError; } @Override public Subscriber<? super T> call(final Subscriber<? super T> t1) { return new Subscriber<T>(t1) { @Override public void onNext(T t) { t1.onNext(t); } @Override public void onError(Throwable e) { onError.call(e); //this will allow finite streams to complete t1.onCompleted(); } @Override public void onCompleted() { t1.onCompleted(); } }; } } 

Если вы просто хотите игнорировать ошибку внутри flatMap не возвращая элемент, выполните следующее:

 flatMap(item -> restService.getSomething(item).onErrorResumeNext(Observable.empty()) );