文章目录
前言多线程多进程多线程多进程协程总结 异步基本概念异步编程asyncioaiohttp 异常常见异常异常处理自定义异常 lambda表达式lambda表达式用法 高阶函数functoolsitertools无穷迭代器最短输入序列长度停止迭代器排列组合迭代器前言
本篇博客主要记录Python的一些高级用法。虽说是高级用法,但实际上是本人的一些薄弱项,以这篇博客作为记录。内容包括多线程,多进程,异常处理等方面的知识,里面有些内容与之前的博客有重叠部分,如多线程多进程可以参考:深度学习部署经验,也有一些薄弱的部分没有在此博客记录,如:Python的迭代器等知识。
多线程多进程
多线程多进程可以参考:深度学习部署经验,具体代码实现也可以参考深度学习部署经验一文。
多线程
让一个进程同时执行一段代码,用起来类似于多进程,但是区别在于线程与线程之间能够共享资源。python不太推荐用多线程,因为GIL的存在。推荐使用multiprocessing或者concurrent.futures.ProcessPoolExecutor。但是如果想要同时运行多个I/O密集型任务,多线程仍然是一个合适的模型。线程之间的对于共享进程的数据需要考虑线程安全的问题,由于进程之间是隔离的,拥有独立的内存空间资源,相对比较安全。
多线程官网地址:python多线程参考文档
多进程
多进程的通信方式:管道,FIFO,消息队列,信号,共享内存,socket,stream流。同步方式是PV信号量,管程。
多进程官网地址:python3.8 多进程,python3.8 多进程共享内存
协程
协程运行与线程之上,当一个协程完成后,可以选择主动让出,让另一个协程运行在当前线程上。协程并没有增加线程数量,只是在线程的基础上通过分时复用的方式运行多核协程,而且协程的切换在用户态完成,切换的代价比线程从用户态到内核态的代价小很多。最有效的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
协程最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。其次协程不需要多线程的机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
gevent官网
总结
Python的多线程多进程比较特殊。由于Python的GIL锁存在,在同一个进程下开启多线程,同一时刻只能有一个线程在运行。在这个背景下,有些任务想要通过使用多线程进行加速的话,反而适得其反,比如计算密集型任务,在线程的切换需要耗费大量时间,利用不了多核优势,从而导致执行效率不比单线程高。针对这种任务可以使用多进程。而有些任务如IO密集型任务,比如频繁的读写文件,可以使用到多线程,因为IO阻塞足以提供足够的时间给解释器进行线程切换。想要达到提升效率,一般会使用多进程+协程的方案。在这里就有协程的概念。协程是比线程还要小的概念。协程是运行在当前线程之上,没有增加线程数量,因此也无需做协程切换,增加程序开销。对一些任务如爬虫等可以使用多进程+协程的方式,既可以充分利用多核,又充分发挥协程的高效率,获得更高的性能。
异步
关于异步操作,之前可以通过消息队列,如RabbitMQ和redis构造简单的消息队列来进行处理,但是添加了组件会增加系统的复杂度,因此学习Python的一些异步编程来进行横向对比,看看两者的方法。其实异步与前面提到的多线程多进程也有一些知识点是重叠的。这一部分主要参考的是深入理解Python异步编程。
基本概念
阻塞:程序未得到所需计算资源时被挂起的状态;程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。非阻塞:程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。同步:同步意味着有序。不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。这是串行的。异步:为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式。不相关的程序单元之间可以是异步的。这是并行。并行是为了利用多核加速多任务完成的进度。并发是为了让独立的子任务都有机会被尽快执行,但不一定能加速整体进度。非阻塞是为了提高程序整体执行效率。异步是高效地组织非阻塞任务的方式。要支持并发,必须拆分为多任务,不同任务相对而言才有阻塞/非阻塞、同步/异步。所以,并发、异步、非阻塞三个词总是如影随形。
异步编程
以进程、线程、协程、函数/方法作为执行任务程序的基本单位,结合回调、事件循环、信号量等机制,以提高程序整体执行效率和并发能力的编程方式。如果在某程序的运行时,能根据已经执行的指令准确判断它接下来要进行哪个具体操作,那它是同步程序,反之则为异步程序。(无序与有序的区别)同步/异步、阻塞/非阻塞并非水火不容,要看讨论的程序所处的封装级别。例如购物程序在处理多个用户的浏览请求可以是异步的,而更新库存时必须是同步的。异步编程有如下难处:
执行顺序不可预料,当下正要发生什么事件不可预料。在并行情况下更为复杂和艰难。如果某事件处理程序需要长时间执行,所有其他部分都会被阻塞。程序下一步行为往往依赖上一步执行结果,如何知晓上次异步调用已完成并获取结果?回调(Callback)成了必然选择。那又需要面临“回调地狱”的折磨。同步代码改为异步代码,必然破坏代码结构。解决问题的逻辑也要转变,不再是一条路走到黑,需要精心安排异步任务。
asyncio
复制代码import timeimport asyncio# 定义异步函数async def hello():await asyncio.sleep(1)print('Hello World:%s' % time.time())if __name__ =='__main__':loop = asyncio.get_event_loop()tasks = [hello() for i in range(5)]loop.run_until_complete(asyncio.wait(tasks))
aiohttp
import asynciofrom aiohttp import ClientSessiontasks = []url = "/{}"async def hello(url):async with ClientSession() as session:async with session.get(url) as response:response = await response.read()print(response)if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(hello(url))
收集Http响应:
import timeimport asynciofrom aiohttp import ClientSessiontasks = []url = "/{}"async def hello(url):async with ClientSession() as session:async with session.get(url) as response:# print(response)print('Hello World:%s' % time.time())return await response.read()def run():for i in range(5):task = asyncio.ensure_future(hello(url.format(i)))tasks.append(task)result = loop.run_until_complete(asyncio.gather(*tasks))print(result)if __name__ == '__main__':loop = asyncio.get_event_loop()run()
限制并发数量,限制最大并发数量:
#coding:utf-8import time,asyncio,aiohttpurl = '/'async def hello(url,semaphore):async with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as response:return await response.read()async def run():semaphore = asyncio.Semaphore(500) # 限制并发量为500to_get = [hello(url.format(),semaphore) for _ in range(1000)] #总共1000任务await asyncio.wait(to_get)if __name__ == '__main__':# now=lambda :time.time()loop = asyncio.get_event_loop()loop.run_until_complete(run())loop.close()
异常
Python的异常处理主要是了解常见异常、捕捉处理异常的方法和自定义异常。
常见异常
异常处理
捕捉异常可以使用try/except语句:
try:<语句> #运行别的代码except <名字>:<语句> #如果在try部份引发了'name'异常except <名字>,<数据>:<语句> #如果引发了'name'异常,获得附加的数据else:<语句> #如果没有异常发生
或者try/finnally语句:
try:<语句>finally:<语句> #退出try时总会执行raise
自定义异常
class Networkerror(RuntimeError):def __init__(self, arg):self.args = arg
触发异常:
try:raise Networkerror("Bad hostname")except Networkerror as e:print(e)
lambda表达式
lambda表达式也称为匿名表达式,其实不太建议使用lambda表达式,因为使用lambda表达式代码可读性不是特别强,但不使用的话代码的风格不够Pythonic。lambda表达式的形式如下:
lambda argument_list:expersion
lambda表达式用法
像一般函数一样调用:c=lambda x,y,z:x*y*zc(2,3,4)24
或者
(lambda x:x**2)(3)9
将lambda函数作为参数传递给其他函数比如说结合map、filter、sorted、reduce等一些Python内置函数使用。
fliter(lambda x:x%3==0,[1,2,3,4,5,6])[3,6]squares = map(lambda x:x**2,range(5))print(lsit(squares))[0,1,4,9,16]
Names = ['Anne', 'Amy', 'Bob', 'David', 'Carrie', 'Barbara', 'Zach']B_Name= filter(lambda x: x.startswith('B'),Names)print(B_Name)['Bob', 'Barbara']
嵌套使用将lambda函数嵌套到普通函数中,lambda函数本身做为return的值。
def increment(n):return lambda x:x+nf=increment(4)f(2)6
高阶函数
Python的常见的高阶函数包含有map
、reduce
、filter
、sorted
、partial
。具体可以参考Python的高阶函数模块functools
。
functools
functools.cmp_to_key(func)functools.total_ordering(cls)functools.reduce(function, iterable[, initializer])functools.partial(func[, args][, *keywords])functools.update_wrapper(wrapper, wrapped[, assigned][, updated])functools.wraps(wrapped[, assigned][, updated])functools.lru_cache(maxsize=128, typed=False)functools.partialmethod(func, *args, **keywords)functools.singledispatch(default)具体可以参考:functools—高阶函数和可调用对象上的操作
itertools
itertools可以参考:为高效循环而创建迭代器的函数,里面包含有如下迭代器:
无穷迭代器
Countdef count(start=0, step=1):# count(10) --> 10 11 12 13 14 ...# count(2.5, 0.5) -> 2.5 3.0 3.5 ...n = startwhile True:yield nn += step
cycle
def cycle(iterable):# cycle('ABCD') --> A B C D A B C D A B C D ...saved = []for element in iterable:yield elementsaved.append(element)while saved:for element in saved:yield element
repeat
def repeat(object, times=None):# repeat(10, 3) --> 10 10 10if times is None:while True:yield objectelse:for i in range(times):yield object
最短输入序列长度停止迭代器
accumulatedef accumulate(iterable, func=operator.add, *, initial=None):'Return running totals'# accumulate([1,2,3,4,5]) --> 1 3 6 10 15# accumulate([1,2,3,4,5], initial=100) --> 100 101 103 106 110 115# accumulate([1,2,3,4,5], operator.mul) --> 1 2 6 24 120it = iter(iterable)total = initialif initial is None:try:total = next(it)except StopIteration:returnyield totalfor element in it:total = func(total, element)yield total
chain
def chain(*iterables):# chain('ABC', 'DEF') --> A B C D E Ffor it in iterables:for element in it:yield element
compress
def compress(data, selectors):# compress('ABCDEF', [1,0,1,0,1,1]) --> A C E Freturn (d for d, s in zip(data, selectors) if s)
dropwhile
def dropwhile(predicate, iterable):# dropwhile(lambda x: x<5, [1,4,6,4,1]) --> 6 4 1iterable = iter(iterable)for x in iterable:if not predicate(x):yield xbreakfor x in iterable:yield x
filterfalse
def filterfalse(predicate, iterable):# filterfalse(lambda x: x%2, range(10)) --> 0 2 4 6 8if predicate is None:predicate = boolfor x in iterable:if not predicate(x):yield x
groupby
groups = []uniquekeys = []data = sorted(data, key=keyfunc)for k, g in groupby(data, keyfunc):groups.append(list(g))# Store group iterator as a listuniquekeys.append(k)
islice
def islice(iterable, *args):# islice('ABCDEFG', 2) --> A B# islice('ABCDEFG', 2, 4) --> C D# islice('ABCDEFG', 2, None) --> C D E F G# islice('ABCDEFG', 0, None, 2) --> A C E Gs = slice(*args)start, stop, step = s.start or 0, s.stop or sys.maxsize, s.step or 1it = iter(range(start, stop, step))try:nexti = next(it)except StopIteration:# Consume *iterable* up to the *start* position.for i, element in zip(range(start), iterable):passreturntry:for i, element in enumerate(iterable):if i == nexti:yield elementnexti = next(it)except StopIteration:# Consume to *stop*.for i, element in zip(range(i + 1, stop), iterable):pass
takewhile
def takewhile(predicate, iterable):# takewhile(lambda x: x<5, [1,4,6,4,1]) --> 1 4for x in iterable:if predicate(x):yield xelse:break
tee
def tee(iterable, n=2):it = iter(iterable)deques = [collections.deque() for i in range(n)]def gen(mydeque):while True:if not mydeque: # when the local deque is emptytry:newval = next(it) # fetch a new value andexcept StopIteration:returnfor d in deques: # load it to all the dequesd.append(newval)yield mydeque.popleft()return tuple(gen(d) for d in deques)
zip_longest
def zip_longest(*args, fillvalue=None):# zip_longest('ABCD', 'xy', fillvalue='-') --> Ax By C- D-iterators = [iter(it) for it in args]num_active = len(iterators)if not num_active:returnwhile True:values = []for i, it in enumerate(iterators):try:value = next(it)except StopIteration:num_active -= 1if not num_active:returniterators[i] = repeat(fillvalue)value = fillvaluevalues.append(value)yield tuple(values)
排列组合迭代器
productdef product(*args, repeat=1):# product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy# product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111pools = [tuple(pool) for pool in args] * repeatresult = [[]]for pool in pools:result = [x+[y] for x in result for y in pool]for prod in result:yield tuple(prod)
permutations
def permutations(iterable, r=None):# permutations('ABCD', 2) --> AB AC AD BA BC BD CA CB CD DA DB DC# permutations(range(3)) --> 012 021 102 120 201 210pool = tuple(iterable)n = len(pool)r = n if r is None else rif r > n:returnindices = list(range(n))cycles = list(range(n, n-r, -1))yield tuple(pool[i] for i in indices[:r])while n:for i in reversed(range(r)):cycles[i] -= 1if cycles[i] == 0:indices[i:] = indices[i+1:] + indices[i:i+1]cycles[i] = n - ielse:j = cycles[i]indices[i], indices[-j] = indices[-j], indices[i]yield tuple(pool[i] for i in indices[:r])breakelse:return
combinations
def combinations(iterable, r):# combinations('ABCD', 2) --> AB AC AD BC BD CD# combinations(range(4), 3) --> 012 013 023 123pool = tuple(iterable)n = len(pool)if r > n:returnindices = list(range(r))yield tuple(pool[i] for i in indices)while True:for i in reversed(range(r)):if indices[i] != i + n - r:breakelse:returnindices[i] += 1for j in range(i+1, r):indices[j] = indices[j-1] + 1yield tuple(pool[i] for i in indices)
combinations_with_replacement
def combinations_with_replacement(iterable, r):# combinations_with_replacement('ABC', 2) --> AA AB AC BB BC CCpool = tuple(iterable)n = len(pool)if not n and r:returnindices = [0] * ryield tuple(pool[i] for i in indices)while True:for i in reversed(range(r)):if indices[i] != n - 1:breakelse:returnindices[i:] = [indices[i] + 1] * (r - i)yield tuple(pool[i] for i in indices)