When I had to use RabbitMQ in a symfony project, I had a hard time with the choice of bundle or Component to use, so in this example I am creating a fully fonctionnal example using Enqueue and symfony 3.4.
As per the Enqueue choice, I find it (enqueue.forma-pro.com) the best documented solution for php projects coupled with a wide range of transports to message brokers (amqp, redis, kafka,...).
This example will allow us to make connect to different endpoints available for r/SpaceX API on a random way. All the code examples and configurations are strictly usefull with the 0.9.12 version of the enqueue-bundle.
The full code can be found on https://github.com/Haamida/RestRabbit.
1. Configuration
The first setp would be to run
composer require enqueue/enqueue-bundle enqueue/amqp-lib
enqueue/amqp-lib here is my library of choice to connect to RabbitMQ. I should mention here that amqp-lib depends on the bcmath extension so you have to make sure that its installed or enbaled.
Now that the bundle is added to the project (composer.json and AppKernel.php), let's procede with a minimalistic config in enqueue.yml :
enqueue:
default:
transport: 'amqp+lib:'
client: ~
Now the connection parameters in parameters.yml
queue_rb: spacex_q
topic_rb: spacex_t
host_rb: 127.0.0.1
port_rb: 5672
user_rb: guest
pass_rb: guest
vhost_rb: /
Here for simplicity,I kept the default port, user,vhost,...( Sure that should be changed for production).
It would be usefull to have the Rabbit Management plugin enbaled to monitor the interactions with the queue :
rabbitmq-plugins enable rabbitmq_management
Now head for http://localhost:15672/
to get an overview of the queues and messages.
2. Connection, Topic and Queue
To send our produced messages to the queue, we need to set up our connection.
First step would be to get an instance of the connection Factory :
$factory = new AmqpConnectionFactory([
'host' => $this->getParameter('host_rb'),
'port' => $this->getParameter('port_rb'),
'vhost' => $this->getParameter('vhost_rb'),
'user' => $this->getParameter('user_rb'),
'pass' => $this->getParameter('pass_rb'),
'persisted' => true,
]);
That would allow us now to create a connection context a topic and a queue (if they are not already created)
$context = $factory->createContext();
$topic = $context->createTopic($topicName);
$topic->setType(AmqpTopic::TYPE_DIRECT);
$context->declareTopic($topic);
$queue=$this->createOrUseQueue($context,$queueName);
$context->bind(new AmqpBind($topic, $queue));
In the above code, I created direct exchange type of topic so that the message will be routed directly to the queue with the matching routing key. That's because per rabbitmq message flow, a message is delivered first to an exchange than depending on the exchange type:
- Direct: sent directly to the corresponding (by routing key)queue
- Fanout: sent to all the queues bound to the exchange
- Headers: sent based on the message headers
- Topic: sent to one or more queues based on a correspondance between the routing key and the routing pattern These are the principle Exchange types, a more in depth explanation can be found here Exchanges-routing-keys-bindings. After configuring the topic, we proceed to create the queue that we later would bind to it.
public function createOrUseQueue($context,$queueName){
$queue = $context->createQueue($queueName);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($queue);
return $queue;
}
Practically, it's the same logic for creating a topic. I added the flag FLAG_DURABLE
so the queue could survive the broker restart/technical problems. That being said, a durable queue doesn't guarantee that the messages would survive too, they should be labeled persistant to have that effect.
Send Message to queue
Now that everything is set, we can produce and send messages:
switch (rand(1, 3)) {
case 1:
$message = $context->createMessage('roadster');
break;
case 2:
$message = $context->createMessage('rockets');
break;
case 3:
$message = $context->createMessage('missions');
break;
default:
$message = $context->createMessage('info');
}
$context->createProducer()->send($this->createOrUseQueue($context, $this->getParameter('queue_rb')), $message);
This is all for the producing part, in the next part we will configure our messages to be persistent and create our Consumer that will allow us to get information from the SpaceX API.
Top comments (1)
OMG WHERE IS PART 2?