# NAME Net::AMQP::RabbitMQ::Batch - simple batch processing of messages for RabbitMQ. # SYNOPSIS my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak; $rb->process({ from_queue => 'test_in', routing_key => 'test_out', handler => \&msg_handler, batch => { size => 10, # batch size timeout => 2, # ignore_size => 0 # ignore in/out batches size mismatch }, ignore_errors => 0, # ignore handler errors publish_options => { exchange => 'exchange_out', # exchange name, default is 'amq.direct' }, }); sub msg_handler { my $messages = shift; # work with 10 messages return $messages; } # DESCRIPTION Assume you read messages from a queue, process them and publish. But you would like to do it in batches, processing many messages at once. This module: - gets messages from in queue and publish them by routing key - uses your handler to batch process messages - keeps persistency - if processing fails, nothing lost from input queue, nothing published # USAGE Define a messages handler: sub msg_handler { my $messages = shift; # works with hashref of messages return $messages; } `$messages` is an arrayref of message objects: { body => 'Magic Transient Payload', # the reconstructed body routing_key => 'nr_test_q', # route the message took delivery_tag => 1, # (used for acks) .... # Not all of these will be present. Consult the RabbitMQ reference for more details. props => { ... } } Handler should return arrayref of message objects (only `body` is required): [ { body => 'Processed message' }, ... ] Connect to RabbitMQ: my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak; And process a batch: $rb->process({ from_queue => 'test_in', routing_key => 'test_out', handler => \&msg_handler, batch => { size => 10 } }); You might like to wrap it with `while(1) {...}` loop. See `process_in_batches.pl` or `process_in_forked_batches.pl` for example. # METHODS ## process() # Known Issues - Can not set infinity timeout (use very long int) - No individual messages processing possible - No tests yet which is very sad :( # AUTHORS Alex Svetkin # LICENSE MIT