DEV Community

Shalvah
Shalvah

Posted on • Edited on • Originally published at blog.shalvah.me

Building a PHP client for Faktory, Part 4: Refactoring

Separating responsibilities

Following on from Part 3, it's time for some refactoring. Thus far, all of the code has been in the TcpClient, but it makes sense to create a higher layer of abstraction on that, and keep TcpClient focused on the low-level concerns.

One reason for this is tests, especially in a CI environment (GitHub Actions). Thus far, my tests have all been talking to the Faktory server, so if I exit Faktory on my computer, tests fail.

To move to CI, I've got two options: mock the Faktory server, or run the actual server in CI. I think both are useful:

  • talking to the actual server allows us verify that our code works with the real thing. We could mock the server and all its responses but then a new release of Faktory changes something, and we wouldn't know. The downside is the extra overhead in the tests. Also, since all tests will use the same server, its state has to be shared between tests.
  • using mocks is helpful for higher-level operations: we can move faster and focus on our own internal code consistency.

So I'll go with both. I'll do a big refactor: splitting this TcpClient into two: a lower-level TcpClient, which is responsible for connecting to the server and sending and receiving messages, and a higher-level Client, which manages and uses the TcpClient to enqueue and fetch jobs.

Here's the leaner TcpClient. It no longer has to handle managing the logger, but rather accepts one from the outside. Same with the worker info. All it does is connect and send and receive messages:

class TcpClient
{
  /** @var resource|null */
  protected $connection = null;
  protected boolean $connected = false;
  protected ParserInterface $responseParser;

  public function __construct(
    protected array $workerInfo,
    protected LoggerInterface $logger,
    protected string $hostname = 'tcp://localhost',
    protected int $port = 7419,
  ) {
    $this->responseParser = (new ProtocolFactory())->createResponseParser();
  }

  public function connect(): bool
  {
    // ...
  }

  protected function createTcpConnection()
  {
    // ...
  }

  protected function handshake()
  {
    // ...
  }

  protected function readHi()
  {
    // ...
  }

  protected function sendHello()
  {
    // ...
  }
Enter fullscreen mode Exit fullscreen mode

I've reworked message sending. send() method still writes to the Faktory server, but I've added sendAndRead(), which will write and then read the next line:

  public function send($command, string ...$args): void
  {
    $message = $command . " " . join(' ', $args) . "\r\n";
    $this->logger->debug("Sending: " . $message);
    fwrite($this->connection, $message);
  }

  public function sendAndRead($command, string ...$args): mixed
  {
    $this->send($command, ...$args);
    return $this->readLine(operation: $command);
  }
Enter fullscreen mode Exit fullscreen mode

For message receiving, you can now tell readLine() to skip some lines (useful for FETCH, where the first line isn't helpful). Also, it will throw an error if the response was an ERR:

  public function readLine(?int $skipLines = 0): mixed
  {
    do {
      $line = fgets($this->connection);
      $this->logger->debug("Received: " . $line);
    } while ($skipLines--);

    if (str_starts_with($line, "{")) // JSON response
      return json_decode($line, true);

    $messages = $this->responseParser->pushIncoming($line);
    if (empty($messages)) return null;

    $response = $messages[0]?->getValueNative();
    if (is_string($response) && str_starts_with($response, "ERR"))
      throw UnexpectedResponse::from($operation, $response);

    return $response;
  }
}
Enter fullscreen mode Exit fullscreen mode

And here's the higher-levelClient class:

namespace Knuckles\Faktory;

class Client
{
  protected array $workerInfo;
  protected LoggerInterface $logger;
  protected TcpClient $tcpClient;

  public function __construct(
    Level $logLevel = Level::Info,
    string $logDestination = 'php://stderr',
    ?LoggerInterface $logger = null)
  {
    $this->logger = $logger ?: self::makeLogger($logLevel, $logDestination);
    $this->workerInfo = [
      "hostname" => gethostname(),
      "wid" => "test-worker-1",
      "pid" => getmypid(),
      "labels" => [],
    ];
    $this->tcpClient = self::makeTcpClient(
      $this->workerInfo, $this->logger, hostname: 'tcp://dreamatorium.local'
    );
  }

