Using RabbitMQ in Symfony2 Projects
Azoft Blog Using RabbitMQ in Symfony2 Projects

Using RabbitMQ in Symfony2 Projects

By Alex Anisimov on February 13, 2014

Using RabbitMQ in Symfony2 ProjectsFoto by weblog244

Today’s post focuses on how to use RabbitMQ in Symfony2-based projects. RabbitMQ is an open-source messaging broker: it accepts, stores, and forwards messages — in binary blocks of data.

What does that mean as far as practical implementations? There are tons of practical uses but let’s take here the simple case of creating Deferred Invites from our one of the latest Symfony2 projects in order to illustrate the process:

Task overview

On a recent project we implemented a feature which allowed administrators of a system to send invites to an unlimited number of recipients via email with an offer to participate in a particular event. Here was the situation:

  • At the time of the announcement of the event, we wouldn't know how many invites would be sent.
  • We wanted to eliminate the need to have administrators continuing to work on the system until all email was sent.
  • As a result we created some Deferred Events.

The scheme is simple:

  • Admin receive a confirmation message that all email has been sent and then continues working with the system;
  • The deferred events start being processed — in our case the system starts to send the emails.

To make this possible, one part of a program needs to communicate with another part — here passing a message via RabbitMQ. Let’s see how it can be implemented:

Installing RabbitMQ

First of all we need to install RabbitMQ itself. Here is the installation guide:

It’s pretty simple to use but there are a few little details that are good to know in advance, which I describe below:

Customizing the RabbitMQBundle

So we have installed the RabbitMQBundle. Ok, the deal is almost half complete. Now we have to customize RabbitMQ. Below is something we need to put in app/config/config.yml.

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
    producers:
        add_mail_task:
            connection:       default
            exchange_options: {name: 'mail-task', type: direct}
    consumers:
        get_mail_task:
            connection:       default
            exchange_options: {name: 'mail-task', type: direct}
            queue_options:    {name: 'mail-task'}
            callback:         mail_task_consumer
            idle_timeout:     60

A lot of lines, right? All in all they are pretty simple, actually:

“old_sound_rabbit_mq” is the name of a related section. Keep the “connections” section as it is without any changes if you don’t want to customize the connection to the RabbitMQ daemon, of course.

“producers” and “consumers” are the most important sections.

“add_mail_task” is the name of a service that you have to call if you want to send a message (or create a new queue) via RabbitMQ. It looks like this:

$msg = array(‘someKey’ => ‘someValue’);
$this->container->get('old_sound_rabbit_mq.add_mail_task_producer')->publish(serialize($msg));

Yes, I know that the “add_mail_task” does not equal to “add_mail_task_producer”. But it works this way.

“get_mail_task” contains the name of a service that you have to create and describe in your service (.yml|.xml) file: see “callback” option. This service will receive the message you have sent before by calling the “add_mail_task_producer” service.

This is the description of our consumer service:

(src/Azoft/APIBundle/Resources/config/services.yml):

mail_task_consumer:
        class: Azoft\<SomeBundle>\Rabbit\Callback\MailTaskConsumer
        arguments: ['@service_container']

As you can see the name of the service equals the value of the “get_mail_task:callback” option from config.yml. We also pass through the service container to have access to any other services we have implemented or loaded into our application.

Implementing reaction to a new Invite

Ok, now let’s write some real code at least. Our issue was to send a message from one place of a program to another one, catch it there and do something more or less useful with it.

So we have configured RabbitMQ (chosen service names we prefer to use) and now we need to add a reaction to a new Invite. To do that we need to create a listener which will listen to a Doctrine ORM (if you use doctrine, of course): postPersist event. It’s very easy.

At first we have to register a listener as a service. To do that you have to put something like this into your services (.yml|.xml) file:

services:
    post_persist_invite:
          class: Azoft\<SomeBundle>\Listener\PostPersistInvite
          arguments: ['@service_container']
          tags:
              - { name: doctrine.event_listener, event: postPersist, connection: default }

