Tkinter: Cum să folosiți fire de execuție pentru a preveni „înghețarea” buclei principale de evenimente (Programare, Python, Multithreading, Tkinter, Bară De Progres, Bucla De Eveniment)

Pinguinul murdar a intrebat.

Am un mic test GUI cu un buton „Start” și o bară de progres. Comportamentul dorit este:

  • Faceți clic pe Start
  • Bara de progres oscilează timp de 5 secunde
  • Bara de progres se oprește

Comportamentul observat este că butonul „Start” îngheață timp de 5 secunde, apoi se afișează o bară de progres (fără oscilații).

Iată codul meu de până acum:

class GUI:
    def __init__(self, master):
        self.master = master
        self.test_button = Button(self.master, command=self.tb_click)
        self.test_button.configure(
            text="Start", background="Grey",
            padx=50
            )
        self.test_button.pack(side=TOP)

    def progress(self):
        self.prog_bar = ttk.Progressbar(
            self.master, orient="horizontal",
            length=200, mode="indeterminate"
            )
        self.prog_bar.pack(side=TOP)

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        # Simulate long running process
        t = threading.Thread(target=time.sleep, args=(5,))
        t.start()
        t.join()
        self.prog_bar.stop()

root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()

Pe baza informațiilor furnizate de Bryan Oakley aici, am înțeles că trebuie să folosesc fire de execuție. Am încercat să creez un fir, dar presupun că, din moment ce firul este pornit din cadrul firului principal, nu ajută.

Am avut ideea de a plasa partea de logică într-o clasă diferită și de a instanția GUI-ul din interiorul acelei clase, similar cu codul de exemplu al lui A. Rodas de aici.

Întrebarea mea:

Nu reușesc să îmi dau seama cum să o codific astfel încât această comandă:

self.test_button = Button(self.master, command=self.tb_click)

să apeleze o funcție care se află în cealaltă clasă. Este acesta un lucru rău de făcut sau este posibil? Cum aș putea crea o a 2-a clasă care să se ocupe de self.tb_click? Am încercat să urmez exemplul de cod al lui A. Rodas, care funcționează de minune. Dar nu reușesc să îmi dau seama cum să implementez soluția sa în cazul unui widget Button care declanșează o acțiune.

Dacă ar trebui în schimb să gestionez firul din interiorul clasei unice GUI, cum s-ar putea crea un fir care să nu interfereze cu firul principal?

4 răspunsuri
A. Rodas

Atunci când alăturați noul fir în firul principal, acesta va aștepta până când firul se termină, astfel încât GUI se va bloca chiar dacă folosiți multithreading.

Dacă doriți să plasați partea logică într-o clasă diferită, puteți subclasa Thread direct și apoi să porniți un nou obiect din această clasă atunci când apăsați butonul. Constructorul acestei subclase de Thread poate primi un obiect Queue și apoi veți putea comunica cu partea de interfață grafică. Așadar, sugestia mea este următoarea:

  1. Creați un obiect Queue în firul principal
  2. Creați un nou fir de execuție cu acces la coada respectivă
  3. verificați periodic coada de așteptare în firul principal

Apoi, trebuie să rezolvați problema a ceea ce se întâmplă dacă utilizatorul face clic de două ori pe același buton (se va genera un nou fir de execuție la fiecare clic), dar puteți rezolva această problemă prin dezactivarea butonului de pornire și activarea lui din nou după ce apelați self.prog_bar.stop().

import Queue

class GUI:
    # ...

    def tb_click(self):
        self.progress()
        self.prog_bar.start()
        self.queue = Queue.Queue()
        ThreadedTask(self.queue).start()
        self.master.after(100, self.process_queue)

    def process_queue(self):
        try:
            msg = self.queue.get(0)
            # Show result of the task if needed
            self.prog_bar.stop()
        except Queue.Empty:
            self.master.after(100, self.process_queue)

