Multiprocessing#

Multiprocessing goodies.

class ktz.multiprocessing.Actor(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)#

Process abstraction used with a Relay.

Actor instances are wired together using a Relay and then send messages to each other. Each actor is spawned in a separate process. Please see Relay to understand their usage.

  • Actors who are senders may invoke send()

  • Actors who receivers must implement recv()

  • Actors who are not receivers must implement loop()

There are callback functions invoked at various stages of an Actor’s lifecycle (implemented in run()):

  • startup(): Before loop()

  • loop(): Must be overwritten if not receiver

  • shutdown(): After loop()

Actors MUST not be iterable! A Relay is required to properly initialize an Actor.

Attributes:
authkey
daemon

Return whether process is a daemon

exitcode

Return exit code of process or None if it has yet to stop

ident

Return identifier (PID) of process or None if it has yet to start

name
pid

Return identifier (PID) of process or None if it has yet to start

receiver

Whether the Actor can receive messages.

sender

Whether the actor can send messages.

sentinel

Return a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.

Methods

close()

Close the Process object.

is_alive()

Return whether process is alive

join([timeout])

Wait until child process terminates

kill()

Terminate process; sends SIGKILL signal or uses TerminateProcess()

log(msg, *args[, level])

Log in main process.

loop()

Keep the Actor alive.

run()

Control an Actor's lifecycle.

send(*args, **kwargs)

Send a message to consuming actors.

shutdown()

Implement as callback after loop().

start()

Start child process

startup()

Implement as callback before loop().

terminate()

Terminate process; sends SIGTERM signal or uses TerminateProcess()

recv

log(msg, *args, level=20)#

Log in main process.

Logging is not process-safe. As such a thread in the parent process handles logging and this function handles communication with this thread.

Parameters:
msgstr

Log message

levelint

Log level

loop()#

Keep the Actor alive.

This function needs to be overwritten if the Actor is not a receiver. Handles message passing otherwise.

property receiver#

Whether the Actor can receive messages.

If the Actor is first in a Relay it has no senders attached which can send it messages. Such Actors must implement a loop() which decides their lifetime and must not implement recv().

run()#

Control an Actor’s lifecycle.

Implementation of mp.Process.run. It is not invoked directly but called through Process.start() in a Relay.

send(*args, **kwargs)#

Send a message to consuming actors.

The recv() handler of any one of the receiving Actors is called with the provided args and kwargs.

property sender#

Whether the actor can send messages.

If the Actor is last in a Relay it has no recipient and thus can not send messages. If Actor.sender is True, the send() message can be invoked.

shutdown()#

Implement as callback after loop().

startup()#

Implement as callback before loop().

class ktz.multiprocessing.Control(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#

Internally used control messages.

class ktz.multiprocessing.Handler#

Execute code in the main process.

The handler maintains a queue. After control is given to the relay to handle the Actor’s life cycles a handler instance may be used to gain control back in the main process. Note that the user is then responsible to leave the run() method and give control back to the relay to join on the actor processes.

Usually it is a good idea to send and react to a poison pill (see demo/multiprocessing_relay.py) for an example.

Methods

run()

Execute code in the main process.

abstract run()#

Execute code in the main process.

class ktz.multiprocessing.Relay(maxsize=None, log=None)#

Wire Actors together.

Control backpressure with maxsize.

Methods

connect(*args, **kwargs)

Connect Actors to form a processing pipeline.

start([handler])

Start the relay.

connect(*args, **kwargs)#

Connect Actors to form a processing pipeline.

Provide a sequence of Actors or sets of Actors to define message flow. All topologies are possible: 1-1, 1-n, n-1 and n-m.

Parameters:
*argsActor | Iterable[Actor]

Actors or sets of Actors.

start(handler=None)#

Start the relay.

This spawns all Actor processes and blocks until all these processes terminated. If a handler is provided, control is given back to the user in the main process.