python3 查询mysql,对pymysql进行封装,并使用

    科技2022-07-11  88

    使用方法:

    (1)新建连接

    dal = mysql()

    (2)执行sql语句

    sql = f''' SELECT * FROM a1 ''' dal.exec(sql)

    (3)获取查询的行

    dal.rows # 元组结果 dal.lines # 字典结果

    注:自动提交我默认开了

    写成一行的话:lines = mysql().exec(sql).lines

     

    一、基础使用

    import datetime from a011 import * # dal = mysql() # 使用自定义配置 mysql_conf = { "host":"127.0.0.1", "port":3306, "user":"root", "password":"root", "db":"test2", "charset":"utf8" } dal = mysql(mysql_conf) # 删表 print("\n删表:") sql = f''' drop TABLE if exists a1 ''' dal.exec(sql).commit() print(dal.count) # 建表 print("\n建表:") sql = f''' CREATE TABLE a1 ( id int, name varchar(255), 日期时间 datetime ) ''' dal.exec(sql).commit() print(dal.count) # 增 names = ["百度","谷歌","必应"] for i in range(1, 3 +1): print(f"\n增 — {i} :") id = i name = names[i-1] 日期时间 = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") sql = f''' INSERT INTO a1(id, name, 日期时间) VALUES({id},'{name}',str_to_date('{日期时间}','%Y-%m-%d %H:%i:%s')) ''' dal.exec(sql).commit() print(dal.count) # 删 print("\n删:") id = 1 sql = f''' DELETE FROM a1 WHERE id={id} ''' dal.exec(sql).commit() print(dal.count) # 改 print("\n改:") id = 2 name = "google谷歌" 日期时间 = "2077-11-19 00:00:00" sql = f''' UPDATE a1 SET name = '{name}' ,日期时间 = str_to_date('{日期时间}','%Y-%m-%d %H:%i:%s') WHERE id = {id} ''' dal.exec(sql).commit() print(dal.count) # 查 print("\n查:") sql = f''' SELECT * FROM a1 ''' dal.exec(sql) print(dal.count) print(dal.rows) print(dal.lines) print() _ = [ f'{l.get("id")} {l.get("name")} {l.get("日期时间")}' for l in dal.lines ] for i in _: print(i)

     

    二、参数化查询

    修改查询字符串的占位符,在exec的参数中加入参数列表

    dal.exec(查询字符串, 参数列表)

    from a011 import * # 使用自定义配置 mysql_conf = { "db":"test2", } dal = mysql(mysql_conf) # 不安全查 print("\n不安全查:") # any' or 1=1 -- 最后有空格 name = input() sql = f''' SELECT * FROM a1 WHERE name = '{name}' ''' lines = dal.exec(sql).lines for i in lines: print(i) # 安全查,参数化查询 print("\n安全查:") # any' or 1=1 -- 最后有空格 name = input() sql = f''' SELECT * FROM a1 WHERE name = %s ''' params = [name] lines = dal.exec(sql, params).lines for i in lines: print(i)

     

    三、调用存储过程

    调用函数call :dal.call("存储过程名", 参数列表)

    in和out参数值回写到参数列表,存储过程查询的值通过 dal.rows 或 dal.lines 获取

    from a011 import * # 使用自定义配置 mysql_conf = { "db": "test2", } dal = mysql(mysql_conf) # 清空 sql = f''' TRUNCATE TABLE a1 ''' dal.exec(sql).commit() # 加值 sql = f''' INSERT INTO a1(id) VALUES(%s) ''' params_list = [[i] for i in range(1, 300 + 1)] dal.cursor.executemany(sql, params_list) dal.commit() # 查全部 # sql = f''' # SELECT * FROM a1 # ''' # dal.exec(sql) # print(dal.count) # 存储过程 """ DELIMITER $$ CREATE PROCEDURE `p1`(IN `id_in` int,OUT `all_count_out` int) BEGIN #Routine body goes here... select count(*) into all_count_out from a1; select * from a1 where id > id_in; END$$ DELIMITER ; """ params = [101, 0] dal.cursor.callproc("p1", params) dal.commit() print(dal.cursor.fetchall()) # 结果集 dal.cursor.execute("select @_p1_1") # out参数要再用查询语句去查 print(dal.cursor.fetchone()[0]) print() # 使用封装后的函数 rows = dal.call("p1", params).commit().rows print(rows) print(params[1])

     

     

    a011.py

    import pymysql # region mysql _mysql_conf = { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "root", "db": "test", "charset": "utf8" } def _get_mysql_conf(new_conf={}): # if not new_conf: # return copy.deepcopy(_mysql_conf) conf = {} conf["host"] = new_conf.get("host", _mysql_conf["host"]) conf["port"] = new_conf.get("port", _mysql_conf["port"]) conf["user"] = new_conf.get("user", _mysql_conf["user"]) conf["password"] = new_conf.get("password", _mysql_conf["password"]) conf["db"] = new_conf.get("db", _mysql_conf["db"]) conf["charset"] = new_conf.get("charset", _mysql_conf["charset"]) return conf class Mysql: def __init__(self, conf=_mysql_conf): self.conn = pymysql.connect(host=conf["host"], port=conf["port"], user=conf["user"], password=conf["password"], db=conf["db"], charset=conf["charset"]) self.cursor = self.conn.cursor() self.count = 0 self.rows = [] self.lines = [] self.conn.autocommit(True) def __del__(self): if self.conn: self.conn.close() @staticmethod def 实例化(new_conf={}): conf = _get_mysql_conf(new_conf) return Mysql(conf) def exec(self, sql: str, params=None): if params: self.cursor.execute(sql, params) else: self.cursor.execute(sql) self.rows = self.cursor.fetchall() self.lines = self._rows_to_lines(self.rows, self.cursor) self.count = self.cursor.rowcount return self def call(self, proc_name: str, params=[]): self.cursor.callproc(proc_name, params) self.rows = self.cursor.fetchall() self.lines = self._rows_to_lines(self.rows, self.cursor) self.count = self.cursor.rowcount if params: select_params = ",".join([f'@_{proc_name}_{i}' for i in range(len(params))]) self.cursor.execute(f"select {select_params}") in_out = self.cursor.fetchone() for i in range(len(params)): params[i] = in_out[i] return self def begin(self): self.conn.begin() return self def commit(self): self.conn.commit() return self def rollback(self): self.conn.rollback() return self def _rows_to_lines(self, rows, cursor): try: col_names = [c[0] for c in cursor.description] except: pass lines = [] for row in rows: r_dict = {} for i, col in enumerate(row): r_dict[col_names[i]] = col lines.append(r_dict) return lines def mysql(new_conf={}): return Mysql.实例化(new_conf) # endregion mysql

     

    Processed: 0.012, SQL: 8