python异步任务处理框架celery 2020年06月03日 未雨晴空 0评论 543阅读 1喜欢 阅读模式 隐藏边栏 显示边栏 ## 关于Celery >Celery 是一款非常简单、灵活、可靠的分布式系统,可用于处理大量消息,并且提供了一整套操作此系统的一系列工具,同时Celery也是一款消息队列工具,可用于处理实时数据以及任务调度。 ## Celery关键词概念 * Task >异步任务和定时任务 * Broker >表示中间人,作用是负责接收生产者发布的任务并将任务存入队列,然后等待任务的消费者也就是下面的Worker来处理。但是Celery本身不提供队列服务,但是提供了配置项来来实现,一般通过Redis或RabbitMQ实现队列服务。 * Worker >字面意思是工人,实际上是执行任务的消费者,它实时监控消息队列,如果有任务就获取任务并执行它。 * Beat >定时任务调度器,根据配置定时相关参数将指定的任务按照指定的时间发送给Broker(中间人)。 * Backend >用于存储任务的执行结果。可以配置redis或者database作为backend ## celery的使用方式 >比方说现在站点注册需要在用户注册完成后发送激活邮件给用户,而后台发送邮件时间需要一定时间,而又不能同步等待邮件发送完成再响应页面,这样用户体验非常不好,这个时候我们就需要一个异步框架(celery)来帮我们完成这些任务。 ## 基于命令行下使用(非django环境下) * 本地测试环境:Windows 10+Python 3.6+Celery 4.3.0+redis 2.4.5 **`pip install redis` 安装的是客户端方便连接的,本地也还要起一个redis服务端**) * 通过pycharm初始化名为celerydemo的项目 * 新建tasks.py文件,内容如下: ```python from celery import Celery #第一个参数"my_task"是celery实例应用名 app=Celery("my_task",broker="redis://localhost:6379/0",backend="redis://localhost:6379/1") @app.task def send_mail(): print("发送邮件中****************************") return "邮件发送成功" ``` * 新建app.py文件,内容如下: ```python from tasks import send_mail if __name__ == '__main__': result=send_mail.delay() print(result) ``` * 通过pycharm打开terminal,如下图所示:  * 执行命令`celery worker -A tasks -l info -P eventlet` >-A表示当前的任务的模块名,这里就是task.py的文件名; -l表示celery的日志等级如info、debug -P 表示Pool implementation,线程池实现类? * 再新开一个terminal窗口执行命令`python app.py`,结果如下图: 上图红色箭头所指的需要关注,可以看到该任务已经执行完毕,并且返回值也输出来了 * 再通过pycharm打开python console界面,依次执行如下命令 ```python >>>from tasks import send_mail >>>send_mail.name 'tasks.send_mail' >>>send_mail.app >>>result=send_mail.delay() result >>>result.ready() True >>>result.get() '邮件发送成功' ``` ## 基于配置方式使用 * 在根目录下新建名为celery_app的**python package** * 在celery_app的目录下新建celeryconfig.py文件,内容如下: ```python BROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_TIMEZONE = 'Asia/Shanghai' # 导入指定的任务模块 CELERY_IMPORTS=['celery_app.task'] ``` 关于更多的配置参数请参考[Celery配置参数](https://www.jianshu.com/p/581fd8f92509)以及[celery的configuration.](http://docs.jinkan.org/docs/celery/configuration.html) * 在celery_app的__init__.py初始化celery应用实例,内容如下: ```python from celery import Celery app=Celery("demo") # 可以从配置对象中进行加载配置。 app.config_from_object("celery_app.celeryconfig") ``` * 在celery_app目录下新建task.py文件,内容如下: ```python from celery_app import app @app.task def send_mail(): print("发送邮件中****************************") return "邮件发送成功" ``` * 打开terminal窗口在根目录下执行如下命令: `celery worker -A celery_app -l info -P eventlet`,结果如下图: 可以看到配置文件内容生效以及celery也识别了创建的任务 * 再新开一个terminal窗口执行命令`python app.py`,结果如下图:  * 再打开python console界面操作如下: ```python >>> from celery_app.task import send_mail >>>send_mail.name 'celery_app.task.send_mail' >>> result=send_mail.delay() >>> result.ready() True >>> result.get() '邮件发送成功' ``` ## 定时任务使用 * 将celeryconfig.py里面的内容修改成如下: ```python from celery.schedules import crontab,timedelta BROKER_URL = 'redis://127.0.0.1:6379/0' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1' CELERY_TIMEZONE = 'Asia/Shanghai' # 导入指定的任务模块 CELERY_IMPORTS=['celery_app.task'] # 定时任务 CELERYBEAT_SCHEDULE={ 'task1':{ 'task':'celery_app.task.send_mail',#具体到函数的路径 'schedule':timedelta(seconds=10),#每10秒钟执行邮件发送 } } ``` * 打开terminal执行命令`celery beat -A celery_app -l info`,结果如下: *再开一个terminal界面,执行以下命令: `celery worker -A celery_app -l info -P eventlet`: 因为设置的秒级别的,所以一旦执行beat命令则会立刻发送定时任务之后再等待0秒重复执行 ## 参考资料 * [celery-3.1.7中文文档](http://docs.jinkan.org/docs/celery/index.html)(该文档详细说明了命令行参数的用法) * [celery官方文档](http://www.celeryproject.org/docs-and-support/) * [celery-4.3.0中文文档](https://www.celerycn.io/) ## 遇到的问题以及解决办法 * **not enough values to unpack (expected 3, got 0)** win10上更对运行celery4.x就会出现这个问题,需要通过`pip install eventlet`,同时在启动worker添加一个参数,如:`celery -A worker -l info -P eventlet`即可 * **在[tasks]可以看到任务,但是通过调用`result.get()`提示任务未注册** 这种情况出现的原因可能是你两次执行的导入方式不一样,从而导致自动生成的任务名称不一样,简单的方法就是给任务**显式添加任务名**可参考该篇文章[Celery-4.1 用户指南: Task描述](https://blog.csdn.net/libing_thinking/article/details/78547816) * **RROR/MainProcess] consumer: Cannot connect to redis** 出现连接不上redis,可以尝试将localhost改成127.0.0.1 * **无法接收任务,执行不执行,但是celery处于就绪状态** 在视图调用任务时候是否添加了`delay()`方法 * **报Object of type 'byte' is not JSON serializable** 可以在django的settings.py 里面关于celery的配置参数添加如下内容: ```python CELERY_TASK_SERIALIZER = 'pickle' CELERY_RESULT_SERIALIZER = 'pickle' CELERY_ACCEPT_CONTENT = ['pickle', 'json'] ``` 添加pickle,这样的话任务函数如果含有对象参数,执行任务时就不会报对象序列化的问题,关于pickle的具体内容可参考[pickle模块详解](https://www.cnblogs.com/baby-lily/p/10990026.html)。 © 著作权归作者所有,欢迎转载,转载请说明出处:未雨晴空博客,谢谢理解! 喜欢 打赏 分享 上一篇 下一篇 发表评论 取消回复 电子邮件地址不会被公开。 表情 请输入以http或https开头的URL,格式如:https://oneisall.top 提交评论