Skip to content

rabbitmq_device_manager_base

messageflux.iodevices.rabbitmq.rabbitmq_device_manager_base

RabbitMQDeviceManagerBase

RabbitMQDeviceManagerBase(
    hosts,
    user,
    password,
    port=None,
    ssl_context=None,
    virtual_host=None,
    client_args=None,
    connection_type="None",
    heartbeat=300,
    connection_attempts=5,
    blocked_connection_timeout=None,
    **kwargs
)

base class for rabbitmq device managers

Notice that pika is imported inside the methods here, since it causes trouble when using this device in multiprocess.

This manager used to create RabbitMQ devices (direct queues)

Parameters:

Name Type Description Default
hosts Union[List[str], str]

the hostname or a list of hostnames of the manager

required
user str

the username for the rabbitMQ manager

required
password str

the password for the rabbitMQ manager

required
port Optional[int]

the port to connect the hosts to

None
ssl_context Optional[SSLContext]

the ssl context to use. None means don't use ssl at all

None
virtual_host Optional[str]

the virtual host to connect to

None
client_args Optional[Dict[str, str]]

the arguments to create the client with

None
heartbeat int

heartbeat interval for the connection (between 0 and 65536

300
connection_attempts int

Maximum number of retry attempts

5
blocked_connection_timeout Optional[float]

If not None, the value is a non-negative timeout, in seconds, for the connection to remain blocked (triggered by Connection.Blocked from broker); if the timeout expires before connection becomes unblocked, the connection will be torn down, triggering the adapter-specific mechanism for informing client app about the closed connection: passing ConnectionBlockedTimeout exception to on_close_callback in asynchronous adapters or raising it in BlockingConnection.

None

connection property

connection

returns an open and ready connection

maintenance_channel property

maintenance_channel

returns a channel used for maintenance (create/bind etc...)

bind_queue

bind_queue(
    queue_name, exchange, routing_key=None, arguments=None
)

Bind the queue to the specified exchange

Parameters:

Name Type Description Default
queue_name str

The queue to bind to the exchange

required
exchange str

The source exchange to bind to

required
routing_key Optional[str]

The routing key to bind on

None
arguments Optional[Dict[str, Any]]

Custom key/value pair arguments for the binding

None

Returns:

Type Description
Method

Method frame from the Queue.Bind-ok response

create_queue

create_queue(
    queue_name,
    passive=False,
    durable=True,
    exclusive=False,
    auto_delete=False,
    arguments=None,
    direct_bind_to_exchange=None,
)

Declare queue, create if needed. This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

Leave the queue name empty for a auto-named queue in RabbitMQ

Parameters:

Name Type Description Default
queue_name str

The queue name

required
passive bool

Only check to see if the queue exists

False
durable bool

Survive reboots of the broker

True
exclusive bool

Only allow access by the current connection

False
auto_delete bool

Delete after consumer cancels or disconnects

False
arguments Optional[Dict[str, Any]]

Custom key/value arguments for the queue

None
direct_bind_to_exchange Optional[str]

if not None - a 'direct' exchange name to bind to with the queue name

None

Returns:

Type Description
Method

Method frame from the Queue.Declare-ok response

delete_queue

delete_queue(queue_name, only_if_empty=True)

Deletes a queue from the mq broker.

Parameters:

Name Type Description Default
queue_name str

The queue name

required
only_if_empty bool

if True will delete the queue only if it is empty (no messages inside)

True

Returns:

Type Description
Method

Method frame from the Queue.Declare-ok response

get_queue_message_count

get_queue_message_count(queue_name)

returns the current number of messages in queue

Parameters:

Name Type Description Default
queue_name str

the queue name to check

required

Returns:

Type Description
int

current number of messages in queue

unbind_queue

unbind_queue(
    queue_name, exchange, routing_key=None, arguments=None
)

Unbind the queue from the specified exchange

Parameters:

Name Type Description Default
queue_name str

The queue to unbind to the exchange

required
exchange str

The source exchange to unbind from

required
routing_key Optional[str]

The routing key to unbind

None
arguments Optional[Dict[str, Any]]

Custom key/value pair arguments for the unbinding

None

Returns:

Type Description
Method

Method frame from the Queue.Unbind-ok response