How would you build out a solution to the following problem? You receive a stream containing several hundred to many thousand rows of records, each potentially corresponding to an entity in your system(s), where each row needs to be processed individually to determine what combination of 0 to many successive operations both internally and externally to be performed per record in those systems/services in a generic fashion that's scalable, extensible, performant, auditable, interruptible, idempotent, testable, respects rate limits and is relatively simple to understand. Oh, and this solution needs to describe every single operation it'll perform to the operator for the batch for green lighting (👍🏾 / 👎🏾) before kicking off the planned steps, and also return a post-operation report which in most cases, should match the pre-run report. Stick with me as I describe how I built this out for some of my clients in the past.
The TLDR in my case, was a combination of (small) State machines, Data loaders, and a lot of plumbing. The state machines are for logic determination, coordination, and keeping things small and simple, while the data loaders are for batching operations so that the whole thing isn't awfully slow because it performs each operation 1 by 1.
More concretely, a row above could describe an entity from a file (in one client's domain in the past) that needs to conditionally get created in a DB, updated if it exists, update an eligibility status, or soft deleted to de-activate a user, created in a CRM, updated in the CRM if the record exists, created, read, updated, or deleted all conditionally in an LMS, entitlement system, authentication service, search cluster or any other services/systems that should have the representation of the row etc. Other use cases include a chat message that needs to be scrubbed for PII, deleted from a store, indexed in a search DB, and updated stats.
The problem boils down to the first principles for each record, perform some checks and then make a decision on the action to take given the state of the entity, then perform the operation or operations, rinse and repeat while handling failures (transient and non-transient), and being interruptible, cancellable, fast, introspect-able and more.
Actions to each service can be modeled with the Typescript types below using Database (DB), Salesforce, Active Campaign, and Elastic Search as example services for this post.
type DBAction = "NOTHING" | "CREATE" | "UPDATE" | "DELETE";
type SalesforceAction =
| "NOTHING"
| "CREATE_CONTACT"
| "UPDATE_CONTACT"
| "DELETE_CONTACT"
| "ACTIVATE_CONTACT"
| "DEACTIVATE_CONTACT";
type ActiveCampaignAction =
| "NOTHING"
| "CREATE_CONTACT"
| "UPDATE_CONTACT"
| "DELETE_CONTACT"
| "ACTIVATE_CONTACT"
| "DEACTIVATE_CONTACT";
type Auth0Action =
| "NOTHING"
| "CREATE_USER"
| "UPDATE_USER"
| "DELETE_USER"
| "ACTIVATE_USER"
| "DEACTIVATE_USER";
type ElasticSearchAction =
| "NOTHING"
| "CREATE_USER"
| "UPDATE_USER"
| "DELETE_USER"
| "ACTIVATE_USER"
| "DEACTIVATE_USER";
type ServiceAction =
| DBAction
| SalesforceAction
| ActiveCampaignAction
| Auth0Action
| ElasticSearchAction;
As I mentioned above, the solution I implemented uses state machines and data loaders. For libraries, I use the popular XState and GraphQL Data Loader libraries. Quotes from the library documentation do a much better job of explaining what they do than I'll be able to in this post. There are also lots of excellent resources out there to gain a better understanding of these tools.
XState is a library for creating, interpreting, and executing finite state machines and state charts, as well as managing invocations of those machines as actors.
A finite state machine (FSM) is a mathematical model of computation that describes the behavior of a system that can be in only one state at any given time. State charts are a formalism for modeling stateful, reactive systems.
On the other hand,
DataLoader is a generic utility to be used as part of your application's data fetching layer to provide a simplified and consistent API over various remote data sources such as databases or web services via batching and caching.
The key thinking behind the ideas in the post is that every record can be processed "independently". The state machine is the brain (🧠) while the data loaders are the brawn (💪🏾).
In my implementation, a state machine is modeled for every record. Every row has a state machine instantiated for it. The state machine begins by interrogating the initial state of a record and figuring out what to do next based on simple conditions using invoked callback actors (a concept in XState). Transitioning into the state can create and start an actor whose logic is specified by its src property value. The onDone array can be a list of guards which is a state target node and predicate to transition to when the condition in the predicate evaluates to true.
type Mode = "DryRun" | "Live";
type GetDBActionDoneInvokeEventData = {
module: "db";
index: number;
action: DBAction;
};
type GetDBActionDoneInvokeEvent =
DoneInvokeEvent<GetDBActionDoneInvokeEventData>;
createMachine({
// other machine configuration
initial: "init",
schema: {} as {
context: {
index: number;
mode: Mode;
user: AuthUser;
input: BatchRecord;
record?: AppRecord;
dataLoaders: typeof loaders;
};
},
states: {
init: {
invoke: {
src: async (context) => {
// load the record from the DB if it exists
const record = await context.dataLoaders.db.user.find.load(
context.input.email,
);
const params = {
module: "db",
index: context.index,
} as const;
switch (true) {
// the record is not in the DB
case !record:
return Promise.resolve<GetDBActionDoneInvokeEventData>({
...params,
action: "CREATE",
});
// the record is in the DB but the active flags don't match
case context.input.active !== record?.active:
return Promise.resolve<GetDBActionDoneInvokeEventData>({
...params,
action: "UPDATE",
});
// TODO: handle other cases according to your business logic and return and action
default:
return Promise.resolve<GetDBActionDoneInvokeEventData>({
...params,
action: "NOTHING",
});
}
},
onDone: [
{
cond: (c) => c.mode === "DryRun",
target: "provisioning",
},
{
cond: (_, e: GetDBActionDoneInvokeEvent) =>
e.data.action === "CREATE",
target: "creatingDBRecord",
},
{
cond: (_, e: GetDBActionDoneInvokeEvent) =>
e.data.action === "UPDATE",
target: "updatingDBRecord",
},
{
cond: (_, e: GetDBActionDoneInvokeEvent) =>
e.data.action === "DELETE",
target: "deletingDBRecord",
},
]
}
// other states
complete: {}
}
}
};
When the machine is interpreted with the row from the file as context, the initial state is set to the init
state. Entering the state begins the invocation of the src
callback. In the callback, the context for the row is loaded from the database using a data loader. If the record does not exist, an action (CREATE) is returned from the callback. If say some expected flags or fields don't match, other actions (UPDATE, DELETE, or DEACTIVATE) are returned from the callback. If there's nothing to be done, the NOTHING action is returned.
Next, onDone
defines the transition to be taken based on the action returned from the src
callback. Remember one of the requirements I described above was that the system needs to describe the sequence of actions to be taken for auditing and approval by an operator before mutation? The first element in the onDone
array handles the condition where the machine is being interpreted in Dry Run mode. In Dry Run mode, the machine bails early stopping the machine from mutating any records, and goes to the next state called the provisioning state. If not in the dry run mode, and an action was returned, one of the conditions in onDone
matches, and the correct state is transitioned to. Example creatingDBRecord
if the action from src
was CREATE.
creatingDBRecord: {
invoke: {
src: (context) => {
// prepare create payload
const payload = {}
// create the record in the DB
return context.dataLoaders.db.user.create.load(payload);
},
onDone: {
target: "provisioning",
},
},
},
// updatingDBRecord
// deletingDBRecord
The provisioning
state is transitioned to in all cases in the machine definition. The provisioning state is a parallel state node that contains children states that will get run at the same time. This is where CRUD-ing and other actions to external systems should happen Auth, CRM, Search, etc. The states are similar to the init
I described above in the sense that a check is performed using a data loader for what state the record in Auth0, Salesforce, or Elasticsearch is, an action is returned, and based on the action, the correct state is transitioned to for performing the correct logic in the service/system. Using a parallel state gives us the benefit of allowing the CRUD operations to the 3rd party services to happen concurrently advancing the synchronization as fast as possible.
provisioning: {
type: "parallel",
onDone: "complete",
states: {
activeCampaign: {
states: {
checking: {
invoke: {
src: (context) => {
// check if the record exists in activeCampaign
const record =
await context.dataLoaders.db.activeCampaign.contact.find.load(
context.input.email,
);
const params = {
module: "activeCampaign",
index: context.index,
} as const;
switch (true) {
// the record is not in the DB
case !record:
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>(
{
...params,
action: "CREATE_CONTACT",
},
);
// the record is in the DB but the active flags don't match
case context.input.active !== context.record?.active:
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>(
{
...params,
action: "UPDATE_CONTACT",
},
);
default:
return Promise.resolve<GetActiveCampaignActionDoneInvokeEventData>(
{
...params,
action: "NOTHING",
},
);
}
},
onDone: [
{
cond: (c) => c.mode === "DryRun",
target: "success",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "CREATE_CONTACT",
target: "creatingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "UPDATE_CONTACT",
target: "updatingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "DELETE_CONTACT",
target: "deletingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "ACTIVATE_CONTACT",
target: "activatingContact",
},
{
cond: (_, e: GetActiveCampaignActionDoneInvokeEvent) =>
e.data.action === "DEACTIVATE_CONTACT",
target: "deactivatingContact",
},
],
},
},
creatingContact: {},
updatingContact: {},
activatingContact: {},
deactivatingContact: {},
deletingContact: {},
success: {
type: "final"
},
}
},
auth0: {
// similar to activeCampaign but specialized in biz logic
},
elasticsearch: {
// similar to activeCampaign but specialized in biz logic
},
salesforce: {
// similar to activeCampaign but specialized in biz logic
}
}
}
The skeleton for the data loaders object looks like the following:
type BatchRecord = { email: string; active: boolean, ... };
type AppRecord = { id: string; email: string; active: boolean, ... };
const loaders = {
db: {
user: {
find: new DataLoader((keys) => Promise.resolve(keys)),
create: new DataLoader((keys) => Promise.resolve(keys)),
update: new DataLoader((keys) => Promise.resolve(keys)),
delete: new DataLoader((keys) => Promise.resolve(keys)),
},
activeCampaign: {
contact: {
find: new DataLoader((keys) => Promise.resolve(keys)),
create: new DataLoader((keys) => Promise.resolve(keys)),
update: new DataLoader((keys) => Promise.resolve(keys)),
delete: new DataLoader((keys) => Promise.resolve(keys)),
},
},
auth0: {
user: {
// similar to above
}
}
salesforce: {
contact: {
// similar to above
},
},
},
activeCampaign: {
contact: {
find: new DataLoader((keys) => Promise.resolve(keys)),
create: new DataLoader((keys) => Promise.resolve(keys)),
update: new DataLoader((keys) => Promise.resolve(keys)),
delete: new DataLoader((keys) => Promise.resolve(keys)),
activate: new DataLoader((keys) => Promise.resolve(keys)),
deactivate: new DataLoader((keys) => Promise.resolve(keys)),
},
},
auth0: {
// similar to above
},
salesforce: {
// similar to above
},
};
Active Campaign, Auth0, and Salesforce keys seem like they are duplicated in the db
path and also as top-level keys. For one implementation, I stored the identifiers that our DB records point to in AC, Auth0, and Salesforce as rows in our database. Aside from having a user table, there's also a user_active_campaign_contact
table that has a foreign key to the user record that holds the identifier of the user in AC. Other tables exist for every other external service. The data loader at loaders.db.activeCampaign.contact
CRUDs only that identifier. The loader at loaders.activeCampaign.contact
talks to the actual service over SDK, HTTP, GraphQL, or whatever communication protocol that exists to CRUD records to it.
At this point, if you haven't realized it, I want to explain what Data loaders help us achieve here. The state machines I've described above deal with individual records and know nothing about batches. It helps to keep IMO keep logic much simpler. However, if we tried to read and write database entries or talk to 3rd party services directly in the state machines, we'd encounter the famed N+1 select problems, where N state machine callbacks try to perform a select or update by themselves. That's highly un-optimal and slow. By performing the CRUD through the data loaders, we achieve the benefit of batching. N state machine callbacks call .load({…})
, those get coalesced into the data loader batch function which can use a batch optimized API like SELECT…IN
, Salesforce Batch calls, or Auth0 batch calls, or your 3rd party service's bulk CRUD API. The data loader is where I'll include code to handle failures, retries, logging, and any other resilience concerns. The separation in my experience has significantly improved the maintainability of this approach.
Running
To bring this all together, a simple runner script is shown here:
type ServiceActionDoneInvokeEventData =
| GetDBActionDoneInvokeEventData
| GetSalesforceActionDoneInvokeEventData
| GetActiveCampaignActionDoneInvokeEventData
| GetAuth0ActionDoneInvokeEventData;
async function run() {
const batchSize = 50;
const records: BatchRecord[] = [
{ email: "1@test.com", active: true },
{ email: "2@test.com", active: true },
];
const actions: ServiceActionDoneInvokeEventData[][] = [];
const pooledMachines = new Array(Math.min(batchSize, records.length))
.fill(0)
.map(() => {
const interpreter = interpret(rootMachine, {
deferEvents: false,
devTools: false,
});
interpreter.onTransition((state, event) => {
if ("data" in event) {
const actionEvent = event as
| GetDBActionDoneInvokeEvent
| GetActiveCampaignActionDoneInvokeEvent
| GetAuth0ActionDoneInvokeEvent
| GetSalesforceActionDoneInvokeEvent;
if (!actions[actionEvent.data.index]) {
actions[actionEvent.data.index] = [];
}
console.log(actionEvent.data);
actions[actionEvent.data.index].push(actionEvent.data);
}
});
return interpreter;
});
let i = 0;
for await (const batch of chunk(records, batchSize)) {
const runPromises = pooledMachines.map((m) => {
m.start({
...m.initialState,
context: {
index: i,
dataLoaders: loaders,
mode: "DryRun", //
user: {
id: "123",
},
input: batch[i],
},
});
i++;
});
await Promise.all(runPromises);
}
}
run();
I create a set of pooled machines whose size is determined by various factors including your systems resources, batch limits of your 3rd party service, etc. The pooled machines have an onTransition
callback defined that adds the actions to be taken by every state node into an actions array. This can be parsed into a report that's displayed to an operator to describe every action to be taken say in a Dry Run mode or a post-operation report. The records from the stream are batched and used to start each machine.
A lot more can be done in the runner.
- Initializing the state machine with a custom state for start/pause/resume and debugging purposes.
- Persisting state to persistent storage on transition for auditing purposes.
- Logging and instrumentation.
- Communicating status information.
One point to also be made is the fact that this can all be visualized using XState's visual editor (in VSCode or in the browser) or the Stately editor. Even better, the machine can be simulated in the editors to catch bugs early.
Summary
In this article, I describe a robust solution for efficiently handling streams of records using state machines and data loaders. The goal is to achieve scalability, performance, and auditability in a generic and understandable fashion.
Key Components:
-
State Machines and Data Loaders:
- We leverage XState for managing state machines and facilitating logic determination and coordination.
- GraphQL Data Loader optimizes data fetching by providing a simplified API over various data sources, avoiding performance bottlenecks.
-
Clear Action Modeling:
- TypeScript types define actions for different services, ensuring a comprehensive understanding of permissible operations.
-
Batching for Performance:
- Data loaders are crucial for optimizing performance by batching operations, mitigating the N+1 select problem.
-
Handling Different Services:
- Parallel state nodes efficiently process actions related to various services concurrently.
-
Operational Transparency:
- Describing the sequence of actions for auditing and approval by an operator before mutation, enhancing transparency.
Areas of Improvement:
- Considerations for error handling, state persistence for auditing, scalability, testing strategies, and security are vital for a comprehensive solution.
Comment if you'd like to see a (yet-to-be-built) playground. Complete code is available here: https://gist.github.com/kadeer/5f35fd4048249b967e13bbbd4085d031
Hire me: I’m a full-stack engineer with over 10 years of experience in React, Javascript/Typescript, C#, PHP, Python, Rest/GraphQL, XState, SQL (Postgres/MySQL), AWS, and more.
LinkedIn: https://www.linkedin.com/in/abdulkadirna/
Twitter: https://twitter.com/adulkadirna
Top comments (0)