class ThreadedTask(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        time.sleep(5)  # Simulate long running process
        self.queue.put("Task finished")

Comentarii

  • Un alt exemplu frumos. Mulțumesc A. Rodas 🙂 Am o întrebare suplimentară: Dacă comentez self.master.after(100, self.process_queue) și îl înlocuiesc cu pur și simplu self.process_queue(), comportamentul este același. Există vreun motiv întemeiat pentru a avea partea self.master.after…? –  > Por Pinguinul murdar.
  • Da, cu self.master.after(100, self.process_queue) programați această metodă la fiecare 100 de milisecunde, în timp ce self.process_queue() constant o execută fără nicio întârziere între fiecare apel. Nu este nevoie să faceți asta, așa că after este o soluție mai bună pentru a verifica conținutul peridic. –  > Por A. Rodas.
  • Scuze, Rodas, sunt într-o situație similară cu cea explicată de OP, dar în cazul meu ar trebui să chem o funcție dintr-o altă clasă atunci când apăs un buton, dar continuă să înghețe. Nu sunt atât de familiarizat cu threading, de aceea întreb cum ar trebui să procedez. Din moment ce trebuie să apelez o funcție (doar atunci când apăs pe butonul din GUI) a unui obiect creat în constructorul aplicației mele GUI, ar trebui să fac cealaltă clasă să derive din Thread oricum? –  > Por nbro.
  • @citizen2077 Dacă doriți să împiedicați utilizatorii să facă acest lucru, vă puteți ocupa de WM_DELETE_PROTOCOL și să distrugeți GUI numai dacă firul nu mai este în viață. –  > Por A. Rodas.
  • @citizen2077 Adăugarea unui handler ar fi primul pas pentru a defini ce se întâmplă dacă rădăcina este închisă cu ajutorul managerului de ferestre, dar puteți folosi și un steag pentru a comunica firului că ar trebui să își oprească execuția. Simțiți-vă liber să vă puneți întrebarea separat, deoarece nu este strict legată de întrebarea lui OP. –  > Por A. Rodas.
jmihalicza

Problema este că t.join() blochează evenimentul click, firul principal nu se întoarce la bucla de evenimente pentru a procesa repictările. vezi De ce apare bara de progres ttk după proces în Tkinter sau Bara de progres TTK blocată la trimiterea e-mailului

BuvinJ

Voi prezenta baza pentru o soluție alternativă. Nu este specifică unei bare de progres Tk în sine, dar cu siguranță poate fi implementată foarte ușor pentru aceasta.

Iată câteva clase care vă permit să executați alte sarcini în fundalul Tk, să actualizați controalele Tk atunci când doriți și să nu blocați gui!

Iată clasele TkRepeatingTask și BackgroundTask:

import threading

class TkRepeatingTask():

    def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
        self.__tk_   = tkRoot
        self.__func_ = taskFuncPointer        
        self.__freq_ = freqencyMillis
        self.__isRunning_ = False

    def isRunning( self ) : return self.__isRunning_ 

    def start( self ) : 
        self.__isRunning_ = True
        self.__onTimer()

    def stop( self ) : self.__isRunning_ = False

    def __onTimer( self ): 
        if self.__isRunning_ :
            self.__func_() 
            self.__tk_.after( self.__freq_, self.__onTimer )

class BackgroundTask():

    def __init__( self, taskFuncPointer ):
        self.__taskFuncPointer_ = taskFuncPointer
        self.__workerThread_ = None
        self.__isRunning_ = False

    def taskFuncPointer( self ) : return self.__taskFuncPointer_

    def isRunning( self ) : 
        return self.__isRunning_ and self.__workerThread_.isAlive()

    def start( self ): 
        if not self.__isRunning_ :
            self.__isRunning_ = True
            self.__workerThread_ = self.WorkerThread( self )
            self.__workerThread_.start()

    def stop( self ) : self.__isRunning_ = False

    class WorkerThread( threading.Thread ):
        def __init__( self, bgTask ):      
            threading.Thread.__init__( self )
            self.__bgTask_ = bgTask

        def run( self ):
            try :
                self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
            except Exception as e: print repr(e)
            self.__bgTask_.stop()

Iată un test Tk care demonstrează utilizarea acestora. Dacă doriți să vedeți demonstrația în acțiune, atașați-o în partea de jos a modulului care conține aceste clase:

def tkThreadingTest():

    from tkinter import Tk, Label, Button, StringVar
    from time import sleep

    class UnitTestGUI:

        def __init__( self, master ):
            self.master = master
            master.title( "Threading Test" )

            self.testButton = Button( 
                self.master, text="Blocking", command=self.myLongProcess )
            self.testButton.pack()

            self.threadedButton = Button( 
                self.master, text="Threaded", command=self.onThreadedClicked )
            self.threadedButton.pack()

            self.cancelButton = Button( 
                self.master, text="Stop", command=self.onStopClicked )
            self.cancelButton.pack()

            self.statusLabelVar = StringVar()
            self.statusLabel = Label( master, textvariable=self.statusLabelVar )
            self.statusLabel.pack()

            self.clickMeButton = Button( 
                self.master, text="Click Me", command=self.onClickMeClicked )
            self.clickMeButton.pack()

            self.clickCountLabelVar = StringVar()            
            self.clickCountLabel = Label( master,  textvariable=self.clickCountLabelVar )
            self.clickCountLabel.pack()

            self.threadedButton = Button( 
                self.master, text="Timer", command=self.onTimerClicked )
            self.threadedButton.pack()

            self.timerCountLabelVar = StringVar()            
            self.timerCountLabel = Label( master,  textvariable=self.timerCountLabelVar )
            self.timerCountLabel.pack()

            self.timerCounter_=0

            self.clickCounter_=0

            self.bgTask = BackgroundTask( self.myLongProcess )

            self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )

        def close( self ) :
            print "close"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass            
            self.master.quit()

        def onThreadedClicked( self ):
            print "onThreadedClicked"
            try: self.bgTask.start()
            except: pass

        def onTimerClicked( self ) :
            print "onTimerClicked"
            self.timer.start()

        def onStopClicked( self ) :
            print "onStopClicked"
            try: self.bgTask.stop()
            except: pass
            try: self.timer.stop()
            except: pass                        

        def onClickMeClicked( self ):
            print "onClickMeClicked"
            self.clickCounter_+=1
            self.clickCountLabelVar.set( str(self.clickCounter_) )

        def onTimer( self ) :
            print "onTimer"
            self.timerCounter_+=1
            self.timerCountLabelVar.set( str(self.timerCounter_) )

        def myLongProcess( self, isRunningFunc=None ) :
            print "starting myLongProcess"
            for i in range( 1, 10 ):
                try:
                    if not isRunningFunc() :
                        self.onMyLongProcessUpdate( "Stopped!" )
                        return
                except : pass   
                self.onMyLongProcessUpdate( i )
                sleep( 1.5 ) # simulate doing work
            self.onMyLongProcessUpdate( "Done!" )                

        def onMyLongProcessUpdate( self, status ) :
            print "Process Update: %s" % (status,)
            self.statusLabelVar.set( str(status) )

    root = Tk()    
    gui = UnitTestGUI( root )
    root.protocol( "WM_DELETE_WINDOW", gui.close )
    root.mainloop()

