100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > Python多线程多进程 异步 异常处理等高级用法

Python多线程多进程 异步 异常处理等高级用法

时间:2022-08-21 02:33:04

相关推荐

Python多线程多进程 异步 异常处理等高级用法

文章目录

前言多线程多进程多线程多进程协程总结 异步基本概念异步编程asyncioaiohttp 异常常见异常异常处理自定义异常 lambda表达式lambda表达式用法 高阶函数functoolsitertools无穷迭代器最短输入序列长度停止迭代器排列组合迭代器

前言

本篇博客主要记录Python的一些高级用法。虽说是高级用法,但实际上是本人的一些薄弱项,以这篇博客作为记录。内容包括多线程,多进程,异常处理等方面的知识,里面有些内容与之前的博客有重叠部分,如多线程多进程可以参考:深度学习部署经验,也有一些薄弱的部分没有在此博客记录,如:Python的迭代器等知识。

多线程多进程

多线程多进程可以参考:深度学习部署经验,具体代码实现也可以参考深度学习部署经验一文。

多线程

让一个进程同时执行一段代码,用起来类似于多进程,但是区别在于线程与线程之间能够共享资源。python不太推荐用多线程,因为GIL的存在。推荐使用multiprocessing或者concurrent.futures.ProcessPoolExecutor。但是如果想要同时运行多个I/O密集型任务,多线程仍然是一个合适的模型。线程之间的对于共享进程的数据需要考虑线程安全的问题,由于进程之间是隔离的,拥有独立的内存空间资源,相对比较安全。

多线程官网地址:python多线程参考文档

多进程

多进程的通信方式:管道,FIFO,消息队列,信号,共享内存,socket,stream流。同步方式是PV信号量,管程。

多进程官网地址:python3.8 多进程,python3.8 多进程共享内存

协程

协程运行与线程之上,当一个协程完成后,可以选择主动让出,让另一个协程运行在当前线程上。协程并没有增加线程数量,只是在线程的基础上通过分时复用的方式运行多核协程,而且协程的切换在用户态完成,切换的代价比线程从用户态到内核态的代价小很多。最有效的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。

协程最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。其次协程不需要多线程的机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

gevent官网

总结

Python的多线程多进程比较特殊。由于Python的GIL锁存在,在同一个进程下开启多线程,同一时刻只能有一个线程在运行。在这个背景下,有些任务想要通过使用多线程进行加速的话,反而适得其反,比如计算密集型任务,在线程的切换需要耗费大量时间,利用不了多核优势,从而导致执行效率不比单线程高。针对这种任务可以使用多进程。而有些任务如IO密集型任务,比如频繁的读写文件,可以使用到多线程,因为IO阻塞足以提供足够的时间给解释器进行线程切换。想要达到提升效率,一般会使用多进程+协程的方案。在这里就有协程的概念。协程是比线程还要小的概念。协程是运行在当前线程之上,没有增加线程数量,因此也无需做协程切换,增加程序开销。对一些任务如爬虫等可以使用多进程+协程的方式,既可以充分利用多核,又充分发挥协程的高效率,获得更高的性能。

异步

关于异步操作,之前可以通过消息队列,如RabbitMQ和redis构造简单的消息队列来进行处理,但是添加了组件会增加系统的复杂度,因此学习Python的一些异步编程来进行横向对比,看看两者的方法。其实异步与前面提到的多线程多进程也有一些知识点是重叠的。这一部分主要参考的是深入理解Python异步编程。

基本概念

阻塞:程序未得到所需计算资源时被挂起的状态;程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。非阻塞:程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。同步:同步意味着有序。不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。这是串行的。异步:为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式。不相关的程序单元之间可以是异步的。这是并行。

并行是为了利用多核加速多任务完成的进度。并发是为了让独立的子任务都有机会被尽快执行,但不一定能加速整体进度。非阻塞是为了提高程序整体执行效率。异步是高效地组织非阻塞任务的方式。要支持并发,必须拆分为多任务,不同任务相对而言才有阻塞/非阻塞、同步/异步。所以,并发、异步、非阻塞三个词总是如影随形。

异步编程

以进程、线程、协程、函数/方法作为执行任务程序的基本单位,结合回调、事件循环、信号量等机制,以提高程序整体执行效率和并发能力的编程方式。如果在某程序的运行时,能根据已经执行的指令准确判断它接下来要进行哪个具体操作,那它是同步程序,反之则为异步程序。(无序与有序的区别)同步/异步、阻塞/非阻塞并非水火不容,要看讨论的程序所处的封装级别。例如购物程序在处理多个用户的浏览请求可以是异步的,而更新库存时必须是同步的。异步编程有如下难处:

