DEV Community

Vu Nguyen
Vu Nguyen

Posted on

How I Manage a Ten Thousand Push Notifications a Minute (CloudTask)

I'm working on an app that we need to send personalized notifications to every user on schedule (9 AM, 3 PM, 8 PM) in their timezone.

Each user will receive a unique notification related to them

Simple architecture:

image

As the number of users grows, we run into a bottleneck because only one cloud function is too slow to process 10k users.

Solution

image

The hint here is to split the workload into smaller chunks (2k users task). Utilize Cloud Task as a queue to slowly distribute tasks and it will trigger multiple cloud functions to send notifications.

Main Cloud Function:

The main function will run at the scheduled time, it will get the total number of users (we store it in a separate document) and create multiple tasks

const pages = []
const batchSize = 2000;
let currentIndex = 0;
while (currentIndex < totalUsers) {
  pages.push({ fromIndex: currentIndex, toIndex: currentIndex + batchSize });

  currentIndex += batchSize;
}

await PromisePool.for(pages)
      .withConcurrency(10)
      .process(async (page) => {
        await createHttpTaskWithToken({
          projectId,
          queue: 'YOUR-CLOUD-TASK-QUEUE-NAME',
          url: `https://us-central1-${projectId}.cloudfunctions.net/sendNotifications`,
          email: `YOUR-SERVICE-EMAIL@${projectId}.iam.gserviceaccount.com`,
          payload: {
            fromIndex: page.fromIndex,
            toIndex: page.toIndex,
          },
        });
      });
Enter fullscreen mode Exit fullscreen mode

Send Notifications Function:

This function only processes 2k users (maximum) at a time.

  1. Get 2k users by using pagination cursor (fromIndex, toIndex)
  2. Filter out only users at the correct timezone
  3. Prepare user notification message (title, content, image)
  4. Call firebase messaging API to send notifications.
export const sendNotifications = functions
  .runWith({
    timeoutSeconds: 540,
    memory: '1GB',
  })
  .https.onRequest(async (req, res): Promise<void> => {
    const { fromIndex, toIndex } = req.body;

    const usersDataResponse = await db.users.orderBy('userIndex').startAt(fromIndex).endAt(toIndex).get();
    const userToSendNotifications = usersDataResponse.docs.filter(onlyUserAtTimezone)

    try {
      await PromisePool.for(userToSendNotifications)
        .withConcurrency(100)
        .process(async (user) => {
          try {
            const userData = user.data();
            const tokens = userData.tokens || [];
            const notificationData = prepareNotification(userData);

            if (notificationData && tokens.length > 0) {
              const response = await messaging().sendMulticast({
                tokens: tokens,
                notification: notificationData,
              });

              if (response.failureCount > 0) {
                const failedTokens: string[] = [];
                response.responses.forEach((resp, idx) => {
                  // Cleanup the tokens who are not registered anymore.
                  if (
                    resp.error?.code === 'messaging/invalid-registration-token' ||
                    resp.error?.code === 'messaging/registration-token-not-registered'
                  ) {
                    failedTokens.push(tokens[idx]);
                  }
                });

                // remove failed tokens
                if (failedTokens.length > 0) {
                  await db.users.doc(user.id).update({
                    tokens: firestore.FieldValue.arrayRemove(...failedTokens),
                  });
                }

                console.log('List of tokens that caused failures: ' + failedTokens);
              }
            }
          } catch (error) {
            functions.logger.error('error', error);
          }
        });
      res.send('OK');
    } catch (error) {
      res.sendStatus(500);
    }
  });

Enter fullscreen mode Exit fullscreen mode

Read more

Top comments (0)