100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > python 任务队列 huey_使用python的分布式任务队列huey实现任务的异步化

python 任务队列 huey_使用python的分布式任务队列huey实现任务的异步化

时间:2022-12-07 03:37:53

相关推荐

python 任务队列 huey_使用python的分布式任务队列huey实现任务的异步化

前言:

一个轻型的任务队列,功能和相关的broker没有celery强大,重在轻型,而且代码读起来也比较的简单。

这次算是原文的翻译了.... 一开始看到这个东西的时候,想看看有没有中文的资料,能立马的入门,结果一看老外用的倒是挺多的,算了 既然是看文档,顺便也按照自己的意思翻译下把。

关于huey的介绍: (比celery轻型,比mrq、rq要好用 !)

a lightweight alternative.written in python

no deps outside stdlib, except redis (or roll your own backend)

support for django

supports:multi-threaded task execution

scheduled execution at a given time

periodic execution, like a crontab

retrying tasks that fail

task result storage

安装:Installing

hueycanbeinstalledveryeasilyusingpip.

pipinstallhuey

hueyhasnodependenciesoutsidethestandardlibrary,butcurrentlytheonlyfully-implementedqueuebackenditshipswithrequiresredis.Tousetheredisbackend,youwillneedtoinstallthepythonclient.

pipinstallredis

Usinggit

Ifyouwanttoruntheverylatest,feelfreetopulldowntherepofromgithubandinstallbyhand.

gitclone/coleifer/huey.git

cdhuey

pythonsetup.pyinstall

Youcanrunthetestsusingthetest-runner:

pythonsetup.pytest

关于huey的api,下面有详细的介绍及参数介绍的。

fromhueyimportRedisHuey,crontab

huey=RedisHuey('my-app',host='')

@huey.task()

defadd_numbers(a,b):

returna+b

@huey.periodic_task(crontab(minute='0',hour='3'))

defnightly_backup():

sync_all_data()

juey作为woker的时候,一些cli参数。

常用的是:

-l 关于日志文件的执行 。

-w workers的数目,-w的数值大了,肯定是增加任务的处理能力

-p --periodic 启动huey worker的时候,他会从tasks.py里面找到 需要crontab的任务,会派出几个线程专门处理这些事情。

-n 不启动关于crontab里面的预周期执行,只有你触发的时候,才会执行周期星期的任务。

--threads 意思你懂的。#原文:

Thefollowingtableliststheoptionsavailablefortheconsumeraswellastheirdefaultvalues.

-l,--logfile

Pathtofileusedforlogging.Whenafileisspecified,bydefaultHueywillusearotatingfilehandler(1MB/chunk)withamaximumof3backups.Youcanattachyourownhandler(huey.logger)aswell.ThedefaultloglevelisINFO.

-v,--verbose

Verboselogging(equatestoDEBUGlevel).Ifnologfileisspecifiedandverboseisset,thentheconsumerwilllogtotheconsole.Thisisveryusefulfortesting/debugging.

-q,--quiet

Onlylogerrors.ThedefaultloglevelfortheconsumerisINFO.

-w,--workers

Numberofworkerthreads,thedefaultis1threadbutforapplicationsthathavemanyI/Oboundtasks,increasingthisnumbermayleadtogreaterthroughput.

-p,--periodic

Indicatethatthisconsumerprocessshouldstartathreaddedicatedtoenqueueing“periodic”tasks(crontab-likefunctionality).ThisdefaultstoTrue,soshouldnotneedtobespecifiedinpractice.

-n,--no-periodic

Indicatethatthisconsumerprocessshouldnotenqueueperiodictasks.

-d,--delay

Whenusinga“polling”-typequeuebackend,theamountoftimetowaitbetweenpollingthebackend.Defaultis0.1seconds.

-m,--max-delay

Themaximumamountoftimetowaitbetweenpolling,ifusingweightedbackoff.Defaultis10seconds.

-b,--backoff

Theamounttoback-offwhenpollingforresults.Mustbegreaterthanone.Defaultis1.15.

-u,--utc

IndicatesthattheconsumershoulduseUTCtimeforalltasks,crontabsandscheduling.DefaultisTrue,soinpracticeyoushouldnotneedtospecifythisoption.

--localtime

Indicatesthattheconsumershoulduselocaltimeforalltasks,crontabsandscheduling.DefaultisFalse.

Examples

Runningtheconsumerwith8threads,alogfileforerrorsonly,andaveryshortpollinginterval:

huey_consumer.pymy.app.huey-l/var/log/app.huey.log-w8-b1.1-m1.0

任务队列huey 是靠着redis来实现queue的任务存储,所以需要咱们提前先把redis-server和redis-py都装好。 安装的方法就不说了,自己搜搜吧。

我们首先创建下huey的链接实例 :#config.py

fromhueyimportHuey

fromhuey.backends.redis_backendimportRedisBlockingQueue

queue=RedisBlockingQueue('test-queue',host='localhost',port=6379)

huey=Huey(queue)

然后就是关于任务的,也就是你想让谁到任务队列这个圈子里面,和celey、rq,mrq一样,都是用tasks.py表示的。

fromconfigimporthuey#importthehueyweinstantiatedinconfig.py

@huey.task()

defcount_beans(num):

print'--counted%sbeans--'%num

再来一个真正去执行的 。 main.py 相当于生产者,tasks.py相当于消费者的关系。 main.py负责喂数据。main.py

fromconfigimporthuey#importour"huey"object

fromtasksimportcount_beans#importourtask

if__name__=='__main__':

beans=raw_input('Howmanybeans?')

count_beans(int(beans))

print'Enqueuedjobtocount%sbeans'%beans

Ensure you have Redis running locally