if __name__ == "__main__": 
    tkThreadingTest()

Două puncte de import pe care le voi sublinia în legătură cu BackgroundTask:

1) Funcția pe care o executați în sarcina de fundal trebuie să ia un pointer de funcție pe care îl va invoca și respecta, ceea ce permite anularea sarcinii la jumătatea drumului – dacă este posibil.

2) Trebuie să vă asigurați că sarcina de fundal este oprită atunci când ieșiți din aplicație. Acest fir va continua să ruleze chiar dacă gui-ul este închis dacă nu abordați acest aspect!

Comentarii

  • Wow, nu cred că înțelegeți cum se face after() funcționează metoda. În răspunsul acceptat, metoda self.master.after(100, self.process_queue) nu apelează self.process_queue recursiv. Doar programează pentru a fi executat din nou în 100 ms. Al doilea argument este doar numele funcției, nu un apel la aceasta – și face acest lucru doar atunci când excepția Queue.Empty a fost ridicată, ceea ce înseamnă că ThreadedTask nu a pus încă nimic în coada de așteptare, așa că trebuie să continue să verifice. –  > Por martineau.
  • @martineau Sper că aveți dreptate! Am rulat asta cu câteva modificări ușoare și s-a blocat din cauza faptului că avea prea multe apeluri recursive. În alte limbaje și biblioteci am folosit cronometre repetitive foarte asemănătoare fără probleme. Mi-ar plăcea să văd că funcționează așa cum pare că ar trebui (adică fără recursivitate). Mă voi juca cu asta și îmi voi retrage răspunsul atunci când voi avea succes. Deși clasa mea BackgroundTask încă funcționează bine, cel puțin în exemplul meu – nu am testat-o suficient de mult pentru a ști de ce se va bloca, tk nefiind totuși un thread safe, ceea ce m-a îngrijorat! –  > Por BuvinJ.
  • Sunt foarte încrezător în ceea ce am spus. Faptul că Tkinter nu este thread-safe nu înseamnă că nu îl poți folosi într-o aplicație multi-thread. Doar că trebuie să limitezi numărul de fire de execuție care accesează Tkinter concomitent la unul singur (iar acest lucru este de obicei lăsat la latitudinea firului principal). Răspunsul meu la o altă întrebare referitoare la Tkinter conține un exemplu în care se face acest lucru. –  > Por martineau.
  • Aveți perfectă dreptate! Îmi retrag comentariile mele dure. Mi-am modificat radical postul. Am văzut cu desăvârșire acel accident de recursivitate, dar trebuie să se fi întâmplat altceva. –  > Por BuvinJ.
  • Ai putea să faci în așa fel încât nu a fost necesar să opriți sarcinile de fundal înainte de a ieși din aplicație prin setarea lor daemon la True. Consultați răspunsul meu la o altă întrebare pentru mai multe detalii și linkuri către documentația relevantă. –  > Por martineau.
