Я пытаюсь использовать Realm с RxJava и Retrofit таким образом, чтобы DanLew описал здесь конкатенирование ввода из области и модификации, но он застревает, если я добавляю сферу в цепочку
Observable.concat(countryStorage.restoreAsObservable(), networkService.api() .getCountries() .doOnNext(countryStorage::save)) .first() .observeOn(AndroidSchedulers.mainThread()) .subscribe(//never reaching here)
место хранения
@Override public Observable<List<Country>> restoreAsObservable() { Realm realm = realmProvider.get(); return realm.where(Country.class) .findAll() .asObservable() .map(countries -> return realm.copyFromRealm(countries)) .first(countries -> return !countries.isEmpty()) .doOnCompleted(realm::close()); }
Похоже, что это может случиться, что наблюдаемые горячие из Царства, но ничего об этом в документах и о том, как я могу составить Царство с другими наблюдаемыми?
ОБНОВЛЕНИЕ: Похоже, что он работает отлично по-старому. Остается вопрос о новых апи.
return Observable.just( realm.copyFromRealm(realm.where(Country.class).findAll())) .filter(countries -> !countries.isEmpty()) .doOnCompleted(realm::close);
Это происходит потому, что countryStorage.restoreAsObservable()
никогда не завершается, и если вы читаете concat doc, он явно заявляет, что:
Конкат ждет, чтобы подписаться на каждый дополнительный Наблюдаемый, который вы передадите ему, до тех пор, пока предыдущий Наблюдаемый не завершится.
Вместо этого вы можете просто сделать что-то вроде:
countryStorage.restoreAsObservable() .doOnSubscribe(() -> { networkService.api() .getCountries() .subscribe(countryStorage::save) }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(//do smth)