RxJava – загрузка файлов последовательно – испускает следующий элемент, когда onNext вызывает

У меня есть метод, который одновременно загружает несколько файлов в облачное хранилище. Это выглядит примерно так:

List<String> files = Arrays.asList("file0", "file1", "file2"); Observable.from(files) .flatMap(file -> uploadFile(file) .flatMap(done -> notifyFinished(file))) .subscribe(this::onNext, this::onError, this::onCompleted); private Observable<Boolean> uploadFile(String file) { Timber.d("Uploading: " + file); return Observable.just(true).delay(6, TimeUnit.SECONDS); } private Observable<Boolean> notifyFinished(String file) { Timber.d("Notify finished: " + file); return Observable.just(true).delay(3, TimeUnit.SECONDS); } 

Результатом этого является:

 06-09 02:10:04.779 D: Uploading: file0 06-09 02:10:04.780 D: Uploading: file1 06-09 02:10:04.781 D: Uploading: file2 06-09 02:10:10.782 D: Notify finished: file1 06-09 02:10:10.782 D: Notify finished: file0 06-09 02:10:10.783 D: Notify finished: file2 06-09 02:10:13.784 D: onNext 06-09 02:10:13.786 D: onNext 06-09 02:10:13.786 D: onNext 06-09 02:10:13.787 D: onCompleted 

Я хочу, чтобы он работал последовательно, например:

 1) Uploading: file0 2) Notify finished: file0 3) onNext 4) Uploading: file1 5) Notify finished: file1 6) onNext ... 

Можно ли сделать что-то подобное с помощью Rx?

РЕДАКТИРОВАТЬ

Замена первой flatMap с помощью concatMap выполнила задание. Я думал, что знаю разницу между этими операторами, но этот пример просто показал, что я ничего не знаю … Теперь вывод:

 06-09 02:15:00.581 D: Uploading: file0 06-09 02:15:06.584 D: Notify finished: file0 06-09 02:15:09.586 D: onNext 06-09 02:15:09.587 D: Uploading: file1 06-09 02:15:15.590 D: Notify finished: file1 06-09 02:15:18.593 D: onNext 06-09 02:15:18.595 D: Uploading: file2 06-09 02:15:24.598 D: Notify finished: file2 06-09 02:15:27.599 D: onNext 06-09 02:15:27.601 D: onCompleted 

Если вы хотите «упорядочить» последовательные последовательные, просто используйте concatMap() вместо flatMap()

Создайте наблюдаемый файл и сравните три наблюдаемых

 @Test public void testContact() { Observable.concat(Observable.just(uploadFile(file1)), Observable.just(uploadFile(file2)), Observable.just(uploadFile(file3))) .flatMap(file -> notifyFinished(file))) .subscribe(this::onNext, this::onError, this::onCompleted); } 

Вам нужно будет сделать метод notifyFinished, чтобы вернуть наблюдаемый файл вместо boolean.

Вы также можете использовать слияние или zip, у вас есть больше примеров объединения наблюдаемых здесь https://github.com/politrons/reactive/tree/master/src/test/java/rx/observables/combining