I'm working on an app that we need to send personalized notifications to every user on schedule (9 AM, 3 PM, 8 PM) in their timezone.
Each user will receive a unique notification related to them
Simple architecture:
As the number of users grows, we run into a bottleneck because only one cloud function is too slow to process 10k users.
Solution
The hint here is to split the workload into smaller chunks (2k users task). Utilize Cloud Task as a queue to slowly distribute tasks and it will trigger multiple cloud functions to send notifications.
Main Cloud Function:
The main function will run at the scheduled time, it will get the total number of users (we store it in a separate document) and create multiple tasks
const pages = []
const batchSize = 2000;
let currentIndex = 0;
while (currentIndex < totalUsers) {
pages.push({ fromIndex: currentIndex, toIndex: currentIndex + batchSize });
currentIndex += batchSize;
}
await PromisePool.for(pages)
.withConcurrency(10)
.process(async (page) => {
await createHttpTaskWithToken({
projectId,
queue: 'YOUR-CLOUD-TASK-QUEUE-NAME',
url: `https://us-central1-${projectId}.cloudfunctions.net/sendNotifications`,
email: `YOUR-SERVICE-EMAIL@${projectId}.iam.gserviceaccount.com`,
payload: {
fromIndex: page.fromIndex,
toIndex: page.toIndex,
},
});
});
Send Notifications Function:
This function only processes 2k users (maximum) at a time.
- Get 2k users by using pagination cursor (fromIndex, toIndex)
- Filter out only users at the correct timezone
- Prepare user notification message (title, content, image)
- Call firebase messaging API to send notifications.
export const sendNotifications = functions
.runWith({
timeoutSeconds: 540,
memory: '1GB',
})
.https.onRequest(async (req, res): Promise<void> => {
const { fromIndex, toIndex } = req.body;
const usersDataResponse = await db.users.orderBy('userIndex').startAt(fromIndex).endAt(toIndex).get();
const userToSendNotifications = usersDataResponse.docs.filter(onlyUserAtTimezone)
try {
await PromisePool.for(userToSendNotifications)
.withConcurrency(100)
.process(async (user) => {
try {
const userData = user.data();
const tokens = userData.tokens || [];
const notificationData = prepareNotification(userData);
if (notificationData && tokens.length > 0) {
const response = await messaging().sendMulticast({
tokens: tokens,
notification: notificationData,
});
if (response.failureCount > 0) {
const failedTokens: string[] = [];
response.responses.forEach((resp, idx) => {
// Cleanup the tokens who are not registered anymore.
if (
resp.error?.code === 'messaging/invalid-registration-token' ||
resp.error?.code === 'messaging/registration-token-not-registered'
) {
failedTokens.push(tokens[idx]);
}
});
// remove failed tokens
if (failedTokens.length > 0) {
await db.users.doc(user.id).update({
tokens: firestore.FieldValue.arrayRemove(...failedTokens),
});
}
console.log('List of tokens that caused failures: ' + failedTokens);
}
}
} catch (error) {
functions.logger.error('error', error);
}
});
res.send('OK');
} catch (error) {
res.sendStatus(500);
}
});
Top comments (0)