dernière modification : 2024

Multi-processing en Python

Le multi-processing consiste à exécuter, en même temps et indépendamment, un ensemble de tâches, chacune dans son contexte. L’isolation est forte : les tâches s’exécutent dans des espaces mémoire séparés, et ne partagent pas leurs variables. Elles possèdent chacunes leur propre interpéteur Python. Conséquence, les contextes peuvent être volumineux, leur gestion (création, destruction et commutation) peut être coûteuses (overhead).

Le module multiprocessing offre le moyen de faire du multi-processing en Python, en offrant au programmeur la possibilité de créer explicitement les tâches (sous forme d’instances de classe).

1. Tâche

Créer une tâche consiste à créer une sous-classe de multiprocessing.Process. Cette sous-classe doit être dotée d’une méthode run implémentant le traitement à réaliser. Elle dispose (via l’héritage) d’une méthode start destinée à lancer l’exécution de la tâche. Exemple :

import multiprocessing
...

class MyProcess(multiprocessing.Process):
  def __init__(self,...):
    multiprocessing.Process.__init__(self)
    ...

  def run(self):
    ...

if __name__ == '__main__':
  myprocess = MyProcess(...)
  myprocess.start()
  ...
  myprocess.join()
  ...

Bien que programmation multi-thread et programmation multi-processus correspondent à 2 réalités bien différentes dans leur fonctionnement, il existe une ressemblance d’utilisation entre le module multithreading et le module multiprocessing. C’est une faculté de la POO de proposer 2 classes offrant des interfaces similaires, alors qu’en réalité leurs implémentations diffèrent fortement.

2. Le multi-processing vu d’Unix

Les processus sur une machine Unix sont organisés en une arborescence, visualisable avec la commande pstree. Chaque processus possède un identifiant entier unique, son PID (visible avec pstree -p). Hormis le premier processus, créé au boot de la machine, tous les autres possèdent également un PPID, qui n’est autre que le PID de leur père (visible avec pstree -g).

La terminaison d’un processus n’est pas suffisant pour qu’il disparaisse sans laisser de trace : même s’il ne fait plus rien, les ressources qui lui ont été affectées (par exemple la mémoire) restent mobilisées. C’est à son père de le faire disparaitre proprement, et ainsi restituer les ressources qu’il mobilisait. Concrètement, le père doit acquitter la terminaison de son ou ses fils, avec la méthode join.

Tout processus dont la terminaison n’est pas acquittée par son père subsiste en tant que processus zombie (defunct). L’accumulation de tels processus encombre la machine, qui peut finir par manquer de ressources et conduire à une congestion dont seul un reboot peut en sortir. Conclusion : un programme ne doit pas laisser subsister des processus zombies.

3. Communication inter-processus

Les processus ne partagent pas d’espace commun (donc pas de variables partagées), ils doivent échanger leur données en communiquant.

3.1. multiprocessing.Pipe

C’est un tuyau, un canal de communication bidirectionnelle entre 2 processus, leur permettant d’échanger des données. multiprocessing.Pipe() renvoie une paire d’objets de type Connection représentant les 2 extrémités du tuyau.

création d’un tuyau
from multiprocessing import Pipe
(a,b) = Pipe()
print(type(a).__name__)  # ->  'Connection'
print(type(b).__name__)  # ->  'Connection'

Un object Connection possède (entre autres) les méthodes send() qui envoie des données dans le tuyau, recv() qui récupère les données du tuyau, poll() qui indique si des données sont présentes dans le tuyau, et close() qui ferme la connexion.

Chacun des 2 processus embarque l’un des 2 objets Connection :

émission, dans le 1er processus, via a
a.send([1, 'hello', None])
réception, dans le 2nd processus, via b
print(b.recv())  # [1, 'hello', None]

3.2. multiprocessing.Queue

La classe multiprocessing.Queue implémente l’échange de données entre processus. Elle s’utilise aussi simplement qu’une liste. Ce type est bien adapté au modèle producteur/consommateur. Les processus qui veulent échanger partagent une même instance de multiprocessing.Queue.

Queue vs Pipe

Le tuyau est un concept de plus bas niveau que la queue, il nécessite la création explicite de connexions entre 2 processus. Conséquence, le tuyau est plus efficace et plus rapide, mais il est limité à 2 processus. Une queue est plus abstraite, elle est manipulée comme une variable liste partagée entre plusieurs processus. Elle n’est pas limités à 2 processus.

Une queue peut avoir une taille donnée :

queue de taille 100
queue = multiprocessing.Queue(maxsize=100)

ou non fixée :

queue sans taille spécifiée
queue = multiprocessing.Queue()

On peut tester sa taille (qsize()), si elle est vide (empty()) ou pleine (full()). L’ajout de valeurs se fait avec put(), et le retrait avec get().

Exemple :

import multiprocessing as mp

q = mp.Queue()
# remplissage
for i in range(10):
  q.put(i,block=False)
...
# vidage
while q.qsize()>0:
  item = q.get(block=True)
  print(item)

Par défaut, ces opérations sont bloquantes. Donc put() attend qu’il y ait de la place pour déposer la valeur :

ajouter en mode bloquant
queue.put(item, block=True)
queue.put(item)               # équivalent

et get() attend qu’il y ait quelque chose à prendre :

retirer en mode bloquant
item = queue.get(block=True)
item = queue.get()            # équivalent

On peut aussi les utiliser dans un mode non bloquant, auxquel cas get() et put() lancent une exception en cas d’impossibilité. Dans ce mode, put() lance une Full exception lorsque la file est pleine :

ajouter sans attendre
try:
  queue.put(item, block=False)
except queue.Full:
  # ...

et get() lance une Empty exception lorsque la file est vide :

retirer sans attendre
try:
  item = queue.get(block=False)
except queue.Empty:
  # ...

On a aussi un mode mixte, dans lequel l’exception est lancée après un certain temps :

essayer d’ajouter pendant 5s
try:
  queue.put(item, timeout=5)
except queue.Full:
  # ...
essayer de prendre pendant 10s
try:
  item = queue.get(timeout=10)
except queue.Empty:
  # ...

On ne doit pas gérer la concurrence par soi-même. Par exemple, ce code :

if not queue.full():
  queue.put(item, block=False)

n’est pas correct dans un environnement d’exécution concurrent (car il est interruptible).

Exemple du producteur/consommateur

producteur
def producteur(queue):
    print('Producteur: début', flush=True)
    for i in range(10):
        value = random()
        sleep(value)
        queue.put(value)
    queue.put(None)
    print('Producteur: fin', flush=True)
consommateur
def consommateur(queue):
    print('Consommateur: début', flush=True)
    while True:
        item = queue.get()
        if not item:
            break
        print(f'>retire {item}', flush=True)
    print('Consommateur: fin', flush=True)

3.3. Listener et Client

Les communications précédentes sont limitées à la machine locale. multiprocessing.connection.Listener et multiprocessing.connection.Cient permettent des communications entre machines.

Listener
from multiprocessing.connection import Listener

address = ('localhost', 6000)

with Listener(address, authkey=b'secret password') as listener:
  with listener.accept() as conn:
    print('connexion acceptée par', listener.last_accepted)
    data = conn.recv()
    print(f"reçu: {data}")
    ...
    print(f"envoi de {result})
    conn.send(result)
Client
from multiprocessing.connection import Client

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
  print(f"envoi de {data})
  conn.send(data)
  result = conn.recv()
  print(f"reçu: {result}")

Là encore, la méthode send() envoie des données, et recv() les récupère.