RxAndroidBle поддерживает постоянное соединение + запись / уведомление

Я создаю приложение для Android, которое имеет особые требования к Bluetooth Low Energy.

Мне нужно написать характеристику только для записи и получить ответы на отдельный атрибут уведомления, и мне нужно сделать это во многих и многих действиях. Есть ли способ Rx для отправки запроса на 1-й признак, дождитесь ответа на втором, а затем перейдите к другому запросу?

Кроме того, чтобы поделиться своим экземпляром RxAndroidBle, я подумал о том, чтобы сделать какой-то BleManager Singleton, где я бы разоблачил Observables, поэтому я могу легко подписаться на них в моем Ведущем. Я просто хочу избежать необходимости копировать логику соединения для каждого вида деятельности и иметь (в идеале) постоянное соединение. Таким образом, я мог только разоблачить connectionObservable и подписаться на него, поэтому я могу легко отправлять запросы на запись и получать уведомления, но я уверен, что есть лучший способ сделать это.

Это то, что у меня есть сейчас:

@Singleton public class BleManager { private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create(); private Observable<RxBleConnection> connectionObservable; private boolean isConnected; private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID"); private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID"); private final RxBleClient bleClient; private String mMacAddress; private final Context context; private RxBleDevice bleDevice; @Inject public BleManager(Context context, RxBleClient client) { Timber.d("Constructing BleManager and injecting members"); this.context = context; this.bleClient = client; } public void setMacAddress(String mMacAddress) { this.mMacAddress = mMacAddress; // Set the associated device on MacAddress change bleDevice = bleClient.getBleDevice(this.mMacAddress); } public String getMacAddress() { return mMacAddress; } public RxBleDevice getBleDevice() { Preconditions.checkNotNull(mMacAddress); return bleClient.getBleDevice(mMacAddress); } public Observable<RxBleScanResult> getScanSubscription() { Preconditions.checkNotNull(context); Preconditions.checkNotNull(bleClient); return bleClient.scanBleDevices().distinct(); } public Observable<RxBleConnection> getConnectionSubscription() { Preconditions.checkNotNull(context); Preconditions.checkNotNull(bleDevice); if (connectionObservable == null) { connectionObservable = bleDevice.establishConnection(context, false) .takeUntil(disconnectTriggerSubject) .observeOn(AndroidSchedulers.mainThread()) .doOnUnsubscribe(this::clearSubscription) .compose(new ConnectionSharingAdapter()); } return connectionObservable; } public Observable<byte[]> setupListeners() { return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID)) .doOnNext(notificationObservable -> Timber.d("Notification Setup")) .flatMap(notificationObservable -> notificationObservable) .observeOn(AndroidSchedulers.mainThread()); } private void triggerDisconnect() { disconnectTriggerSubject.onNext(null); } public Observable<byte[]> writeBytes(byte[] bytes) { return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic( BLE_WRITE_CHARACTERISTIC_UUID, bytes)).observeOn(AndroidSchedulers.mainThread()); } private boolean isConnected() { return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED; } /** * Will update the UI with the current state of the Ble Connection */ private void registerConnectionStateChange() { bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> { isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED); }); } private void clearSubscription() { connectionObservable = null; } } 

    Я немного подумал о вашем прецеденте. При совместном использовании того же соединения вы вводите состояния в свое приложение, которое требует некоторой обработки состояния, и поэтому невозможно (или, по крайней мере, я не знаю, как) быть чисто реактивным.

    Я сосредоточился на установлении соединения и выполнении передачи уведомлений с записью на устройство BLE, которое сериализовано.

     private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create(); private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create(); private Subscription connectionSubscription; private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time public void connect() { Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection() final UUID notificationUuid = // your notification characteristic UUID final UUID writeUuid = // your write-only characteristic UUID connectionSubscription = connectionObservable .flatMap( rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications (rxBleConnection, notificationObservable) -> // connection is established and notification prepared inputSubject // waiting for the data-packet to transmit .onBackpressureBuffer() .flatMap(bytesAndFilter -> { return Observable.combineLatest( // subscribe at the same time to notificationObservable.take(1), // getting the next notification bytes rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device (responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes ) .doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier }, 1 // serializing communication as only one Observable will be processed at the same time ) ) .flatMap(observable -> observable) .subscribe( response -> { /* ignored here - used only as side effect with outputSubject */ }, throwable -> outputSubject.onError(throwable) ); } public void disconnect() { if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) { connectionSubscription.unsubscribe(); connectionSubscription = null; } } public Observable<byte[]> writeData(byte[] data) { return Observable.defer(() -> { final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect() return outputSubject .filter(responseIdPair -> responseIdPair.second == uniqueId) .first() .map(responseIdPair -> responseIdPair.first); } ); } 

    Это подход, который я считаю хорошим, поскольку весь поток описывается в одном месте и поэтому его легче понять. Часть сообщения, которая является stateful (запрос на запись и ожидание ответа), сериализуется и имеет возможность продолжать соединение до вызова disconnect() .

    Недостатком является то, что передача зависит от побочных эффектов различного потока и вызова функции writeData() до того, как соединение будет установлено, и установка уведомлений никогда не завершит возвращенный Observable, хотя не должно быть проблемой добавить обработку для этого сценария с помощью Государственная проверка.

    С наилучшими пожеланиями

    Intereting Posts
    Я продолжаю получать сообщения: К сожалению, «некоторые китайцы» прекратили В чем разница между кучей далвика и родной кучей в андроиде? Который фиксирован. Voice Matcher для сопоставления голосового пароля в Android (сравнить голос) Как отменить диалоговое окно, например «Активность» при касании за пределами окна? Почему Google Images Service конвертирует мои изображения WEBP в JPG при их обслуживании? Преобразование PSD в формат XML (макет) для Android? Как обнаружить, когда клавиатура программного обеспечения Android скрыта? Как установить цвет rgb в android? Android google map blank Android Nested Fragments Issue "java.lang.IllegalStateException: активность была уничтожена" Google Play просит принять пользовательские разрешения, как в встроенных разрешениях в случае автоматического обновления О финише () в андроиде Error: Ошибка: недопустимые типы целых чисел (в 'layout_height' со значением '10') Как настроить код, создаваемый при использовании «Surround with try / catch»? Как подсчитать количество файлов в папке во внешнем хранилище?