使用方法:
(1)新建连接
dal = mysql()(2)执行sql语句
sql = f''' SELECT * FROM a1 ''' dal.exec(sql)(3)获取查询的行
dal.rows # 元组结果 dal.lines # 字典结果注:自动提交我默认开了
写成一行的话:lines = mysql().exec(sql).lines
修改查询字符串的占位符,在exec的参数中加入参数列表
dal.exec(查询字符串, 参数列表)
from a011 import * # 使用自定义配置 mysql_conf = { "db":"test2", } dal = mysql(mysql_conf) # 不安全查 print("\n不安全查:") # any' or 1=1 -- 最后有空格 name = input() sql = f''' SELECT * FROM a1 WHERE name = '{name}' ''' lines = dal.exec(sql).lines for i in lines: print(i) # 安全查,参数化查询 print("\n安全查:") # any' or 1=1 -- 最后有空格 name = input() sql = f''' SELECT * FROM a1 WHERE name = %s ''' params = [name] lines = dal.exec(sql, params).lines for i in lines: print(i)
调用函数call :dal.call("存储过程名", 参数列表)
in和out参数值回写到参数列表,存储过程查询的值通过 dal.rows 或 dal.lines 获取
from a011 import * # 使用自定义配置 mysql_conf = { "db": "test2", } dal = mysql(mysql_conf) # 清空 sql = f''' TRUNCATE TABLE a1 ''' dal.exec(sql).commit() # 加值 sql = f''' INSERT INTO a1(id) VALUES(%s) ''' params_list = [[i] for i in range(1, 300 + 1)] dal.cursor.executemany(sql, params_list) dal.commit() # 查全部 # sql = f''' # SELECT * FROM a1 # ''' # dal.exec(sql) # print(dal.count) # 存储过程 """ DELIMITER $$ CREATE PROCEDURE `p1`(IN `id_in` int,OUT `all_count_out` int) BEGIN #Routine body goes here... select count(*) into all_count_out from a1; select * from a1 where id > id_in; END$$ DELIMITER ; """ params = [101, 0] dal.cursor.callproc("p1", params) dal.commit() print(dal.cursor.fetchall()) # 结果集 dal.cursor.execute("select @_p1_1") # out参数要再用查询语句去查 print(dal.cursor.fetchone()[0]) print() # 使用封装后的函数 rows = dal.call("p1", params).commit().rows print(rows) print(params[1])
a011.py
import pymysql # region mysql _mysql_conf = { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "root", "db": "test", "charset": "utf8" } def _get_mysql_conf(new_conf={}): # if not new_conf: # return copy.deepcopy(_mysql_conf) conf = {} conf["host"] = new_conf.get("host", _mysql_conf["host"]) conf["port"] = new_conf.get("port", _mysql_conf["port"]) conf["user"] = new_conf.get("user", _mysql_conf["user"]) conf["password"] = new_conf.get("password", _mysql_conf["password"]) conf["db"] = new_conf.get("db", _mysql_conf["db"]) conf["charset"] = new_conf.get("charset", _mysql_conf["charset"]) return conf class Mysql: def __init__(self, conf=_mysql_conf): self.conn = pymysql.connect(host=conf["host"], port=conf["port"], user=conf["user"], password=conf["password"], db=conf["db"], charset=conf["charset"]) self.cursor = self.conn.cursor() self.count = 0 self.rows = [] self.lines = [] self.conn.autocommit(True) def __del__(self): if self.conn: self.conn.close() @staticmethod def 实例化(new_conf={}): conf = _get_mysql_conf(new_conf) return Mysql(conf) def exec(self, sql: str, params=None): if params: self.cursor.execute(sql, params) else: self.cursor.execute(sql) self.rows = self.cursor.fetchall() self.lines = self._rows_to_lines(self.rows, self.cursor) self.count = self.cursor.rowcount return self def call(self, proc_name: str, params=[]): self.cursor.callproc(proc_name, params) self.rows = self.cursor.fetchall() self.lines = self._rows_to_lines(self.rows, self.cursor) self.count = self.cursor.rowcount if params: select_params = ",".join([f'@_{proc_name}_{i}' for i in range(len(params))]) self.cursor.execute(f"select {select_params}") in_out = self.cursor.fetchone() for i in range(len(params)): params[i] = in_out[i] return self def begin(self): self.conn.begin() return self def commit(self): self.conn.commit() return self def rollback(self): self.conn.rollback() return self def _rows_to_lines(self, rows, cursor): try: col_names = [c[0] for c in cursor.description] except: pass lines = [] for row in rows: r_dict = {} for i, col in enumerate(row): r_dict[col_names[i]] = col lines.append(r_dict) return lines def mysql(new_conf={}): return Mysql.实例化(new_conf) # endregion mysql