Multiprocessing#
Multiprocessing goodies.
- class ktz.multiprocessing.Actor(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)#
Process abstraction used with a Relay.
Actor instances are wired together using a Relay and then send messages to each other. Each actor is spawned in a separate process. Please see Relay to understand their usage.
Actors who are senders may invoke send()
Actors who receivers must implement recv()
Actors who are not receivers must implement loop()
There are callback functions invoked at various stages of an Actor’s lifecycle (implemented in run()):
startup(): Before loop()
loop(): Must be overwritten if not receiver
shutdown(): After loop()
Actors MUST not be iterable! A Relay is required to properly initialize an Actor.
- Attributes:
- authkey
daemonReturn whether process is a daemon
exitcodeReturn exit code of process or None if it has yet to stop
identReturn identifier (PID) of process or None if it has yet to start
- name
pidReturn identifier (PID) of process or None if it has yet to start
receiverWhether the Actor can receive messages.
senderWhether the actor can send messages.
sentinelReturn a file descriptor (Unix) or handle (Windows) suitable for waiting for process termination.
Methods
close()Close the Process object.
is_alive()Return whether process is alive
join([timeout])Wait until child process terminates
kill()Terminate process; sends SIGKILL signal or uses TerminateProcess()
log(msg, *args[, level])Log in main process.
loop()Keep the Actor alive.
run()Control an Actor's lifecycle.
send(*args, **kwargs)Send a message to consuming actors.
shutdown()Implement as callback after loop().
start()Start child process
startup()Implement as callback before loop().
terminate()Terminate process; sends SIGTERM signal or uses TerminateProcess()
recv
- log(msg, *args, level=20)#
Log in main process.
Logging is not process-safe. As such a thread in the parent process handles logging and this function handles communication with this thread.
- Parameters:
- msgstr
Log message
- levelint
Log level
- loop()#
Keep the Actor alive.
This function needs to be overwritten if the Actor is not a receiver. Handles message passing otherwise.
- property receiver#
Whether the Actor can receive messages.
If the Actor is first in a Relay it has no senders attached which can send it messages. Such Actors must implement a loop() which decides their lifetime and must not implement recv().
- run()#
Control an Actor’s lifecycle.
Implementation of mp.Process.run. It is not invoked directly but called through Process.start() in a Relay.
- send(*args, **kwargs)#
Send a message to consuming actors.
The recv() handler of any one of the receiving Actors is called with the provided args and kwargs.
- property sender#
Whether the actor can send messages.
If the Actor is last in a Relay it has no recipient and thus can not send messages. If Actor.sender is True, the send() message can be invoked.
- shutdown()#
Implement as callback after loop().
- startup()#
Implement as callback before loop().
- class ktz.multiprocessing.Control(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)#
Internally used control messages.
- class ktz.multiprocessing.Handler#
Execute code in the main process.
The handler maintains a queue. After control is given to the relay to handle the Actor’s life cycles a handler instance may be used to gain control back in the main process. Note that the user is then responsible to leave the run() method and give control back to the relay to join on the actor processes.
Usually it is a good idea to send and react to a poison pill (see demo/multiprocessing_relay.py) for an example.
Methods
run()Execute code in the main process.
- abstract run()#
Execute code in the main process.
- class ktz.multiprocessing.Relay(maxsize=None, log=None)#
Wire Actors together.
Control backpressure with maxsize.
Methods
connect(*args, **kwargs)Connect Actors to form a processing pipeline.
start([handler])Start the relay.
- connect(*args, **kwargs)#
Connect Actors to form a processing pipeline.
Provide a sequence of Actors or sets of Actors to define message flow. All topologies are possible: 1-1, 1-n, n-1 and n-m.
- Parameters:
- *argsActor | Iterable[Actor]
Actors or sets of Actors.
- start(handler=None)#
Start the relay.
This spawns all Actor processes and blocks until all these processes terminated. If a handler is provided, control is given back to the user in the main process.