Hi 👋🏽, I'm Taylor, founder and solo-builder of Finta. Finta helps users sync their financial data to their favorite apps, so that they can manage their finances how they want, where they want. The platform integrates with data providers like Plaid and Stripe to sync data to destination integrations like Notion, Airtable, Coda, and Google Sheets. These syncs can be triggered by webhooks when new data is available or by manual triggers initiated by users.
Inngest is the orchestration tool used to power this entire workflow. Essentially, Finta leverages Inngest to streamline and automate the backend processes, ensuring data synchronization is both accurate and efficient.
Problem Overview
There are two main problems that Inngest solves:
DRY Functions
Throughout the codebase, there are shared functions between multiple types of syncs. For example, if Plaid sends a “New Transactions” webhook, we need to use some sort of syncAccounts function before triggering a syncTransactions function. Also, if a user triggers a manual refresh, we'll need to use the same syncAccounts and syncTransactions functions. Inngest functions should be portable so that each sync trigger can call any mix of the appropriate sync functions in the correct order.
Function Orchestration
Certain sync steps need to happen in a specific order and need to resolve before the main syncing function can continue. For example, before syncing a user's holdings to their destination, we need to first sync all of the available securities data to their destination. Also, it's possible for a destination to have multiple running syncs. Imagine that a bank refreshes their transactions data at the same time that a user initiates a manual refresh. Even though syncs can happen concurrently, destinations should only have one sync function actively running at a time to avoid racing issues and duplicated data.
Key Inngest Features That We Use
There are two key Inngest features used for all syncing workflows at Finta:
concurrency
with a key
To make sure a destination has at most one actively running sync function, each sync function has the following concurrency property:
const syncHoldingsFunction = inngest.createFunction({
id: 'sync-holdings',
name: 'Sync Holdings',
concurrency: { limit: 1, key: 'event.data.destinationId' },
})
Because destinationId is a parameter for all sync functions, this guarantees that each function only runs once for a given destination. Also, when a sync completes, the next sync in line for that destination automatically starts.
invoke
This feature is the star of the show. Previously, we used waitForEvents to run functions synchronously.
const { destinationId, syncId } = event.data;
const transactions = await step.run(
{ id: 'fetch-transaction' },
async () => ...
)
const syncTransactionsPromise = step.waitForEvent('app/destination.sync.task.finished', {
timeout: '10m',
if: `async.data.destinationId == '${destinationId}' && async.data.syncId == '${syncId}' && async.data.action == 'Sync' && async.data.table == 'Transactions'}`
})
await step.sendEvent({
name: 'app/destination.sync.transactions',
data: {
destinationId,
transactions,
syncId
}
});
const syncTransactionsResponse = await syncTransactionsPromise;
However, thanks to invoke most of this logic can be replaced by a simple step.invoke() call. More about this will be explained further down below.
Implementation Feature 1: Composability (DRY Programming)
Before introducing Inngest, a single API route handled the entire end-to-end syncing flow in Finta. This would often fail for various reasons. As you can imagine, if a person has 10 bank connections each with over 1,000 transactions, it might take longer than the timeout limit to sync all of the data. Additionally, each banking integration and each destination integration has rate limits that need to be handled. Lastly, many sync triggers share the same steps. Thus, at one point in time, Finta's codebase looked like the following:
if (['stripe_balance_update', 'plaid_holdings_update', 'plaid_transactions_update', 'plaid_investments_update'].includes(syncTrigger) ) {
await syncAccounts(...)
};
if (['stripe_transactions_update', 'plaid_transactions_update'].includes(syncTrigger) ) {
await syncTransactions(...)
};
if (['plaid_holdings_update', 'plaid_investments_update'].includes(syncTrigger) ) {
await syncSecurities(...)
};
if (['plaid_holdings_update'].includes(syncTrigger) ) {
await syncHoldings(...)
};
if (['plaid_investments_update'].includes(syncTrigger) ) {
await syncInvestments(...)
};
...
This was fine in the early days. However, now with over 20+ sync triggers, this would be hard to maintain.
Now, there are distinct sync events and functions for each data type. Each event requires a destinationId , a syncId , and the data in the event payload.
{
name: 'app/destination.sync.accounts',
data: {
destinationId: string;
syncId: string;
accounts: Account[]
}
}
Each event corresponds to a sync function:
const table = 'Accounts';
const syncAccounts = inngest.createFunction(
{
...
},
async ({ event, step, utils }) => { // Utils comes from middleware
const { destinationId, accounts } = event.data;
const destination = await utils.fetchDestination(
{ id: 'fetch-destination' },
destinationId
);
const { records, destinationError } = await utils.loadRecords(
{ id: 'load-account-records' }, {
table,
destinationId,
destination
}
);
if { destinationError } {
return { hasError: true }
}
const currentAccounts = records.map(record => record.properties.id);
const newAccounts = accounts.filter(account => !currentAccounts.includes(account.id));
const updatedAccounts = accounts.filter(account => currentAccounts.includes(account.id));
const createResults = await step.run(
{ id: 'sync-new-accounts-to-destination' },
async () => {
...
}
);
const updateResults = await step.run(
{ id: 'sync-updated-accounts-to-destination' },
async () => {
...
}
);
await utils.logResultsToDB({ id: 'log-results-to-db' }, { createResults, updateResults });
return { hasError: false }
}
)
Let's say that Finta decides to use a new data provider and needs to set up a new sync trigger event (for example, new_provider/accounts.updated ) to handle account updates. The corresponding Inngest function doesn't have to have any logic around syncing accounts. Instead, it can focus on fetching and formatting data from the new data provider and invoke the syncAccounts function as needed. The best part about all of this is that if any logic around how accounts sync to destinations needs to be changed, this logic only needs to be updated in one place.
Implementation Feature 2: Function Orchestration
There are multiple events that can trigger a sync. Each trigger event has its own Inngest function. Here is where the relevant data is gathered and formatted. Then the appropriate sync functions are called in the necessary order.
Event: plaid/transactions.default_update
Function:
import { syncAccounts, syncTransactions } from './functions/...';
const plaidDefaultUpdate = inngest.createFunction(
{
...
},
{ event: 'plaid/transactions.default_update' },
async ({ event, step, utils }) => {
const connectedDestinationIds = await utils.getConnectedDestinationIds(
{ id: 'get-connected-destination-ids' },
data: event.data
)
const sync = await utils.startSync({ id: 'start-sync' }, ...);
const accountsData = await step.run({ id: 'fetch-accounts-from-plaid' }, ...)
const syncAccountsResponse = await Promise.all(connectedDestinationIds.map(destinationId =>
step.invoke({ id: `sync-accounts-to-${destinationId}` }, {
function: syncAccounts,
data: {
destinationid,
syncId: sync.id,
accounts: accountsData
}
})
)).then(responses => {
const hasError = Boolean(responses.find(response => response.hasError));
return { responses, hasError }
})
if ( syncAccountsResponse.hasError ) {
// Send notification
// Update logs
// etc.
return;
}
const transactionsData = await step.run({ id: 'fetch-transactions-from-plaid' }, ...);
const syncTransactionsResponse = ...
})
Because invoke resolves in a promise containing the results from the function, multiple syncs to different destinations can be triggered at once. Once all of them are done, the parent trigger function can handle the results and proceed accordingly.
Takeaways for you to use in your Inngest Project
Having used Inngest for over a year now, here are a few things that I've learned when it comes to building scalable and reliable architecture:
If you notice that many of your functions share the same logic, split that logic into a separate function and use invoke to trigger where needed
Although invoke solves most problems, there is still a need to use waitForEvent : if you are triggering a single function and waiting for that function to finish before proceeding, use invoke. If the event is triggered in a completely separate workflow and doesn't always happen at that exact moment, use waitForevent
With invoke and middleware functions, you can build complex workflows that follow DRY programming.
If you're building something similar or if you just want to chat about all things Inngest, feel free to reach out on LinkedIn or X.
Top comments (0)