Ensure you have installed huey

Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).

Run the main program: python main.py

和celery、rq一样,他的结果获取是需要在你的config.py或者主代码里面指明他的存储的方式,现在huey还仅仅是支持redis,但相对他的特点和体积,这已经很足够了 !

只是那几句话而已,导入RedisDataStore库,申明下存储的地址。fromhueyimportHuey

fromhuey.backends.redis_backendimportRedisBlockingQueue

fromhuey.backends.redis_backendimportRedisDataStore#ADDTHISLINE

queue=RedisBlockingQueue('test-queue',host='localhost',port=6379)

result_store=RedisDataStore('results',host='localhost',port=6379)#ADDED

huey=Huey(queue,result_store=result_store)#ADDEDresultstore

这个时候,我们在ipython再次去尝试的时候,会发现可以获取到tasks.py里面的return值了 其实你在main.py里面获取的时候,他还是通过uuid从redis里面取出来的。>>>frommainimportcount_beans

>>>res=count_beans(100)

>>>res#whatis"res"?

>>>res.get()#gettheresultofthistask

'Counted100beans'

huey也是支持celey的延迟执行和crontab的功能 。 这些功能很是重要,可以自定义的优先级或者不用再借助linux本身的crontab。

用法很简单,多加一个delay的时间就行了,看了下huey的源码,他默认是立马执行的。当然还是要看你的线程是否都是待执行的状态了。>>>importdatetime

>>>res=count_beans.schedule(args=(100,),delay=60)

>>>res

>>>res.get()#thisreturnsNone,nodataisready

>>>res.get()#stillnodata...

>>>res.get(blocking=True)#ok,let'sjustblockuntilitsready

'Counted100beans'

再来一个重试retry的介绍,huey也是有retry,这个很是实用的东西。 如果大家有看到我的上面文章关于celery重试机制的介绍,应该也能明白huey是个怎么个回事了。 是的,他其实也是在tasks里具体函数的前面做了装饰器,装饰器里面有个func try 异常重试的逻辑 。 大家懂的。#tasks.py

fromdatetimeimportdatetime

fromconfigimporthuey

@huey.task(retries=3,retry_delay=10)

deftry_thrice():

print'trying....%s'%datetime.now()

raiseException('nope')

huey是给你反悔的机会饿 ~ 也就是说,你做了deley的计划任务后,如果你又想取消,那好看,直接revoke就可以了。#countsomebeans

res=count_beans(10000000)

res.revoke()

Thesameappliestotasksthatarescheduledinthefuture:

res=count_beans.schedule(args=(100000,),eta=in_the_future)

res.revoke()

@huey.task(crontab(minute='*'))

defprint_time():

printdatetime.now()

task() - 透明的装饰器,让你的函数变得优美点。

periodic_task() - 这个是周期性的任务

crontab() - 启动worker的时候,附带的crontab的周期任务。

BaseQueue - 任务队列

BaseDataStore - 任务执行后,可以把 结果塞入进去。 BAseDataStore可以自己重写。

原文:

官方的huey的git库里面是提供了相关的测试代码的:

main.pyfromconfigimporthuey

fromtasksimportcount_beans

if__name__=='__main__':

beans=raw_input('Howmanybeans?')

count_beans(int(beans))

print('Enqueuedjobtocount%sbeans'%beans)

tasks.py

importrandom

importtime

fromhueyimportcrontab

fromconfigimporthuey

@huey.task()

defcount_beans(num):

print"start..."

print('--counted%sbeans--'%num)

time.sleep(3)

print"end..."

return'Counted%sbeans'%num

@huey.periodic_task(crontab(minute='*/5'))

defevery_five_mins():

print('Consumerprintsthisevery5mins')

@huey.task(retries=3,retry_delay=10)

deftry_thrice():

ifrandom.randint(1,3)==1:

print('OK')

else:

print('Abouttofail,willretryin10seconds')

raiseException('Crapsomethingwentwrong')

@huey.task()

defslow(n):

time.sleep(n)

print('slept%s'%n)

run.sh#!/bin/bash

echo"HUEYCONSUMER"

echo"-------------"

echo"Inanotherterminal,run'pythonmain.py'"

echo"StoptheconsumerusingCtrl+C"

PYTHONPATH=.:$PYTHONPATH

python../../huey/bin/huey_consumer.pymain.huey--threads=2

=>

咱们可以先clone下huey的代码库。 里面有个examples例子目录,可以看到他是支持django的,但是这不是重点 !

[xiaorui@devops/tmp]$gitclone/coleifer/huey.git

Cloninginto'huey'...

remote:Countingobjects:1423,done.

remote:Compressingobjects:100%(9/9),done.

Receivingobjects:34%(497/1423),388.00KiB|29.00KiB/sKiB/s

Receivingobjects:34%(498/1423),628.00KiB|22.00KiB/s

remote:Total1423(delta0),reused0(delta0)

Receivingobjects:100%(1423/1423),2.24MiB|29.00KiB/s,done.

Resolvingdeltas:100%(729/729),done.

Checkingconnectivity...done.

[xiaorui@devops/tmp]$cdhuey/examples/simple

[xiaorui@devopssimple(master)]$ll

total40

-rw-r--r--1xiaoruiwheel79B9808:49README

-rw-r--r--1xiaoruiwheel0B9808:49__init__.py

-rw-r--r--1xiaoruiwheel56B9808:49config.py

-rwxr-xr-x1xiaoruiwheel227B9808:49cons.sh

-rw-r--r--1xiaoruiwheel205B9808:49main.py

-rw-r--r--1xiaoruiwheel607B9808:49tasks.py

[xiaorui@devopssimple(master)]$

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。