Skip to content

rabbitmq_input_device

messageflux.iodevices.rabbitmq.rabbitmq_input_device

RabbitMQInputDevice

RabbitMQInputDevice(
    device_manager,
    queue_name,
    consumer_args=None,
    prefetch_count=1,
    use_consumer=True,
)

Bases: InputDevice['RabbitMQInputDeviceManager']

represents an RabbitMQ input device

constructs a new input RabbitMQ device

Parameters:

Name Type Description Default
device_manager RabbitMQInputDeviceManager

the RabbitMQ device Manager that holds this device

required
queue_name str

the name for the queue

required
consumer_args Optional[Dict[str, str]]

the arguments to create the consumer with only relevent if "use_consumer" is True

None
prefetch_count int

the number of unacked messages that can be consumed only relevent if "use_consumer" is True

1
use_consumer bool

True to use the 'consume' method, False to use 'basic_get'

True

close

close()

closes the connection to device

RabbitMQInputDeviceManager

RabbitMQInputDeviceManager(
    hosts,
    user,
    password,
    port=None,
    ssl_context=None,
    virtual_host=None,
    client_args=None,
    heartbeat=300,
    connection_attempts=5,
    prefetch_count=1,
    use_consumer=True,
    blocked_connection_timeout=None,
    default_direct_exchange=None,
    **kwargs
)

Bases: RabbitMQDeviceManagerBase, InputDeviceManager[RabbitMQInputDevice]

rabbitmq input device manager

This manager used to create RabbitMQ devices (direct queues)

Parameters:

Name Type Description Default
hosts Union[List[str], str]

the hostname or a list of hostnames of the manager

required
user str

the username for the rabbitMQ manager

required
password str

the password for the rabbitMQ manager

required
port Optional[int]

the port to connect the hosts to

None
ssl_context Optional[SSLContext]

the ssl context to use. None means don't use ssl at all

None
virtual_host Optional[str]

the virtual host to connect to

None
client_args Optional[Dict[str, str]]

the arguments to create the client with

None
heartbeat int

heartbeat interval for the connection (between 0 and 65536

300
connection_attempts int

Maximum number of retry attempts (-1 means not to handle poison messages at all, 0 means reject all redelivered messages right away)

5
prefetch_count int

the number of unacked messages that can be consumed

1
use_consumer bool

True to use the 'consume' method, False to use 'basic_get'

True
blocked_connection_timeout Optional[float]

If not None, the value is a non-negative timeout, in seconds, for the connection to remain blocked (triggered by Connection.Blocked from broker); if the timeout expires before connection becomes unblocked, the connection will be torn down, triggering the adapter-specific mechanism for informing client app about the closed connection: passing ConnectionBlockedTimeout exception to on_close_callback in asynchronous adapters or raising it in BlockingConnection.

None
default_direct_exchange Optional[str]

optional direct exchange to bind all the queues to (None means no bind)

None

connect

connect()

connects to the device manager

disconnect

disconnect()

disconnects from the device manager

RabbitMQInputTransaction

RabbitMQInputTransaction(
    cancellation_token, device, channel, delivery_tag
)

Bases: InputTransaction

represents a InputTransaction for RabbitMQ

Parameters:

Name Type Description Default
cancellation_token Event

the cancellation token, to check before acking

required
device RabbitMQInputDevice

the device that returned this transaction

required
channel BlockingChannel

the BlockingChannel that the item was read from

required
delivery_tag int

the delivery tag for this item

required

channel property

channel

the channel that the item was read from

delivery_tag property

delivery_tag

the delivery tag for this item