Skip to content

rabbitmq_output_device

messageflux.iodevices.rabbitmq.rabbitmq_output_device

LengthValidationException

LengthValidationException(
    *args, inner_exceptions=None, **kwargs
)

Bases: OutputDeviceException

this exception is used when a length validation error occurs in IODevice

RabbitMQOutputDevice

RabbitMQOutputDevice(
    device_manager, routing_key, exchange=""
)

Bases: OutputDevice['RabbitMQOutputDeviceManager']

represents an RabbitMQ output devices

constructs a new output RabbitMQ device

Parameters:

Name Type Description Default
device_manager RabbitMQOutputDeviceManager

the RabbitMQ device Manager that holds this device

required
routing_key str

the routing key for this queue

required
exchange str

the exchange name in RabbitMQ for this output device

''

RabbitMQOutputDeviceManager

RabbitMQOutputDeviceManager(
    hosts,
    user,
    password,
    port=None,
    ssl_context=None,
    virtual_host=None,
    default_output_exchange="",
    publish_confirm=True,
    client_args=None,
    heartbeat=300,
    connection_attempts=5,
    max_message_length=-1,
    max_header_value_length=2048,
    max_header_name_length=1024,
    default_rabbit_headers=None,
    blocked_connection_timeout=None,
    **kwargs
)

Bases: RabbitMQDeviceManagerBase, OutputDeviceManager[RabbitMQOutputDevice]

this manager is used to create RabbitMQ devices

This manager used to create RabbitMQ devices (direct queues)

Parameters:

Name Type Description Default
hosts Union[List[str], str]

the hostname or a list of hostnames of the manager

required
user str

the username for the rabbitMQ manager

required
password str

the password for the rabbitMQ manager

required
port Optional[int]

the port to connect the hosts to

None
ssl_context Optional[SSLContext]

the ssl context to use. None means don't use ssl at all

None
virtual_host Optional[str]

the virtual host to connect to

None
default_output_exchange str

the default exchange used for output devices

''
publish_confirm bool

should the send fail if the message is unroutable?

True
client_args Optional[Dict[str, str]]

the arguments to create the client with

None
heartbeat int

heartbeat interval for the connection (between 0 and 65536

300
connection_attempts int

Maximum number of retry attempts

5
max_message_length int

the maximum size (in bytes) of a message to send (0 and below = no limit)

-1
max_header_value_length int

the maximum length of a header's value (0 and below = no limit)

2048
max_header_name_length int

the maximum length of a header's name (0 and below = no limit)

1024
default_rabbit_headers dict

The default headers to use when sending

None
blocked_connection_timeout Optional[float]

If not None, the value is a non-negative timeout, in seconds, for the connection to remain blocked (triggered by Connection.Blocked from broker); if the timeout expires before connection becomes unblocked, the connection will be torn down, triggering the adapter-specific mechanism for informing client app about the closed connection: passing ConnectionBlockedTimeout exception to on_close_callback in asynchronous adapters or raising it in BlockingConnection.

None

default_rabbit_headers property

default_rabbit_headers

returns the default rabbit headers

publish_confirm property

publish_confirm

returns the publish_confirm flag

connect

connect()

connects to the device manager

disconnect

disconnect()

disconnects from the device manager

get_outgoing_channel

get_outgoing_channel()

returns a rabbitmq channel for publishing messages

Returns:

Type Description
BlockingChannel

a rabbit mq channel

publish_message

publish_message(
    routing_key,
    data,
    exchange="",
    headers=None,
    app_id=None,
    message_id=None,
    persistent=True,
    mandatory=False,
    priority=None,
    expiration=None,
)

publishes a message, and reconnects if there's an error

Parameters:

Name Type Description Default
routing_key str

The routing key to bind on

required
data BinaryIO

the stream to send

required
exchange str

The exchange to publish to

''
headers Optional[Dict[str, Any]]

the headers to send to queue

None
app_id str

the app_id to send with the message

None
message_id str

the id to five the message

None
persistent bool

should the message be persisted to disk

True
mandatory bool

The mandatory flag

False
priority int

the message priority (0-9)

None
expiration Optional[int]

ttl (in ms) of message

None