Problem: Data processing and machine learning computation can become a pipeline bottleneck
One solution: Break heavy computations out into a separate Python process
There are a few reasons you might want to use more than one process in your pipeline.
1. You are using all your CPU cycles. This is the most common one. Running a Python script usually starts up a single process, which means that it can at most make use of one core. Your computer probably has two or more, so you have room to expand if you need it. Splitting your job up into two processes gives you twice as many cycles to work with.
2. One part runs more often than another. I wrote some code that collects audio data from a microphone, then calculates its frequency components. The collection has to happen often (audio data is sampled 44,000 times per second), but I only needed to recalculate the frequency components a few times per second. The timing mismatch here meant that the fast part would sometimes have to wait around for the slow part to run. Splitting this into separate processes decoupled them so that they could both operate comfortably at their own cadence.
3. It has to run in real time. If you are working with sensors that sample from the physical world, like cameras, take actions in the physical world, like climate controls, or systems that do both, like robots, then your code needs to be reliably synced to a wall clock. This doesn't always mesh well with demanding computational modeling or machine learning algorithms, which can take long, uncertain periods of time to reach a result. Breaking these out into separate processes ensures that sensors can be read and actions can be taken regularly using the most current information available at the time, while large computations whirr away happily in the background.
Here's a working example. Commentary below.
import multiprocessing as mp
import numpy as np
import time
shared_values = mp.Array('d', range(2))
def heavy_pipeline_task(shared_values):
counter = 0
while True:
shared_values[0] = counter
shared_values[1] = np.sum(np.random.normal(size=(1000, 1000)))
counter += 1
pipeline_job = mp.Process(
target=heavy_pipeline_task,
args=(shared_values,),
daemon=True)
pipeline_job.start()
for i in range(1000):
time.sleep(.4)
print(f"pass {i}: {int(shared_values[0])}, {shared_values[1]}")
This code creates a separate process to run a heavy array computation, then polls it periodically to pull the most recent result.
shared_values = mp.Array('d', range(2))
Create
a special object for sharing values between
the two processes. This is the dead drop where one process
can write a set of values and the other can read from it.
All of the trickiness of making sure both processes know where
in memory to find this object and preventing them both from
changing it at the same time is taken care of for you. This
is a wonderful convenience and makes for clean code.
This particular object is a two-element array of doubles ('d').
There are also objects available for single values and
queues,
and if those aren't enough for you, you can use a
Manager to
share any other object type you want.
def heavy_pipeline_task(shared_values):
There's nothing special about this function declaration
except that it accepts the shared variable as an argument.
The function itself performs a pointless but expensive
array operation, and also maintains a counter so that you
can see missed and repeated readings in the other thread.
pipeline_job = mp.Process(
Initialize a separate process.
target=heavy_pipeline_task,
Identify the function that the process will run.
args=(shared_values,),
Provide all the input arguments for the function in the form of
a tuple.
daemon=True)
Declare the new child process to be a daemon. This means that
when the parent process dies for any reason, the child process
will be killed too. This saves you from having to explicitly
track and clean up after each process separately. However, if you
need to close your child process explicitly,
for instance, to gracefully shut down a
connection or close a file, you can skip the daemon argument
and use an
Event
to coordinate the shutdown.
pipeline_job.start()
Kick of the new process.
print(f"pass {i}: {int(shared_values[0])}, {shared_values[1]}")
Reference the shared object as often as you like from the parent
process. In this code, you might read the same thing more than once,
and you might miss a lot of updates, but you have the benefit of
fully decoupled processes. (If you want to get all the updates
once and only once, a
queue
is your best bet.)
What about threading?
In situations where you're not running low on CPU cycles, Python's threading library is a fine alternative for decoupling your code. ( Here's an example of how.) There's about 90% similarity between the two libraries, so you can almost copy and paste multiprocessing code to get multi-threaded code.
Threading helps different code snippets to run asynchronously, while keeping the code easy to read. But it's ideally suited to cases where the bottleneck is due to waiting around, for example, waiting for some external input/output process to provide or read a value. It still limits your code to running in a single process. If you have expensive computations, like large array operations, they still can block the other thread from running because the process can only do one thing at a time. Running multiple processes is a better way to ensure that the two tasks really are parallel and that one won't gum up the other.