Skip to content

rabbitmq_poison_counting_input_device

messageflux.iodevices.rabbitmq.rabbitmq_poison_counting_input_device

PoisonCounterBase

abstract class for a persistent counter store for message ids

delete_counter abstractmethod

delete_counter(message_id)

deletes the counter for message_id

Parameters:

Name Type Description Default
message_id str

the message id to delete the counter for

required

increment_and_return_counter abstractmethod

increment_and_return_counter(message_id)

this methods increments and returns the counter for message_id. if there is no counter for message_id it should return '1'

Parameters:

Name Type Description Default
message_id str

the message id to check the counter for

required

Returns:

Type Description
int

the incremented counter for message_id

start

start()

gets called before the first invocation

stop

stop()

gets called after the last invocation

RabbitMQNoPoisonInputTransactionWrapper

RabbitMQNoPoisonInputTransactionWrapper(
    inner_transaction, poison_counter, message_id
)

Bases: InputTransaction

represents a wrapper for InputTransaction for RabbitMQ with poison counter

Parameters:

Name Type Description Default
inner_transaction InputTransaction

the inner transaction

required
poison_counter PoisonCounterBase

the poison counter

required
message_id str

the message id in this transaction

required

RabbitMQPoisonCountingInputDevice

RabbitMQPoisonCountingInputDevice(
    device_manager,
    queue_name,
    poison_counter,
    max_poison_count=3,
    consumer_args=None,
    prefetch_count=1,
    use_consumer=True,
)

Bases: RabbitMQInputDevice

represents an RabbitMQ input device, that adds handling with poison messages (for classic queues)

constructs a new input RabbitMQ device

Parameters:

Name Type Description Default
device_manager RabbitMQPoisonCountingInputDeviceManager

the RabbitMQ device Manager that holds this device

required
queue_name str

the name for the queue

required
poison_counter PoisonCounterBase

the poison counter to use

required
max_poison_count int

the number of times to requeue a message before backout (-1 means not to handle poison messages at all, 0 means reject all redelivered messages right away)

3
consumer_args Optional[Dict[str, str]]

the arguments to create the consumer with

None
prefetch_count int

the number of un-acked messages that can be consumed

1
use_consumer bool

True to use the 'consume' method, False to use 'basic_get'

True

RabbitMQPoisonCountingInputDeviceManager

RabbitMQPoisonCountingInputDeviceManager(
    hosts,
    user,
    password,
    poison_counter,
    max_poison_count=3,
    port=None,
    ssl_context=None,
    virtual_host=None,
    client_args=None,
    heartbeat=300,
    connection_attempts=5,
    prefetch_count=1,
    use_consumer=True,
    blocked_connection_timeout=None,
    default_direct_exchange=None,
    **kwargs
)

Bases: RabbitMQInputDeviceManager

rabbitmq input device manager, that adds handling with poison messages (for classic queues)

This manager used to create RabbitMQ devices (direct queues)

Parameters:

Name Type Description Default
hosts List[str]

the list of hostnames of the manager

required
user str

the username for the rabbitMQ manager

required
password str

the password for the rabbitMQ manager

required
poison_counter PoisonCounterBase

the poison counter to use

required
max_poison_count int

the number of times to try to handle a message before backout

3
port Optional[int]

the port to connect the hosts to

None
ssl_context Optional[SSLContext]

the ssl context to use. None means don't use ssl at all

None
virtual_host Optional[str]

the virtual host to connect to

None
client_args Optional[Dict[str, str]]

the arguments to create the client with

None
heartbeat int

heartbeat interval for the connection (between 0 and 65536

300
connection_attempts int

Maximum number of retry attempts (-1 means not to handle poison messages at all, 0 means reject all redelivered messages right away)

5
prefetch_count int

the number of unacked messages that can be consumed

1
use_consumer bool

True to use the 'consume' method, False to use 'basic_get'

True
blocked_connection_timeout Optional[float]

If not None, the value is a non-negative timeout, in seconds, for the connection to remain blocked (triggered by Connection.Blocked from broker); if the timeout expires before connection becomes unblocked, the connection will be torn down, triggering the adapter-specific mechanism for informing client app about the closed connection: passing ConnectionBlockedTimeout exception to on_close_callback in asynchronous adapters or raising it in BlockingConnection.

None
default_direct_exchange Optional[str]

optional direct exchange to bind all the queues to (None means no bind)

None

connect

connect()

connects to the device manager

disconnect

disconnect()

disconnects from the device manager