DEV Community

Cover image for Room + RxJava - simple database observer
Róbert Ďuriančík
Róbert Ďuriančík

Posted on

Room + RxJava - simple database observer

Recently I worked on a project where I needed to update android app geofences based on the data in the local database in real time. That means, if an item was added to the database I’d have to create a geofence for it or remove the geofence if the item was removed.

To set up the database I decided to use the Room persistence library and utilize RxJava as it was already a big part of the project. Room provides a pretty good abstraction layer over SQLite, it’s easy to set it up and with the help of RxJava it allows to asynchronously observe the data. You can find out more about how to do queries with RxJava in this excellent post.

So, the goal was to observe the database in a service. I found out that the only way how Room let me observe all data in real time is to create a query that returns a list of all items in the database every time a change is made. Something like this 👇

@Query(SELECT * FROM Table)
fun observeItems(): Flowable<List<Item>>

But with this approach, I’d have to store a copy of the list in the service and compare it with the received list to find out which items were removed, added our updated.


To avoid this situation here is a quick tip on how to build a simple database observer which informs you about the ongoing events. For the demonstration purposes, I created a demo To-Do app (even though the logic could be way simpler). You can find it here.

1. Step

Firstly, we start with creating our Task entity which holds all information we need to know:

@Parcelize
@Entity
data class Task(
    var description: String,
    @PrimaryKey(autoGenerate = true) val id: Int = 0,
    val created: Long = System.currentTimeMillis(),
    var isDone: Boolean = false
) : Parcelable

2. Step

In the second step, we design the TaskDao interface where we include all data manipulating/retrieving methods and set up the TaskDatabase:

@Dao
interface TaskDao {
    @Insert
    fun insertTask(task: Task): Completable

    @Update
    fun updateTask(task: Task): Completable

    @Query("SELECT * FROM Task")
    fun getAll(): Single<List<Task>>

    @Delete
    fun deleteTask(task: Task): Completable
 }
@Database(entities = [Task::class], version = 1, exportSchema = false)
abstract class TaskDatabase : RoomDatabase() {
    abstract fun taskDao(): TaskDao
}

The most important method for our observer initialization is getAll(). It’ll allow us to retrieve all saved data before real-time observing. As you can see, for basic operations such as insert, delete and update I chose Completable as a return type because it fits me the best. But you can use whatever type you want provided it informs you if the operation was successful or not.

3. Step

As the first thing of this step, we create the DatabaseEventType enum class that stores all possible events we want to observe. In my case, the events are INSERTED, UPDATED, REMOVED.

enum class DatabaseEventType {
    INSERTED,
    UPDATED,
    REMOVED
}

Then we make a simple generic class that will wrap the event and the value that caused it.

data class DatabaseEvent<T>(val eventType: DatabaseEventType, val value: T)

4. Step

Lastly, when we have all the necessary components implemented, we can finally build our observer. Basically, we create a layer over the task database which will serve as a single point of control.

class TaskRepository private constructor(context: Context) {

    private val mTaskDao = Room.databaseBuilder(context, TaskDatabase::class.java, "tasks-database")
        .build()
        .taskDao()

    private val mObserverSubject = PublishSubject.create<DatabaseEvent<Task>>()

    fun addTask(task: Task): Completable {
        val insertEvent = DatabaseEvent(DatabaseEventType.INSERTED, task)
        return mTaskDao.insertTask(task)
            .doOnComplete { mObserverSubject.onNext(insertEvent) }
    }

    fun deleteTask(task: Task): Completable {
        val deleteEvent = DatabaseEvent(DatabaseEventType.REMOVED, task)
        return mTaskDao.deleteTask(task)
            .doOnComplete { mObserverSubject.onNext(deleteEvent) }
    }

    fun updateTask(task: Task): Completable {
        val updateEvent = DatabaseEvent(DatabaseEventType.UPDATED, task)
        return mTaskDao.updateTask(task)
            .doOnComplete { mObserverSubject.onNext(updateEvent) }
    }

    fun observeTasks(): Observable<DatabaseEvent<Task>> {
        return mTaskDao.getAll()
            .flatMapObservable { Observable.fromIterable(it) }
            .map { DatabaseEvent(DatabaseEventType.INSERTED, it) }
            .concatWith(mObserverSubject)
    }

    companion object {
        @Volatile
        private var instance: TaskRepository? = null

        fun getInstance(context: Context): TaskRepository {
            return instance ?: synchronized(this) {
                instance ?: TaskRepository(context.applicationContext).also { instance = it }
            }
        }
    }
}

mObserverSubject is a RxJava PublishSubject responsible for real-time observing of database events. All three methods addTask(), deleteTask() and updateTask() utilize TaskDao to make the given operation. When we know the operation is successful (in our case when the Completable completes) we post a DatabaseEvent object to this subject. PublishSubject doesn't retain/cache items so a new subscriber won't receive any past items. As we want to know the content of our database when we start to observe it, we must retrieve items via TaskDao and map them to DatabaseEvent. This is the purpose of the observeTasks() method.


And that's it, we built our database observer. Any time we need to observe the database we can call the observeTask() method and subscribe to the Observable. I hope this post will help you simplify your project or better understand how you can utilize RxJava with Room. If you have any questions feel free to ask in the comment section. In my next post, I'll explain how you can replace RxJava with Coroutines.

Top comments (0)