Using message queues like RabbitMQ is a nice and easy way to bring PHP processes into the background. Instead of running a time consuming task inside a web request, just push a message to a queue and grab the result later. Sounds easy in the first step. But when it comes to keep PHP processes running in the background to consume those messages on a queue it can get dirty.

Running on a dedicated Linux machine will make life easy. But what would you do, if it is not like this. On Linux you can simply use things like supervisord to control it. But what would you do if you have to run in shared hosting (for sure the rabbit must then be available somewhere else), not able to install any additional software. Even harder: what do you do on Windows. I would never suggest to run supervisor daemon on a windows machine, even if others do. The Linux cron daemon (mostly also available on shared hosting) or Windows scheduled tasks can be a solution. Write a small PHP script that is triggered every few minutes and connects to RabbitMQ to wait for messages.

(Examples are based on php-amqplib)

// connect to your RabbitMQ server
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// declare a channel
$channel->queue_declare('myQueue', false, false, false, false);

// define consuming
$channel->basic_consume('hello', '', false, true, false, false,
  function($msg) {
    // a callback to consume incoming messages
    echo "Message received:n";
    print_r($msg->body);
  }
);

// wait for incoming messages
while(count($channel->callbacks)) {
  $channel->wait();
}

This snippet shows, how easy it could be to consume messages from a queue. If you put code like that into a PHP script and execute the script every minute, using the Linux cron daemon or  a windows scheduled task, it would create a new consumer every minute. That’s not exactly what we want. How do I now control the amount of spawned consumers?

To make this happen we just have to check the return values form queue_declare(). One of the return values is the current consumer count listening on the queue.

// It return an array with different values.
// The third one is the consumer count.
list(,,$consumerCount) = $channel->queue_declare('myQueue', false, false, false, false);

if ($consumerCount > $maxConsumers) {
  exit;
}

Just stop the consumer script, if the maximum amount of consumers is reached. The big benefit from PHP is, that it fails gracefully when it comes to errors or exceptions. That means, we will not have PHP consumers running into a dead-end on issues. When an error occurs in consuming a message (or an exception, depends on you), the php process will stop and this means it also stops consuming. You do not have to do any additional cleanup neither in PHP nor RabbitMQ. Just wait for the next cron/scheduled task and a new and fresh consumer is fired up.

For sure you have to do a bit more on exception/error handling and all this stuff should be nicely wrapped up in classes and so on. Whatever fits your needs.

Autorefresh consumers

To avoid consumers to run too long, you can shut them down after a while. Withing your consuming callback, you can check how long the consumer is already running and close it after a defined time:

// snippet of a possible consumer class

/**
 * Consume messages from queue
 *
 * @param PhpAmqpLibMessageAMQPMessage $message The message
 */
 public function consume(PhpAmqpLibMessageAMQPMessage $message) {
   // do whatever you have to do with your message
   $result = $this->executeWorker($message->body);

   if ($result === true) {
     // tell rabbitmq that message is completed
     $channel = $message->delivery_info['channel'];
     $channel->basic_ack($message->delivery_info['delivery_tag']);
   }

   // stop consuming when ttl is reached
   if ($this->start + $this->ttl) < time()) {
     $channel->basic_cancel($message->delivery_info['consumer_tag']);
   }
 }

Avoid blocking resources

To not step into bad blocking issues, always try to avoid binding resources not required in a consumer. E.g. you may need a database connection during consuming a message. If you connect to a database, the connection will stay with the lifetime of your consumer. If you have many consumers you may open up too many connections and block others from using the database.

That’s a reason why we keep the consumer really really lightweighted. To consume a message, we call another PHP script using proc_open() to avoid unecessary bindings within the consumer itself. The bindings sticks with the worker script:

/**
 * Execute a worker in an external process
 *
 * @param string $body The job message body
 * @return boolean True on success
 */
protected function executeWorker($body) {
  // open a php process and call the worker.php script
  $pipes = array();
  $process = proc_open(
    '/usr/bin/php -d display_errors=stderr /path/to/worker.php',
    array(
      0 => array("pipe", "r"),
      1 => array("pipe", "w"),
      2 => array("pipe", "w")
    ),
    $pipes,
    sys_get_temp_dir(),
    null
  );

  if (is_resource($process)) {
    // write the message into worker.php stdin
    fwrite($pipes[0], $body);
    fclose($pipes[0]);

    // read errors from worker.php
    $stdErr = stream_get_contents($pipes[2]);
    fclose($pipes[2]);

    // if worker.php ends without errors, execution was successful
    if (proc_close($process) === 0 && empty($stdErr)) {
      return true;
    }
  }

  return false;
}

Finally due to some PHP behaviour, running PHP consumer to handle messages from queueing  systems aren’t so complex. The same may also work for other systems like Gearman for example. Have fun playing around yourself.

For the windows guys: How do you solve it? Let me know in the comments.