打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Django中使用Celery执行异步和定时任务的注意事项

WEB前端开发社区 昨天

1. Windows中使用Celery 4.0及以后版本
Celery 4.0+及以后版本不支持在windows系统上运行。如果你希望在windows系统上使用celery, 有两种方法。

方法一:安装3.1.25版本

pip install celery==3.1.25
方法二:安装gevent



pip install gevent# 启动workercelery -A <module> worker -l info -P gevent

2. @task与@shared_task的区别

当我们使用@app.task装饰器定义我们的异步任务时,那么这个任务依赖于根据项目名myproject生成的Celery实例。





app = Celery('myproject')
@app.task(bind=True)def debug_task(self): print('Request: {0!r}'.format(self.request))
然而我们在进行Django开发时为了保证每个app的可重用性,我们经常会在每个app文件夹下编写异步任务,这些任务并不依赖于具体的Django项目名。使用@shared_task 装饰器能让我们避免对某个项目名对应Celery实例的依赖,使app的可移植性更强。






from __future__ import absolute_importfrom celery import shared_task
@shared_taskdef add(x, y): return x + y

3. 如果异步的任务包括耗时的I/O操作

一个无限期阻塞的任务会使得工作单元无法再做其他事情。如果你的任务里有 I/O 操作,请确保给这些操作加上超时时间,例如使用 requests 库时给网络请求添加一个超时时间:


connect_timeout, read_timeout = 5.0, 30.0response = requests.get(URL, timeout=(connect_timeout, read_timeout))
默认的 prefork 池调度器对长时间任务不是很友好,所以如果你的任务需要运行很长时间,确保在启动工作单元时使能了 -ofair 选项。

4. 使用多装饰器
当使用多个装饰器装饰任务函数时,确保 task 装饰器最后应用(在python中,这意味它必须在第一个位置):





@app.task@decorator2@decorator1def add(x, y):    return x + y

5. 使用bind=True绑定任务
一个绑定任务意味着任务函数的第一个参数总是任务实例本身(self),就像 Python 绑定方法类似,如下例所示:






from celery.utils.log import get_task_loggerlogger = get_task_logger(__name__)
@task(bind=True)def add(self, x, y): logger.info(self.request.id)
绑定任务在这些情况下是必须的:任务重试(使用 app.Task.retry() ),访问当前任务请求的信息,以及你添加到自定义任务基类的附加功能。

6. 忽略不想要的结果
如果你不在意任务的返回结果,可以设置 ignore_result 选项,因为存储结果耗费时间和资源。你还可以可以通过 task_ignore_result 设置全局忽略任务结果。



@app.task(ignore_result=True)def mytask():    something()

7. 避免启动同步子任务
让一个任务等待另外一个任务的返回结果是很低效的,并且如果工作单元池被耗尽的话这将会导致死锁。

# 坏例子

















@app.taskdef update_page_info(url):    page = fetch_page.delay(url).get()    info = parse_page.delay(url, page).get()    store_page_info.delay(url, info)
@app.taskdef fetch_page(url): return myhttplib.get(url)
@app.taskdef parse_page(url, page): return myparser.parse_document(page)
@app.taskdef store_page_info(url, info): return PageInfo.objects.create(url, info)
# 好例子

















def update_page_info(url):    # fetch_page -> parse_page -> store_page    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)    chain()
@app.task()def fetch_page(url): return myhttplib.get(url)
@app.task()def parse_page(page): return myparser.parse_document(page)
@app.task(ignore_result=True)def store_page_info(info, url): PageInfo.objects.create(url=url, info=info)
在好例子里,我们将不同的任务签名链接起来创建一个任务链,三个子任务按顺序执行警告:不建议同步执行子任务!


8.  Django的模型对象不应该作为参数传递给任务
Django 的模型对象。他们不应该作为参数传递给任务。几乎总是在任务运行时从数据库获取对象是最好的,因为老的数据会导致竞态条件。假象有这样一个场景,你有一篇文章,以及自动展开文章中缩写的任务:








class Article(models.Model):    title = models.CharField()    body = models.TextField()
@app.taskdef expand_abbreviations(article): article.body.replace('MyCorp', 'My Corporation') article.save()
首先,作者创建一篇文章并保存,这时作者点击一个按钮初始化一个缩写展开任务:


>>> article = Article.objects.get(id=102)>>> expand_abbreviations.delay(article)
现在,队列非常忙,所以任务在2分钟内都不会运行。与此同时,另一个作者修改了这篇文章,当这个任务最终运行,因为老版本的文章作为参数传递给了这个任务,所以这篇文章会回滚到老的版本。修复这个竞态条件很简单,只要参数传递文章的 id 即可,此时可以在任务中重新获取这篇文章:





@app.taskdef expand_abbreviations(article_id):    article = Article.objects.get(id=article_id)    article.body.replace('MyCorp', 'My Corporation')    article.save()

9. 使用on_commit函数处理事务
我们再看另外一个celery中处理事务的例子。这是在数据库中创建一个文章对象的 Django 视图,此时传递主键给任务。它使用 commit_on_success 装饰器,当视图返回时该事务会被提交,当视图抛出异常时会进行回滚。






from django.db import transaction
@transaction.commit_on_successdef create_article(request): article = Article.objects.create() expand_abbreviations.delay(article.pk)
如果在事务提交之前任务已经开始执行会产生一个竞态条件;数据库对象还不存在。解决方案是使用 on_commit 回调函数来在所有事务提交成功后启动任务。




from django.db.transaction import on_commitdef create_article(request):    article = Article.objects.create()    on_commit(lambda: expand_abbreviations.delay(article.pk))
10. 自定义重试延迟
当任务发送例外时,app.Task.retry() 函数可以用来重新执行任务。当一个任务被重试,它在重试前会等待给定的时间,并且默认的由 default_retry_delay 属性定义。默认设置为 3 分钟。注意延迟设置的单位是秒(int 或者 float)。你可以通过提供 countdown 参数覆盖这个默认值。








# retry in 30 minutes.@app.task(bind=True, default_retry_delay=30 * 60)  def add(self, x, y):try:        something_raising()except Exception as exc:# overrides the default delay to retry after 1 minuteraise self.retry(exc=exc, countdown=60)


声明:
本文于网络整理,版权归原作者所有,如来源信息有误或侵犯权益,请联系我们删除或授权事宜。
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Django blog项目《二十五》:项目优化《1》使用celery异步任务和定时任务
Celery定时任务
python测试开发django-157.celery异步与redis环境搭建
python 分布式库celery介绍
两万字长文让你彻底掌握 celery
celery的建议
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服