Utilizarea corectă a mutexurilor în Python (Programare, Python, Multithreading, Mutex)

Richard a intrebat.

Încep cu mai multe fire în python (sau cel puțin este posibil ca scriptul meu să creeze mai multe fire). ar fi acest algoritm utilizarea corectă a unui Mutex? Nu am testat încă acest cod și probabil că nici nu va funcționa. Vreau doar ca ProcessData să ruleze într-un fir (unul câte unul), iar bucla principală while să continue să ruleze, chiar dacă există un fir în coadă.

from threading import Thread
from win32event import CreateMutex
mutex = CreateMutex(None, False, "My Crazy Mutex")
while(1)
    t = Thread(target=self.processData, args=(some_data,))
    t.start()
    mutex.lock()

def processData(self, data)
    while(1)
        if mutex.test() == False:
            do some stuff
            break

Edit: relectând codul meu, văd că este greșit în mod grosolan. dar hei, de aceea sunt aici cerând ajutor.

Comentarii

  • Este foarte greu să-mi dau seama ce încerci să faci. Va trebui să vă explicați intenția mai în detaliu. –  > Por Marcelo Cantos.
  • @Marcelo Cantos, îmi pare rău, probabil că aveți dreptate. Vreau ca codul meu din processData să înceapă într-un nou tread. Vreau ca un singur fir să poată procesa datele la un moment dat și în ordinea în care au fost trimise datele pentru a procesa datele. Vreau, de asemenea, ca bucla principală while să continue bucla în timp ce alte fire sunt în coadă. –  > Por Richard.
  • @Richard: De ce vrei să folosești fire de execuție dacă oricum plănuiești să serializezi toată procesarea? Ce este în neregulă cu o simplă buclă? De asemenea, de ce vrei ca firul principal să continue să facă buclă? Nu va face decât să ardă CPU, putând să înfometeze alte fire. –  > Por Marcelo Cantos.
  • @Marcelo Cantos, Acesta nu este programul meu real. Scriptul meu real a ajuns la peste 500 de linii de cod, care includ intrări în baza de date și e-mail. Vreau ca firul principal să citească doar un port serial, astfel încât să existe mai puține șanse ca bufferul să fie umplut înainte ca datele primite să fie procesate. Corectați-mă dacă greșesc, dar cred că aici este exact locul în care s-ar putea folosi fie fire de execuție, fie multiprocesare –  > Por Richard.
  • @Richard: Acest lucru are sens pentru firul principal, dar dacă toate celelalte procesări vor fi secvențiale, atunci ați putea crea un alt fir pentru a face toate celelalte activități. –  > Por Marcelo Cantos.
4 răspunsuri
Chris B.

Nu știu de ce folosiți Mutexul de la Window în loc de cel de la Python. Folosind metodele Python, acest lucru este destul de simplu:

from threading import Thread, Lock

mutex = Lock()

def processData(data):
    mutex.acquire()
    try:
        print('Do some stuff')
    finally:
        mutex.release()

while True:
    t = Thread(target = processData, args = (some_data,))
    t.start()

Dar rețineți că, din cauza arhitecturii CPython (și anume Blocarea globală a interpretului), veți avea efectiv un singur fir de execuție la un moment dat – acest lucru este în regulă dacă mai multe dintre ele sunt legate de I/O, deși veți dori să eliberați blocajul cât mai mult posibil, astfel încât firul legat de I/O să nu blocheze alte fire de execuție.

O alternativă, pentru Python 2.6 și versiunile ulterioare, este să folosiți funcția Python’s multiprocessing din Python. Acesta oglindește threading dar va crea procese complet noi care poate rula simultan. Este banal să vă actualizați exemplul:

from multiprocessing import Process, Lock

mutex = Lock()

def processData(data):
    with mutex:
        print('Do some stuff')

if __name__ == '__main__':
    while True:
        p = Process(target = processData, args = (some_data,))
        p.start()

Comentarii

  • Am încercat codul dvs. și am primit această eroare: with mutex: SyntaxError: sintaxă invalidă. Presupun că aș putea folosi try: except: în funcția mea, folosesc python 2.4 –  > Por Richard.
  • with este Python 2.5, iar multiprocessing este Python 2.6. Editat în consecință. –  > Por Chris B..
  • „acest lucru este în regulă dacă o parte dintre ele sunt legate de I/O” De fapt, este de asemenea în regulă dacă sunt legate de CPU, atât timp cât acest lucru se întâmplă în apelurile către biblioteci scrise în C și nu în cod Python pur (de exemplu, dacă manipulați matrici mari în numpy), deoarece GIL este deblocat în timpul acestor apeluri. –  > Por Arthur Tacca.
  • @demented hedgehog: the mutex modulul este depreciat. Crearea de mutexuri utilizând modulul threading și multiprocessing module, ceea ce se face în răspuns, nu este depreciat. –  > Por tjalling.
  • excelent pentru alinierea specială a mesajelor de I/O –  > Por eusoubrasileiro.
Richard

Aceasta este soluția pe care am găsit-o:

import time
from threading import Thread
from threading import Lock

def myfunc(i, mutex):
    mutex.acquire(1)
    time.sleep(1)
    print "Thread: %d" %i
    mutex.release()


mutex = Lock()
for i in range(0,10):
    t = Thread(target=myfunc, args=(i,mutex))
    t.start()
    print "main loop %d" %i

Ieșire:

main loop 0
main loop 1
main loop 2
main loop 3
main loop 4
main loop 5
main loop 6
main loop 7
main loop 8
main loop 9
Thread: 0
Thread: 1
Thread: 2
Thread: 3
Thread: 4
Thread: 5
Thread: 6
Thread: 7
Thread: 8
Thread: 9

Comentarii

    18

  • Există un potențial blocaj. Dacă instrucțiunea de imprimare aruncă o excepție, nu veți elibera niciodată mutexul. Trebuie să folosiți try/release sau with pentru a vă asigura că blocajul este eliberat. Consultați răspunsul meu. –  > Por Chris B..
  • De asemenea, nu este nevoie să treceți mutexul ca argument al funcției. Acesta este disponibil în domeniul global. –  > Por Chris B..
Guillaume Lebourgeois

Trebuie să vă deblocați Mutex-ul la un moment dat…

Teoman shipahi

Aș dori să îmbunătățesc puțin răspunsul de la chris-ba.

Vedeți mai jos pentru codul meu:

from threading import Thread, Lock
import threading
mutex = Lock()


def processData(data, thread_safe):
    if thread_safe:
        mutex.acquire()
    try:
        thread_id = threading.get_ident()
        print('
Processing data:', data, "ThreadId:", thread_id)
    finally:
        if thread_safe:
            mutex.release()


counter = 0
max_run = 100
thread_safe = False
while True:
    some_data = counter        
    t = Thread(target=processData, args=(some_data, thread_safe))
    t.start()
    counter = counter + 1
    if counter >= max_run:
        break

În prima execuție, dacă setați thread_safe = False în bucla while loop, mutexul nu va fi folosit, iar firele vor trece una peste alta în metoda print ca mai jos;

dar, dacă setați thread_safe = True și o executați, veți vedea că toate ieșirile vin perfect;

Sper că vă ajută.