Firebase Cloud Firestore の購読処理を RxJava / RxKotlin に変換する Extension
はじめに
Firebase の Cloud Firestore の購読処理を DocumentSnapshot または QuerySnapshot をRxに変換する方法を紹介します。
使いやすいように改善
購読処理には、 addSnapshotListener()
のリスナー使用します。
ポイントとしては以下の2つです。
- エラーの場合は
onError
, 変更された場合はonNext
で流がす - Rxが解放された場合はリスナーも
remove()
を呼び出しておく
/** * QuerySnapshotをRxで購読 * * @param metadataChanges メタデータの変更フラグ(追加削除だけでなく内容が変更された時にも通知するかどうか) * @return QuerySnapshot */ @CheckResult fun Query.listen(metadataChanges: MetadataChanges = MetadataChanges.INCLUDE): Flowable<QuerySnapshot> { return Observable.create<QuerySnapshot> { emitter -> val listener = addSnapshotListener(metadataChanges) { snapshot, exception -> if (exception != null) { emitter.onError(exception) } if (snapshot != null && snapshot.documentChanges.isNotEmpty()) { emitter.onNext(snapshot) } } emitter.setCancellable { listener.remove() } }.toFlowable(BackpressureStrategy.LATEST) } /** * DocumentSnapshotをRxで購読 * * @param metadataChanges メタデータの変更フラグ(追加削除だけでなく内容が変更された時にも通知するかどうか) * @return DocumentSnapshot */ @CheckResult fun DocumentReference.listen(metadataChanges: MetadataChanges = MetadataChanges.INCLUDE): Flowable<DocumentSnapshot> { return Observable.create<DocumentSnapshot> { emitter -> val listener = addSnapshotListener(metadataChanges) { snapshot, exception -> if (exception != null) { emitter.onError(exception) } if (snapshot != null && snapshot.exists()) { emitter.onNext(snapshot) } } emitter.setCancellable { listener.remove() } }.toFlowable(BackpressureStrategy.LATEST) }
さらに使いやすく改善
こちらの記事で紹介している toObject / toObjects の関数も用意してみてください。
以下のようにスマートにかけるようになります!
FirebaseFirestore.getInstance() .collection("messages") .listen() .toObjects<Message>() .subscribe { // 購読処理 }
補足
今回は購読処理について紹介しましたが、取得処理についてもこちらで紹介していますので、よかったら参考にしてください。