Python进程、线程、协程
一、概念理解
需求创造了功能,为了满足各种实际需求,就出现了各种功能。
并发:为了能一起执行多个程序,于是就出现了并发;并发现象表现在视觉上就是多个程序一起执行,但是实际上是多个cpu同时运行多个程序呢,还是一个cpu在多个程序之间切换呢,这就不得而知了,反正这就是并发。
进程:并发有一个问题就是程序之间的数据,变量这些程序独有的东西容易混乱;于是需求出现了,为了解决并发带来的问题,进程出现了,进程能够将每个程序的地址空间,内存,数据等进行独立管理,避免混乱。
并行:后来cpu核数多了,那不能让其他人闲着吧,大家一起跑吧,这就是并行。
线程:但是后来发现,进程一多,当正在占用cpu的进程卡住了(可能是I\O阻塞、时钟阻塞等),那不能一直等你啊,我得把cpu给下一个进程用啊,那就切换嘛;但是切换进程需要进入内核,置换掉一大堆状态,进程数一高,大部分系统资源就被进程切换给吃掉了。所以,线程就出现了。在一个进程里面搞多个线程,一个线程卡住了,那我另一个线程还可以跑啊。而线程之间的切换就比进程的切换要省事多了,因为进程内的所有线程共享同内存、变量、数据等,甚至连进程号都相同。
协程:原本的线程太过僵硬了,必须要等到有人操作或者规定的时间到了才会切换,这样可能就会导致有程序阻塞后其他没有执行的线程只能看着,而协程就是用户自己写的逻辑流调度程序,即可以利用到并发优势,又可以避免反复系统调用,还有进程切换造成的开销。
首先协程算是用户态的线程,优势主要是少了内核态用户态的切换和能自己来做调度。然后协程一般只在有IO操作的时候才能用到,对于一些会阻塞的IO操作,可以自己选择协程切换,等IO就绪了再切回来,可以更充分利用CPU。
他们之间的关系是:系统中可以运行多个进程,一个进程中可有多个线程,一个线程中可有多个协程
深入了解推荐:
在python中,由于大部分编译器中都默认设置了GIL机制,所以python在使用这些编译器的时候会受到GIL锁的限制,GIL锁的的存在就是为了防止多线程并发执行机器码的互斥锁(mutex)(为了确保线程运行安全)。GIL的运行机制是只有拿到GIL的线程才能调用CPU运行。一个线程有两种情况下会释放全局解释器锁,一种情况是在该线程进入IO操作之前,会主动释放GIL,另一种情况是解释器不间断运行了1000字节码(Py2)或运行15毫秒(Py3)后,该线程也会放弃GIL。所以python的多线程其实并不是同时运行,而是交替运行,只是交替的速度很快,看起来像是多线程。
二、多进程实现
python中想实现多进程需要依赖multiprocessing或jobli包,multiprocessing包中又有多种实现多进程的方法,详细如下:
2.1、Process(用于创建进程模块)
2.1.0、方法属性
点击查看
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) ## 参数说明 # 常用参数 target: 要要调用的函数或方法; args/kwargs: 要传入方法的参数。args=(a,);kwargs={'name'=a,} # 以下是不常用参数 group: 线程组,目前还没有实现,库引用中提示必须是None; # 每个进程都属于一个进程组(PG,Process Group),进程组可以包含多个进程。进程组有一个进程组长(Leader),进程组长的ID(PID, Process ID)就作为整个进程组的ID(PGID,Process Groupd ID)。 name: 进程名; ## 方法实例 需要注意的是start(),join(),is_alive(), terminate()和exitcode方法只能由创建进程对象的过程调用。 is_alive():返回进程是否存活。从start() 方法返回到子进程终止的那一刻,进程对象仍处于活动状态。 join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程 daemon:进程的守护进程标志,一个布尔值。必须在start()调用之前设置。 # 在创建线程实例的时候可以设置该参数(实例中默认为None),或者在start()前设置这个参数,如果该参数为True,则会将该进程设置为主进程的守护进程,只要是其他子进程结束且主进程执行完毕,不管被设置为守护进程的程序是否执行完,主线程都会关闭。 name:进程名字。 pid:进程号。 ## 代码示例 from multiprocessing import Process import time import os def info(): print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info() time.sleep(3) print('hello', name) if __name__ == '__main__': info() p = Process(target=f, args=('bob',)) # p.daemon = False print(p.daemon) p.start() p.join(1) print('name:', p.name) print('is_alive:', p.is_alive()) print('exitcode:', p.exitcode) ''' ------------------------------------------------------------ module name: __main__ parent process: 1188 process id: 13060 False module name: __mp_main__ parent process: 13060 process id: 13424 name: Process-1 is_alive: True exitcode: None hello bob ------------------------------------------------------------ ''' if __name__ == '__main__': info() p = Process(target=f, args=('bob',)) p.daemon = True print(p.daemon) p.start() # p.join(1) print('name:', p.name) print('is_alive:', p.is_alive()) print('exitcode:', p.exitcode) ''' ------------------------------------------------------------ module name: __main__ parent process: 1188 process id: 1668 True name: Process-1 is_alive: True exitcode: None ------------------------------------------------------------ '''
2.1.1、直接创建
点击查看
# 1.导入包和模块 import multiprocessing import time def sing(): for i in range(3): print("i am sing ooo~") time.sleep(0.5) if __name__ == '__main__': # 2.使用进程类创建进程对象 # target :指定进程执行的函数名,不加括号 sing_process = multiprocessing.Process(target=sing) # 3. 使用进程对象启动进程执行指定任务 sing_process.start()
2.1.2、循环创建(🉑️;重点关注获取返回值)
点击查看
# 1.导入包和模块 import multiprocessing import time def main_province2(data_prov, return_list): k = data_prov return_list.append(k) return return_list return_list = multiprocessing.Manager().list() # 使用multiprocessing的Manager下的list,dict等模块接收返回值 jobs = [] for data_prov in data_prov_list: p = multiprocessing.Process(target=main_province2, args=(data_prov,return_list)) # 将返回值送到函数去接收返回值 jobs.append(p) p.start() # 开始进程 for proc in jobs: proc.join() # 阻塞进程,等待所有进程执行完成 return_list # 最终的接收值
2.2、Pool(用于创建管理进程池)
Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
2.2.0、方法属性
点击查看
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) ## 参数详解 # processes: 是要使用的工作进程数。如果进程是None,那么使用返回的数字os.cpu_count()。也就是说根据本地的cpu个数决定,processes小于等于本地的cpu个数; # initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。 # maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。 # context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。 ## 方法实例 apply(func [,args [,kwds ] ]) # pool.apply(test, args=(i,)) # 使用堵塞方式调用函数,必须等待上一个进程退出才能执行下一个进程 apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ]) # pool.apply_async(test, args=(i,)) # 使用非阻塞方式调用函数,可以并行执行 map(func,iterable [,chunksize ]) # pool.map(test, lists) # 使用堵塞方式调用函数,此方法将iterable内的每一个对象作为单独的任务提交给进程池 map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ]) # pool.map_async(test, range(500)) # 使用非阻塞方式调用函数,将iterable内的每一个对象作为单独的任务提交给进程池 apply_async 和 map_async 的区别 imap(func,iterable [,chunksize ]) # 使用和map相同,但是该方法将返回迭代器,使用next()获取迭代器内的内容 close():不在接受新的任务进入进程池,等待正在运行的所有进程结束后,关闭线程池 terminal() — 立即结束所有进程,即使在运行中的进程也会被停止 join() — 主进程阻塞等待子进程执行完之后再执行, join方法要在close或terminate之后使用。(守护进程) ## 异步方法返回的结果获取 get([timeout]) # result = pool.apply_async(time.sleep, (10,)) # print(result.get(timeout=1)) # 获取返回结果,timeout可以不设置。如果timeout不是None并且结果没有在timeout秒内到达,则 multiprocessing.TimeoutError 被引发。
2.2.1、apply_async循环启用进程池中的进程
点击查看
# 导入包和模块 import multiprocessing # 创建进程池 pool = multiprocessing.Pool(20) #异步调用 results = [] for data_4a in data_prov_system_acct4a_lists: result = pool.apply_async(main_province, args=(data_4a,)) results.append(result) pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 #获取处理结果 data_acct4a_out_list_2 = [res.get() for res in results] data_acct4a_out_df_2 = pd.concat(data_acct4a_out_list_2, join='inner', axis=0)
使用apply_async方法时,子进程不执行的情况
- 参数需要以元组的形式传递,并在最后一个参数后面加上 ,号,如果没有加,子进程不会执行
- 关闭进程池之前使用get()函数会导致进程阻塞(可以使用
callback = log_result
参数来代替get调回函数结果)
2.2.2、map_async映射启用进程池中的进程(🉑️)
点击查看
# 导入包和模块 import multiproceing # 创建进程池 pool = multiprocessing.Pool(20) #异步调用 res = pool.map_async(main_province, data_prov_system_acct4a_lists) #获取处理结果 data_acct4a_out_list_1 = [res_img for res_img in res.get()] data_acct4a_out_df_1 = pd.concat(data_acct4a_out_list_1, join='inner', axis=0)
它们的主要区别在于它们处理任务和返回结果的方式。
apply_async
:一次只提交一个任务。返回一个
AsyncResult
对象,你可以调用它的 get()
方法来获取结果。允许你为每个任务提供不同的参数。map_async
:类似于内置的
map
函数,它接受一个函数和一个可迭代的参数列表,然后将函数并行地应用于所有的参数。返回一个 AsyncResult
对象,你可以调用它的 get()
方法来获取一个包含所有结果的列表。所有任务使用相同的函数,但是参数从提供的可迭代对象中获取。2.3、jobli包实现多线程
推荐阅读:
pip install joblib from math import sqrt from joblib import Parallel, delayed Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] # Parallel(n_jobs=2): 指定两个CPU(默认是分配给不同的CPU) # 后面的delayed(sqrt)表示要用的函数是sqrt,这里这种用法就非常类似C++里面的委托(delegate)。 # (i**2) for i in range(10): 这里注意(i**2)的括号和delayed(sqrt)是紧挨着的。这一小段表示要传递给delayed中指定的函数的参数是i^2。
2.4、队列
2.4.1、队列应用场景
有时候在启用多进程时, 多个进程可能同时访问和修改共享的数据,这可能导致数据不一致性和竞争条件。为了解决这个问题,可以考虑使用进程间通信机制(IPC)来同步和协调进程之间的操作。
1, 使用进程池(
multiprocessing.Pool
)中的锁机制。可以使用multiprocessing.Lock()
创建锁对象,然后在进程内部的关键区域使用lock.acquire()
获取锁,完成关键操作后使用lock.release()
释放锁。这样可以确保同一时间只有一个进程在访问共享数据。使用锁机制确保了数据的正确性,但是同时也降低了并行的效率,因为多个进程需要竞争同一个锁。2, 使用进程池(
multiprocessing.Pool
)中的队列(multiprocessing.Queue
)机制。可以将数据放入队列中,然后让进程从队列中获取数据并进行处理。这种方式可以确保多个进程之间的数据互不干扰,避免了数据竞争的问题。数据进入不同进程是经过拷贝的2.4.2、方法属性
import multiprocessing queue = multiprocessing.Queue(队列长度) put # queue.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) put_nowait # queue.put_nowati(数据),放入数据(如队列已满,则不等待队列信息取出后再放入,直接报错) get # queue.get(数据),取出数据(如队列为空,则程序进入阻塞状态,等待队列防如数据后再取出) get_nowait # queue.get_nowait(数据),取出数据(如队列为空,则不等待队列放入信息后取出数据,直接报错),放入数据后立马判断是否为空有时为True,原因是放入值和判断同时进行 qsize # queue.qsize(),消息数量 empty # queue.empty()(返回值为True或False),判断是否为空 full # queue.full()(返回值为True或False),判断是否为满
2.4.3、实例
import multiprocessing def square(n): return n * n if __name__ == '__main__': data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 创建进程池和队列 pool = multiprocessing.Pool() queue = multiprocessing.Manager().Queue() # 将要处理的数据放入队列 for n in data: queue.put(n) # 并发处理数据 results = [] while not queue.empty(): n = queue.get() result = pool.apply_async(square, args=(n,)) results.append(result) # 关闭进程池 pool.close() pool.join() # 获取处理结果 final_results = [r.get() for r in results] print(final_results)
2.5、资源查看
os.cpu_count() # 查看cpu核数 os.getpid() # 当前进程id os.getppid() # 当前父进程id
2.6、推荐阅读
三、多线程实现
多线程主要是通过threading包实现
python多线程详解
什么是线程?
线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。
线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所
拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行
为什么要使用多线程?
线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄
和其他进程应有的状态。
因为线程的划分尺度小于进程,使得多线程程序的并发性高。进程在执行过程之中拥有独立的内存单元,而多个线程共享
内存,从而极大的提升了程序的运行效率。
线程比进程具有更高的性能,这是由于同一个进程中的线程都有共性,多个线程共享一个进程的虚拟空间。线程的共享环境
包括进程代码段、进程的共有数据等,利用这些共享的数据,线程之间很容易实现通信。
操作系统在创建进程时,必须为改进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单得多。因此,使用多线程
来实现并发比使用多进程的性能高得要多。
总结起来,使用多线程编程具有如下几个优点:
进程之间不能共享内存,但线程之间共享内存非常容易。
操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此使用多线程来实现多任务并发执行比使用多进程的效率高
python语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了python的多线程编程。
实现多线程
点击查看
import threading from threading import Lock,Thread import time,os # 普通创建方式 def run(n): print('task',n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = threading.Thread(target=run,args=('t1',)) # target是要执行的函数名(不是函数),args是函数对应的参数,以元组的形式存在 t2 = threading.Thread(target=run,args=('t2',)) t1.start() t2.start() # 自定义线程:继承threading.Thread来定义线程类,其本质是重构Thread类中的run方法 class MyThread(threading.Thread): def __init__(self,n): super(MyThread,self).__init__() #重构run函数必须写 self.n = n def run(self): print('task',self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = MyThread('t1') t2 = MyThread('t2') t1.start() t2.start()
守护线程
下面这个例子,这里使用setDaemon(True)把所有的子线程都变成了主线程的守护线程,
因此当主线程结束后,子线程也会随之结束,所以当主线程结束后,整个程序就退出了。
所谓’线程守护’,就是主线程不管该线程的执行情况,只要是其他子线程结束且主线程执行完毕,主线程都会关闭。也就是说:主线程不等待该守护线程的执行完再去关闭。
点击查看
def run(n): print('task',n) time.sleep(1) print('3s') time.sleep(1) print('2s') time.sleep(1) print('1s') if __name__ == '__main__': t=threading.Thread(target=run,args=('t1',)) t.setDaemon(True) t.start() print('end') # 通过执行结果可以看出,设置守护线程之后,当主线程结束时,子线程也将立即结束,不再执行 # 主线程等待子线程结束 # 为了让守护线程执行结束之后,主线程再结束,我们可以使用join方法,让主线程等待子线程执行 def run(n): print('task',n) time.sleep(2) print('5s') time.sleep(2) print('3s') time.sleep(2) print('1s') if __name__ == '__main__': t=threading.Thread(target=run,args=('t1',)) t.setDaemon(True) #把子线程设置为守护线程,必须在start()之前设置 t.start() t.join() #设置主线程等待子线程结束 print('end') # 多线程共享全局变量 # 线程时进程的执行单元,进程时系统分配资源的最小执行单位,所以在同一个进程中的多线程是共享资源的 g_num = 100 def work1(): global g_num for i in range(3): g_num+=1 print('in work1 g_num is : %d' % g_num) def work2(): global g_num print('in work2 g_num is : %d' % g_num) if __name__ == '__main__': t1 = threading.Thread(target=work1) t1.start() time.sleep(1) t2=threading.Thread(target=work2) t2.start()
GIL和其他
点击查看
''' 由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据, 所以出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,像下面的代码,当需要独占 某一个资源时,任何一个锁都可以锁定这个资源,就好比你用不同的锁都可以把这个相同的门锁住一样。 由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期, 我们因此也称为“线程不安全”。 为了防止上面情况的发生,就出现了互斥锁(Lock) ''' # def work(): # global n # lock.acquire() # temp = n # time.sleep(0.1) # n = temp-1 # lock.release() # # # if __name__ == '__main__': # lock = Lock() # n = 100 # l = [] # for i in range(100): # p = Thread(target=work) # l.append(p) # p.start() # for p in l: # p.join() ''' 递归锁:RLcok类的用法和Lock类一模一样,但它支持嵌套,在多个锁没有释放的时候一般会使用RLock类 ''' # def func(lock): # global gl_num # lock.acquire() # gl_num += 1 # time.sleep(1) # print(gl_num) # lock.release() # # # if __name__ == '__main__': # gl_num = 0 # lock = threading.RLock() # for i in range(10): # t = threading.Thread(target=func,args=(lock,)) # t.start() ''' 信号量(BoundedSemaphore类) 互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如厕所有3个坑, 那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去 ''' # def run(n,semaphore): # semaphore.acquire() #加锁 # time.sleep(3) # print('run the thread:%s\n' % n) # semaphore.release() #释放 # # # if __name__== '__main__': # num=0 # semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 # for i in range(22): # t = threading.Thread(target=run,args=('t-%s' % i,semaphore)) # t.start() # while threading.active_count() !=1: # pass # else: # print('----------all threads done-----------') ''' python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下的几个方法: clear将flag设置为 False set将flag设置为 True is_set判断是否设置了flag wait会一直监听flag,如果没有检测到flag就一直处于阻塞状态 事件处理的机制:全局定义了一个Flag,当Flag的值为False,那么event.wait()就会阻塞,当flag值为True, 那么event.wait()便不再阻塞 ''' event = threading.Event() def lighter(): count = 0 event.set() #初始者为绿灯 while True: if 5 < count <=10: event.clear() #红灯,清除标志位 print("\33[41;lmred light is on...\033[0m]") elif count > 10: event.set() #绿灯,设置标志位 count = 0 else: print('\33[42;lmgreen light is on...\033[0m') time.sleep(1) count += 1 def car(name): while True: if event.is_set(): #判断是否设置了标志位 print('[%s] running.....'%name) time.sleep(1) else: print('[%s] sees red light,waiting...'%name) event.wait() print('[%s] green light is on,start going...'%name) # startTime = time.time() light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=('MINT',)) car.start() endTime = time.time() # print('用时:',endTime-startTime)
GIL 全局解释器
在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python中,无论有多少个核同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。
GIL的全程是全局解释器,来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看做是“通行证”,并且在一个python进程之中,GIL只有一个。拿不到线程的通行证,并且在一个python进程中,GIL只有一个,拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,而只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的python在使用多线程的时候,调用的是c语言的原生过程。
python针对不同类型的代码执行效率也是不同的
1、CPU密集型代码(各种循环处理、计算等),在这种情况下,由于计算工作多,ticks技术很快就会达到阀值,然后触发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。
2、IO密集型代码(文件处理、网络爬虫等设计文件读写操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序的执行效率)。所以python的多线程对IO密集型代码比较友好。
主要要看任务的类型,我们把任务分为I/O密集型和计算密集型,而多线程在切换中又分为I/O切换和时间切换。如果任务属于是I/O密集型,若不采用多线程,我们在进行I/O操作时,势必要等待前面一个I/O任务完成后面的I/O任务才能进行,在这个等待的过程中,CPU处于等待状态,这时如果采用多线程的话,刚好可以切换到进行另一个I/O任务。这样就刚好可以充分利用CPU避免CPU处于闲置状态,提高效率。但是如果多线程任务都是计算型,CPU会一直在进行工作,直到一定的时间后采取多线程时间切换的方式进行切换线程,此时CPU一直处于工作状态,此种情况下并不能提高性能,相反在切换多线程任务时,可能还会造成时间和资源的浪费,导致效能下降。这就是造成上面两种多线程结果不能的解释。
结论:
I/O密集型任务,建议采取多线程,还可以采用多进程+协程的方式(例如:爬虫多采用多线程处理爬取的数据);
对于计算密集型任务,python此时就不适用了。
四、异步编程(协程)
异步编程(Asynchronous Programming)是一种并发编程的形式,允许程序在等待某个耗时操作完成(如 I/O 操作)时,继续执行其他任务,而不是阻塞在当前操作上。这种方式可以大幅提高程序的执行效率,特别是在处理 I/O 密集型任务(如文件读写、网络请求、数据库操作)时。
可以简单的理解为多个相互协作的子程序。在同一个线程中,当一个子程序阻塞时,我们可以让程序马上从一个子程序切换到另一个子程序,从而避免CPU因程序阻塞而闲置,这样就可以提升CPU的利用率,相当于用一种协作的方式加速了程序的执行。所以,我们可以言简意赅的说:协程实现了协作式并发。
异步编程可以通过多种方式实现,协程是其中一种常用的实现方式。在 Python 中,
asyncio
模块提供了一个完整的框架来实现异步编程,协程是这个框架的核心组件。此外,异步编程还可以通过回调函数、事件循环、生成器等方式实现。异步函数不同于普通函数,调用普通函数会得到返回值,而调用异步函数会得到一个协程对象。我们需要将协程对象放到一个事件循环中才能达到与其他协程对象协作的效果,因为事件循环会负责处理子程序切换的操作,简单的说就是让阻塞的子程序让出CPU给可以执行的子程序。
异步编程的核心概念
- 同步 vs 异步
- 同步编程:代码按顺序执行,一个任务未完成,后续任务必须等待。例如,传统的函数调用是同步的,函数未返回时,调用者会一直等待。
- 异步编程:代码可以在某些操作未完成时继续执行其他任务。等待中的操作完成时会通过回调或其他方式通知主程序。
- 并发 vs 并行
- 并发:程序在单个处理器上通过分时机制执行多个任务,任务间交替执行,但在某一时刻只有一个任务在运行。异步编程是实现并发的一种方式。
- 并行:程序在多个处理器或多核处理器上同时执行多个任务。
- 事件循环 (Event Loop)
- 事件循环是异步编程的调度中心。它负责管理协程的执行,并在协程暂停时切换到其他协程。
asyncio
库提供了一个默认的事件循环,可以通过asyncio.run()
来运行异步代码。事件循环会不断检查是否有准备好执行的任务,并将它们调度执行。确保在一个协程暂停时,其他协程可以利用 CPU 资源继续执行。
- 协程 (Coroutine)
- 协程是 Python 中异步编程的基础,它是一种比线程更加轻量级的并发方式。可以在执行过程中暂停并在稍后恢复。在 Python 中,协程使用
async def
关键字定义,并通过await
关键字来暂停执行。协程可以在等待 I/O 操作时挂起,允许其他协程继续执行。
Python 中的异步编程
Python 通过
asyncio
模块来支持异步编程。自 Python 3.5 起,async
和 await
关键字被引入,使得异步编程更加直观和易于使用。关键字和函数
async def
:定义一个异步函数(协程)。
await
:等待一个异步操作完成。只能在协程中使用。(暂停当前协程,释放控制权给事件循环,等待异步操作完成。)
asyncio.run()
:启动一个异步任务。
asyncio.create_task()
:并发运行多个协程。
asyncio.gather()
:将多个协程包装成一个,等待所有协程完成。
异步编程示例
示例 1:简单的异步函数
import asyncio async def say_hello(): print("Hello...") await asyncio.sleep(1) # 模拟一个异步 I/O 操作 print("...World!") asyncio.run(say_hello())
- 在这个例子中,
say_hello
是一个异步函数,await asyncio.sleep(1)
会让出控制权,使事件循环可以调度其他任务。
示例 2:并发执行多个异步任务
import asyncio async def task(name, duration): print(f"Task {name} started...") await asyncio.sleep(duration) print(f"Task {name} finished after {duration} seconds") async def main(): task1 = asyncio.create_task(task("A", 2)) task2 = asyncio.create_task(task("B", 1)) await task1 await task2 asyncio.run(main())
- 这里的
main()
函数通过asyncio.create_task()
创建了两个异步任务,它们会并发执行,最终的输出顺序不会因为任务的启动顺序而改变,而是由任务的持续时间决定。
示例 3:等待多个任务的完成
import asyncio async def task(name, duration): print(f"Task {name} started...") await asyncio.sleep(duration) print(f"Task {name} finished after {duration} seconds") return name, duration async def main(): results = await asyncio.gather( task("A", 2), task("B", 1), task("C", 3) ) print("All tasks finished:", results) asyncio.run(main())
asyncio.gather()
同时运行多个任务,并等待它们全部完成后再返回结果。这个示例会并发执行任务A
、B
和C
,并在所有任务完成后输出结果。
示例4:执行顺序
import asyncio async def task1(): print("Task 1 started") await asyncio.sleep(2) # 假设这个操作需要 2 秒 print("Task 1 finished") async def task2(): print("Task 2 started") await asyncio.sleep(1) # 假设这个操作需要 1 秒 print("Task 2 finished") async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main())
- 当
task1()
遇到await asyncio.sleep(2)
时,任务会暂停,事件循环会切换去执行task2()
。
task2()
遇到await asyncio.sleep(1)
时也会暂停,事件循环又会检查是否有其他任务可以运行。
- 由于所有任务都在等待,这时事件循环会空闲等待,直到第一个
await
操作完成。
- 1 秒后,
task2
的await
操作完成,事件循环会恢复执行task2
的后续代码,打印Task 2 finished
。
- 再过 1 秒后,
task1
的await
操作完成,事件循环会恢复执行task1
的后续代码,打印Task 1 finished
。
异步编程的优点和注意事项
优点
- 高效的 I/O 操作:在网络请求、文件读写等 I/O 操作中,异步编程可以显著提高效率。
- 减少阻塞:避免程序在等待 I/O 操作时阻塞,从而更好地利用 CPU 资源。
- 更好的响应性:在 GUI 编程或 Web 服务中,异步编程可以使程序对用户操作更加响应及时。
注意事项
- 学习曲线:异步编程可能比同步编程更难理解,特别是在处理复杂的逻辑时。
- 调试困难:异步代码的调试和错误处理相对复杂,需要更多的注意力。
- 非 I/O 密集型任务:异步编程主要在 I/O 密集型任务中表现出色,而对于 CPU 密集型任务,可能并没有显著优势。
总结
异步编程是一种强大的工具,可以显著提高程序在处理 I/O 密集型任务时的性能。通过理解事件循环、协程和
asyncio
库,你可以编写出更高效、响应更快的 Python 程序。随着 Python 对异步编程支持的逐步增强,掌握这些概念和技术对现代 Python 开发者来说至关重要。五、进程、线程、协程对比
对比项 | 进程 (Process) | 线程 (Thread) | 协程 (Coroutine) |
定义 | 独立的执行单元,有自己的内存空间和资源 | 进程内的执行单元,线程间共享内存和资源 | 程序内轻量级的协作任务,通过让出控制权实现并发。协程的并发运行实际上是通过迅速切换任务来实现的 |
构建方式 | multiprocessing 模块 | threading 模块 | asyncio 模块和 async /await 关键字 |
优点 | 独立性强,能充分利用多核 CPU | 轻量级,共享内存,切换速度快 | 最轻量级,创建和切换开销小,适合大量并发任务 |
缺点 | 创建和销毁开销大,进程间通信复杂 | 受 GIL 限制,CPU 密集型任务中无法充分利用多核 CPU | 需要显式控制让出点,调试和错误处理较复杂 |
适用场景 | CPU 密集型任务,如数据处理、科学计算 | I/O 密集型任务,如网络请求、文件读写 | I/O 密集型任务,如网络服务器、高并发应用 |
CPU 使用情况 | 能够充分利用多核 CPU,真正的并行 | 受 GIL 限制,主要在 I/O 操作时有效 | 所有协程都在同一个线程中执行,因此是共享同一个 CPU 核心的资源。 |
内存共享 | 否,每个进程有独立的内存空间 | 是,线程间共享同一进程的内存 | 是,协程共享同一线程的内存 |
调度方式 | 由操作系统调度 | 由操作系统调度 | 由程序自身调度,非抢占式 |
构建代码示例 | multiprocessing 示例见下 | threading 示例见下 | asyncio 示例见下 |
multiprocessing
示例
import multiprocessing import os def worker(num): print(f"Worker {num} is running on process {os.getpid()}") if __name__ == "__main__": processes = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) processes.append(p) p.start() for p in processes: p.join()
threading
示例
import threading def worker(num): print(f"Worker {num} is running on thread {threading.current_thread().name}") threads = [] for i in range(5): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
asyncio
示例
import asyncio async def worker(num): print(f"Worker {num} is running") await asyncio.sleep(1) print(f"Worker {num} finished") async def main(): tasks = [worker(i) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
使用过程中遇到的问题
1、python版本限制进程池处理的数据量
报错代码:'i' format requires -2147483648 <= number <= 2147483647
问题原因:多进程时候,进程间数据交换是通过pickling,因为处理的文本都比较大,当数据pickled时候超过了i struct的限制,-2147483648 <= number <= 2147483647;Python3.8中在非windows平台修复了这个问题,可以支持最大4EB的数据。
解决办法:
- 将每个进程结果写入文件,最后汇总处理,这样避免进程间大量数据传递
- 代码中检查Python版本,低于3.8时候不使用多进程。
- 使用dask dataframe等Python并行处理库
具体思路:主要路线是上面👆的第一条
尝试:
1、换成process方式实现(缺点是进程数会跟着省份数增加,不固定)
2、由于使用pool方式的循环创建并不能缩短时间,所以还是使用pool直接创建,但是可以把process中的数据接收参数使用进来
2、进程池中进程的数量增加,而运行效率并没有提升
进程还需要处理I/O请求,在一定数据量内,进程数量和处理速度会有一个平衡点。
3、OSError: [Errno 28] No space left on device
严格来讲这并不是一个多进程引发的报错。
背景:
Linux 中的tmpfs是一种基于内存的文件系统,它使用系统的RAM或交换空间来存储文件和目录。tmpfs的特点是速度快,因为数据存储在内存中,而不是硬盘上。这使得对文件的读写操作比传统的基于磁盘的文件系统要快得多。当系统重启或关闭时,存储在tmpfs上的数据会丢失,因为内存的内容在断电后不会保留。
/dev/shm
是tmpfs文件系统的一个常见挂载点,全称为"shared memory",即共享内存。它允许不同的进程共享内存中的数据,从而提高程序之间通信的效率。在很多Linux发行版中,/dev/shm
默认就是以tmpfs的形式挂载的,这意味着/dev/shm
实际上就是一个内存中的文件系统,用于存储临时文件,这些文件可以被系统上运行的任何进程访问。一般共享内存对应的大小时 64M(不知道是不是因为有Docker的原因)原因:
一般来说,如果一个程序进程正常结束,那么这个程序在共享内存中对应的临时文件就会自动删除;但是当程训报错或者非正常结束,则临时文件就会堆积,而一般服务器也不会频繁关机重启,从而就导致
/dev/shm
对应的空间越来越小。当共享内存被占满时再运行程序就会导致报错:OSError: [Errno 28] No space left on device