DEV Community

Hamida
Hamida

Posted on

Using RabbitMQ in a Symfony 3.4 project -- Part1: Producer --

When I had to use RabbitMQ in a symfony project, I had a hard time with the choice of bundle or Component to use, so in this example I am creating a fully fonctionnal example using Enqueue and symfony 3.4.
As per the Enqueue choice, I find it (enqueue.forma-pro.com) the best documented solution for php projects coupled with a wide range of transports to message brokers (amqp, redis, kafka,...).

This example will allow us to make connect to different endpoints available for r/SpaceX API on a random way. All the code examples and configurations are strictly usefull with the 0.9.12 version of the enqueue-bundle.
The full code can be found on https://github.com/Haamida/RestRabbit.

1. Configuration

The first setp would be to run

composer require enqueue/enqueue-bundle enqueue/amqp-lib

enqueue/amqp-lib here is my library of choice to connect to RabbitMQ. I should mention here that amqp-lib depends on the bcmath extension so you have to make sure that its installed or enbaled.

Now that the bundle is added to the project (composer.json and AppKernel.php), let's procede with a minimalistic config in enqueue.yml :

enqueue:
  default:
    transport: 'amqp+lib:'
    client: ~

Now the connection parameters in parameters.yml

    queue_rb: spacex_q
    topic_rb: spacex_t
    host_rb: 127.0.0.1
    port_rb: 5672
    user_rb: guest
    pass_rb: guest
    vhost_rb: /

Here for simplicity,I kept the default port, user,vhost,...( Sure that should be changed for production).
It would be usefull to have the Rabbit Management plugin enbaled to monitor the interactions with the queue :
rabbitmq-plugins enable rabbitmq_management
Now head for http://localhost:15672/ to get an overview of the queues and messages.

2. Connection, Topic and Queue

To send our produced messages to the queue, we need to set up our connection.
First step would be to get an instance of the connection Factory :

$factory = new AmqpConnectionFactory([
            'host' => $this->getParameter('host_rb'),
            'port' => $this->getParameter('port_rb'),
            'vhost' => $this->getParameter('vhost_rb'),
            'user' => $this->getParameter('user_rb'),
            'pass' => $this->getParameter('pass_rb'),
            'persisted' => true,
        ]);

That would allow us now to create a connection context a topic and a queue (if they are not already created)

 $context = $factory->createContext();
 $topic = $context->createTopic($topicName);
 $topic->setType(AmqpTopic::TYPE_DIRECT);
 $context->declareTopic($topic);
 $queue=$this->createOrUseQueue($context,$queueName);
 $context->bind(new AmqpBind($topic, $queue));

In the above code, I created direct exchange type of topic so that the message will be routed directly to the queue with the matching routing key. That's because per rabbitmq message flow, a message is delivered first to an exchange than depending on the exchange type:

  • Direct: sent directly to the corresponding (by routing key)queue
  • Fanout: sent to all the queues bound to the exchange
  • Headers: sent based on the message headers
  • Topic: sent to one or more queues based on a correspondance between the routing key and the routing pattern These are the principle Exchange types, a more in depth explanation can be found here Exchanges-routing-keys-bindings. After configuring the topic, we proceed to create the queue that we later would bind to it.
 public function createOrUseQueue($context,$queueName){
        $queue = $context->createQueue($queueName);
        $queue->addFlag(AmqpQueue::FLAG_DURABLE);
        $context->declareQueue($queue);
        return $queue;
    }

Practically, it's the same logic for creating a topic. I added the flag FLAG_DURABLE so the queue could survive the broker restart/technical problems. That being said, a durable queue doesn't guarantee that the messages would survive too, they should be labeled persistant to have that effect.

Send Message to queue

Now that everything is set, we can produce and send messages:

 switch (rand(1, 3)) {
                case 1:
                    $message = $context->createMessage('roadster');
                    break;
                case 2:
                    $message = $context->createMessage('rockets');
                    break;
                case 3:
                    $message = $context->createMessage('missions');
                    break;
                default:
                    $message = $context->createMessage('info');
            }
$context->createProducer()->send($this->createOrUseQueue($context, $this->getParameter('queue_rb')), $message);

This is all for the producing part, in the next part we will configure our messages to be persistent and create our Consumer that will allow us to get information from the SpaceX API.

Top comments (1)

Collapse
 
cjsfj profile image
cjsfj

OMG WHERE IS PART 2?