loading...
Cover image for Drupal "Advanced Queue" Rudimentary Tutorial

Drupal "Advanced Queue" Rudimentary Tutorial

ericpugh profile image ericpugh Originally published at ericpugh.dev on ・7 min read

It's been a stressful couple weeks/months for the 7.5 billion people on Earth. This post isn't going to help with that in any way, at all. What it hopefully will do is introduce you to the the Drupal Advanced Queue module, and provide some straightforward code examples to getting you thinking about some of the advanced (or in the case of this tutorial, silly) things you can do with it.

Drupal Queues

Drupal Core has a Queue API which can be used for basic enqueueing of data to be processed later. Advanced Queue is a contributed Drupal module which expands upon this basic functionality by allowing different queues to be associated with different backend plugins, creates statuses on jobs, adds additional information to jobs like process time and messages, adds configurable retry settings for failed jobs and the ability to delay processing jobs, and creates a Views powered UI for job listings. Pretty neat, huh? Let's do something with it.

The Tutorial

I'm sure I'm not the only person who's been spending a lot of time lately compulsively reading the news or the twitter. Worse yet, most of what I've been reading is the same or similar information. 💡 This can be automated! In this tutorial we're going to create a Drupal Console command to retrieve news articles from The Guardian and queue each to be processed later by Advanced Queue. We'll then create a "JobType" to process these items, creating an Node for each article depending on whether or not the article contains certain keywords I'm interested in. Finally, we'll set up an Event Subscriber to send a notification when we get the "all clear". 🙏

Note: I'm using Drupal 8. I'm not going to cover setting up Drupal or installing Advanced Queue.

First let's create a custom module for this tutorial called "news_diet_queue". The quickest way to set up our custom module is the run the following Drupal Console command:

drupal generate:module --module="news_diet_queue"

I'm gonna to enter all the defaults when prompted by the command, except add "drupal:advancedqueue" as as dependency, which gives me the new module modules/custom/news_diet_queue with a news_diet_queue.info.yml that looks like this:

name: 'news_diet_queue'
type: module
description: 'My Awesome Module'
core: 8.x
package: 'Custom'
dependencies:
  - drupal:advancedqueue

The import Command

Now we've got a module. (Install it!) Next, I'm going to also generate the boilerplate for a Drupal Console command to load our news articles.

drupal generate:command --extension="news_diet_queue" --extension-type="module" --class="ReadNewsCommand" --name="read:news"

This generated a couple new files for our custom Drupal Console command. First, we've registered our command as a service in modules/custom/news_diet_queue/console.services.yml:

services:
  news_diet_queue.read_news:
    class: Drupal\news_diet_queue\Command\ReadNewsCommand
    arguments: []
    tags:
      - { name: drupal.command }

I've added some code (and comments) to the command class at modules/custom/news_diet_queue/src/Command/ReadNewsCommand.php to retrieve our news articles and queue each one as an individual import job. I'm getting articles from The Guardian Open Platform (docs).

The Command:

<?php

namespace Drupal\news_diet_queue\Command;

use Drupal\advancedqueue\Entity\Queue;
use Drupal\advancedqueue\Job;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Drupal\Console\Core\Command\Command;
use Drupal\Component\Serialization\Json;

/**
 * Class ReadNewsCommand.
 *
 * Drupal\Console\Annotations\DrupalCommand (
 *     extension="news_diet_queue",
 *     extensionType="module"
 * )
 */
class ReadNewsCommand extends Command {

  /**
   * {@inheritdoc}
   */
  protected function configure() {
    $this
      ->setName('read:news')
      ->setDescription('Queue an import of coronavirus articles from The Guardian');
  }

