Skip to content

file_system_input_device

messageflux.iodevices.file_system.file_system_input_device

FileSystemInputDevice

FileSystemInputDevice(
    manager,
    name,
    tmp_folder,
    queues_folder,
    fifo=True,
    min_file_age=0,
    serializer=None,
)

Bases: InputDevice['FileSystemInputDeviceManager']

An InputDevice, that reads from folder

ctor

Parameters:

Name Type Description Default
manager FileSystemInputDeviceManager

the device manager which created this input device

required
name str

the name of the queue to read from

required
tmp_folder str

the folder to create temporary files in

required
queues_folder str

the base folder for all queues

required
fifo bool

should we read the files sorted by time (Notice! when fifo=false, you might get file starvation)

True
min_file_age int

the minimum time in seconds since last modification to file, before we to try to read it...

0
serializer Optional[FileSystemSerializerBase]

the serializer to use for reading messages from files

None

FileSystemInputDeviceManager

FileSystemInputDeviceManager(
    root_folder,
    queue_dir_name=FileSystemDeviceManagerBase.DEFAULT_QUEUES_SUB_DIR,
    tmp_dir_name=FileSystemDeviceManagerBase.DEFAULT_TMPDIR_SUB_DIR,
    bookkeeping_dir_name=FileSystemDeviceManagerBase.DEFAULT_BOOKKEEPING_SUB_DIR,
    serializer=None,
    fifo=True,
    min_input_file_age=0,
    transaction_log_save_interval=10,
    **kwargs
)

Bases: FileSystemDeviceManagerBase, InputDeviceManager[FileSystemInputDevice]

this is an input manager that creates file system input devices

Parameters:

Name Type Description Default
root_folder str

the root folder to use for the manager

required
queue_dir_name str

the name of the subdirectory under root_folder that holds the queues

DEFAULT_QUEUES_SUB_DIR
tmp_dir_name str

the name of the subdirectory under root_folder to use for temp files

DEFAULT_TMPDIR_SUB_DIR
bookkeeping_dir_name str

the name of the subdirectory under root_folder that holds the book-keeping data

DEFAULT_BOOKKEEPING_SUB_DIR
serializer Optional[FileSystemSerializerBase]

the serializer to use to write messages to files. None will use the default serializer

None
fifo bool

should we read the files sorted by time (Notice! when fifo=false, you might get file starvation)

True
min_input_file_age int

the minimum time in seconds since last modification, before we to try to read it...

0
transaction_log_save_interval int

the interval in seconds to save the transaction log

10

transaction_log property

transaction_log

the transaction log

connect

connect()

connects to the device manager

disconnect

disconnect()

disconnects from the device manager

FileSystemInputTransaction

FileSystemInputTransaction(device, org_path, tmp_path)

Bases: InputTransaction

represents an InputTransaction for filesystem

Parameters:

Name Type Description Default
device FileSystemInputDevice

the device that returned this transaction

required
org_path str

the original path of the file we read

required
tmp_path str

the temp path of the file we read

required

org_path property

org_path

the original path of the file in this transaction

tmp_path property

tmp_path

the temp (current) path of the file in this transaction

read_file staticmethod

read_file(
    device,
    org_path,
    tmp_folder,
    with_transaction,
    serializer,
)

reads the file from org_path, and adds the file to transaction if it exists, or delete if it doesn't

Parameters:

Name Type Description Default
device FileSystemInputDevice

the input device for this transaction

required
org_path str

the original path to this file

required
tmp_folder str

the temporary folder to use while holding files in transaction

required
with_transaction bool

if set to 'False' no transaction is made, and the message is acked on read

required
serializer FileSystemSerializerBase

the serializer to use to deserialize the file

required

rollback_path staticmethod

rollback_path(tmp_path, org_path, max_poison_count)

rolls back the transaction

Parameters:

Name Type Description Default
tmp_path str

the temp (current) path of the file

required
org_path str

the original path of the file

required
max_poison_count int

the maximum time we allow the file to be rolled back

required

TransactionLog

TransactionLog(filepath)

this class is used to persist transaction log into filesystem, so it can be rolled back on process termination

Parameters:

Name Type Description Default
filepath str

the full path of the file used to persist this log

required

filepath property

filepath

the path to the file holding this transaction log

add_transaction

add_transaction(transaction)

adds a transaction to the log

Parameters:

Name Type Description Default
transaction FileSystemInputTransaction

the transaction to add

required

remove_transaction

remove_transaction(transaction)

removes a transaction from the log

Parameters:

Name Type Description Default
transaction FileSystemInputTransaction

the transaction to remove

required

rollback_all

rollback_all()

rolls back all the transaction in the log

write_log

write_log()

writes the transaction log to the file