rabbitmq_poison_counting_input_device
messageflux.iodevices.rabbitmq.rabbitmq_poison_counting_input_device
PoisonCounterBase
abstract class for a persistent counter store for message ids
delete_counter
abstractmethod
deletes the counter for message_id
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id |
str
|
the message id to delete the counter for |
required |
increment_and_return_counter
abstractmethod
this methods increments and returns the counter for message_id. if there is no counter for message_id it should return '1'
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_id |
str
|
the message id to check the counter for |
required |
Returns:
| Type | Description |
|---|---|
int
|
the incremented counter for message_id |
RabbitMQNoPoisonInputTransactionWrapper
Bases: InputTransaction
represents a wrapper for InputTransaction for RabbitMQ with poison counter
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inner_transaction |
InputTransaction
|
the inner transaction |
required |
poison_counter |
PoisonCounterBase
|
the poison counter |
required |
message_id |
str
|
the message id in this transaction |
required |
RabbitMQPoisonCountingInputDevice
RabbitMQPoisonCountingInputDevice(
device_manager,
queue_name,
poison_counter,
max_poison_count=3,
consumer_args=None,
prefetch_count=1,
use_consumer=True,
)
Bases: RabbitMQInputDevice
represents an RabbitMQ input device, that adds handling with poison messages (for classic queues)
constructs a new input RabbitMQ device
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device_manager |
RabbitMQPoisonCountingInputDeviceManager
|
the RabbitMQ device Manager that holds this device |
required |
queue_name |
str
|
the name for the queue |
required |
poison_counter |
PoisonCounterBase
|
the poison counter to use |
required |
max_poison_count |
int
|
the number of times to requeue a message before backout (-1 means not to handle poison messages at all, 0 means reject all redelivered messages right away) |
3
|
consumer_args |
Optional[Dict[str, str]]
|
the arguments to create the consumer with |
None
|
prefetch_count |
int
|
the number of un-acked messages that can be consumed |
1
|
use_consumer |
bool
|
True to use the 'consume' method, False to use 'basic_get' |
True
|
RabbitMQPoisonCountingInputDeviceManager
RabbitMQPoisonCountingInputDeviceManager(
hosts,
user,
password,
poison_counter,
max_poison_count=3,
port=None,
ssl_context=None,
virtual_host=None,
client_args=None,
heartbeat=300,
connection_attempts=5,
prefetch_count=1,
use_consumer=True,
blocked_connection_timeout=None,
default_direct_exchange=None,
**kwargs
)
Bases: RabbitMQInputDeviceManager
rabbitmq input device manager, that adds handling with poison messages (for classic queues)
This manager used to create RabbitMQ devices (direct queues)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hosts |
List[str]
|
the list of hostnames of the manager |
required |
user |
str
|
the username for the rabbitMQ manager |
required |
password |
str
|
the password for the rabbitMQ manager |
required |
poison_counter |
PoisonCounterBase
|
the poison counter to use |
required |
max_poison_count |
int
|
the number of times to try to handle a message before backout |
3
|
port |
Optional[int]
|
the port to connect the hosts to |
None
|
ssl_context |
Optional[SSLContext]
|
the ssl context to use. None means don't use ssl at all |
None
|
virtual_host |
Optional[str]
|
the virtual host to connect to |
None
|
client_args |
Optional[Dict[str, str]]
|
the arguments to create the client with |
None
|
heartbeat |
int
|
heartbeat interval for the connection (between 0 and 65536 |
300
|
connection_attempts |
int
|
Maximum number of retry attempts (-1 means not to handle poison messages at all, 0 means reject all redelivered messages right away) |
5
|
prefetch_count |
int
|
the number of unacked messages that can be consumed |
1
|
use_consumer |
bool
|
True to use the 'consume' method, False to use 'basic_get' |
True
|
blocked_connection_timeout |
Optional[float]
|
If not None, the value is a non-negative timeout, in seconds, for the connection to remain blocked (triggered by Connection.Blocked from broker); if the timeout expires before connection becomes unblocked, the connection will be torn down, triggering the adapter-specific mechanism for informing client app about the closed connection: passing |
None
|
default_direct_exchange |
Optional[str]
|
optional direct exchange to bind all the queues to (None means no bind) |
None
|