  /**
   * {@inheritdoc}
   */
  protected function execute(InputInterface $input, OutputInterface $output) {
    $this->getIo()->info('execute');
    $this->getIo()->info($this->trans('commands.ReadNewsCommand.messages.success'));
    // Set the API endpoint to get articles. The API KEY is in my .env file (get your own!!).
    $uri = sprintf(
      'https://content.guardianapis.com/search?q=covid-19&page-size=50&section=us-news&show-fields=body&order-by=newest&api-key=%s',
      getenv('NEWS_API_KEY')
    );
    // TODO: dependency injection.
    /** @var \GuzzleHttp\Client $client */
    $client = \Drupal::service('http_client');
    // Get articles from the API.
    $contents = Json::decode($client->request('GET', $uri)->getBody()->getContents());
    // Loop through each article in the response.
    $count = 0;
    if (isset($contents['response']) && $contents['response']['status'] == 'ok') {
      foreach ($contents['response']['results'] as $result) {
        // Create the Job payload with the article Title and Body.
        $payload = [
          'title' => $result['webTitle'],
          'body' => $result['fields']['body'],
        ];
        // Create import Job and add to the "default" queue.
        $job = Job::create('news_import_job', $payload);
        if ($job instanceof Job) {
          $q = Queue::load('default');
          $q->enqueueJob($job);
          $count++;
        }
      }
      if ($count) {
        $this->getIo()->info(sprintf('Imported %s articles.', $count));
      }
    }
    else {
      $this->getIo()->warning('No results.');
    }
  }

}

Run the newly created command to queue some jobs:

drupal read:news

Looks good! If you look at your job listing table in the Drupal administration dashboard (/admin/config/system/queues) you should
now see 50 new jobs have been added with the status "queued".

The Event Subscriber

Before we process our queue let's add an Event Subscriber to add some functionality which will get executed after each job has been processed. I'm adding some code to look for a couple phrases in the title of a successfully imported article, and if there's a match to send myself a Slack notification. In order to send the Slack notification I'm first going to add a dependency to the Monolog package. Note: The correct way to do this would be to add a composer.json file to my custom module, and then update my main Drupal project composer.json with a path repository
pointing to the module, and finally requiring the custom module itself in that main project composer.json. That gets a little in the weeds, and will be slightly different depending on how you have composer set up, so for this tutorial I'll just add Monolog in the project's root composer.json, like so:

composer require monolog/monolog

With that out of the way, let's create the Event Subscriber at modules/custom/news_diet_queue/src/EventSubscriber/JobProcessedEventSubscriber.php. This is going to be an incredibly simple Class with two methods

  1. The getSubscribedEvents method which specifies which event to listen for, in this case the "advancedqueue.post_process" event, and which method to execute.
  2. The onRespond method which executes after a Job was processed.

Of course, in order to send notifications, we'll new to create a Slack App. Create an app in Slack, then after naming your app activate "Incoming Webhooks", and add a Webhook URL to workspace. (specifying the channel the Bot should post to). Finally install the app to a Slack workspace. When that's done make sure to copy your Webhook URL. (I've added mine to the project's .env file to be used in the Event Subscriber class).

<?php

namespace Drupal\news_diet_queue\EventSubscriber;

use Drupal\advancedqueue\Event\AdvancedQueueEvents;
use Drupal\advancedqueue\Event\JobEvent;
use Monolog\Handler\SlackWebhookHandler;
use Monolog\Logger;
use Drupal\advancedqueue\Job;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;

/**
 * Event Subscriber for Advanced queue Jobs "post process" events.
 */
class JobProcessedEventSubscriber implements EventSubscriberInterface {

  /**
   * Method that is triggered on the response event.
   *
   * @param \Drupal\advancedqueue\Event\JobEvent $event
   *
   * @return bool
   *   Successful response to event.
   *
   * @throws \Exception
   */
  public function onRespond(JobEvent $event) {
    try {
      $job = $event->getJob();
      $state = $job->getState();
      $payload = $job->getPayload();

      // Don't do anything on a failed or requeued import Job.
      if ($state !== Job::STATE_SUCCESS) {
        return FALSE;
      }
      // Decide whether or not to send a notification.
      $notify = FALSE;
      foreach (['pandemic is over', 'found a vaccine', 'all clear'] as $needle) {
        if (stripos($payload['body'], $needle)) {
          $notify = TRUE;
          break;
        }
      }
      if ($notify) {
        // Send a notification to my private channel on Slack using Monolog.
        $slack = new SlackWebhookHandler(
          getenv('SLACK_BOT_WEBHOOK_URL'), // The webhook url
          '#my_notifications', // The channel
          'eric_notifications', // The bot username
          TRUE,
          '',
          FALSE,
          TRUE,
          Logger::INFO
        );
        $logger = new Logger('news_diet_queue_logger');
        $logger
          ->pushHandler($slack)
          ->info(sprintf('We\'ve got the all clear! _(probably not)_ Title: %s', $payload['title']));
      }
    }
    catch (\Exception $e) {
      /** @var \Drupal\Core\Logger\LoggerChannelFactory $logger */
      $logger = \Drupal::service('logger.factory');
      $logger->get('news_diet_queue')->error($e->getMessage());
    }
  }