aleksk

Am folosit RxPY, care are câteva funcții frumoase de threading pentru a rezolva acest lucru într-un mod destul de curat. Nu există cozi și am furnizat o funcție care se execută pe firul principal după finalizarea firului de fundal. Iată un exemplu de lucru:

import rx
from rx.scheduler import ThreadPoolScheduler
import time
import tkinter as tk

class UI:
   def __init__(self):
      self.root = tk.Tk()
      self.pool_scheduler = ThreadPoolScheduler(1) # thread pool with 1 worker thread
      self.button = tk.Button(text="Do Task", command=self.do_task).pack()

   def do_task(self):
      rx.empty().subscribe(
         on_completed=self.long_running_task, 
         scheduler=self.pool_scheduler
      )

   def long_running_task(self):
      # your long running task here... eg:
      time.sleep(3)
      # if you want a callback on the main thread:
      self.root.after(5, self.on_task_complete)

   def on_task_complete(self):
       pass # runs on main thread

if __name__ == "__main__":
    ui = UI()
    ui.root.mainloop()

O altă modalitate de a utiliza această construcție care ar putea fi mai curată (în funcție de preferințe):

tk.Button(text="Do Task", command=self.button_clicked).pack()

...

def button_clicked(self):

   def do_task(_):
      time.sleep(3) # runs on background thread
             
   def on_task_done():
      pass # runs on main thread

   rx.just(1).subscribe(
      on_next=do_task, 
      on_completed=lambda: self.root.after(5, on_task_done), 
      scheduler=self.pool_scheduler
   )