DEV Community

Sateesh Madagoni
Sateesh Madagoni

Posted on • Updated on

Orchestrate AWS Lambdas using MongoDB - Part 2

Continuation to the first part. This post assumes you are a developer with working knowledge on AWS, Lambda, EventBridge, MongoDB, NodeJs.

Technical Implementation:

1. Design State-Machine
This step you need to figure out, how do you want to orchestrate your jobs and create an object like below.

const stateMachine = {
  id: '1',
  purposeId: 'catch_the_fish',
  currentPhase: 'thinking',
  phases: {
    thinking: {
      steps: {
        `thinking`: {
          jobs: ['buy_fishing_gear'],
        },
        buy_fishing_gear: {
          jobs: ['rent_a_boat'],
        },
        rent_a_boat: {
          jobs: ['go_to_fishing_ground'],
        },
        go_to_fishing_ground: {
          jobs: ['cast_fishing_pole', 'drink_beer', 'catch_fish'],
        },
        catch_fish: {
          jobs: ['go_home'],
        },
      },
      finished: [],
      next: 'end',
      waitFor: [
        'buy_fishing_gear',
        'rent_a_boat',
        'go_to_fishing_ground',
        'cast_fishing_pole',
        'drink_beer',
        'catch_fish',
        'go_home',
      ],
    },
  },
};

Enter fullscreen mode Exit fullscreen mode

In the above example we are running buy_fishing_gear after thinking job is finished in a sequential manner, but we run cast_fishing_pole, drink_beer and catch_fish in parallel after go_to_fishing_ground is successful.

2. Start the Process:
Creation of the statemachine could be anything from API to a cron job. Lets take API as an example as POST /process/catch-the-fish.

router.post('/process/catch-the-fish', async (req, res, next) => {
  const state = {}; // above mentioned state
  // Insert into state-machines
  const createdState = await db.collection('state-machines').insertOne(state);
  // Start the process by sending first event. As thinking is success we start the process of catching fish
  await db.collection('statuses').insertOne({
    stateId: createdState._id,
    status: 'success',
    job: 'thinking',
    date: new Date(),
    // additional parameters as needed
  });
});

Enter fullscreen mode Exit fullscreen mode

3. State Machine Job:

exports.handler = async (event, ctx, callback) => {
  // Mongodb Document from the trigger
  const document = event.detail.fullDocument;
  // Find the state machine
  const state = await db
    .collection('state-machines')
    .findOne({ _id: document.stateId });
  // if its ended do nothing
  if (state?.currentPhase === 'end') {
    callback();
  }
  // Update the job as finished for the state machine
  const currentPhase = state.phases[state.currentPhase];
  currentPhase.finished.push(document.job);
  await db
    .collection('state-machines')
    .findOneAndUpdate(
      { _id: document.stateId },
      { $set: { [`phases.${state.currentPhase}`]: currentPhase } }
    );
  const jobsRemaining = currentPhase.waitFor.filter(
    (s) => !currentPhase.finished.includes(s)
  );
  // If there are no remaining jobs in the current phase
  if (jobsRemaining.length === 0) {
    // Update the currentPhase to be the next Phase.
    await db.collection('state-machines').findOneAndUpdate(
      { _id: document.stateId },
      {
        $set: {
          currentPhase: currentPhase.next,
        },
      }
    );
    // If the next phase is not end trigger the phase.
    if (currentPhase.next !== 'end') {
      await db.collection('statuses').insertOne({
        ...document,
        job: `${currentPhase.next}Start`,
        status: 'success',
      });
    }
  }
};

Enter fullscreen mode Exit fullscreen mode

Above, whenever a job succeeds this will update the finished jobs and check if the phase is finished and move onto next phase until it meets the end phase.
4. Orchestrator Job:

exports.handler = async (event, ctx, callback) => {
  ctx.callbackWaitsForEmptyEventLoop = false;
  const document = event.detail.fullDocument;
  const state = await db
    .collection('pipeline-state-machines')
    .findOne({ _id: document.stateId });
  const currentPhase = state[state.currentPhase];
  if (!document.stateId) {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  if (state.currentPhase === 'end') {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  if (!currentPhase) {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  const step = currentPhase.steps[document.job];
  if (step && step.jobs && step.jobs.length > 0) {
    await Promise.all(
      step.jobs.map(async (job) => {
        const payload = {}
        const event = {
          FunctionName: job,
          InvocationType: 'Event',
          LogType: 'Tail',
          Payload: JSON.stringify(payload),
        };
        return lambda.invoke(event);
      })
    );
  }
  callback();
};
Enter fullscreen mode Exit fullscreen mode

The above job receives the success event and finds the next jobs to trigger and invoke them.
5. Notifications Job:
As above the job receives all the events so we can use the data and structure the message to send.

What if there are multiple phases:
Then we just have to add another phase to existing state machine configuration.

const stateMachine = {
  id: '1',
  purposeId: 'catch_the_fish',
  currentPhase: 'thinking',
  phases: {
    thinking: {
      // Prev things
    },
    cooking: {
      steps: {
        cookingStart: {
          jobs: ['clean'],
        },
        clean: {
          jobs: ['cook'],
        },
      },
      waitFor: ['cook', 'clean'],
      finished: [],
      next: 'end',
    },
  },
};

Enter fullscreen mode Exit fullscreen mode

I hope the above code and explanation gives you a way to implement your own solutions. For anymore details please do comment, I would be happy to help. Thanks.

Top comments (0)