DEV Community

Shalvah
Shalvah

Posted on • Originally published at blog.shalvah.me

Building a PHP client for Faktory, Part 6: Higher-level usage

We have the low-level tools for connecting to the Faktory server, executing commands, fetching data, logging, and so on. Now, I'll work on the high-level API this Faktory library will expose. I plan to make it flexible enough for a user to use directly, or maybe plug it into their framework.

We need to provide a producer API (enqueuing jobs) and a consumer API (fetching and executing jobs).

Producer API

For the producer, we can expose a base Job class and a Dispatcher which enqueues jobs.

My proposed usage looks like this:

// Create a dispatcher
$dispatcher = Dispatcher::make(
    logLevel: \Monolog\Level::Debug,
    hostname: 'tcp://localhost',
);
// Dispatch a job now
$dispatcher->dispatch(SendWelcomeEmail::class, [$userId]);
// Schedule a job for later
$dispatcher->dispatch(SendWelcomeEmail::class, [$userId], delaySeconds: 60);
// Enqueue multiple instances in bulk
$dispatcher->dispatchMany(
  SendWelcomeEmail::class, [[$user1, $user2]], 
  delaySeconds: 60
);
Enter fullscreen mode Exit fullscreen mode

I believe in convenience, so I'll also provide a global dispatcher:

Dispatcher::configure(
    logLevel: \Monolog\Level::Debug,
    hostname: 'tcp://dreamatorium',
);
SendWelcomeEmail::dispatch($userId);
SendWelcomeEmail::dispatchIn(seconds: 60, $userId);
SendWelcomeEmail::dispatchMany(
  inSeconds: 60, [[$user1], [$user2]]
);
Enter fullscreen mode Exit fullscreen mode

Okay, implementing. The job class allows a user to specify options for the the job payload sent to Faktory:

abstract class Job
{
  public static ?string $queue = null;

  public static int $retry = 25;

  public static int $reserveFor = 1800;

  public static array $custom = []; // From the Faktory spec; allows users to add custom data

  public static function dispatch(...$args)
  {
    Dispatcher::instance()->dispatch(static::class, $args);
  }

  public static function dispatchIn(?int $seconds, array $args = [])
  {
    Dispatcher::instance()->dispatch(static::class, $args, delaySeconds: $seconds);
  }

  public static function dispatchMany(array $args, ?int $inSeconds = null)
  {
    Dispatcher::instance()->dispatchMany(static::class, $args, delaySeconds: $inSeconds);
  }
}
Enter fullscreen mode Exit fullscreen mode

The Dispatcher creates and passes configuration to the Faktory client, and transforms job operations (enqueueing/scheduling) to the appropriate Faktory operation:

class Dispatcher
{
  public function dispatch(string $jobClass, array $args = [], int $delaySeconds = null)
  {
    $jobPayload = static::toJobPayload($jobClass, $args, $delaySeconds);
    $this->client->push($jobPayload);
  }

  public function dispatchMany(string $jobClass, array $argumentsListing, int $delaySeconds = null)
  {
    $basicPayload = static::toJobPayload($jobClass, [], $delaySeconds);

    $jobPayloads = [];
    foreach ($argumentsListing as $index => $arguments) {
      $jobPayloads[] = array_merge($basicPayload, [
        "jid" => "{$basicPayload['jid']}_{$index}",
        "args" => $arguments,
      ]);
    }

    return $this->client->pushBulk($jobPayloads);
  }