执行顺序不可预料,当下正要发生什么事件不可预料。在并行情况下更为复杂和艰难。如果某事件处理程序需要长时间执行,所有其他部分都会被阻塞。程序下一步行为往往依赖上一步执行结果,如何知晓上次异步调用已完成并获取结果?回调(Callback)成了必然选择。那又需要面临“回调地狱”的折磨。同步代码改为异步代码,必然破坏代码结构。解决问题的逻辑也要转变,不再是一条路走到黑,需要精心安排异步任务。

asyncio

复制代码import timeimport asyncio# 定义异步函数async def hello():await asyncio.sleep(1)print('Hello World:%s' % time.time())if __name__ =='__main__':loop = asyncio.get_event_loop()tasks = [hello() for i in range(5)]loop.run_until_complete(asyncio.wait(tasks))

aiohttp

import asynciofrom aiohttp import ClientSessiontasks = []url = "/{}"async def hello(url):async with ClientSession() as session:async with session.get(url) as response:response = await response.read()print(response)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(hello(url))

收集Http响应:

import timeimport asynciofrom aiohttp import ClientSessiontasks = []url = "/{}"async def hello(url):async with ClientSession() as session:async with session.get(url) as response:# print(response)print('Hello World:%s' % time.time())return await response.read()def run():for i in range(5):task = asyncio.ensure_future(hello(url.format(i)))tasks.append(task)result = loop.run_until_complete(asyncio.gather(*tasks))print(result)if __name__ == '__main__':loop = asyncio.get_event_loop()run()

限制并发数量,限制最大并发数量:

#coding:utf-8import time,asyncio,aiohttpurl = '/'async def hello(url,semaphore):async with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as response:return await response.read()async def run():semaphore = asyncio.Semaphore(500) # 限制并发量为500to_get = [hello(url.format(),semaphore) for _ in range(1000)] #总共1000任务await asyncio.wait(to_get)if __name__ == '__main__':# now=lambda :time.time()loop = asyncio.get_event_loop()loop.run_until_complete(run())loop.close()

异常

Python的异常处理主要是了解常见异常、捕捉处理异常的方法和自定义异常。

常见异常

异常处理

捕捉异常可以使用try/except语句:

try:<语句> #运行别的代码except <名字>:<语句> #如果在try部份引发了'name'异常except <名字>,<数据>:<语句> #如果引发了'name'异常,获得附加的数据else:<语句> #如果没有异常发生

或者try/finnally语句:

try:<语句>finally:<语句> #退出try时总会执行raise

自定义异常

class Networkerror(RuntimeError):def __init__(self, arg):self.args = arg

触发异常:

try:raise Networkerror("Bad hostname")except Networkerror as e:print(e)

lambda表达式

lambda表达式也称为匿名表达式,其实不太建议使用lambda表达式,因为使用lambda表达式代码可读性不是特别强,但不使用的话代码的风格不够Pythonic。lambda表达式的形式如下:

lambda argument_list:expersion

lambda表达式用法

像一般函数一样调用:

c=lambda x,y,z:x*y*zc(2,3,4)24

或者

(lambda x:x**2)(3)9

将lambda函数作为参数传递给其他函数比如说结合map、filter、sorted、reduce等一些Python内置函数使用。

fliter(lambda x:x%3==0,[1,2,3,4,5,6])[3,6]squares = map(lambda x:x**2,range(5))print(lsit(squares))[0,1,4,9,16]

Names = ['Anne', 'Amy', 'Bob', 'David', 'Carrie', 'Barbara', 'Zach']B_Name= filter(lambda x: x.startswith('B'),Names)print(B_Name)['Bob', 'Barbara']

嵌套使用将lambda函数嵌套到普通函数中,lambda函数本身做为return的值。

def increment(n):return lambda x:x+nf=increment(4)f(2)6

高阶函数

Python的常见的高阶函数包含有mapreducefiltersortedpartial。具体可以参考Python的高阶函数模块functools

functools

functools.cmp_to_key(func)functools.total_ordering(cls)functools.reduce(function, iterable[, initializer])functools.partial(func[, args][, *keywords])functools.update_wrapper(wrapper, wrapped[, assigned][, updated])functools.wraps(wrapped[, assigned][, updated])functools.lru_cache(maxsize=128, typed=False)functools.partialmethod(func, *args, **keywords)functools.singledispatch(default)

具体可以参考:functools—高阶函数和可调用对象上的操作

itertools

itertools可以参考:为高效循环而创建迭代器的函数,里面包含有如下迭代器:

无穷迭代器

Count

def count(start=0, step=1):# count(10) --> 10 11 12 13 14 ...# count(2.5, 0.5) -> 2.5 3.0 3.5 ...n = startwhile True:yield nn += step

cycle

def cycle(iterable):# cycle('ABCD') --> A B C D A B C D A B C D ...saved = []for element in iterable:yield elementsaved.append(element)while saved:for element in saved:yield element

repeat

def repeat(object, times=None):# repeat(10, 3) --> 10 10 10if times is None:while True:yield objectelse:for i in range(times):yield object

