Каков правильный способ обработки подписки в RxJava / RxAndroid для жизненного цикла активности?

Я просто начинаю работу с RxJava / RxAndroid. Я хочу избежать утечек контекста, поэтому я создал BaseFragment следующим образом:

public abstract class BaseFragment extends Fragment { protected CompositeSubscription compositeSubscription = new CompositeSubscription(); @Override public void onDestroy() { super.onDestroy(); compositeSubscription.unsubscribe(); } } 

И внутри моего фрагмента, который расширяет BaseFragment, я делаю это:

 protected void fetchNewerObjects(){ if(!areNewerObjectsFetching()){ //if it is not already fetching newer objects Runtime.getRuntime().gc();//clean out memory if possible fetchNewObjectsSubscription = Observable .just(new Object1()) .map(new Func1<Object1, Object2>() { @Override public Object2 call(Object1 obj1) { //do bg stuff return obj2; } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Object2>() { @Override public void onCompleted() { compositeSubscription.remove(fetchNewObjectsSubscription); fetchNewObjectsSubscription = null; } @Override public void onError(Throwable e) { } @Override public void onNext(ArrayList<NewsFeedObject> newsFeedObjects) { //do stuff } }); //add subscription to composite subscription so it can be unsubscribed onDestroy() compositeSubscription.add(fetchNewObjectsSubscription); } } protected boolean areNewerObjectsFetching(){ if(fetchNewObjectsSubscription == null || fetchNewObjectsSubscription.isUnsubscribed()){ //if its either null or is in a finished status return false; } return true; } 

Поэтому, я думаю, мой вопрос в два раза:

  1. Остановит ли этот контекст утечку, потому что я отписываю onDestroy ()?

  2. И правильно ли я отслеживаю, насколько наблюдаемый «работает», установив подписку на нуль после завершения и проверки недействительности?

Solutions Collecting From Web of "Каков правильный способ обработки подписки в RxJava / RxAndroid для жизненного цикла активности?"

  1. Да, он остановится, но вы также должны установить подписку на null в onError (или после ошибки, вы не будете загружать элементы снова).

    Также не забывайте, что фрагмент можно остановить, но не уничтожить (например, в фоновом стеке), и вы можете не заметить что-либо в этом случае. Если вы переходите от unsubscribe onDestroy на onStop , не забудьте инициализировать compositeSubscription onCreateView в onCreateView каждый раз, когда создается представление (потому что после того, как CompositeSubscription отменена, вы больше не можете добавлять туда подписки).

  2. Да исправить. Но я думаю, что compositeSubscription.remove может быть опущен, потому что вы уже проверяете значение null.

Для управления жизненным циклом деятельности вам не нужна сторонняя библиотека. Попробуйте следующие коды:

 public class LifecycleBinder { public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Activity target, LifecycleEvent event) { final Application app = target.getApplication(); final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create(); final Application.ActivityLifecycleCallbacks callbacks = new Application.ActivityLifecycleCallbacks() { @Override public void onActivityCreated(Activity activity, Bundle savedInstanceState) { } @Override public void onActivityStarted(Activity activity) { } @Override public void onActivityResumed(Activity activity) { } @Override public void onActivityPaused(Activity activity) { if (activity == target) publishSubject.onNext(LifecycleEvent.ON_PAUSED); } @Override public void onActivityStopped(Activity activity) { if (activity == target) publishSubject.onNext(LifecycleEvent.ON_STOPPED); } @Override public void onActivitySaveInstanceState(Activity activity, Bundle outState) { if (activity == target) publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE); } @Override public void onActivityDestroyed(Activity activity) { if (activity == target) publishSubject.onNext(LifecycleEvent.ON_DESTROYED); } }; app.registerActivityLifecycleCallbacks(callbacks); return subscribeUtilEvent(publishSubject, event, new Action0() { @Override public void call() { app.unregisterActivityLifecycleCallbacks(callbacks); } }); } public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Fragment target, LifecycleEvent event) { final FragmentManager manager = target.getFragmentManager(); if (manager == null) { throw new NullPointerException("fragment manager is null!"); } final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create(); final FragmentManager.FragmentLifecycleCallbacks callbacks = manager.new FragmentLifecycleCallbacks() { @Override public void onFragmentPreAttached(FragmentManager fm, Fragment f, Context context) { } @Override public void onFragmentAttached(FragmentManager fm, Fragment f, Context context) { } @Override public void onFragmentCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) { } @Override public void onFragmentActivityCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) { } @Override public void onFragmentViewCreated(FragmentManager fm, Fragment f, View v, Bundle savedInstanceState) { } @Override public void onFragmentStarted(FragmentManager fm, Fragment f) { } @Override public void onFragmentResumed(FragmentManager fm, Fragment f) { } @Override public void onFragmentPaused(FragmentManager fm, Fragment f) { if (f == target) publishSubject.onNext(LifecycleEvent.ON_PAUSED); } @Override public void onFragmentStopped(FragmentManager fm, Fragment f) { if (f == target) publishSubject.onNext(LifecycleEvent.ON_STOPPED); } @Override public void onFragmentSaveInstanceState(FragmentManager fm, Fragment f, Bundle outState) { if (f == target) publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE); } @Override public void onFragmentViewDestroyed(FragmentManager fm, Fragment f) { if (f == target) publishSubject.onNext(LifecycleEvent.ON_VIEW_DESTORYED); } @Override public void onFragmentDestroyed(FragmentManager fm, Fragment f) { if (f == target) publishSubject.onNext(LifecycleEvent.ON_DESTROYED); } @Override public void onFragmentDetached(FragmentManager fm, Fragment f) { if (f == target) publishSubject.onNext(LifecycleEvent.ON_DESTROYED); } }; manager.registerFragmentLifecycleCallbacks(callbacks, true); return subscribeUtilEvent(publishSubject, event, new Action0() { @Override public void call() { manager.unregisterFragmentLifecycleCallbacks(callbacks); } }); } private static <R, T> Observable.Transformer<R, R> subscribeUtilEvent(final Observable<T> source, final T event, final Action0 doOnComplete) { return new Observable.Transformer<R, R>() { @Override public Observable<R> call(Observable<R> rObservable) { return rObservable.takeUntil(takeUntilEvent(source, event)).doOnCompleted(doOnComplete); } }; } private static <T> Observable<T> takeUntilEvent(final Observable<T> src, final T event) { return src.takeFirst(new Func1<T, Boolean>() { @Override public Boolean call(T lifecycleEvent) { return lifecycleEvent.equals(event); } }); } } 

События жизненного цикла:

 public enum LifecycleEvent { ON_PAUSED, ON_STOPPED, ON_SAVE_INSTANCE_STATE, ON_DESTROYED, ON_VIEW_DESTORYED, ON_DETACHED, } 

Применение:

 myObservable .compose(LifecycleBinder.subscribeUtilEvent(this, LifecycleEvent.ON_DESTROYED)) .subscribe();