  public static function makeTcpClient($workerInfo, $logger, $hostname): TcpClient
  {
    return new TcpClient($workerInfo, $logger, $hostname);
  }

  public static function makeLogger(
    Level $logLevel = Level::Info, string $logDestination = 'php://stderr'): LoggerInterface
  {
    return new Logger(
      name: 'faktory-php',
      handlers: [(new StreamHandler($logDestination, $logLevel))]
    );
  }

  // Empty the Faktory database
  public function flush()
  {
    $this->tcpClient->sendAndRead("FLUSH");
  }

  // Enqueue a job
  public function push(array $job)
  {
    $this->tcpClient->sendAndRead("PUSH", json_encode($job));
  }

  // Fetch a job from the specified queues
  public function fetch(string ...$queues)
  {
    $this->tcpClient->send("FETCH", ...$queues);
    // The first line of the response just contains the length of the next line; skip it
    return $this->tcpClient->readLine(skipLines: 1);
  }
}
Enter fullscreen mode Exit fullscreen mode

The Client class instantiates the TCP client, determines the worker details, and configures things like logging. Later on, I'll update it to accept configuration and use that to customise things.

Tests

At this point, our tests for the TcpClient look like this:

// Helper method for creating a new TCP client
function tcpClient($port = 7419, $level = Level::Error) {
  return new TcpClient(
    logger: Knuckles\Faktory\Faktory::makeLogger(logLevel: $level),
    port: $port,
  );
}

it('raises an error if Faktory server is unreachable', function () {
  expect(fn() => tcpClient()->connect(port: 7400))->toThrow(CouldntConnect::class);
});

it('connects to the Faktory server', function () {
  $tcpClient = tcpClient();
  expect($tcpClient->connect())->toBeTrue();
});

it('can send commands to and read from the Faktory server', function () {
  $tcpClient = tcpClient();

  $job = [
    "jid" => "123861239abnadsa",
    "jobtype" => "SomeJobClass",
    "args" => [1, 2, true, "hello"],
  ];
  $tcpClient->send("PUSH", json_encode($job));
  expect($tcpClient->readLine())->toStartWith("OK");

  $tcpClient->send("FETCH", "default");
  $fetched = $tcpClient->readLine(skipLines: 1);
  expect($fetched['created_at'])->not->toBeEmpty();
  unset($fetched['created_at']);

  expect($fetched['enqueued_at'])->not->toBeEmpty();
  unset($fetched['enqueued_at']);

  expect($fetched)->toEqual(array_merge($job, ['queue' => 'default', 'retry' => 25]));
});

test('->readLine() raises an error when the response is an ERR', function () {
  $tcpClient = tcpClient();
  $tcpClient->send("PUSH", "Invalid payload");
  expect(fn() => $tcpClient->readLine())->toThrow(UnexpectedResponse::class);
});

test('->sendAndRead() raises an error when the response is not OK', function () {
  expect(
    fn() => tcpClient()->sendAndRead("PUSH", "Invalid data")
  )->toThrow(UnexpectedResponse::class);
});
Enter fullscreen mode Exit fullscreen mode

This test still depends on Faktory to be running, but luckily, with GitHub Actions, this is as simple as listing the Docker container as a service:

services:
  faktory:
    image: contribsys/faktory:1.6.1
      ports:
      - 7419:7419
Enter fullscreen mode Exit fullscreen mode

We can add some more tests to verify the implicit behaviours of this client. For example, the Faktory docs say:\

If the server protocol version is larger than a client expects, the client should print a message recommending a client upgrade.

I currently have this implemented:

class TcpClient
{
  const SUPPORTED_FAKTORY_PROTOCOL_VERSION = 2;