最短输入序列长度停止迭代器

accumulate

def accumulate(iterable, func=operator.add, *, initial=None):'Return running totals'# accumulate([1,2,3,4,5]) --> 1 3 6 10 15# accumulate([1,2,3,4,5], initial=100) --> 100 101 103 106 110 115# accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120it = iter(iterable)total = initialif initial is None:try:total = next(it)except StopIteration:returnyield totalfor element in it:total = func(total, element)yield total

chain

def chain(*iterables):# chain('ABC', 'DEF') --> A B C D E Ffor it in iterables:for element in it:yield element

compress

def compress(data, selectors):# compress('ABCDEF', [1,0,1,0,1,1]) --> A C E Freturn (d for d, s in zip(data, selectors) if s)

dropwhile

def dropwhile(predicate, iterable):# dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1iterable = iter(iterable)for x in iterable:if not predicate(x):yield xbreakfor x in iterable:yield x

filterfalse

def filterfalse(predicate, iterable):# filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8if predicate is None:predicate = boolfor x in iterable:if not predicate(x):yield x

groupby

groups = []uniquekeys = []data = sorted(data, key=keyfunc)for k, g in groupby(data, keyfunc):groups.append(list(g))# Store group iterator as a listuniquekeys.append(k)

islice

def islice(iterable, *args):# islice('ABCDEFG', 2) --> A B# islice('ABCDEFG', 2, 4) --> C D# islice('ABCDEFG', 2, None) --> C D E F G# islice('ABCDEFG', 0, None, 2) --> A C E Gs = slice(*args)start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1it = iter(range(start, stop, step))try:nexti = next(it)except StopIteration:# Consume *iterable* up to the *start* position.for i, element in zip(range(start), iterable):passreturntry:for i, element in enumerate(iterable):if i == nexti:yield elementnexti = next(it)except StopIteration:# Consume to *stop*.for i, element in zip(range(i + 1, stop), iterable):pass

takewhile

def takewhile(predicate, iterable):# takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4for x in iterable:if predicate(x):yield xelse:break

tee

def tee(iterable, n=2):it = iter(iterable)deques = [collections.deque() for i in range(n)]def gen(mydeque):while True:if not mydeque: # when the local deque is emptytry:newval = next(it) # fetch a new value andexcept StopIteration:returnfor d in deques: # load it to all the dequesd.append(newval)yield mydeque.popleft()return tuple(gen(d) for d in deques)

zip_longest

def zip_longest(*args, fillvalue=None):# zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-iterators = [iter(it) for it in args]num_active = len(iterators)if not num_active:returnwhile True:values = []for i, it in enumerate(iterators):try:value = next(it)except StopIteration:num_active -= 1if not num_active:returniterators[i] = repeat(fillvalue)value = fillvaluevalues.append(value)yield tuple(values)

排列组合迭代器

product

def product(*args, repeat=1):# product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy# product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111pools = [tuple(pool) for pool in args] * repeatresult = [[]]for pool in pools:result = [x+[y] for x in result for y in pool]for prod in result:yield tuple(prod)

permutations

def permutations(iterable, r=None):# permutations('ABCD', 2) --> AB AC AD BA BC BD CA CB CD DA DB DC# permutations(range(3)) --> 012 021 102 120 201 210pool = tuple(iterable)n = len(pool)r = n if r is None else rif r > n:returnindices = list(range(n))cycles = list(range(n, n-r, -1))yield tuple(pool[i] for i in indices[:r])while n:for i in reversed(range(r)):cycles[i] -= 1if cycles[i] == 0:indices[i:] = indices[i+1:] + indices[i:i+1]cycles[i] = n - ielse:j = cycles[i]indices[i], indices[-j] = indices[-j], indices[i]yield tuple(pool[i] for i in indices[:r])breakelse:return

combinations

def combinations(iterable, r):# combinations('ABCD', 2) --> AB AC AD BC BD CD# combinations(range(4), 3) --> 012 013 023 123pool = tuple(iterable)n = len(pool)if r > n:returnindices = list(range(r))yield tuple(pool[i] for i in indices)while True:for i in reversed(range(r)):if indices[i] != i + n - r:breakelse:returnindices[i] += 1for j in range(i+1, r):indices[j] = indices[j-1] + 1yield tuple(pool[i] for i in indices)

combinations_with_replacement

def combinations_with_replacement(iterable, r):# combinations_with_replacement('ABC', 2) --> AA AB AC BB BC CCpool = tuple(iterable)n = len(pool)if not n and r:returnindices = [0] * ryield tuple(pool[i] for i in indices)while True:for i in reversed(range(r)):if indices[i] != n - 1:breakelse:returnindices[i:] = [indices[i] + 1] * (r - i)yield tuple(pool[i] for i in indices)

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。