01、消费者和生产者模式
--基本解释: 1、由生产者生成元素存入队列中,消费者不断问讯取出生成的元素做处理 --实际意义: 1、常见的解耦方式之一,找到中间人broker,使得两个任务之间没有直接联系 --应用: --生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。 --这里消息队列一般是rabbitMQ | redis,非常好用的 --redis会出现一个问题即突然断电会导致数据丢失,这里可以使用redis持久化策略部分解决问题,但会降低性能 例如: 商城登陆时,生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。而不是等待消息发送之后, 我们再返回消息已发送 --生产者 消费者之间除了简单的逻辑还有以下问题待解决: --生产者生成的消息可以理解为一个带输入值的key值,input(num_01, str_01, str_02, ***) --消费者需要实现具体的任务逻辑:即从消息队列中取出key值,找到对应任务,用对应的输入和任务编号执行相应代码解决问题 --高并发问题:就是生产者一次生产的消息数量过多 --后台多任务处理:对应高并发的消息,后台如何处理多任务并发的情况,例如:增加容器存储消息、增加后台服务器[反正celery也是一个服务器,一定有很多方法解决后台分布式连接的问题] --耗时的任务有许多种,每一种之间有重复怎么办 --取到消息之后什么时候执行,以什么样的方式执行 --以上这些问题可以使用原生的python进程、协程进行解决。但是如果使用celery则效率要高很多02、celery初步介绍
--celery简介: --一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。 --单个 Celery 进程每分钟可处理数以百万计的任务。 --通过消息进行通信,使用消息队列(broker)在客户端和消费者之间进行协调。 --安装:pip install -U Celery ---U就是 --upgrade,意思是如果已安装就升级到最新版 --pip install -h命令可以查看其它参数的作用 --celery将生产者--消费者设计模式封装起来,并提供接口 --总结: --celery相当于一个服务器,部署了各种预设好的任务,由它自己安排异步执行计划 --定时任务:就是celery自己按照设置好的计划表执行服务器中各项任务,甚至可以一个任务的输出作为下一个计划任务的输入 --异步任务:多用于解耦与项目主体关联不大的一些任务,或者耗时的任务。例如发送验证码,又比如像03、celery -- 异步任务范例的创建:使用redis作为消息队列
--创建python文件夹 python_celery_redis_test
--在文件夹 python_celery_redis_test下,创建任务定义后的调度文件diaoyong.py,并输入以下代码:
# coding=UTF-8 from proj.tasks import add import time t1 = time.time() r1 = add.delay(1, 2) r2 = add.delay(2, 4) r3 = add.delay(3, 6) r4 = add.delay(4, 8) r5 = add.delay(5, 10) r_list = [r1, r2, r3, r4, r5] for r in r_list: while not r.ready(): pass print(r.result) t2 = time.time() print('共耗时:%s' % str(t2-t1)) --这个代码是为了对接下来启动的celery任务服务器进行调度的 py文件,一共定义了5个文件 --代码中是逐个调用,每个会sleep1秒,正常来说是5s左右。实际上应该是异步调用,执行时间实际为1s多--在文件夹 python_celery_redis_test下,创建python文件夹 proj 【celery任务文件夹】
--在文件夹 python_celery_redis_test/proj下,创建app_test.py。这个是任务的详细入口
from celery import Celery # 设置任务名称为 proj # include参数必须为 list,每一个就是一个任务的python导入顺序 import proj.tasks app = Celery('proj', include=['proj.tasks']) # 这里是为了加载配置文件,'proj.celeryconfig' 也就是 import proj.celeryconfig app.config_from_object('proj.celeryconfig') if __name__ == '__main__': app.start()--在文件夹 python_celery_redis_test/proj下,创建celeryconfig.py。这个是celery配置,也就是app_test.py文件中的app.config_from_object('proj.celeryconfig')
BROKER_URL = 'redis://:@localhost:6379/0' # 使用Redis作为消息代理 CELERY_RESULT_BACKEND = 'redis://:@localhost:6379/0' # 把任务结果存在了Redis CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型--在文件夹 python_celery_redis_test/proj下,创建tasks.py。这个才是真正定义celery做什么的文件也就是app_test.py文件中的app = Celery('proj', include=['proj.tasks'])
import time from proj.app_test import app # 这个装饰器是固定写法 @app.task def add(x, y): time.sleep(1) return x + y04、celery -- 异步任务范例的使用:
--鉴于windows下,celery已经非常不好用了,这里就不赘述了。希望你也别看了用不到的。 --windows下坑非常多,非常不建议去折腾没有意义 --启动终端 01 --启动 celery服务器: --celery -A proj.app_test worker -l INFO 这个命令是为了能够看到执行的任务的输出 --celery -A proj.app_test worker -l INFO --logfile="celery.log" 这个命令启动之后不会看到输出,所有数据都在log中 --celery -A proj.app_test worker -l INFO -P eventlet -c 1000 这个命令才是真正能够实现异步的命令,如果不是这样启动celery则无法发挥异步特性 --启动之后如下所示: --启动终端 02 -- python3 diaoyong.py --最终结果如下: --这里不应该是 5 s,因为我是在虚拟机中做的,因此可能有问题,所以会在公司服务器中再试一次。这里应该是 1.01235812563871s 这样的 1.几s才对。毕竟celery是异步调度 --异步调用失败是 启动命令错误导致的,建议参考上面启动命令中第三条05、celery -- 异步任务范例的使用:使用守护进程的方式启动celery
--记住这里守护进程其实就是后台执行 ******************************** 01、supervisor安装 ******************************* pip install supervisor || supervisord -v ******************************** 02、supervisor配置如下 ******************************* --创建supervisord.conf文件,并输入以下代码,注意celery位置修改 --当然也可以直接使用 celery,因为celery安装时已经注册过 --这里就是使用的celery异步启动命令 [program:celeryd] command=/opt/buildtools/python/bin/celery -A proj.app_test worker -l INFO -P eventlet -c 1000 stdout_logfile=/data/lwx898295/celery_test/time_sche/celeryd.log stderr_logfile=/data/lwx898295/celery_test/time_sche/celeryd.log autostart=true autorestart=true startsecs=10 [supervisord] ******************************** 03、supervisord启动如下 ******************************* supervisord -c supervisord.conf ******************************** 03、仍然像04中那样启动调试即可,只是此时运行内容放在日志文件 *******************************