python 进程与线程是并发编程的两种常见方式。进程是操作系统中的一个基本概念,表示程序在操作系统中的一次执行过程,拥有独立的地址空间、资源、优先级等属性。线程是进程中的一条执行路径,可以看做是轻量级的进程,与同一个进程中的其他线程共享相同的地址空间和资源。
线程和进程都可以实现并发编程,但是它们之间有几点不同:
线程间共享进程的内存空间,但进程间的内存空间是相互独立的;
线程创建和销毁的开销较小,但是线程切换的开销较大;
进程间通信需要较为复杂的 IPC(Inter-Process Communication)机制,线程间通信则可以直接读写共享内存;
多进程可以充分利用多核 CPU 的性能,但是多线程受 GIL(Global Interpreter Lock)限制,只能利用单核 CPU 的性能。
在选择使用进程还是线程时,需要根据具体场景和需求进行权衡和选择。如果任务需要充分利用多核 CPU,且任务之间互不影响,可以选择多进程;如果任务之间需要共享资源和数据,可以选择多线程。同时,需要注意在 python 中使用多线程时,由于 GIL 的存在,可能无法实现真正的并行。
8.1 创建并使用线程 线程是操作系统调度的最小执行单元,是进程中的一部分,能够提高程序的效率。在python中,创建线程需要使用threading模块。该模块的实现方法是底层调用了C语言的原生函数来实现线程的创建和管理。在系统中,所有的线程看起来都是同时执行的,但实际上是由操作系统进行时间片轮转调度的。
使用函数创建线程: 创建线程并传递参数实现指定函数多线程并发,使用join
方法,等待线程执行完毕后的返回结果.
import os,timeimport threadingnow = lambda :time.time() def MyThread (x,y ): time.sleep(5 ) print ("传递的数据:%s,%s" %(x,y)) if __name__ == "__main__" : ThreadPool = [] start = now() for item in range (10 ): thread = threading.Thread(target=MyThread,args=(item,item+1 ,)) thread.start() ThreadPool.append(thread) for item in ThreadPool: item.join() print ("[+] 线程信息: {}" .format (item)) stop = now() print ("[+] 线程总耗时: {} s" .format (int (stop-start)))
使用类创建内部线程: 通过定义类,将线程函数与类进行结合实现一体化该方式调用方便思维明确.
import os,timeimport threadingclass MyThread (threading.Thread): def __init__ (self,x,y ): super (MyThread, self).__init__() self.x = x self.y = y def run (self ): print ("[+] 当前执行运算: {} + {}" .format (self.x,self.y)) self.result = self.x + self.y def get_result (self ): try : return self.result except Exception: return None if __name__ == "__main__" : ThreadPool = [] for item in range (1 ,10 ): obj = MyThread(item,item+1 ) obj.start() ThreadPool.append(obj) for item in ThreadPool: item.join() print ("[+] 获取返回: " ,item.get_result())
使用类创建外部线程: 该定义方式与上方完全不同,我们可以将执行过程定义到类的外部为单独函数,然后类内部去调用传参.
import os,timeimport threadingdef MyThreadPrint (x,y ): print ("[+] 当前执行运算: {} + {}" .format (x,y)) result = x + y return result class MyThread (threading.Thread): def __init__ (self,func,args=( ) ): super (MyThread, self).__init__() self.func = func self.args = args def run (self ): self.result = self.func(*self.args) def get_result (self ): try : return self.result except Exception: return None if __name__ == "__main__" : ThreadPool = [] for item in range (1 ,10 ): obj = MyThread(func=MyThreadPrint,args=(item,item+1 )) obj.start() ThreadPool.append(obj) for item in ThreadPool: item.join() print ("[+] 获取返回: " ,item.get_result())
在线程中创建子线程: 通过创建一个守护线程,并让守护线程调用子线程,从而实现线程中调用线程,线程嵌套调用.
import timeimport threadingdef run (num ): print ("这是第 {} 个子线程" .format (num)) time.sleep(2 ) def main (): for each in range (5 ): thread = threading.Thread(target=run,args=(each,)) thread.start() print ("启动子线程: {} 编号: {}" .format (thread.getName(),each)) thread.join() if __name__ == "__main__" : daemon = threading.Thread(target=main,args=()) daemon.setDaemon(True ) daemon.start() daemon.join(timeout=10 )
简单的线程互斥锁(Semaphore): 同时允许一定数量的线程更改数据,也就是限制每次允许执行的线程数.
import threading,timesemaphore = threading.BoundedSemaphore(5 ) def run (n ): semaphore.acquire() time.sleep(1 ) print ("运行这个线程中: %s" %n) semaphore.release() if __name__ == '__main__' : for i in range (20 ): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1 : pass else : print ('----所有线程执行完毕了---' )
import threading,timeclass mythreading (threading.Thread): def run (self ): semaphore.acquire() print ('running the thread:' ,self.getName()) time.sleep(2 ) semaphore.release() if __name__ == "__main__" : semaphore = threading.BoundedSemaphore(3 ) for i in range (20 ): t1 = mythreading() t1.start() t1.join()
线程全局锁(Lock): 添加本全局锁以后,能够保证在同一时间内保证只有一个线程具有权限.
import timeimport threadingnum = 0 thread_list = [] lock = threading.Lock() def SumNumber (): global num time.sleep(2 ) lock.acquire() num += 1 lock.release() for x in range (50 ): thread = threading.Thread(target=SumNumber) thread.start() thread_list.append(thread) for y in thread_list: y.join() print ("计算结果: " ,num)
线程递归锁(RLock): 递归锁和全局锁差不多,递归锁就是在大锁中还要添加个小锁,递归锁是常用的锁.
import threadingimport timenum = 0 lock = threading.RLock() def fun1 (): lock.acquire() global num num += 1 lock.release() return num def fun2 (): lock.acquire() res = fun1() print ("计算结果: " ,res) lock.release() if __name__ == "__main__" : for x in range (10 ): thread = threading.Thread(target=fun2) thread.start() while threading.active_count() != 1 : print (threading.active_count()) else : print ("所有线程运行完成..." ) print (num)
线程互斥锁量控制并发: 使用BoundedSemaphore
定义默认信号10,既可以实现控制单位时间内的程序并发量.
import os,timeimport threadingdef MyThread (x ): lock.acquire() print ("执行数据: {}" .format (x)) lock.release() time.sleep(2 ) threadmax.release() if __name__ == "__main__" : threadmax = threading.BoundedSemaphore(10 ) lock = threading.Lock() ThreadPool = [] for item in range (1 ,100 ): threadmax.acquire() thread = threading.Thread(target=MyThread,args=(item,)) thread.start() ThreadPool.append(thread) for item in ThreadPool: item.join()
线程驱动事件(Event): 线程事件用于主线程控制其他线程的执行,事件主要提供了三个方法set、wait、clear、is_set
,分别用于设置检测和清除标志.
import threadingevent = threading.Event() def func (x,event ): print ("函数被执行了: %s 次.." %x) event.wait() print ("加载执行结果: %s" %x) for i in range (10 ): thread = threading.Thread(target=func,args=(i,event,)) thread.start() print ("当前状态: %s" %event.is_set()) event.clear() temp=input ("输入yes解锁新姿势: " ) if temp == "yes" : event.set () print ("当前状态: %s" %event.is_set())
import threadingdef show (event ): event.wait() print ("执行一次线程操作" ) if __name__ == "__main__" : event_obj = threading.Event() for i in range (10 ): t1 = threading.Thread(target=show,args=(event_obj,)) t1.start() inside = input ('>>>:' ) if inside == '1' : event_obj.set () event_obj.clear()
线程实现条件锁: 条件(Condition) 使得线程等待,只有满足某条件时,才释放N个线程.
import threadingdef condition_func (): ret = False inp = input (">> " ) if inp == '1' : ret = True return ret def run (n ): con.acquire() con.wait_for(condition_func) print ('running...' ,n) con.release() if __name__ == "__main__" : con = threading.Condition() for i in range (10 ): t = threading.Thread(target=run,args=(i,)) t.start() t.join()
单线程异步并发执行: 在单线程下实现异步执行多个函数,返回耗时取决于最后一个函数的执行时间.
import time,asyncionow = lambda :time.time() async def GetSystemMem (sleep ): print ("[+] 执行获取内存异步函数." ) await asyncio.sleep(sleep) return 1 async def GetSystemCPU (sleep ): print ("[+] 执行获取CPU异步函数." ) await asyncio.sleep(sleep) return 1 if __name__ == "__main__" : stop = now() mem = GetSystemMem(1 ) cpu = GetSystemCPU(4 ) task=[ asyncio.ensure_future(mem), asyncio.ensure_future(cpu) ] loop=asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task)) for item in task: print ("[+] 返回结果: " ,item.result()) print ('总耗时: {}' .format (stop - now()))
8.2 创建并使用进程 进程是指正在执行的程序,创建进程需要使用multiprocessing
模块,创建方法和线程相同,但由于进程之间的数据需要各自持有一份,所以创建进程需要更大的开销。进程间数据不共享,多进程可以用来处理多任务,但很消耗资源。计算密集型任务最好交给多进程来处理,I/O密集型任务最好交给多线程来处理。另外,进程的数量应该和CPU的核心数保持一致,以充分利用系统资源。
使用进程函数执行命令: 通过系统提供的进程线程函数完成对系统命令的调用与执行.
>>> import os,subprocess>>> >>> os.system("ping -n 1 www.baidu.com" ) >>> >>> ret = os.popen("ping -n 1 www.baidu.com" ) >>> ret.read()>>> >>> subprocess.run("ping www.baidu.com" ,shell=True )>>> subprocess.call("ping www.baidu.com" , shell=True )>>> >>> ret = subprocess.Popen("ping www.baidu.com" ,shell=True ,stdout=subprocess.PIPE)>>> ret.stdout.read()
创建多进程与子线程: 通过使用multiprocessing库,循环创建4个主进程,而在每个主进程内部又起了5个子线程.
import multiprocessingimport threading,osdef ThreadingFunction (): print ("[-] ----> 子线程PPID: {}" .format (threading.get_ident())) def ProcessFunction (number ): print ("[*] -> 主进程PID: {} 父进程: {}" .format (os.getpid(),os.getppid())) for i in range (5 ): thread = threading.Thread(target=ThreadingFunction,) thread.start() if __name__ == "__main__" : for item in range (4 ): proc = multiprocessing.Process(target=ProcessFunction,args=(item,)) proc.start() proc.join()
使用基于类的方式创建进程: 除了使用函数式方式创建进程以外,我们还可以使用基于类的方式创建.
import os,timefrom multiprocessing import Processclass Myprocess (Process ): def __init__ (self,person ): super ().__init__() self.person = person def run (self ): print ("[*] -> 当前PID: {}" .format (os.getpid())) print ("--> 传入的人名: {}" .format (self.person)) time.sleep(3 ) if __name__ == '__main__' : process = Myprocess("lyshark" ) process.start()
进程锁(Lock): 进程中也有锁,可以实现进程之间数据的一致性,也就是进程数据的同步,保证数据不混乱.
import multiprocessingdef func (loc,num ): loc.acquire() print ("hello ---> %s" %num) loc.release() if __name__ == "__main__" : lock = multiprocessing.Lock() for number in range (10 ): proc = multiprocessing.Process(target=func,args=(lock,number,)) proc.start()
异步进程池: 进程池内部维护一个进程序列,当使用时则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止.
import multiprocessingimport timedef ProcessFunction (number ): time.sleep(2 ) print ("[+] 进程执行ID: {}" .format (number)) def ProcessCallback (arg ): print ("[-] 进程执行结束,执行回调函数" ) if __name__ == "__main__" : pool = multiprocessing.Pool(processes=5 ) for item in range (10 ): pool.apply_async(func=ProcessFunction,args=(item,),callback=ProcessCallback) pool.close() pool.join()
from multiprocessing import Pool, TimeoutErrorimport time,osdef f (x ): return x*x if __name__ == '__main__' : with Pool(processes=4 ) as pool: print (pool.map (f, range (10 ))) for i in pool.imap_unordered(f, range (10 )): print (i,end=' ' ) res = pool.apply_async(f,(20 ,)) print ('\n' ,res.get(timeout=1 )) res = pool.apply_async(os.getpid,()) print (res.get(timeout=1 )) multiple_results = [pool.apply_async(os.getpid, ()) for i in range (4 )] print ([res.get(timeout=1 ) for res in multiple_results]) res = pool.apply_async(time.sleep, (10 ,)) try : print (res.get(timeout=1 )) except TimeoutError: print ("We lacked patience and got a multiprocessing.TimeoutError" )
8.3 多进程数据共享 一般当我们创建两个进程后,进程各自持有一份数据,默认无法共享数据,如果我们想要共享数据必须通过一个中间件来实现数据的交换,来帮你把数据进行一个投递,要实现进程之间的数据共享,其主要有以下几个方法来实现进程间数据的共享.
共享队列(Queue): 这个Queue主要实现进程与进程之间的数据共享,与线程中的Queue不同.
from multiprocessing import Processfrom multiprocessing import queuesimport multiprocessing def foo (i,arg ): arg.put(i) print ('say hi' ,i,arg.qsize()) li = queues.Queue(20 ,ctx=multiprocessing) for i in range (10 ): p = Process(target=foo,args=(i,li,)) p.start()
共享整数(int): 整数之间的共享,只需要使用multiprocessing.Value
方法,即可实现.
import multiprocessingdef func (num ): num.value = 1024 print ("函数中的数值: %s" %num.value) if __name__ == "__main__" : num = multiprocessing.Value("d" ,10.0 ) print ("这个共享数值: %s" %num.value) for i in range (5 ): num = multiprocessing.Value("d" , i) proc = multiprocessing.Process(target=func,args=(num,)) proc.start() print ("最后打印数值: %s" %num.value)
共享数组(Array): 数组之间的共享,只需要使用multiprocessing.Array
方法,即可实现.
import multiprocessingdef func (ary ): ary[0 ]=100 ary[1 ]=200 ary[2 ]=300 ''' i所对应的类型是ctypes.c_int,其他类型如下参考: 'c': ctypes.c_char, 'u': ctypes.c_wchar, 'b': ctypes.c_byte, 'B': ctypes.c_ubyte, 'h': ctypes.c_short, 'H': ctypes.c_ushort, 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, 'L': ctypes.c_ulong, 'f': ctypes.c_float, 'd': ctypes.c_double ''' if __name__ == "__main__" : ary = multiprocessing.Array("i" ,[1 ,2 ,3 ]) for i in range (5 ): proc = multiprocessing.Process(target=func,args=(ary,)) print (ary[:]) proc.start()
共享字典(dict): 通过使用Manager方法,实现两个进程中的,字典与列表的数据共享.
import multiprocessingdef func (mydict, mylist ): mydict["字典1" ] = "值1" mydict["字典2" ] = "值2" mylist.append(1 ) mylist.append(2 ) mylist.append(3 ) if __name__ == "__main__" : mydict = multiprocessing.Manager().dict () mylist = multiprocessing.Manager().list () proc = multiprocessing.Process(target=func,args=(mydict,mylist)) proc.start() proc.join() print ("列表中的元素: %s" %mylist) print ("字典中的元素: %s" %mydict)
管道共享(Pipe): 通过Pipe
管道的方式在两个进程之间共享数据,类似于Socket套接字.
import multiprocessingdef func (conn ): conn.send("你好我是子进程." ) print ("父进程传来了:" ,conn.recv()) conn.close() if __name__ == "__main__" : parent_conn,child_conn = multiprocessing.Pipe() proc = multiprocessing.Process(target=func,args=(child_conn,)) proc.start() print ("子进程传来了:" ,parent_conn.recv()) parent_conn.send("我是父进程,收到消息了.." )