5 min read

Python Multiprocessing: Having a Pickle with Pickling

Python Multiprocessing: Having a Pickle with Pickling
A pickled python. Unhappy...

Pickling is one of the most used serialization protocols specific to Python. It is popular due to its ease ouf use, as virtually all Python objects can be serialized to a bytestream and written on disk or sent over a network.

Originally, the term is derived from the process of pickling food for preservation. Similarly, we want to "Python" objects, usually in a .pickle file for later use. For many, that's the only place where they directly encounter pickling. However, it happens in many places under the hood. Without realizing, we may use it and since our program seems to run just fine, why bother? In somewhat typical Python fashion: Out of mind, out of sight. Python is supposed to be an easy language right?

Not so. If you care about speed and efficiency, it is important to understand what happens beyond the single line of Python code that does so much. I know I mentioned speed and Python in a single sentence (how dare you?). We need to keep in mind, that at the time of writing, large parts of the world are running on Python. Especially since the meteoric rise of modern machine learning and AI[1]. So we should pay attention.

A Pickle stuck in a Pipe

There are physical limitations when trying to increase the clock speed of CPUs (Central Processing Units). Therefore, chip manufacturers have resorted to increasing the number of cores in CPUs, allowing for parallel processing of instructions (rather than processing them faster). The most widely used implementation of Python (CPython) is not well suited for this kind of programming paradigm due to the Global Interpreter Lock (GIL)[2] that prevents Python programs (or threads) from ending up in race conditions.

Without the GIL, Python might get tangled up...

This is a major issue since Python is used for many applications in scientific computing that rely on parallel computations to crunch enormous amount of numbers (most notably applications in AI and high-performance computing).

For this reason, many programmers resort to spinning up multiple processes (with their independent Python instances) and employing inter-process communication (IPC) techniques (e.g, Pipes or Networking Sockets) to coordinate those processes to work together on a computation. This paradigm has multiple downsides. Here is a quote:

In PyTorch, Python is commonly used to orchestrate ~8 GPUs and ~64 CPU threads, growing to 4k GPUs and 32k CPU threads for big models. While the heavy lifting is done outside of Python, the speed of GPUs makes even just the orchestration in Python not scalable. We often end up with 72 processes in place of one because of the GIL. Logging, debugging, and performance tuning are orders-of-magnitude more difficult in this regime, continuously causing lower developer productivity.
-- Zachary DeVito, PyTorch core developer and researcher at FAIR (Meta AI)

Processes are not nimble entities and come with a lot of overhead. Also, making those processes communicate over IPC can be slow. This is where pickling enters the scene.

Python processes are heavy-weight...

Unbeknownst to many, popular methods for IPC (of the "message-passing" variant) such as multiprocessing.Queue use pickling in the background. As data gets sent from one process to another, it needs to be serialized into a bytestream and then deserialized again[3]. It is well known, that this is not exactly efficient[4].

Shared Memory to the Rescue

I would like to offer an interesting alternative. We won't get rid of processes (in favor of light-weight threads, this will take a couple of years until Python is capable of doing that. Note that C API Extension for Python can release the GIL for long running processes. However, this does not fully replace "true" multi-threading) for our CPU-bound computations. However, we can get rid of that pickle. Below I present a simplified solution of using shared memory for IPC. Shared Memory allows processes to access the same memory region rather than sending each other messages.

The example we will look at aims to do the following:

  • Create a custom SharedMemoryProcessPool class that can accept workloads (tasks) and assign them to a pool of worker processes. We want to minimize the pickling overhead though SharedMemory as much as possible.
  • Make sure that the dispatched workloads ("futures") remain in order. I do this by using a normal queue (not a multi-processing queue!). These to not use any pickling.
  • We restrict our workload to numpy arrays. But this is not a hard requirement.
  • Both input array and output array are shared via SharedMemory. Note that the future only returns a bool, not the result of the computation. Futures use pickling in the background and we want to avoid transmitting potentially large amounts of data (the result of the tasks). We settle for returning a single bool (which will be pickled) and use it as a sentinel.

The snippet below shows a custom class SharedMemoryProcessPool. It accepts workloads through its put function. I assumed that work will happen on a numpy array arr for which the function creates a Shared Memory region using the multiprocessing.shared_memory module. The memory for the result of the task is pre-allocated as well. The reason for this is that we cannot easily allocate it dynamically in the worker process because we will get a segmentation fault as the memory region seems to be de-allocated as soon as the worker process finishes the task.

There is no free lunch. Pickling offers you more flexibility at the cost of efficiency. However, if you can specify your workloads (in terms of memory requirements) then using shared memory is a good option. With the implementation below I achieved a speed up of almost 50% compared to a simple implementation using futures (without shared memory).

class SharedMemoryProcessPool:
    def __init__(self) -> None:
        self.executor = ProcessPoolExecutor(max_workers=4)
        self.task_queue = queue.Queue()

    def put(self, func, arr, task_type):

        shm = SharedMemory(create=True, size=arr.nbytes)
        shm_result = SharedMemory(create=True, size=task_type.result_bytes)
        copy_array_to_shared_memory(arr, shm.name, arr.shape, arr.dtype)
        self.task_queue.put(
            (
                shm,
                shm_result,
                task_type,
                self.executor.submit(
                    func,
                    shm.name,
                    arr.shape,
                    arr.dtype,
                    shm_result.name,
                    task_type.result_shape,
                    task_type.result_dtype,
                ),
            )
        )

    def get(self):
        input_shm: SharedMemory
        f: Future
        input_shm, result_shm, task_type, f = self.task_queue.get()

        result, success = None, False
        try:
            success = f.result()

            if success:
                result = np.array(
                      np.ndarray(shape=task_type.result_shape, dtype=task_type.result_dtype, buffer=result_shm.buf)
                )

            return result, success

        finally:
            deallocate_shared_memory(input_shm)
            deallocate_shared_memory(result_shm)

    def empty(self):
        return self.task_queue.empty()

    def _clear_task_queue(self):
        while not self.task_queue.empty():
            input_shm, result_shm, _, future = self.task_queue.get()
            future.cancel()
            deallocate_shared_memory(input_shm)
            deallocate_shared_memory(result_shm)

    def shutdown(self):
        self._clear_task_queue()
        self.executor.shutdown()

See GitHub for full code

[1] See Statista 2023.
[2] There seems to be recent work going on trying to remove the GIL from CPython (PEP 0703). The rollout of such can only happen gradually, because many important Python libraries implicitly depend on the GIL. Simply removing it would break a lot of stuff.
[3] Documentation for Pipes and Queues in the multiprocessing library
[4] Don't pickle your data