I am using observable-fns (which can also work with threads.js). Not sure how powerful RxJS and RxJava can be...
Still missing one important feature though, cancellability.
I am working on SQLite-SQLite and SQLite-MongoDB syncing.
/**
* DbSqlite needs three basic columns
*
* [uid] TEXT PRIMARY KEY,
* date_created DATETIME DEFAULT CURRENT_TIMESTAMP,
* date_sync DATETIME,
*/
function replicate (from: DbSqlite, to: DbSqlite, uids?: string[]) {
return new Observable<{
message: string
percent?: number
}>((obs) => {
(async () => {
const getDateSync = (r: any) => Math.max(r.date_created, r.date_sync || 0)
await to.transaction(() => {
to.sql.db.parallelize(() => {
const syncTable = (tableName: string) => {
from.sql.each(/*sql*/`
SELECT * FROM ${safeColumnName(tableName)}
`, (_: any, r1: any) => {
const uid = r1.uid
if (uids && !uids.includes(uid)) {
return
}
to.sql.db.get(/*sql*/`
SELECT date_created, date_sync FROM ${safeColumnName(tableName)} WHERE [uid] = @uid
`, { uid }, (_: any, r2: any) => {
const updateSync = () => {
r1.date_sync = +new Date()
to.sql.db.run(/*sql*/`
REPLACE INTO ${safeColumnName(tableName)} (${Object.keys(r1).map(safeColumnName)})
VALUES (${Object.keys(r1).map((c) => `@${c}`)})
`, r1)
}
if (r2) {
if (getDateSync(r1) > getDateSync(r2)) {
updateSync()
}
} else {
updateSync()
}
})
})
}
for (const tableName of ['user', 'card', 'quiz', 'lesson', 'deck']) {
obs.next({
message: `Uploading table: ${tableName}`
})
syncTable(tableName)
}
})
})
obs.complete()
})().catch(obs.error)
})
}
If you can curious on what is safe columnName / tableName in SQLite, it is
What SQLite CAN actually do
Pacharapol Withayasakpunt ・ May 3 ・ 3 min read
Observables can be joined.
function import (filename: string) {
return new Observable<{
message: string
percent?: number
}>((obs) => {
(async () => {
obs.next({
message: `Opening: ${filename}`
})
const srcDb = await DbSqlite.open(filename)
DbSqlite.replicate(srcDb, this)
.subscribe(
obs.next,
obs.error,
obs.complete
)
})().catch(obs.error)
})
}
Observables can be polled via WebSockets.
conn.socket.on('message', async (msg: string) => {
const { id, filename } = JSON.parse(msg)
const isNew = !socketMap.has(id)
socketMap.set(id, (json: any) => {
conn.socket.send(JSON.stringify(json))
})
if (isNew) {
const observable = db.import(filename)
logger.info(`Start processing: ${filename}`)
observable
.subscribe(
(status) => {
socketMap.get(id)!({ id, status })
logger.info(`Processing status: ${filename}: ${status}`)
},
(err) => {
socketMap.get(id)!({ id, error: err.message })
logger.error(`Processing error: ${filename}: ${err.message}`)
},
() => {
socketMap.get(id)!({ id, status: 'done' })
logger.info(`Finished processing: ${filename}`)
}
)
}
})
What is remaining is whether Observables can be cancelled via WebSockets?
Top comments (9)
Hi! Andy here, the author of observable-fns and threads.js.
Just stumbled upon this blog post. Nice job!
About the cancellation: Observables are kind-of cancellable – you can unsubscribe from them.
Observables are "cold" by default which means every new subscriber will get their own fresh instance of this observable, which also means that the code in
new Observable(/* ... */)
will be run on every.subscribe()
call (!). If a subscriber unsubscribes, the upstream subscription is terminated (cancellation) – just that there may be numerous more subscriptions of the same sort.You can make observables "hot" by using the
multicast()
function. Then there will only ever be one upstream subscription that all downstream subscribers share. You can unsubscribe, too, just that the single shared upstream subscription will live on until the last subscriber has unsubscribed.What does that mean for your use case?
multicast()
😉.unsubscribe()
acting as cancellationHope that helps 🙂
How do I listen to
.once('all-unsubscribed')
event? Like this one package - p-cancelable.Otherwise, it is possible to use
Subject
, as inthread.js
? (I wouldn't want to use Worker, as I write to SQLite as well, and it is to supposed to be written from multiple threads.)Sure, who would stop you? ;)
Currently, I use the signature,
Will it work?
That doesn't make much sense. Why would you want to emit a new cancellation function for every new value?
You shouldn't have this non-standard custom cancellation approach when there is already a standardized one (
observable.subscribe()
,subscription.unsubscribe()
). Keep in mind that emitting functions is generally considered to be an anti-pattern.Maybe the observable-fns readme lacks a link to some documentation about observables in general… I guess this is one of your first times using observables? (No offense – I'm seriously interested :) )
Yes. Only promises and eventemitters.
Should I start with RxJS?
Yeah, maybe that's the best to get started. You should definitely start with RxJS's documentation – it's pretty good!
Which library you then use doesn't really matter too much as their APIs are very similar to almost identical.
Do you actually mean
Yes, my bad! I updated my code sample accordingly.