cx_Oracle版本 7.3
oracle 客户端版本 11g
使用方法:
(1)新建连接
dal = oracle()(2)执行sql语句
sql = f''' SELECT * FROM a1 ''' dal.exec(sql).commit()(3)获取查询的行
dal.rows # 元组结果 dal.lines # 字典结果注:要手动commit()
修改查询字符串的占位符,在exec的参数中加入参数列表
dal.exec(查询字符串, 参数列表)
from b011 import * dal = oracle() # 不安全查 print("\n不安全查:") # any' or 1=1 -- 最后有空格 name = input() sql = f''' SELECT * FROM a1 WHERE name = '{name}' ''' lines = dal.exec(sql).lines stream(lines).print() # 安全查,参数化查询 print("\n安全查:") # any' or 1=1 -- 最后有空格 name = input() sql = f''' SELECT * FROM a1 WHERE name = :s ''' params = [name] lines = dal.exec(sql, params).lines stream(lines).print()
调用函数call,dal.call("存储过程名", 参数列表)
in和out参数值回写到参数列表
仅将最后一个游标的值放到 dal.rows 和 dal.lines
from b011 import * dal = oracle() # 清空 sql = f''' TRUNCATE TABLE a1 ''' dal.exec(sql).commit() # 加值 sql = f''' INSERT INTO a1(id) VALUES(:s) ''' params_list = [[i] for i in range(1, 300 + 1)] dal.cursor.executemany(sql, params_list) dal.commit() # # 查全部 # sql = f''' # SELECT * FROM a1 # ''' # dal.exec(sql) # print(dal.count) # stream(dal.rows).print() # 存储过程 """ CREATE procedure p1 ( --定义输入、输出参数-- id_in in int, all_count_out out int, cur_out out sys_refcursor ) as --定义变量-- -- 变量名 变量数据类型;如: -- numCount integer; begin --处理方法-- select count(*) into all_count_out from a1; open cur_out for select * from a1 where id > id_in; end; """ # 原生用法 all_count_out = dal.cursor.var(cx_Oracle.NUMBER) cur_out = dal.cursor.var(cx_Oracle.CURSOR) params = [101,all_count_out,cur_out] dal.cursor.callproc("p1", params) dal.commit() print(cur_out.getvalue().fetchall()) # 结果集 print(all_count_out.getvalue()) # out参数 # 使用封装后的函数 all_count_out = dal.cursor.var(cx_Oracle.NUMBER) cur_out = dal.cursor.var(cx_Oracle.CURSOR) params = [101,all_count_out,cur_out] rows = dal.call("p1", params).commit().rows print() print(rows) print(params[1])
b011.py
import os import cx_Oracle # region oracle # os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' # select userenv('language') from dual; os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.AL32UTF8' _oracle_conf = { "host": "192.168.15.132", "port": 1521, "user": "c##dba", "password": "oracle", "db": "orcl" } def _get_oracle_conf(new_conf={}): conf = {} conf["host"] = new_conf.get("host", _oracle_conf["host"]) conf["port"] = new_conf.get("port", _oracle_conf["port"]) conf["user"] = new_conf.get("user", _oracle_conf["user"]) conf["password"] = new_conf.get("password", _oracle_conf["password"]) conf["db"] = new_conf.get("db", _oracle_conf["db"]) return f'{conf["user"]}/{conf["password"]}@{conf["host"]}:{conf["port"]}/{conf["db"]}' class Oracle: def __init__(self, conf=_get_oracle_conf()): self.conn = cx_Oracle.connect(conf) self.cursor = self.conn.cursor() self.count = 0 self.rows = [] self.lines = [] def __del__(self): if self.cursor: self.cursor.close() if self.conn: self.conn.close() @staticmethod def 实例化(new_conf={}): conf = _get_oracle_conf(new_conf) return Oracle(conf) def exec(self, sql: str, params=None): if params: cursor = self.cursor.execute(sql, params) else: cursor = self.cursor.execute(sql) if cursor: cursor = self.cursor self.rows = cursor.fetchall() self.lines = self._rows_to_lines(self.rows, cursor) self.count = cursor.rowcount else: self.rows = () self.lines = {} self.count = 0 return self def call(self, proc_name: str, params=[]): in_out = self.cursor.callproc(proc_name, params) cur_index = -1; for i in range(len(params)): params[i] = in_out[i] if repr(type(in_out[i])) == "<class 'cx_Oracle.Cursor'>": cur_index = i if cur_index != -1 and in_out[i]: cursor = in_out[i] self.rows = cursor.fetchall() self.lines = self._rows_to_lines(self.rows, cursor) self.count = cursor.rowcount else: self.rows = () self.lines = {} self.count = 0 return self def begin(self): self.conn.begin() return self def commit(self): self.conn.commit() return self def rollback(self): self.conn.rollback() return self def _rows_to_lines(self, rows, cursor): try: col_names = [c[0] for c in cursor.description] except: pass lines = [] for row in rows: r_dict = {} for i, col in enumerate(row): r_dict[col_names[i]] = col lines.append(r_dict) return lines def oracle(new_conf={}): return Oracle.实例化(new_conf) # endregion oracle # region 流式计算 class ListStream: def __init__(self, my_list=[]): self.list = list(my_list) def filter(self, func): self.list = list(filter(func, self.list)) return self def map(self, func): self.list = list(map(func, self.list)) return self def forEach(self, func): list(map(func, self.list)) return self def print(self): self.forEach(lambda item: print(item)) return self def collect(self): return self.list class DictStream(ListStream): def __init__(self, my_dict={}): self.list = self.dict_to_list(my_dict) def collect(self, is_to_dict=True): if is_to_dict: return self.list_to_dict(self.list) else: return self.list def dict_to_list(self, old_dict): new_list = [] for i in old_dict.keys(): temp_dict = {} temp_dict["key"] = i temp_dict["value"] = old_dict[i] new_list.append(temp_dict) return new_list def list_to_dict(self, old_list): new_dict = {} for i in old_list: new_dict[i["key"]] = i["value"] return new_dict def stream(iteration): def list_处理(): return ListStream(iteration) def dict_处理(): return DictStream(iteration) def default(): raise Exception("stream化失败,参数类型未支持") switch = { "<class 'list'>": list_处理, "<class 'tuple'>": list_处理, "<class 'str'>": list_处理, "<class 'dict'>": dict_处理 } return switch.get(repr(type(iteration)), default)() # endregion 流式计算
客户端文件 64位
蓝奏云: https://wws.lanzous.com/i9un1hzi7xa