  protected function readHi()
  {
    $hi = $this->readLine();
    if (empty($hi)) throw UnexpectedResponse::from("Handshake (HI)", $hi);

    $version = Json::parse(str_replace("HI ", "", $hi))['v'];
    if (floatval($version) > static::SUPPORTED_FAKTORY_PROTOCOL_VERSION) {
      $this->logger->warning(
        sprintf(
          "Expected Faktory protocol v%s or lower; received v%s from the server",
          static::SUPPORTED_FAKTORY_PROTOCOL_VERSION, $version
        )
      );;
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Now to add tests:

it('logs a warning when version is higher than expected', function () {
  $logger = Mockery::mock(Logger::class, ['info' => null, 'debug' => null]);
  $tcpClient = new class($logger) extends TcpClient {
    const SUPPORTED_FAKTORY_PROTOCOL_VERSION = 1;
  };

  $logger->shouldReceive('warning')->withArgs(function ($arg) {
    return str_contains($arg, "Expected Faktory protocol v1 or lower");
  });
  $tcpClient->connect();
});

it('does not log a warning when version is lower than expected', function () {
  $logger = Mockery::mock(Logger::class, ['info' => null, 'debug' => null]);
  $tcpClient = new class($logger) extends TcpClient {
    const SUPPORTED_FAKTORY_PROTOCOL_VERSION = 3;
  };

  $logger->shouldNotReceive('warning');
  $tcpClient->connect();
});
Enter fullscreen mode Exit fullscreen mode

I won't add tests for the higher-level Client for now. That's because you can't easily mock the network in PHP. I can work around this by switching from fsockopen() and fwrite() to a higher-level library which I can mock more easily, but that will be later.

Connection and state management

I'll do one more refactor, this time to improve the UX by eliminating the need to manually call ->connect. The TCP Client should be able to automatically detect when it needs to connect and do so. This will also help deal with disconnection problems (for instance, due to a network failure).

To do this, we update the ->send() method to automatically connect if we aren't:

public function send($command, string ...$args): void
{
  if (!$this->connected) {
    $this->connect();
  }

  $message = $command . " " . join(' ', $args) . "\r\n";
  $this->logger->debug("Sending: " . $message);
  fwrite($this->connection, $message);
}
Enter fullscreen mode Exit fullscreen mode

There's one problem here,though. The ->connect() method itself calls ->send() (in order to establish the handshake), so this will lead us into an infinite loop. At this point, it's obvious we need a more fine-grained state description, so let's move from the boolean connected to the enum state:

enum State
{
  case Connecting;
  case Connected;
  case Disconnected;
}

class TcpClient
{
  protected State $state = State::Disconnected;

  public function connect(): bool
  {
    $this->state = State::Connecting;

    $this->logger->info("Connecting to Faktory server on $this->hostname:$this->port");
    $this->createTcpConnection();
    $this->handshake();

    $this->state = State::Connected;
    return true;
  }

  public function send($command, string ...$args): void
  {
    if ($this->state == State::Disconnected) {
      $this->connect();
    }

    $message = $command . " " . join(' ', $args) . "\r\n";
    $this->logger->debug("Sending: " . $message);
    fwrite($this->connection, $message);
  }
}
Enter fullscreen mode Exit fullscreen mode

This is much clearer. When send() calls connect(), the state is changed from Disconnected to Connecting, so it avoids the infinite loop.

At this point, since we're dealing with connection states, we might as well implement Faktory's END command, which signals the end of a session. We'll add a disconnect() method that will send this and then disconnect:

public function disconnect()
{
  $this->send('END');
  fclose($this->connection);
  $this->connection = null;
  $this->state = State::Disconnected;
}
Enter fullscreen mode Exit fullscreen mode

All done for Part 4. In Part 5, we'll get back to implementing the rest of the Faktory protocol. Code on GitHub.

Top comments (0)