  protected static function toJobPayload(string $jobClass, array $args, int $delaySeconds = null)
  {
    return PayloadBuilder::build(
      jobType: $jobClass,
      args: $args,
      queue: $jobClass::$queue,
      retry: $jobClass::$retry,
      reserveFor: $jobClass::$reserveFor,
      delaySeconds: $delaySeconds
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

In the above snippet, I've omitted some code from the dispatcher, mainly boilerplate like constructors and managing the underlying client. You can view the full code on GitHub.

The PayloadBuilder class is a helper class to generate the needed payload shape for the Faktory server:

class PayloadBuilder
{
  public static function build(
    string $jobType,
    array $args,
    ?string $queue,
    ?int $retry,
    ?int $reserveFor,
    ?int $delaySeconds,
  ): array
  {
    $payload = [
      'jid' => 'job_' . bin2hex(random_bytes(12)),
      'jobtype' => $jobType,
      'args' => $args,
    ];

    if ($queue) {
      $payload['queue'] = $queue;
    }
    if (is_null($retry)) { // 0 is a possible value, meaning no retries
      $payload['retry'] = $retry;
    }
    if ($reserveFor) {
      $payload['reserve_for'] = $reserveFor;
    }

    if ($delaySeconds) {
      $executionTime = (new \DateTimeImmutable('@'.(time() + $delaySeconds)));
      $payload['at'] = $executionTime->format(\DateTimeInterface::ATOM);
    }

    return $payload;
  }
}
Enter fullscreen mode Exit fullscreen mode

That's it for the dispatcher. Next up: retrieving and executing jobs. For this, I'm creating an Executor class. It's quite similar to the Dispatcher in terms of managing the client and having a global instance, so I'll omit those parts. Here's the key logic::

class Executor
{
  public function start(array $queues = []): void
  {
    $this->client->connect();

    $queuesListening = empty($queues)
      ? "all queues" : ("queues" . implode(",", $queues));
    $this->logger->info(sprintf("Listening on %s", $queuesListening));

    while (true) {
      $jobPayload = $this->getNextJob($queues);
      $this->processAndReport($jobPayload);
    }
  }

  /**
   * Process a retrieved job, and ACK or FAIL it to Faktory.
   */
  public function processAndReport(array $jobPayload): bool
  {
    try {
      $this->process($jobPayload);
      $this->reportSuccess($jobPayload);
      return true;
    } catch (Throwable $e) {
      $this->reportFailure($jobPayload, $e);
      return false;
    }
  }

  /**
   * Process a retrieved job. This merely instantiates the job and executes it.
   * Any exceptions thrown by the job are not handled.
   */
  public function process(array $jobPayload): void
  {
    $jobInstance = $this->instantiateJob($jobPayload);
    $jobInstance->process(...$jobPayload['args']);
  }

  /**
   * Manually fetch the next job to be executed from the specified queues.
   * Faktory will block for a few seconds if no job available, then return null.
   * The $retryUntilAvailable parameter forces the executor to try again when this happens.
   */
  public function getNextJob(array $queues = ["default"], bool $retryUntilAvailable = true): array|null
  {
    while (true) {
      $job = $this->client->fetch(...$queues);
      if ($job || !$retryUntilAvailable) return $job;
    }
  }

  protected function reportSuccess(array $jobPayload)
  {
    $this->logger->info(sprintf("Processed job result=success class=%s id=%s", $jobPayload['jobtype'], $jobPayload['jid']));
    $this->client->ack(["jid" => $originalJobPayload["jid"]]);
  }

  protected function reportFailure(array $jobPayload, Throwable $exception)
  {
    $this->logger->info(sprintf("Processed job result=failure class=%s id=%s", $jobPayload['jobtype'], $jobPayload['jid']));
    $this->client->fail([
      "jid" => $originalJobPayload["jid"],
      "errtype" => $exception::class,
      "message" => $exception->getMessage(),
      "backtrace" => explode("\n", $exception->getTraceAsString()),
    ]));
  }

  protected function instantiateJob(array $jobPayload): Job
  {
    $class = $jobPayload['jobtype'];
    return new $class;
  }
}
Enter fullscreen mode Exit fullscreen mode

I'll bring this article to a close here, because I got interrupted multiple times and ended up spending several months on it. 😅 I'm off to tackle the next stages; here's the code thus far.

Thus far, we have a basic working API. I'm not yet sure how it would fit into a production app, but it's good for a start. In the next phase, I'd like to address some limitations of the current setup.

Top comments (0)