Шаблон RxJava для запроса удаленного наблюдения с временным кешем

Вариант использования: я хочу временно кэшировать последний испускаемый дорогой отзыв Observable, но после его истечения вернитесь к дорогому источнику Observable и снова его кешируйте и т. Д.

Достаточно простой сетевой сценарий кэширования, но я действительно пытаюсь заставить его работать.

private Observable<String> getContentObservable() { // expensive upstream source (API, etc.) Observable<String> sourceObservable = getSourceObservable(); // cache 1 result for 30 seconds, then return to the source return sourceObservable .replay(1, 30, TimeUnit.SECONDS) .autoConnect() .switchIfEmpty(sourceObservable); } 

Исходный запрос: переходит к исходному второму запросу в течение 30 секунд с момента испускания источника: доставлено из кэша Третий запрос за пределами окна истечения срока кеширования: ничего. Я подписываюсь на него, и у меня нет данных, но он не переключается на исходный источник Observable.

Похоже, я просто подключаюсь к ConnectableObservable из autoConnect() и он никогда не заканчивается пустым, поэтому он никогда не запускает мой switchIfEmpty() .

Как использовать эту комбинацию replay(1,x,x) и switchIfEmpty() ?

Или я просто подхожу к этому неправильно с самого начала?

Solutions Collecting From Web of "Шаблон RxJava для запроса удаленного наблюдения с временным кешем"

Таким образом, получается, что вы можете использовать общий ресурс Jake Wharton для кэширования последнего значения даже после удаления. https://github.com/JakeWharton/RxReplayingShare

 return sourceObservable .replay(1, 30, TimeUnit.SECONDS) .autoConnect() .switchIfEmpty(sourceObservable); 

Исходный запрос: переходит к исходному второму запросу в течение 30 секунд с момента испускания источника: доставлено из кэша Третий запрос за пределами окна истечения срока кеширования: ничего. Я подписываюсь на него, и у меня нет данных, но он не переключается на исходный источник Observable.

Проблема заключается в том, что повторение повторяет ту же последовательность, что и sourceObservable за последние 30 секунд, но когда вы подписываетесь через 30 секунд, в последовательности нет событий, даже нет onCompleted() , поэтому вы не можете switchIfEmpty() , Он не будет работать, поскольку он зависит от сигнала 'onCompleted()' и без каких-либо выбросов, чтобы знать, что он «пуст».

В общем случае использование повтора недостаточно для сценария кэширования, так как вам нужен способ повторной подписки в случае, если срок действия кеша истек, а также сделать это по требованию, что означает, что только когда какой-либо клиент подписывается на него. (Вы можете делать кеш, который обновляется каждый раз в 30 секунд, но я не думаю, что это не желаемое поведение)


Итак, как предложил @Yurly Kulikov, вам нужно поддерживать состояние и контролировать операцию подписки для поддержания состояния. Но я думаю, что в решении есть большой поток, поскольку он на самом деле не является exatcly потокобезопасным, то есть если 2 подписывается на него 1 за другим, скажите A и B, а A выполняет запрос и ждет, чтобы сохранить новый В результате кэш, B также может подписаться, и другой запрос будет выполнен, так как кешированное значение еще не установлено A (он еще не завершил первый сетевой запрос.

Я предлагаю использовать аналогичный подход с другой реализацией, которую я предложил здесь :

 public class CachedRequest<T> { private final AtomicBoolean expired = new AtomicBoolean(true); private final Observable<T> source; private final long cacheExpirationInterval; private final TimeUnit cacheExpirationUnit; private Observable<T> current; public CachedRequest(Observable<T> o, long cacheExpirationInterval, TimeUnit cacheExpirationUnit) { source = o; current = o; this.cacheExpirationInterval = cacheExpirationInterval; this.cacheExpirationUnit = cacheExpirationUnit; } private Observable<T> getCachedObservable() { return Observable.defer(() -> { if (expired.compareAndSet(true, false)) { current = source.cache(); Observable.timer(cacheExpirationInterval, cacheExpirationUnit) .subscribe(aLong -> expired.set(true)); } return current; }); } } 

С отсрочкой вы можете вернуть право Наблюдаемый в соответствии с статусом истечения срока действия кеша, поэтому каждая подписка, произошедшая в течение срока действия кеша, будет кэшироваться Наблюдаемая (с использованием cache ()) – значение запроса будет выполняться только один раз. После истечения срока действия кеша дополнительная подписка вызовет новый запрос и установит новый таймер для сброса срока действия кеша.

Вам нужно будет поддерживать состояние, разделяемое между многими абонентами. Вот почему вы не можете создавать Observable каждый раз, когда вызывается getContentObservable ().

Один из способов сделать это – создать Наблюдаемый снаружи, удерживать внутреннее состояние в Observable (например, используя буфер), но выполнение поведения с состоянием часто проще без Observables.

Вот пример с общим состоянием в поле:

 private Optional<String> cached = Optional.empty(); private Observable<String> getContentObservable() { //use defer to delay cache evaluation to the point when someone subscribes return Observable.defer( () -> cached.isPresent() ? Observable.just(cached) : fetchAndCache() ) //use the same scheduler for every cached field access .subscribeOn(scheduler); } private Observable<String> fetchAndCache() { Observable<String> cachedSource = getSourceObservable() //I assume you only need one, make sure it is 1 .take(1) .cache(); cachedSource .observeOn(scheduler) //side-effect stores the state .doOnNext(str -> cached = Optional.of(str)) .flatMap(str -> Observable.timer(30, TimeUnit.SECONDS, scheduler)) //another side-effect clears the cache .subscribe(l -> cached = Optional.empty()); return cachedSource; }