Separating responsibilities
Following on from Part 3, it's time for some refactoring. Thus far, all of the code has been in the TcpClient
, but it makes sense to create a higher layer of abstraction on that, and keep TcpClient
focused on the low-level concerns.
One reason for this is tests, especially in a CI environment (GitHub Actions). Thus far, my tests have all been talking to the Faktory server, so if I exit Faktory on my computer, tests fail.
To move to CI, I've got two options: mock the Faktory server, or run the actual server in CI. I think both are useful:
- talking to the actual server allows us verify that our code works with the real thing. We could mock the server and all its responses but then a new release of Faktory changes something, and we wouldn't know. The downside is the extra overhead in the tests. Also, since all tests will use the same server, its state has to be shared between tests.
- using mocks is helpful for higher-level operations: we can move faster and focus on our own internal code consistency.
So I'll go with both. I'll do a big refactor: splitting this TcpClient
into two: a lower-level TcpClient
, which is responsible for connecting to the server and sending and receiving messages, and a higher-level Client
, which manages and uses the TcpClient
to enqueue and fetch jobs.
Here's the leaner TcpClient
. It no longer has to handle managing the logger, but rather accepts one from the outside. Same with the worker info. All it does is connect and send and receive messages:
class TcpClient
{
/** @var resource|null */
protected $connection = null;
protected boolean $connected = false;
protected ParserInterface $responseParser;
public function __construct(
protected array $workerInfo,
protected LoggerInterface $logger,
protected string $hostname = 'tcp://localhost',
protected int $port = 7419,
) {
$this->responseParser = (new ProtocolFactory())->createResponseParser();
}
public function connect(): bool
{
// ...
}
protected function createTcpConnection()
{
// ...
}
protected function handshake()
{
// ...
}
protected function readHi()
{
// ...
}
protected function sendHello()
{
// ...
}
I've reworked message sending. send()
method still writes to the Faktory server, but I've added sendAndRead()
, which will write and then read the next line:
public function send($command, string ...$args): void
{
$message = $command . " " . join(' ', $args) . "\r\n";
$this->logger->debug("Sending: " . $message);
fwrite($this->connection, $message);
}
public function sendAndRead($command, string ...$args): mixed
{
$this->send($command, ...$args);
return $this->readLine(operation: $command);
}
For message receiving, you can now tell readLine()
to skip some lines (useful for FETCH, where the first line isn't helpful). Also, it will throw an error if the response was an ERR:
public function readLine(?int $skipLines = 0): mixed
{
do {
$line = fgets($this->connection);
$this->logger->debug("Received: " . $line);
} while ($skipLines--);
if (str_starts_with($line, "{")) // JSON response
return json_decode($line, true);
$messages = $this->responseParser->pushIncoming($line);
if (empty($messages)) return null;
$response = $messages[0]?->getValueNative();
if (is_string($response) && str_starts_with($response, "ERR"))
throw UnexpectedResponse::from($operation, $response);
return $response;
}
}
And here's the higher-levelClient
class:
namespace Knuckles\Faktory;
class Client
{
protected array $workerInfo;
protected LoggerInterface $logger;
protected TcpClient $tcpClient;
public function __construct(
Level $logLevel = Level::Info,
string $logDestination = 'php://stderr',
?LoggerInterface $logger = null)
{
$this->logger = $logger ?: self::makeLogger($logLevel, $logDestination);
$this->workerInfo = [
"hostname" => gethostname(),
"wid" => "test-worker-1",
"pid" => getmypid(),
"labels" => [],
];
$this->tcpClient = self::makeTcpClient(
$this->workerInfo, $this->logger, hostname: 'tcp://dreamatorium.local'
);
}
public static function makeTcpClient($workerInfo, $logger, $hostname): TcpClient
{
return new TcpClient($workerInfo, $logger, $hostname);
}
public static function makeLogger(
Level $logLevel = Level::Info, string $logDestination = 'php://stderr'): LoggerInterface
{
return new Logger(
name: 'faktory-php',
handlers: [(new StreamHandler($logDestination, $logLevel))]
);
}
// Empty the Faktory database
public function flush()
{
$this->tcpClient->sendAndRead("FLUSH");
}
// Enqueue a job
public function push(array $job)
{
$this->tcpClient->sendAndRead("PUSH", json_encode($job));
}
// Fetch a job from the specified queues
public function fetch(string ...$queues)
{
$this->tcpClient->send("FETCH", ...$queues);
// The first line of the response just contains the length of the next line; skip it
return $this->tcpClient->readLine(skipLines: 1);
}
}
The Client
class instantiates the TCP client, determines the worker details, and configures things like logging. Later on, I'll update it to accept configuration and use that to customise things.
Tests
At this point, our tests for the TcpClient
look like this:
// Helper method for creating a new TCP client
function tcpClient($port = 7419, $level = Level::Error) {
return new TcpClient(
logger: Knuckles\Faktory\Faktory::makeLogger(logLevel: $level),
port: $port,
);
}
it('raises an error if Faktory server is unreachable', function () {
expect(fn() => tcpClient()->connect(port: 7400))->toThrow(CouldntConnect::class);
});
it('connects to the Faktory server', function () {
$tcpClient = tcpClient();
expect($tcpClient->connect())->toBeTrue();
});
it('can send commands to and read from the Faktory server', function () {
$tcpClient = tcpClient();
$job = [
"jid" => "123861239abnadsa",
"jobtype" => "SomeJobClass",
"args" => [1, 2, true, "hello"],
];
$tcpClient->send("PUSH", json_encode($job));
expect($tcpClient->readLine())->toStartWith("OK");
$tcpClient->send("FETCH", "default");
$fetched = $tcpClient->readLine(skipLines: 1);
expect($fetched['created_at'])->not->toBeEmpty();
unset($fetched['created_at']);
expect($fetched['enqueued_at'])->not->toBeEmpty();
unset($fetched['enqueued_at']);
expect($fetched)->toEqual(array_merge($job, ['queue' => 'default', 'retry' => 25]));
});
test('->readLine() raises an error when the response is an ERR', function () {
$tcpClient = tcpClient();
$tcpClient->send("PUSH", "Invalid payload");
expect(fn() => $tcpClient->readLine())->toThrow(UnexpectedResponse::class);
});
test('->sendAndRead() raises an error when the response is not OK', function () {
expect(
fn() => tcpClient()->sendAndRead("PUSH", "Invalid data")
)->toThrow(UnexpectedResponse::class);
});
This test still depends on Faktory to be running, but luckily, with GitHub Actions, this is as simple as listing the Docker container as a service:
services:
faktory:
image: contribsys/faktory:1.6.1
ports:
- 7419:7419
We can add some more tests to verify the implicit behaviours of this client. For example, the Faktory docs say:\
If the server protocol version is larger than a client expects, the client should print a message recommending a client upgrade.
I currently have this implemented:
class TcpClient
{
const SUPPORTED_FAKTORY_PROTOCOL_VERSION = 2;
protected function readHi()
{
$hi = $this->readLine();
if (empty($hi)) throw UnexpectedResponse::from("Handshake (HI)", $hi);
$version = Json::parse(str_replace("HI ", "", $hi))['v'];
if (floatval($version) > static::SUPPORTED_FAKTORY_PROTOCOL_VERSION) {
$this->logger->warning(
sprintf(
"Expected Faktory protocol v%s or lower; received v%s from the server",
static::SUPPORTED_FAKTORY_PROTOCOL_VERSION, $version
)
);;
}
}
}
Now to add tests:
it('logs a warning when version is higher than expected', function () {
$logger = Mockery::mock(Logger::class, ['info' => null, 'debug' => null]);
$tcpClient = new class($logger) extends TcpClient {
const SUPPORTED_FAKTORY_PROTOCOL_VERSION = 1;
};
$logger->shouldReceive('warning')->withArgs(function ($arg) {
return str_contains($arg, "Expected Faktory protocol v1 or lower");
});
$tcpClient->connect();
});
it('does not log a warning when version is lower than expected', function () {
$logger = Mockery::mock(Logger::class, ['info' => null, 'debug' => null]);
$tcpClient = new class($logger) extends TcpClient {
const SUPPORTED_FAKTORY_PROTOCOL_VERSION = 3;
};
$logger->shouldNotReceive('warning');
$tcpClient->connect();
});
I won't add tests for the higher-level Client
for now. That's because you can't easily mock the network in PHP. I can work around this by switching from fsockopen()
and fwrite()
to a higher-level library which I can mock more easily, but that will be later.
Connection and state management
I'll do one more refactor, this time to improve the UX by eliminating the need to manually call ->connect
. The TCP Client should be able to automatically detect when it needs to connect and do so. This will also help deal with disconnection problems (for instance, due to a network failure).
To do this, we update the ->send()
method to automatically connect if we aren't:
public function send($command, string ...$args): void
{
if (!$this->connected) {
$this->connect();
}
$message = $command . " " . join(' ', $args) . "\r\n";
$this->logger->debug("Sending: " . $message);
fwrite($this->connection, $message);
}
There's one problem here,though. The ->connect()
method itself calls ->send()
(in order to establish the handshake), so this will lead us into an infinite loop. At this point, it's obvious we need a more fine-grained state description, so let's move from the boolean connected
to the enum state
:
enum State
{
case Connecting;
case Connected;
case Disconnected;
}
class TcpClient
{
protected State $state = State::Disconnected;
public function connect(): bool
{
$this->state = State::Connecting;
$this->logger->info("Connecting to Faktory server on $this->hostname:$this->port");
$this->createTcpConnection();
$this->handshake();
$this->state = State::Connected;
return true;
}
public function send($command, string ...$args): void
{
if ($this->state == State::Disconnected) {
$this->connect();
}
$message = $command . " " . join(' ', $args) . "\r\n";
$this->logger->debug("Sending: " . $message);
fwrite($this->connection, $message);
}
}
This is much clearer. When send()
calls connect()
, the state is changed from Disconnected
to Connecting
, so it avoids the infinite loop.
At this point, since we're dealing with connection states, we might as well implement Faktory's END
command, which signals the end of a session. We'll add a disconnect()
method that will send this and then disconnect:
public function disconnect()
{
$this->send('END');
fclose($this->connection);
$this->connection = null;
$this->state = State::Disconnected;
}
All done for Part 4. In Part 5, we'll get back to implementing the rest of the Faktory protocol. Code on GitHub.
Top comments (0)