Skip to content

PipelineService

The PipelineService Service is used to Process incoming messages, and transfer the results to an output device.

It gets an instance of PipelineHandlerBase class, which handles the messages. It also gets the input and output device managers that will be used to create the devices to receive and send messages

The list of device names to listen on, is also given in the constructor.

Examples

Custom Handler

from typing import Optional

from messageflux.iodevices.base import InputDevice, Message, MessageBundle
from messageflux.iodevices.file_system import FileSystemOutputDeviceManager
from messageflux.iodevices.rabbitmq.rabbitmq_input_device import RabbitMQInputDeviceManager
from messageflux.pipeline_service import PipelineService, PipelineHandlerBase, PipelineResult


class MyPipelineHandler(PipelineHandlerBase):  # This is the handler class
    def handle_message(self,
                       input_device: InputDevice,
                       message_bundle: MessageBundle) -> Optional[PipelineResult]:
        """
        Handles a message from an input device. and returns a tuple of the output device name, message and headers to
        send to.

        :param input_device: The input device that sent the message.
        :param message_bundle: The message that was received.

        :return: None if the message should not be sent to any output device.
        PipelineResult if a message should be sent to the output device with the given name.
        """
        message_bytes = message_bundle.message.bytes
        # do something with message
        return PipelineResult(output_device_name='OUTPUT',  # the output device to send the message to
                              message_bundle=Message(b'test'))  # the message to send


input_device_manager = RabbitMQInputDeviceManager(hosts='my.rabbit.host',
                                                  user='username',
                                                  password='password')  # can be some other type of manager

output_device_manager = FileSystemOutputDeviceManager(
    root_folder='/mnt/fsdevices/')  # can be some other type of manager

service = PipelineService(input_device_manager=input_device_manager,
                          input_device_names=['INPUT_QUEUE_1', 'INPUT_QUEUE_2'],  # these are the devices to read from
                          output_device_manager=output_device_manager,
                          pipeline_handler=MyPipelineHandler())

service.start()  # starts the service

Fixed Router

Sometimes, we just need to read from one device (i.e, a RabbitMQ queue) and forward the messages to another device (i.e, A File-System folder).

In that case, we can use the FixedRouterPipelineHandler instead of writing a custom handler

from typing import Optional

from messageflux.iodevices.rabbitmq.rabbitmq_input_device import RabbitMQInputDeviceManager
from messageflux.iodevices.file_system import FileSystemOutputDeviceManager
from messageflux.pipeline_service import PipelineService, FixedRouterPipelineHandler



input_device_manager = RabbitMQInputDeviceManager(hosts='my.rabbit.host',
                                                  user='username',
                                                  password='password') # can be some other type of manager

output_device_manager = FileSystemOutputDeviceManager(root_folder='/mnt/fsdevices/') # can be some other type of manager

# forward all the messages to 'OUTPUT' device on the output manager
fixed_router = FixedRouterPipelineHandler(output_device_name='OUTPUT') 

service = PipelineService(input_device_manager=input_device_manager,
                          input_device_names=['INPUT_QUEUE_1', 'INPUT_QUEUE_2'], # these are the devices to read from
                          output_device_manager=output_device_manager,
                          pipeline_handler=fixed_router)

service.start() # starts the service

In this example, the service will read from 'INPUT_QUEUE_1' and 'INPUT_QUEUE_2' queues on the RabbitMQ server, and forward all the messages to the 'OUTPUT' queue on the File-System device. this is a useful usecase for backup for example.