Next we have to implement this listener: create file src/Azoft/<SomeBundle>/Listener/PostPersistInvite.php (where “<SomeBundle>” is the name of your bundle).

And then put the following code into it:

<?php

namespace Azoft\<SomeBundle>\Listener;

use Azoft\<SomeBundle>\Entity\Invite;

class PostPersistInvite
{
    private $container;
    
    public function __construct($container)
    {
        $this->container = $container;
    }

    public function postPersist(LifecycleEventArgs $args){
        $entity = $args->getEntity();
        if($entity instanceof Invite) {
            $message = serialize(array(‘invite_id’ => $entity->getId()));
            $this->container->get(‘old_sound_rabbit_mq.add_mail_task_producer’)
                            ->publish($message);
        }
    }
}

Here with the postPersist method we check whether the persisting entity is an Instance of Invite or not. If it is, we create a new message and send it by RabbitMQ.

Then we need to catch this message in a “consumer” (we‘ve named it MailTaskConsumer and placed into the src/Azoft/<SomeBundle>/Rabbit/Callback/ directory).

The MailTaskConsumer class will look like this:

<?php

namespace Azoft\AdminBundle\Rabbit\Callback;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Azoft\APIBundle\Entity\MailTask;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;

class MailTaskConsumer implements ConsumerInterface
{
    private $container;
    private $em;
    private $logger;

    public function __construct($container)
    {
        $this->container = $container;
        $this->em = $this->container->get('doctrine')->getManager();
        $this->logger = new Logger('mails_tasks');
        $this->logger->pushHandler(new StreamHandler($container->parameters['mails_tasks_log']), Logger::WARNING);
    }

    public function execute(AMQPMessage $msg)
    {
        try {
            $this->logger->addInfo('Start executing');
            $message = unserialize($msg->body);
            $inviteId = $message['inviteId'];
	    /* put your code here */
            $pathInvite = $this->em->getRepository('APIBundle:PathInvite')->find($inviteId);
            //$this->container->get('api_mailer')->sendPrivatePathInvites($pathInvite);
            /* end your code */
            $this->logger->addInfo('End executing');
        }
        catch(\Exception $e){
            $this->logger->addError($e->getMessage());
        }
    }
}

You can put your code right after the “put your code here” comment. For example, we wanted to send emails to people who were chosen when creating the Invite. We use a service to send emails: “api_mailer”. Before sending emails we have to retrieve the Invite from the database.

We want all activity to be logged therefore we initialize the logger here. You can omit this code otherwise.

After everything is configured lets create a “cron” task. There is one little but very important detail: a cron job should be created on behalf of either a “root” or “www-data” user.

To create a cronjob on behalf of a different user you need to be designated as the root. Execute this command to create cron job task on behalf of a www-data user:

sudo -u www-data -e

Then put this string into your schedule editor of preference:

* * * * * /<path_to_your_project_root_directory>/app/console rabbitmq:consumer get_mail_task

You may possibly need to create a cronjob task as a root to make rabitmq work:

sudo crontab -e

One thing you will find very useful is setting up a RabbitMQ web panel. To do that you need to enable the appropriate rabbitmq plugin:

/usr/lib/rabbitmq/lib/rabbitmq_server-2.7.1/sbin/rabbitmq-plugins enable rabbitmq_management

You may need to replace “rabbitmq_server-2.7.1” with the name of the rabbitmq_server you have installed (check out your rabbitmq server version).

If everything looks ok, restart rabbitmq_server:

sudo service rabbitmq-server restart

And then open the RabbitMQ web UI:

http://localhost:15672/

VN:F [1.9.22_1171]
Rating: 4.6/5 (14 votes cast)
VN:F [1.9.22_1171]
Rating: +5 (from 7 votes)
Using RabbitMQ in Symfony2 Projects, 4.6 out of 5 based on 14 ratings



Request a Free Quote
 
 
 

Please enter the result and submit the form

Content created by Alex Anisimov