  /**
   * {@inheritdoc}
   */
  public static function getSubscribedEvents() {
    $events[AdvancedQueueEvents::POST_PROCESS][] = ['onRespond'];
    return $events;
  }

}

Our articles are queued, and we have notifications in place, the last thing we need is the JobType to actually process our jobs.

The JobType

Create the following file modules/custom/news_diet_queue/src/Plugin/AdvancedQueue/JobType/NewsImportJob.php

<?php

namespace Drupal\news_diet_queue\Plugin\AdvancedQueue\JobType;

use Drupal\advancedqueue\Job;
use Drupal\advancedqueue\JobResult;
use Drupal\advancedqueue\Plugin\AdvancedQueue\JobType\JobTypeBase;

/**
 * @AdvancedQueueJobType(
 *   id = "news_import_job",
 *   label = @Translation("News Import Job"),
 *   max_retries = 1,
 *   retry_delay = 79200,
 * )
 */
class NewsImportJob extends JobTypeBase {

  /**
   * {@inheritdoc}
   */
  public function process(Job $job) {
    try {
      $status = 0;
      // Get the Job data.
      $payload = $job->getPayload();
      /** @var \Drupal\Core\Entity\EntityTypeManager $entityTypeManager */
      // Use the Entity Type Manager service to create/update Drupal content.
      $entityTypeManager = \Drupal::service('entity_type.manager');
      if (isset($payload['body'])) {
        // Check if any of the desired phrases exist in the article body. If so, import these articles as Nodes.
        $import = FALSE;
        foreach (['death toll', 'singing italians', 'toilet paper'] as $needle) {
          if (stripos($payload['body'], $needle)) {
            $import = TRUE;
            break;
          }
        }
        if ($import) {
          // Set the content field data.
          $title = $payload['title'] ?? 'Untitled';
          $fields = [
            'langcode' => 'en',
            'changed' => \Drupal::time()->getRequestTime(),
            'uid' => 1,
            'status' => 1,
            'type' => 'article',
            'title' => $title,
            'body' => [
              'summary' => '',
              'value' => $payload['body'],
              'format' => 'full_html',
            ],
          ];
          // If an article with this title already exists, don't create a new one.
          $existing = $entityTypeManager->getStorage('node')->loadByProperties(['title' => $title]);
          if (!$existing) {
            // Save a new "Article" Node.
            $status = $entityTypeManager
              ->getStorage('node')
              ->create($fields)
              ->enforceIsNew()
              ->save();
            // Return a success message if the "saved" constant was returned for the Entity operation.
            if ($status == SAVED_NEW) {
              return JobResult::success('Node was saved.');
            }
          }
        }
      }
      // By default mark the Job as failed.
      return JobResult::failure('Womp.');
    }
    catch (\Exception $e) {
      return JobResult::failure($e->getMessage());
    }
  }

}

Now you can process your queued jobs using the Advanced Queue drupal console command:

drupal advancedqueue:queue:process default

If you go back to you job listings table now, you should see the some jobs that have successfully been imported, and that others have been requeued to be tried again tomorrow (If the job failed, it will fail again next time, I just wanted to demonstrate the settings in the plugin annotation). For me, it worked. From the original 50 articles 9 were imported into Drupal for reading at my leisure. 🍷

That's about it. Now all I have to do is add some cron jobs to run the "read:news" and "advancedqueue:queue:process" commands on a regular schedule, and I can sit back and wait for the notification... waiting ... still waiting. Oh well, since we have some time on your hands maybe you came come up with some useful (or silly) things to do with Drupal queues.

Discussion

pic
Editor guide