2020-10-08

    科技2025-03-28  10

    1.注册容联云账号

    1.1注册账号

    http://www.yuntongxun.com/user/login

    1.2登录即可看到开发者账号信息

    1.3添加测试账号

    2.使用容联云发送代码测试

    '''1. 安装容联云sdk''' pip install ronglian_sms_sdk # 免费测试文档地址:https://doc.yuntongxun.com/p/5a531a353b8496dd00dcdfe2 '''2. 短信发送代码''' # libs/rl_sms.py from ronglian_sms_sdk import SmsSDK accId = '8a216da8747ac98201749c0de38723b7' accToken = '86072b540b4648229b27400414150ef2' appId = '8a216da8747ac98201749c0de45123be' def send_message(phone, datas): sdk = SmsSDK(accId, accToken, appId) tid = '1' # 测试模板id为: 1. 内容为: 【云通讯】您的验证码是{1},请于{2}分钟内正确输 入。 # mobile = '13303479527' # datas = ('666777', '5') # 模板中的参数按照位置传递 resp = sdk.sendMessage(tid, phone, datas) return resp

    3.在视图函数中使用

    3.1在verifications/urls.py中添加路由

    urlpatterns = [ path('sms_codes/', views.SmsCodeView.as_view()), ] ## 3.2写视图函数 ```python # verifications/views.py from rest_framework.permissions import AllowAny from rest_framework.views import APIView from rest_framework.response import Response import re import random from utils.rl_sms import send_message class SmsCodeView(APIView): # 1. 所有人可以访问 permission_classes = (AllowAny,) def post(self, request): # 1. 获取参数 phone = request.data.get('phone') image_code = request.data.get('image_code') image_code_uuid = request.data.get('image_code_uuid') # 2. 检查参数 if not all([phone, image_code, image_code_uuid]): return Response({"code": 999, "msg": "参数不全"}) if not re.match(r'^1[3456789]\d{9}$', phone): return Response({"code": 999, "msg": "手机号码不正确"}) # 3. 检查是否发送 redis_client = get_redis_connection('img_code') phone_exists = redis_client.get(phone) if phone_exists: return Response({"code": 999, "msg": "频繁发送, 请稍后再试"}) redis_image_code = redis_client.get(image_code_uuid) # bytes if redis_image_code: # bytes 转成 string redis_image_code = redis_image_code.decode() # 比较用户提供的图片内容是否和redis中保存的一致 if image_code.upper() != redis_image_code: return Response({'code': 999, 'msg': '图片验证码不正确'}) # 4. 发送 code = '%06d' % random.randint(0, 999999) # 随机6位验证码 send_resp = send_message(phone, (code, "5")) # 5.1 保存code 到 redis中 # redis_client.setex(phone, 60 * 5, code) # phone:code, 5分钟有效期 # 5.2 从redis中删除这个图片验证码, 以防再次被使用 # redis_client.delete(image_code_uuid) # 5.3 使用 pipeline 批量操作 pl = redis_client.pipeline() pl.setex(phone, 60 * 5, code) pl.delete(image_code_uuid) pl.execute() # 6. 返回结果 return Response({"code": 0, "msg": "短信发送成功"})```

    1Celery介绍

    http://www.cnblogs.com/xiaoq/p/11166235.html#i1

    1.1celery应用举例

    Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis

    1.2Celery有以下优点

    简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务快速:一个单进程的celery每分钟可处理上百万个任务灵活: 几乎celery的各个组件都可以被扩展及自定制

    1.3Celery特性

    方便查看定时任务的执行情况, 如 是否成功, 当前状态, 执行任务花费的时间等.可选 多进程, Eventlet 和 Gevent 三种模型并发执行.Celery 是语言无关的.它提供了python 等常见语言的接口支持.

    2.celery组件

    https://www.cnblogs.com/xiaonq/p/11166235.html#i2

    2.1 Celery 扮演生产者和消费者的角色

    Celery Beat:任务调度器. Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列.Celery Worker:执行任务的消费者, 通常会在多台服务器运行多个消费者, 提高运行效率.Broker:消息代理, 队列本身. 也称为消息中间件. 接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方(通常是消息队列或者数据库).Producer:任务生产者. 调用 Celery API , 函数或者装饰器, 而产生任务并交给任务队列处理的都是任务生产者.Result Backend:任务处理完成之后保存状态信息和结果, 以供查询.

    2.2celery架构图

    2.3产生任务的方式

    发布者发布任务(WEB 应用)任务调度按期发布任务(定时任务)

    2.4celery 依赖三个库: 这三个库, 都由 Celery 的开发者开发和维护.

    billiard : 基于 Python2.7 的 multisuprocessing 而改进的库, 主要用来提高性能和稳定性.librabbitmp :C 语言实现的 Python 客户端kombu : Celery 自带的用来收发消息的库, 提供了符合 Python 语言习惯的, 使用 AMQP 协议的高级借口.

    1.celery配置与基本使用

    1.1安装celery

    pip install celery @ https://github.com/celery/celery/tarball/master

    1.2新建celery/manin.py配置celery

    # celery_task/main.py import os from celery import Celery # 定义celery实例, 需要的参数, 1, 实例名, 2, 任务发布位置, 3, 结果保存位置 app = Celery('mycelery', broker='redis://127.0.0.1:6379/14', # 任务存放的地方 backend='redis://127.0.0.1:6379/15') # 结果存放的地方 @app.task def add(x, y): return x + y

    2.测试celery

    2.1启动celery

    '''1.启动celery''' #1.1 单进程启动celery celery -A main worker -l INFO #1.2 celery管理 celery multi start celery_test -A celery_test -l debug --autoscale=50,5 # celery并发数:最多50个,最少5个 ps auxww|grep "celery worker"|grep -v grep|awk '{print $2}'|xargs kill -9 # 关闭所有celery进程

    1.使用celery异步发送短信

    1.1在celery_task/mian.py中添加发送短信函数

    # celery项目中的所有导包地址, 都是以CELERY_BASE_DIR为基准设定. # 执行celery命令时, 也需要进入CELERY_BASE_DIR目录执行. CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) @app.task(bind=True) def send_sms_code(self, mobile, datas): sys.path.insert(0, os.path.join(CELERY_BASE_DIR, '../syl')) # 在方法中导包 from libs.rl_sms import send_message # time.sleep(5) try: # 用 res 接收发送结果, 成功是:0, 失败是:-1 res = send_message(mobile, datas) except Exception as e: res = '-1' if res == '-1': # 如果发送结果是 -1 就重试. self.retry(countdown=5, max_retries=3, exc=Exception('短信发送失败'))

    1.2在verifications/views.py中添加celery发送短信试图函数

    class SmsCodeView(APIView): """使用apiview的限流""" # 1. 所有人可以访问 permission_classes = (AllowAny,) def post(self, request): # 1. 获取参数 phone = request.data.get('phone') # 手机号 image_code = request.data.get('image_code') # 图片验证码 image_code_uuid = request.data.get('image_code_uuid') # 前端生成的uuid # 2. 检查参数 if not all([phone, image_code, image_code_uuid]): return Response({"code": 999, "msg": "参数不全"}) if not re.match(r'^1[3456789]\d{9}$', phone): return Response({"code": 999, "msg": "手机号码不正确"}) # 3. 检查是否发送 redis_client = get_redis_connection('img_code') phone_exists = redis_client.get(phone) if phone_exists: return Response({"code": 999, "msg": "频繁发送, 请稍后再试"}) # 验证图形验证码 redis_image_code = redis_client.get(image_code_uuid) # bytes if redis_image_code: # bytes 转成 string redis_image_code = redis_image_code.decode() # 比较用户提供的图片内容是否和redis中保存的一致 if image_code.upper() != redis_image_code: return Response({'code': 999, 'msg': '图片验证码不正确'}) # 4. 发送 code = '%06d' % random.randint(0, 999999) # 随机6位验证码 from syl.settings import BASE_DIR sys.path.insert(0, os.path.join(BASE_DIR, '../celery_task')) from main import send_sms_code # 必须这么写, 从main中导包 send_sms_code.delay(phone, (code, "5")) print(code) # 5.使用 pipeline 批量操作 pl = redis_client.pipeline() # 实例化pipeline对象 pl.setex(phone, 60 * 5, code) # 存储phone:code, 5分钟有效期 pl.delete(image_code_uuid) # 从redis中删除这个图片验证码, 以防再次被使用 pl.execute() # 6. 返回结果 return Response({"code": 0, "msg": "短信发送成功"})

    1.3添加路由

    urlpatterns = [ path('sms_codes/', views.SmsCodeView.as_view()), ]

    2.测试接口

    接口URL

    http://192.168.56.100:8888/user/sms_codes/

    请求携带参数 { "phone": 18538752511, "image_code":"aed3", # 前端生成的 图形验证码 "image_code_uuid":"de8edce2-fc9f-11ea-9325-005056c00008" # 前端生成的uuid }

    1.vue发送短信逻辑在这里插入代码片

    前端函数如下,js方法代码无需更改,前端代码逻辑在components\common\lab_header.vue只需要修改components\axios_api\http.js中调用的后端地址 // axios.defaults.baseURL = "http://127.0.0.1:8000/" axios.defaults.baseURL = "http://192.168.56.100:8888/" // 获取手机验证码 sendcode() { // 0. 判断是否发送中 if (this.is_send) { return } this.check_phone() this.check_imgcode() if (this.phone_error || this.imgCode_error) { return false } // 3、短信发送 // imgCode: '', // uuid: '', var data = { phone: this.phone, image_code_uuid: this.uuid, image_code: this.imgCode } this.is_send = true send_phone_code_post(data).then((res) => { console.log(res) if (res.code != 0) { this.errorMsg = res.msg return } let t = 10 let si = setInterval(() => { this.msgButtonText = t t = t - 1 if (t == 0) { this.is_send = false this.msgButtonText = '获取手机验证码' clearInterval(si) } }, 1000) // if (res.data.code == 200) { // console.log('短信发送成功') // alert(res.data.message) // } else { // alert(res.data.message) // } }).catch((err) => { console.log(err) }) },

    1.django添加检查用户名和手机号数量接口

    1.1 在user/urls.py中添加

    urlpatterns = [ path('count/', views.RegCountView.as_view()), # 查询用户名手机号使用量的视图, /user/count/ ]

    1.2 在user/views.py中添加视图函数

    # 查询用户数量接口 class RegCountView(APIView): # 注册时需要验证的用户名和手机号是否使用 # 自定义权限类 permission_classes = (AllowAny,) def post(self, request): # 接收参数: 验证的内容type: username/phone, data: '用户名' 或者 '手机号', datatype = request.data.get('type') data = request.data.get('data') if not all([data, datatype]): return Response({'code': 999, 'msg': '参数不完整'}) if datatype == 'username': count = User.objects.filter(username=data).count() if datatype == 'phone': count = User.objects.filter(phone=data).count() return Response({'code': 0, 'msg': '查询成功', 'data': {'type': datatype, 'count': count}})

    2.测试接口

    测试接口URL http://192.168.56.100:8888/user/count/ 演示结果

    1.vue检查用户名是否重复

    前端函数如下,js方法代码无需更改,前端代码逻辑在components\common\lab_header.vue

    只需要修改components\axios_api\http.js中调用的后端地址

    // axios.defaults.baseURL = "http://127.0.0.1:8000/" axios.defaults.baseURL = "http://192.168.56.100:8888/" // 检查用户名 是否使用 check_username() { console.log('判断用户名') console.log(this.username == '') var reg = new RegExp(/^[a-zA-Z0-9_-]{3,16}$/); //字符串正则表达式 4到14位(字母,数字,下划线,减号) if (this.username == '') { this.username_message = '用户名不能为空' this.username_error = true return false } if (!reg.test(this.username)) { this.username_message = '用户名格式不正确' this.username_error = true return false } else { // 去后端检查用户名使用数量 user_count({ type: 'username', data: this.username }).then((res) => { console.log(res) if (res.data.count > 0) { this.username_message = '用户名已存在' this.username_error = true } else { this.username_message = '' this.username_error = false } }) } },

    2.vue检查手机号是否重复

    // 检查手机号是否使用 check_phone() { console.log('检查手机号') var reg = new RegExp(/^[1]([3-9])[0-9]{9}$/) if (this.phone == '') { this.phone_message = '手机号不能为空' this.phone_error = true } if (!reg.test(this.phone)) { this.phone_message = '手机号格式不正确' this.phone_error = true return false } else { // 去后端查用户数量 user_count({ type: 'phone', data: this.phone }).then((res) => { console.log(res) if (res.data.count > 0) { this.phone_message = '手机号已存在' this.phone_error = true } else { this.phone_message = '' this.phone_error = false } }) } },

    1.完善注册接口

    1.1 修改user/views.py中完善视图函数

    # 注册接口 class RegisterView(APIView): """ 用户注册, 权限是: 匿名用户可访问 """ # 自定义权限类 permission_classes = (AllowAny,) def post(self, request): """ 接收用户名,密码,手机号和验证码, 前端校验两遍一致性, 注册成功后返回成功, 然后用户自行登录获取token 1. 用户名 2. 密码 3. 手机号 4. 验证码 :param request: :return: {'code':0,'msg':'注册成功'} code: "260361" password: "123123" phone: "13303479527" username: "liangxuepeng" """ username = request.data.get('username') phone = request.data.get('phone') code = request.data.get('code') passwrod = request.data.get('password') if all([username, passwrod, phone, code]): pass else: return Response({'code': 999, 'msg': '参数不全'}) # rand_name = self.randomUsername() # 验证手机验证码 redis_client = get_redis_connection('verify_code') code_redis = redis_client.get(phone) if code_redis: code_redis = code_redis.decode() if not code == code_redis: return Response({'code': 999, 'msg': '手机验证码错误'}) user = User(username=username, phone=phone) user.set_password(passwrod) user.save() return Response({'code': 0, 'msg': '注册成功'})
    Processed: 0.012, SQL: 8