Celery

    科技2025-11-18  13

    Celery学习之踩坑日记一

    一 、先说问题情况, 生产者:获取短信验证码,短信任务放入redis队列, 消费者:然后进行启动消费者,使用如下命令获取

    celery -A izufang worker -l DEBUG &

    二、解决方法 消费者切换一下模式使用如下命令:

    celery -A izufang worker -P threads -l DEBUG &

    三、环境和情况 环境:

    Python3.7解释器 django 2.2

    在项目中:

    项目结构

    izufang ----- izufang ------------- __init__.py ------------- settings.py ------------- … ----- api ------------- aliyuncn.py ------------- views.py ------------- … ----- …

    代码:

    __init__.py import celery # 注册环境变量量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'izufang.settings') # 创建celery对象 main 为当前包位置,也是指向项目或者app为起始文件位置 # broker 存储消息队列 backend 持久化方式 # 这里的redis我没填真实的自己的,看着来 app = celery.Celery(main='izufang', broker='redis://:1qaz2wsx@127.0.0.1/1', backend='redis://:1qaz2wsx@127.0.0.1/2', ) # 如果从项目的配置文件读取Celery配置信息 app.config_from_object('django.conf:settings') # 让Celery自动从参数指定的应用中发现异步任务/定时任务 app.autodiscover_tasks(['common', ]) aliyuncn.py

    我用的是阿里云的接口

    @app.task def send_sms_by_aliyun(tel, message): '''发送验证码''' client = AcsClient('<AccessKey ID>', '<AccessKey Secret>', 'cn-shenzhen') request = CommonRequest() request.set_accept_format('json') request.set_domain('dysmsapi.aliyuncs.com') request.set_method('POST') request.set_protocol_type('https') # https | http request.set_version('2017-05-25') request.set_action_name('SendSms') request.add_query_param('RegionId', "cn-shenzhen") request.add_query_param('PhoneNumbers', f"{tel}") request.add_query_param('SignName', "租房网") request.add_query_param('TemplateCode', "SMS_204126343") request.add_query_param('TemplateParam', "{'code':" + f'{message}' + "}") response = client.do_action_with_exception(request) return str(response, encoding='utf-8') views.py @api_view(('GET',)) def get_code_by_sms(request, tel): """获取短信验证码""" if check_tel(tel): if caches['default'].get(f'{tel}:block'): resp = DefaultResponse(*CODE_TOO_FREQUENCY) else: code = gen_mobile_code() # 异步化的执行函数(把函数放到消息队列中去执行)----> 消息的生产者 send_sms_by_aliyun.delay(tel, code) caches['default'].set(f'{tel}:block', code, timeout=120) # caches['default'].set(f'{tel}:valid', code, timeout=600) resp = DefaultResponse(*MOBILE_CODE_SUCCESS) else: resp = DefaultResponse(*INVALID_TEL_NUM) return resp

    问题描述:情况如上,代码一切正常 使用执行函数delay开启异步化执行时

    先调用发送短信验证码的接口,一共发送了两次

    接着开启消费者模式 当使用如下命令开启消费者模式

    celery -A izufang worker -l DEBUG &

    有两条待执行的消息队列

    [2020-10-08 18:02:04,366: INFO/MainProcess] Received task: api.aliyuncn.send_sms_by_aliyun[aaad6915-78e9-40e0-9a65-51dcbf16d138] [2020-10-08 18:02:04,376: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x000001EC27D49168> (args:(‘api.aliyuncn.send_sms_by_aliyun’, ‘aaad6915-78e9-40e0-9a65-51dcbf16d138’, {‘lang’: ‘py’, ‘task’: ‘api.aliyuncn. send_sms_by_aliyun’, ‘id’: ‘手动修改了一下’, ‘shadow’: None, ‘eta’: None, ‘expires’: None, ‘group’: None, ‘group_index’: None, ‘retries’: 0, ‘timelimit’: [None, None], ‘root_id’: ‘手动修改了一下’, ‘parent_id’: None, ‘argsrepr’: “(手动遮挡电话, ‘457751’)”, ‘kwargsrepr’: ‘{}’, ‘origin’: ‘gen12016@DESKTOP-PCQ23G8’, ‘reply_to’: ‘手动修改了一下’, ‘correlation_id’: ‘手动修改了一下’, ‘hostname’: ‘celery@DESKTOP-PCQ23G8’, ‘delivery_info’: {‘exchange’: ‘’, ‘routing_key’: ‘celery’, ‘priority’: 0, ‘redelivered’: None}, ‘args’: [手动遮挡电话, ‘457751’], ‘kwargs’: {}}, b’[[手动遮挡电话, “457751”], {}, {“call backs”: null, “errbacks”: null, “chain”: null, “chord”: null}]’, ‘application/json’, ‘utf-8’) kwargs:{}) [2020-10-08 18:02:04,436: INFO/MainProcess] Received task: api.aliyuncn.send_sms_by_aliyun[240c4c77-0307-46ce-85a0-5d9c91f9b5ce] [2020-10-08 18:02:04,436: DEBUG/MainProcess] TaskPool: Apply <function _trace_task_ret at 0x000001EC27D49168> (args:(‘api.aliyuncn.send_sms_by_aliyun’, ‘240c4c77-0307-46ce-85a0-5d9c91f9b5ce’, {‘lang’: ‘py’, ‘task’: ‘api.aliyuncn. send_sms_by_aliyun’, ‘id’: ‘240c4c77-0307-46ce-85a0-5d9c91f9b5ce’, ‘shadow’: None, ‘eta’: None, ‘expires’: None, ‘group’: None, ‘group_index’: None, ‘retries’: 0, ‘timelimit’: [None, None], ‘root_id’: ‘240c4c77-0307-46ce-85a0-5d9 c91f9b5ce’, ‘parent_id’: None, ‘argsrepr’: “(手动遮挡电话, ‘948426’)”, ‘kwargsrepr’: ‘{}’, ‘origin’: ‘gen12016@DESKTOP-PCQ23G8’, ‘reply_to’: ‘48a36444-b9b8-31b1-8d40-80ca42836176’, ‘correlation_id’: ‘240c4c77-0307-46ce-85a0-5d9c91 f9b5ce’, ‘hostname’: ‘celery@DESKTOP-PCQ23G8’, ‘delivery_info’: {‘exchange’: ‘’, ‘routing_key’: ‘celery’, ‘priority’: 0, ‘redelivered’: None}, ‘args’: [手动遮挡电话, ‘948426’], ‘kwargs’: {}}, b’[[手动遮挡电话, “948426”], {}, {“call backs”: null, “errbacks”: null, “chain”: null, “chord”: null}]’, ‘application/json’, ‘utf-8’) kwargs:{}) [2020-10-08 18:02:05,188: DEBUG/MainProcess] Timer wake-up! Next ETA 0.9529999999995198 secs.

    redis代码

    127.0.0.1:54321[1]> flushall OK 127.0.0.1:54321[1]> keys * (empty list or set) 127.0.0.1:54321[1]> keys *

    “celery”“_kombu.binding.celery” 127.0.0.1:54321[1]> lrange celery 0 -1“{“body”: “手动遮挡=”, “content-encoding”: “utf-8”, “content-type”: “application/json”, “headers”: {“lang”: “py”, “task”: “api.aliyuncn.send_sms_by_aliyun”, “id”: “aaad6915-78e9-40e0-9a65-51dcbf16d138”, “shadow”: null, “eta”: null, “expires”: null, “group”: null, “group_index”: null, “retries”: 0, “timelimit”: [null, null], “root_id”: “aaad6915-78e9-40e0-9a65-51dcbf16d138”, “parent_id”: null, “argsrepr”: “(手动遮挡电话, ‘457751’)”, “kwargsrepr”: “{}”, “origin”: “gen12016@DESKTOP-PCQ23G8”, “redelivered”: true}, “properties”: {“correlation_id”: “aaad6915-78e9-40e0-9a65-51dcbf16d138”, “reply_to”: “48a36444-b9b8-31b1-8d40-80ca42836176”, “delivery_mode”: 2, “delivery_info”: {“exchange”: “”, “routing_key”: “celery”}, “priority”: 0, “body_encoding”: “base64”, “delivery_tag”: “bb169987-bae2-408a-9a4a-74b9a3781187”}}”“{“body”: “W1sxNzYyNTY1MTAyNCwgIjk0ODQyNiJdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=”, “content-encoding”: “utf-8”, “content-type”: “application/json”, “headers”: {“lang”: “py”, “task”: “api.aliyuncn.send_sms_by_aliyun”, “id”: “240c4c77-0307-46ce-85a0-5d9c91f9b5ce”, “shadow”: null, “eta”: null, “expires”: null, “group”: null, “group_index”: null, “retries”: 0, “timelimit”: [null, null], “root_id”: “240c4c77-0307-46ce-85a0-5d9c91f9b5ce”, “parent_id”: null, “argsrepr”: “(手动遮挡电话, ‘948426’)”, “kwargsrepr”: “{}”, “origin”: “gen12016@DESKTOP-PCQ23G8”, “redelivered”: true}, “properties”: {“correlation_id”: “240c4c77-0307-46ce-85a0-5d9c91f9b5ce”, “reply_to”: “48a36444-b9b8-31b1-8d40-80ca42836176”, “delivery_mode”: 2, “delivery_info”: {“exchange”: “”, “routing_key”: “celery”}, “priority”: 0, “body_encoding”: “base64”, “delivery_tag”: “fe577fde-237b-4c88-8dae-373a17db222d”}}” 127.0.0.1:54321[1]>

    看到有两条消息 但是没有被生产者执行 当我ctrl+c后提示

    Restoring 2 unacknowledged message(s)

    后面我又执行了下面命令

    celery -A izufang worker -P threads -l DEBUG &

    解决问题

    参考链接:

    https://www.v2ex.com/t/177589

    Processed: 0.009, SQL: 8