您现在的位置是: 网站首页 >Python Python

通过celery_one避免Celery定时任务重复执行

admin2019年2月21日 14:24 Django | Python 1411人已围观

# 通过celery_one 在使用Celery统计每日访问数量的时候,发现一个任务会同时执行两次,发现同一时间内(1s内)竟然同时发送了两次任务,也就是同时产生了两个worker,造成统计两次,一直找不到原因。 参考:https://blog.csdn.net/qq_41333582/article/details/83899884 有人使用 Redis 实现了分布式锁,然后也有人使用了 Celery Once。 Celery Once 也是利用 Redis 加锁来实现, Celery Once 在 Task 类基础上实现了 QueueOnce 类,该类提供了任务去重的功能,所以在使用时,我们自己实现的方法需要将 QueueOnce 设置为 base ```python @task(base=QueueOnce, once={'graceful': True}) ``` 后面的 `once` 参数表示,在遇到重复方法时的处理方式,默认 `graceful` 为 `False`,那样 Celery 会抛出 `AlreadyQueued` 异常,手动设置为 `True`,则静默处理。 另外如果要手动设置任务的 key,可以指定 keys 参数 ```python @celery.task(base=QueueOnce, once={'keys': ['a']}) def slow_add(a, b): sleep(30) return a + b ``` ## 解决步骤 Celery One允许你将Celery任务排队,防止多次执行 安装 ```bash pip install -U celery_once ``` 要求,需要Celery4.0,老版本可能运行,但不是官方支持的。 使用`celery_once`,`tasks`需要继承一个名为`QueueOnce`的抽象`base tasks` `Once`安装完成后,需要配置一些关于`ONCE`的选项在Celery配置中 ```python from celery import Celery from celery_once import QueueOnce from time import sleep celery = Celery('tasks', broker='amqp://guest@localhost//') # 一般之前的配置没有这个,需要添加上 celery.conf.ONCE = { 'backend': 'celery_once.backends.Redis', 'settings': { 'url': 'redis://localhost:6379/0', 'default_timeout': 60 * 60 } } # 在原本没有参数的里面加上base @celery.task(base=QueueOnce) def slow_task(): sleep(30) return "Done!" ``` 要确定配置,需要取决于使用哪个`backend`进行锁定,查看[Backends](https://pypi.org/project/celery_once/#backends) 在后端,这将覆盖`apply_async`和`delay`。它不影响直接调用任务。 在运行任务时,`celery_once`检查是否没有锁定(针对Redis键)。否则,任务将正常运行。一旦任务完成(或由于异常而结束),锁将被清除。如果在任务完成之前尝试再次运行该任务,将会引发`AlreadyQueued `异常。 ```python example.delay(10) example.delay(10) Traceback (most recent call last): .. AlreadyQueued() ``` ```python result = example.apply_async(args=(10)) result = example.apply_async(args=(10)) Traceback (most recent call last): .. AlreadyQueued() ``` **graceful**:如果在任务的选项中设置了`once={'graceful': True}`,或者在运行时设置了`apply_async`,则任务可以返回`None`,而不是引发`AlreadyQueued`异常。 ```python from celery_once import AlreadyQueued # Either catch the exception, try: example.delay(10) except AlreadyQueued: pass # Or, handle it gracefully at run time. result = example.apply(args=(10), once={'graceful': True}) # or by default. @celery.task(base=QueueOnce, once={'graceful': True}) def slow_task(): sleep(30) return "Done!" ``` 其他功能请访问:https://pypi.org/project/celery_once/

很赞哦! (0)

文章交流

  • emoji
0人参与,0条评论

当前用户

未登录,点击   登录

站点信息

  • 建站时间:网站已运行2077天
  • 系统信息:Linux
  • 后台程序:Python: 3.8.10
  • 网站框架:Django: 3.2.6
  • 文章统计:256 篇
  • 文章评论:60 条
  • 腾讯分析网站概况-腾讯分析
  • 百度统计网站概况-百度统计
  • 公众号:微信扫描二维码,关注我们
  • QQ群:QQ加群,下载网站的学习源码
返回
顶部
标题 换行 登录
网站