php - How delete messages from an AMQP (RabbitMQ) queue? -
this thing.
i reading results queue rabbitmq using php amqp in order process vital information on every email sent. after done, need delete or mark message written next time read queue don't messages processed.
as rabbitmq server sending on 10.000 emails hour, every time read queue process result sendings, script can running @ lease 5 minutes in order process messages in queue, after done, several hundred of new messages places during 5 minutes. makes impossible me purge queue after script finishes because delete messages places during script running not processed.
that leaves me 1 choice. mark or delete message after being processed or read amqp script.
is there way that? (here script)
<?php /** * result.php * script connects rabbitmq, , takes result message * result message queue. */ // include settings require_once('settings.php'); // try set connection rabbitmq server try { // construct connection rabbitmq server $connection = new amqpconnection(array( 'host' => $hostname, 'login' => $username, 'password' => $password, 'vhost' => $vhost )); // connect rabbitmq server $connection->connect(); } catch (amqpexception $exception) { echo "could not establish connection rabbitmq server.\n"; } // try create channel try { // open channel $channel = new amqpchannel($connection); } catch (amqpconnectionexception $exception) { echo "connection broker lost (creating channel).\n"; } // try create queue try { // create queue , bind exchange $queue = new amqpqueue($channel); $queue->setname($resultbox); $queue->setflags(amqp_durable); $queue->bind('exchange1', 'key1'); $queue->declare(); } catch (amqpqueueexception $exception) { echo "channel not connected broker (creating queue).\n"; } catch (amqpconnectionexception $exception) { echo "connection broker lost. (creating queue)/\n"; } // message queue. while ($envelope = $queue->get()) { //function processes message process_message($envelope->getbody()); } $queue->purge(); // done, close connection rabbitmq $connection->disconnect(); ?>
acknowledge message(s) $queue->ack()
after successful processing or consume/get them amqp_autoack
flag.
upd:
based on code:
1. ack'ing messagewhile ($envelope = $queue->get()) { //function processes message process_message($envelope->getbody()); $queue->ack($envelope->getdeliverytag()); }
2. getting amqp_autoack
flag: while ($envelope = $queue->get(amqp_autoack)) { //function processes message process_message($envelope->getbody()); }
p.s.:
check amqpqueue::consume documentation, looks more suitable here.
3. can consume , ack message after been processed:$queue->consume(function ($envelope, $queue) { process_message($envelope->getbody()); $queue->ack($envelope->getdeliverytag()); });
4. or consume amqp_autoack
flag, when procesing fails won't able process message again: $queue->consume(function ($envelope, $queue) { process_message($envelope->getbody()); $queue->ack($envelope->getdeliverytag()); }, amqp_autoack);
conclusion: recommend use #3 solution, it's you.
Comments
Post a Comment