Multi-processing en Python
Le multi-processing consiste à exécuter, en même temps et indépendamment, un ensemble de tâches, chacune dans son contexte. L’isolation est forte : les tâches s’exécutent dans des espaces mémoire séparés, et ne partagent pas leurs variables. Elles possèdent chacunes leur propre interpéteur Python. Conséquence, les contextes peuvent être volumineux, leur gestion (création, destruction et commutation) peut être coûteuses (overhead).
Le module multiprocessing offre le moyen de faire du multi-processing en Python,
en offrant au
programmeur la possibilité de créer explicitement les tâches (sous forme d’instances de classe).
1. Tâche
Créer une tâche consiste à créer une sous-classe de multiprocessing.Process.
Cette sous-classe doit être dotée d’une méthode run
implémentant le traitement à réaliser.
Elle dispose (via l’héritage) d’une méthode start destinée à
lancer l’exécution de la tâche.
Exemple :
import multiprocessing
...
class MyProcess(multiprocessing.Process):
def __init__(self,...):
multiprocessing.Process.__init__(self)
...
def run(self):
...
if __name__ == '__main__':
myprocess = MyProcess(...)
myprocess.start()
...
myprocess.join()
...
|
Bien que programmation multi-thread et programmation multi-processus
correspondent à 2 réalités bien différentes dans leur fonctionnement,
il existe une ressemblance d’utilisation
entre le module |
2. Le multi-processing vu d’Unix
Les processus sur une machine Unix sont organisés en une arborescence,
visualisable avec la commande pstree.
Chaque processus possède un identifiant entier unique, son PID (visible avec pstree -p).
Hormis le premier processus, créé au boot de la machine,
tous les autres possèdent également un PPID,
qui n’est autre que le PID de leur père (visible avec pstree -g).
La terminaison d’un processus n’est pas suffisant pour qu’il disparaisse
sans laisser de trace :
même s’il ne fait plus rien,
les ressources qui lui ont été affectées (par exemple la mémoire) restent mobilisées.
C’est à son père de le faire disparaitre proprement, et ainsi restituer les ressources qu’il mobilisait.
Concrètement, le père doit acquitter la terminaison de son ou ses fils,
avec la méthode join.
|
Tout processus dont la terminaison n’est pas acquittée par son père subsiste en tant que processus zombie (defunct). L’accumulation de tels processus encombre la machine, qui peut finir par manquer de ressources et conduire à une congestion dont seul un reboot peut en sortir. Conclusion : un programme ne doit pas laisser subsister des processus zombies. |
3. Communication inter-processus
Les processus ne partagent pas d’espace commun (donc pas de variables partagées), ils doivent échanger leur données en communiquant.
3.1. multiprocessing.Pipe
C’est un tuyau, un canal de communication bidirectionnelle entre 2 processus, leur permettant d’échanger des données.
multiprocessing.Pipe()
renvoie une paire d’objets de type Connection représentant les 2 extrémités du tuyau.
from multiprocessing import Pipe
(a,b) = Pipe()
print(type(a).__name__) # -> 'Connection'
print(type(b).__name__) # -> 'Connection'
Un object Connection possède (entre autres) les méthodes
send() qui envoie des données dans le tuyau,
recv() qui récupère les données du tuyau,
poll() qui indique si des données sont présentes dans le tuyau, et
close() qui ferme la connexion.
Chacun des 2 processus embarque l’un des 2 objets Connection :
aa.send([1, 'hello', None])
bprint(b.recv()) # [1, 'hello', None]
3.2. multiprocessing.Queue
La classe multiprocessing.Queue implémente l’échange de données entre processus.
Elle s’utilise aussi simplement qu’une liste.
Ce type est bien adapté au modèle producteur/consommateur.
Les processus qui veulent échanger partagent une même instance de multiprocessing.Queue.
Queue vs PipeLe tuyau est un concept de plus bas niveau que la queue, il nécessite la création explicite de connexions entre 2 processus. Conséquence, le tuyau est plus efficace et plus rapide, mais il est limité à 2 processus. Une queue est plus abstraite, elle est manipulée comme une variable liste partagée entre plusieurs processus. Elle n’est pas limités à 2 processus. |
Une queue peut avoir une taille donnée :
queue = multiprocessing.Queue(maxsize=100)
ou non fixée :
queue = multiprocessing.Queue()
On peut tester sa taille (qsize()), si elle est vide (empty()) ou pleine (full()).
L’ajout de valeurs se fait avec put(), et le retrait avec get().
Exemple :
import multiprocessing as mp
q = mp.Queue()
# remplissage
for i in range(10):
q.put(i,block=False)
...
# vidage
while q.qsize()>0:
item = q.get(block=True)
print(item)
Par défaut, ces opérations sont bloquantes.
Donc put() attend qu’il y ait de la place pour déposer la valeur :
queue.put(item, block=True)
queue.put(item) # équivalent
et get() attend qu’il y ait quelque chose à prendre :
item = queue.get(block=True)
item = queue.get() # équivalent
On peut aussi les utiliser dans un mode non bloquant, auxquel cas get() et put() lancent une exception en cas d’impossibilité.
Dans ce mode, put() lance une Full exception lorsque la file est pleine :
try:
queue.put(item, block=False)
except queue.Full:
# ...
et get() lance une Empty exception lorsque la file est vide :
try:
item = queue.get(block=False)
except queue.Empty:
# ...
On a aussi un mode mixte, dans lequel l’exception est lancée après un certain temps :
try:
queue.put(item, timeout=5)
except queue.Full:
# ...
try:
item = queue.get(timeout=10)
except queue.Empty:
# ...
On ne doit pas gérer la concurrence par soi-même. Par exemple, ce code :
if not queue.full():
queue.put(item, block=False)
n’est pas correct dans un environnement d’exécution concurrent (car il est interruptible).
Exemple du producteur/consommateur
def producteur(queue):
print('Producteur: début', flush=True)
for i in range(10):
value = random()
sleep(value)
queue.put(value)
queue.put(None)
print('Producteur: fin', flush=True)
def consommateur(queue):
print('Consommateur: début', flush=True)
while True:
item = queue.get()
if not item:
break
print(f'>retire {item}', flush=True)
print('Consommateur: fin', flush=True)
3.3. Listener et Client
Les communications précédentes sont limitées à la machine locale.
multiprocessing.connection.Listener
et
multiprocessing.connection.Cient
permettent des communications entre machines.
from multiprocessing.connection import Listener
address = ('localhost', 6000)
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connexion acceptée par', listener.last_accepted)
data = conn.recv()
print(f"reçu: {data}")
...
print(f"envoi de {result})
conn.send(result)
from multiprocessing.connection import Client
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(f"envoi de {data})
conn.send(data)
result = conn.recv()
print(f"reçu: {result}")
Là encore, la méthode
send() envoie des données, et
recv() les récupère.