python多线程编程

    科技2022-08-07  94

    0x00多线程有啥用?

    唱歌和跳舞是同时进行的,打架时xxx和打人的动作是同时进行的。 然而,就我们目前的知识量而言,我们只能唱完歌再跳舞,骂完人再打人。(挺那啥的,是不是?) 怎么让他们同时进行? 多线程。 然而,单核CPU上的多线程有啥用呢?都是一个CPU在干活,多线程不仅不会加速运行,还会增加线程切换的开销。 这样理解没问题。不过还是这个问题,唱歌和跳舞是同时进行的(至少应该看上去是同时执行的)。所以需要多线程。 而真正的多核CPU,可以真正起到加速执行的效果。不过对于计算密集型的任务,python多线程搞不定(GIL保证了只能有一个线程在占用cpu)。IO密集型的任务可以交给python处理。对于需要计算密集型的任务,可以交给其他语言实现的程序去处理。

    并行是什么?并发呢? 并行就是真正的多核CPU,多个核心一起执行。 并发就是从宏观上看上去是并行的,但是微观上不是。 然而,CPU的核心往往比任务数少很多,所以现实的应用中大多是并发。

    0x01 第一个多线程程序

    import threading import time def sing(): for i in range(5): print('singing.') time.sleep(1) def song(): for i in range(5): print('songing.') time.sleep(1) def main(): # 创建一个sing线程 t1 = threading.Thread(target=sing) # 创建一个song线程 t2 = threading.Thread(target=song) # 开启sing线程 t1.start() # 开启song线程 t2.start() # 阻塞主线程 直到t1结束 t1.join() # 阻塞主线程 直到t2结束 t2.join() print('test') main()

    0x02 实现多线程的另一种方式

    import threading class MyThread(threading.Thread): def run(self) -> None: for i in range(3): print(self.name) t = MyThread() t.start()

    0x03多线程实现方式总结

    实现一个函数,将之传递给threading.Thread(target=, args=, name=), target值是这个函数的名字, args值是这个函数的参数列表,它是一个元组。 name是线程的名字。这是实现多线程的方法一。实现一个类,此类继承自Thread类。然后重写这个类的run方法。这是实现多线程的方法二。threading.Thread(),main线程会等待子线程结束后再结束,因为主线程要负责给子线程收尸(清理一些环境,资源之类的事情)。threading.enumerate() 可以查看当前程序活动的线程。t1.start() 这行代码执行之后才会产生新线程。t1.start()会调用t1.run().线程之间的调度是OS说了算的,我们干涉不了。t1.join()会阻塞,直到t1线程运行结束才会解阻塞。self.name 是线程的名字。我们可以更改。多个线程之间,是共享全局变量的。如果牵扯到对全局变量的重新指向,则要加global。否则的话不用。

    0x04 多线程操作全局变量

    多个线程是共享全局变量的(这也是我们实现线程通信的重要形式)。由于线程的调度是os决定的,我们不知道os会在什么时候决定某一个线程进入阻塞态,所以多线程程序对于全局变量的操作是不可控的(会造成资源竞争)。来看一个demo.

    import threading nums = 0 def decline(): global nums for i in range(1000000): nums += 1 def increase(): global nums for i in range(1000000): nums -= 1 def main(): t1 = threading.Thread(target=decline, name='-1进程') t2 = threading.Thread(target=increase, name='+1进程') t1.start() t2.start() t1.join() t2.join() print(nums) main()

    分析一下。按道理说,我们的线程1执行了100万次-1操作,线程2执行了100万次+1操作。但是我们最后打印出来nums,它讲道理是0,结果不是。为啥呢? 分析一番: nums -= 1等同于 nums = nums - 1 我们的cpu计算出来右值之后,然后把结果赋值给左值。 假设nums是0, nums-1计算出来是-1。 我们要做的事情是把-1赋值给nums. 在我们将要把-1赋值给nums时,cpu突然把它休眠了。去执行线程2了。 线程2一看,哦,nums是0啊, nums += 1 嗯,赋值给nums 为1. 然后休眠了,去执行线程1. 线程1醒了,一看,哦,我要把nums赋值成-1, 刚才睡着了,继续。 然后它就把nums改成了-1.

    所以说,2个线程,一个减,一个加, 大家都做了一次,结果却不是0了。这样就出错了。我们需要有一个机制,让它变成正确的。

    0x05 线程同步

    怎么解决上述问题? 线程同步。 啥是同步?通俗地讲就是说,如果线程A和线程B需要配合行动,线程A需要依赖线程B的某个结果,那么就让线程B先执行,然后出来结果了再让A执行。即有个谁先谁后。 所谓是 整体上是并行的,局部上是串行的。 实现同步的一个方式就是加锁。threading.Lock()这个锁叫做互斥锁。 加锁的目的是啥? 把会产生资源竞争的代码做成原子的。这样就不会有竞争了。 比如说,我和我媳妇在家里,我想看体育频道,我媳妇想看韩剧。咋整呢?我让给我媳妇先看,我等她看完了再看,这样就没事了。hhhh

    import threading nums = 0 # 申请一把锁 lock = threading.Lock() def decline(): global nums for i in range(1000000): lock.acquire() nums += 1 lock.release() def increase(): global nums for i in range(1000000): lock.acquire() nums -= 1 lock.release() def main(): t1 = threading.Thread(target=decline, name='-1进程') t2 = threading.Thread(target=increase, name='+1进程') t1.start() t2.start() t1.join() t2.join() print(nums) main()

    这里要注意啊,我们锁定竞争资源要用同一把锁。不然的话就白锁了。再次重申,加锁是怎么实现的?是说锁上的代码被CPU执行完了才会放过CPU吗? 当然不是。加锁后的代码和加锁前的代码对于OS的调度来说并没有什么差别。只能说,对于threading.Lock()这个锁,acquire()是上锁,release()是开锁。 一旦上了锁,再遇到加锁操作就阻塞。 所以,加锁保护的其实是个啥? 心里应该有答案了。

    0x06 - 死锁

    啥叫死锁?由于同一个锁来说,在acquire()执行之后再acquire()就会阻塞,所以有多个互斥锁存在的话就有可能导致死锁的情况发生。(大家都在等对方释放资源,但是对方由于锁的存在没办法释放资源)。

    import threading lock1 = threading.Lock() lock2 = threading.Lock() def test(): lock1.acquire() print(11) print(12) print(13) lock2.acquire() print(14) print(15) print(16) lock2.release() def test2(): lock2.acquire() print(21) print(22) lock1.acquire() print(23) print(24) print(25) print(26) lock1.release() t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) t1.start() t2.start()

    0x06 - threading模块中一些有用的函数

    threading.active_count()

    这个函数会打印当前存活的Thread()类的对象数。它的返回值跟enumerate()函数的长度是一致的。 例子1:

    import threading import time def test(): time.sleep(10) for i in range(100000): pass def test2(): time.sleep(10) for i in range(100000): pass t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) print(threading.active_count()) # 1 t1.start() print(threading.active_count()) # 2 t2.start() print(threading.active_count()) # 3

    例子2:

    import threading import time def test(): time.sleep(10) for i in range(100000): pass def test2(): time.sleep(10) for i in range(100000): pass t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) # True print(threading.active_count() == threading.enumerate().__len__()) t1.start() # True print(threading.active_count() == threading.enumerate().__len__()) t2.start() # True print(threading.active_count() == threading.enumerate().__len__())

    threading.current_thread()

    返回执行这段代码的线程对象。

    import threading import time def test(): print(threading.current_thread()) time.sleep(10) for i in range(100000): pass def test2(): print(threading.current_thread()) time.sleep(10) for i in range(100000): pass t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) t1.start() t2.start() print(threading.current_thread())

    输出:

    <Thread(Thread-1, started 123145323687936)> <Thread(Thread-2, started 123145328943104)> <_MainThread(MainThread, started 140736190260160)>

    threading.enumerate()

    以列表形式返回当前所有存活的 Thread 对象。 该列表包含守护线程,current_thread() 创建的虚拟线程对象和主线程。它不包含已终结的线程和尚未开始的线程。

    import threading import time def test(): time.sleep(10) for i in range(100000): pass def test2(): time.sleep(10) for i in range(100000): pass t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) t1.start() t2.start() print(threading.enumerate())

    输出:

    [<_MainThread(MainThread, started 140736190260160)>, <Thread(Thread-1, started 123145369214976)>, <Thread(Thread-2, started 123145374470144)>]

    threading.main_thread()

    返回python解释器创建的线程。 也是主线程。

    import threading import time def test(): time.sleep(10) for i in range(100000): pass def test2(): time.sleep(10) for i in range(100000): pass t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) t1.start() t2.start() print(threading.main_thread())

    输出:

    <_MainThread(MainThread, started 140736190260160)>

    threading.local()

    存储线程本地数据。它返回一个对象。每个线程都可以对这个对象增加一些属性,谁增加的属性谁就可以操作哪个属性,这个属性对别的线程不可见。

    import threading import time mydata = threading.local() def test(): mydata.x = 1 time.sleep(10) for i in range(100000): pass # test线程添加的x只对自己可见,是1 print(mydata.x, threading.current_thread().name) def test2(): mydata.x = False time.sleep(10) for i in range(100000): pass # test2线程添加的x只对自己可见, 是False print(mydata.x, threading.current_thread().name) t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) t1.start() t2.start() t1.join() t2.join()

    0x07-Thread对象中一些有用的方法

    Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

    构造方法如上所示。

    group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任何方法。name 是线程名称。默认情况下,由 “Thread-N” 格式构成一个唯一的名称,其中 N 是小的十进制数。args 是用于调用目标函数的参数元组。默认是 ()。kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。如果不是 None,daemon 参数将显式地设置该线程是否为守护模式。 如果是 None (默认值),线程将继承当前线程的守护模式属性。如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__())。

    start()

    开始线程活动。它在一个线程里最多只能被调用一次。它会自动调用run()方法。

    run()

    代表线程活动的方法。可以在子类型里重载这个方法。 标准的 run() 方法会对作为 target 参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从 args 和 kwargs 参数分别获取的位置和关键字参数。 当然,如果要实现自己的线程类,一定不要忘记重写run()方法。

    join(timeout=None)

    等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join() 的线程终结 – 不管是正常终结还是抛出未处理异常 – 或者直到发生超时,超时选项是可选的。 简而言之的说,要么等被join的这个线程死了,要么等join()到超时时间了,都会接阻塞。

    import threading import time def test(): time.sleep(10) for i in range(100000): pass def test2(): time.sleep(10) for i in range(100000): pass t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) t1.start() t2.start() # 设置超时时间为1。超过这个时间就不阻塞了 t1.join(timeout=1) print('t1怎么还不完, 不等了, 接阻塞了啊', time.time()) t2.join(timeout=1) print('t2怎么还不完, 不等了, 接阻塞了啊', time.time())

    输出:

    t1怎么还不完, 不等了, 接阻塞了啊 1601989420.751092 t2怎么还不完, 不等了, 接阻塞了啊 1601989421.754227

    name

    只用于识别的字符串。

    is_alive()

    返回线程是否存活。所谓的存活,是指当 run() 方法刚开始直到 run() 方法刚结束,这个方法返回 True。 enumerate()返回的列表里的线程对象都是存活的。

    daemon

    这个值标识了某个进程是否是守护线程。一定要在调用 start() 前设置好,不然就出错了。主线程不是守护线程,因此主线程创建的所有线程默认都是 daemon = False。 当没有存活的非守护线程时,整个Python程序才会退出, 所以这才使得我们之前得出结论:主线程会等待子线程都结束之后再结束。 如果主线程运行完了,那么守护线程会被强制结束。 守护线程的作用是为其他线程提供服务。比如GC。

    import threading import time def test(): time.sleep(3) for i in range(100000): pass with open('testthread.txt', 'w') as f: f.write('test done') def test2(): time.sleep(3) for i in range(100000): pass with open('test2thread.txt', 'w') as f: f.write('test2 done') t1 = threading.Thread(target=test) t2 = threading.Thread(target=test2) t1.daemon = True t2.daemon = True t1.start() t2.start() time.sleep(4) print(t1.is_alive()) print(t2.is_alive())

    输出:

    False False

    0x08锁机制

    原始锁是一个在锁定时不属于特定线程的同步基元组件。原始锁处于 “锁定” 或者 “非锁定” 两种状态之一。它被创建时为非锁定状态。它有两个基本方法, acquire() 和 release() 。当状态为非锁定时, acquire() 将状态改为 锁定 并立即返回。当状态是锁定时, acquire() 将阻塞至其他线程调用 release() 将其改为非锁定状态,然后 acquire() 调用重置其为锁定状态并返回。 release() 只在锁定状态下调用; 它将状态改为非锁定并立即返回。

    互斥锁(原始锁)Lock的方法

    acquire(blocking=True, timeout=-1)

    当浮点型 timeout 参数被设置为正值调用时,只要无法获得锁,将最多阻塞 timeout 设定的秒数。timeout 参数被设置为 -1 时将无限等待。如果成功获得锁,则返回 True,否则返回 False (例如发生 超时 的时候)。

    release()

    释放一个锁。这个方法可以在任何线程中调用,不单指获得锁的线程。

    locked()

    如果获得了锁则返回真值。

    递归锁RLock

    重入锁(递归锁)是一个可以被同一个线程多次获取的同步基元组件。在内部,它在基元锁(就是上面那个原始的Lock)的锁定/非锁定状态上附加了 “所属线程” 和 “递归等级” 的概念。 若要锁定锁,线程调用其 acquire() 方法;一旦线程拥有了锁,方法将返回。若要解锁,线程调用 release() 方法。 acquire()/release() 对可以嵌套;只有最终 release() (最外面一对的 release() ) 将锁解开,才能让其他线程继续处理 acquire() 阻塞。

    其实递归锁就可以当做Lock使用,只不过它在同一个线程中可以被acquire很多次。 总之,就是说,同一个线程可以多次获取锁,执行acquire().

    条件对象

    import threading import time condition = threading.Condition() def thread1(): condition.acquire() print('thread1- start') # 暂时释放锁 condition.wait() print('threading1-end') # 把thread2给我叫醒 condition.notify() condition.release() def thread2(): # 保证线程1先执行 time.sleep(1) condition.acquire() print('thread2- start') # 把thread1叫起来 condition.notify() # 暂时释放锁 condition.wait() print('threading2-end') condition.release() def main(): t1 = threading.Thread(target=thread1) t2 = threading.Thread(target=thread2) t1.start() t2.start() main()

    wait()方法

    wait() 方法释放锁,然后阻塞直到其它线程调用 notify() 方法或 notify_all() 方法唤醒它。一旦被唤醒, wait() 方法重新获取锁并返回。它也可以指定超时时间。

    这就很有意思了。wait()方法可以假的释放锁,然后会阻塞到其他线程用notify唤醒它。这就像一个条件语句。运行到wait()时,这个线程就假死,让别的线程去运行。别的线程去运行完了,再用notify来唤醒它,继续运行这个线程。

    acquire(*args)

    请求底层锁。

    release()

    释放底层锁。

    wait(timeout=None)

    等待直到被通知或发生超时。如果线程在调用此方法时没有获得锁,将会引发 RuntimeError 异常。 这个方法释放底层锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的 notify() 或 notify_all() 唤醒它,或者直到可选的超时发生。一旦被唤醒或者超时,它重新获得锁并返回。

    notify(n=1)

    默认唤醒一个等待这个条件的线程。如果调用线程在没有获得锁的情况下调用这个方法,会引发 RuntimeError 异常。 这个方法唤醒最多 n 个正在等待这个。条件变量的线程;如果没有线程在等待,这是一个空操作。notify() 不会释放锁

    notify_all()

    唤醒所有正在等待这个条件的线程。

    信号量对象

    一个信号量管理一个内部计数器,该计数器因 acquire() 方法的调用而递减,因 release() 方法的调用而递增。 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。

    信号量决定了最多能同时运行的线程的数量。

    import threading import time semaphore = threading.Semaphore(5) def thread1(): semaphore.acquire() print(threading.current_thread().name + '正在运行') time.sleep(1) semaphore.release() def main(): threads = [] for i in range(22): threads.append(threading.Thread(target=thread1)) for thread in threads: thread.start() main()

    我们看到打印出来是5个5个一起出。 最后2个是2个一起出。 所以这样就验证了信号量的用法。

    事件对象

    一个事件对象管理一个内部标志,调用 set() 方法可将其设置为true,调用 clear() 方法可将其设置为false,调用 wait() 方法将进入阻塞直到标志为true。

    is_set()

    当且仅当内部旗标为时返回 True。

    set()

    将内部标志设置为true。所有正在等待这个事件的线程将被唤醒。当标志为true时,调用 wait() 方法的线程不会被阻塞。

    clear()

    将内部标志设置为false。之后调用 wait() 方法的线程将会被阻塞,直到调用 set() 方法将内部标志再次设置为true。

    wait(timeout=None)

    阻塞线程直到内部变量为true。如果调用时内部标志为true,将立即返回。否则将阻塞线程,直到调用 set() 方法将标志设置为true或者发生可选的超时。

    import threading import time event = threading.Event() def thread1(): print('thread1 is running.') event.wait() print('thread1 ended.') def thread2(): # 让thread1先执行 time.sleep(1) print('thread2 is running.') # 唤醒thread1 event.set() print('thread2 ended.') def main(): t1 = threading.Thread(target=thread1) t2 = threading.Thread(target=thread2) t1.start() t2.start() t1.join() t2.join() if __name__ == '__main__': main()

    定时器对象

    此类表示一个操作应该在等待一定的时间之后运行 — 相当于一个定时器。 Timer 类是 Thread 类的子类,因此可以像一个自定义线程一样工作。 与线程一样,通过调用 start() 方法启动定时器。而 cancel() 方法可以停止计时器(在计时结束前), 定时器在执行其操作之前等待的时间间隔可能与用户指定的时间间隔不完全相同。

    threading.Timer(interval, function, args=None, kwargs=None)

    创建一个定时器,在经过 interval 秒的间隔事件后,将会用参数 args 和关键字参数 kwargs 调用 function。如果 args 为 None (默认值),则会使用一个空列表。如果 kwargs 为 None (默认值),则会使用一个空字典。

    cancel()

    停止定时器并取消执行计时器将要执行的操作。仅当计时器仍处于等待状态时有效。

    举个栗子:

    def hello(): print("hello, world") t = Timer(30.0, hello) t.start() # after 30 seconds, "hello, world" will be printed
    Processed: 0.009, SQL: 8