【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】
自己重新封装了一个Mysql的命令执行的类,但是经常会自动关闭mysql数据库的连接,自己写法完全按照官方的来的,但是就是会出问题,自己也是一脸懵逼,后来启用dbutils解决此问题了,但是又出新问题,就是查询返回数据是tuple,而且是0,1,2,3,4……等对应查询的值,并非字段返回字段对应值,艾玛,想着不可能有这么大缺陷,然后一顿狂看接口API,最终找到解决方法。下看如下代码。
DBUtils 是 Python 的一个用于实现数据库连接池的模块 , 此连接池有两种连接模式
模式一 🍀
为每个线程创建一个连接 , 线程即使调用了 close 方法 , 也不会关闭 , 只是把连接重新放到连接池 , 供自己线程再次使用 , 当线程终止时 , 连接自动关闭
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PersistentDB(
# 使用链接数据库的模块
creator=pymysql,
# 一个链接最多被重复使用的次数,None表示无限制
maxusage=None,
# 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
setsession=[],
# ping MySQL服务端,检查是否服务可用
# 0 = None = never,
# 1 = default = whenever it is requested,
# 2 = when a cursor is created,
# 4 = when a query is executed,
# 7 = always
ping=0,
# 如果为False,conn.close()实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接
# 如果为True,conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
closeable=False,
# 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
threadlocal=None,
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8',
cursorclass=DictCursor ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close()
func()
模式二 🍀
创建一批连接到连接池 , 供所有线程共享使用 (由于 pymysql , MySQLdb 等 threadsafety 值为 1 , 所以该模式连接池中的线程会被所有线程共享)
import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
# 使用链接数据库的模块
creator=pymysql,
# 连接池允许的最大连接数,0和None表示不限制连接数
maxconnections=6,
# 初始化时,链接池中至少创建的空闲的链接,0表示不创建
mincached=2,
# 链接池中最多闲置的链接,0和None不限制
maxcached=5,
# 链接池中最多共享的链接数量,0和None表示全部共享
# PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享
maxshared=3,
# 连接池中如果没有可用连接后,是否阻塞等待;True,等待;False,不等待然后报错
blocking=True,
# 一个链接最多被重复使用的次数,None表示无限制
maxusage=None,
# 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
setsession=[],
# ping MySQL服务端,检查是否服务可用
# 0 = None = never,
# 1 = default = whenever it is requested,
# 2 = when a cursor is created,
# 4 = when a query is executed,
# 7 = always
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8',
cursorclass=DictCursor ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
def func():
# 检测当前正在运行连接数的是否小于最大链接数,如果不小于则等待或报raise TooManyConnections异常
# 否则则优先去初始化时创建的链接中获取链接 SteadyDBConnection,
# 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回,
# 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回,
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用
conn = POOL.connection()
# print(th, '链接被拿走了', conn1._con)
# print(th, '池子里目前有', pool._idle_cache, '\r\n')
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()
func()
以上数据来源:https://github.com/lyonyang/blogs/blob/8042919c035f4586196ccf34d85ec4dbe95d06ea/04-Web-Framework/Flask/DBUtils.md
下面奉上我自己封装的python3 mysql调用类:
# -*-coding:utf-8-*-
import pymysql
import socket
import struct
import time
from flask import jsonify
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
class MysqlPool:
def __init__(self, dbname, username, password, host='127.0.0.1', port=3306, charset='utf8'):
self.pool = PooledDB(
# 使用链接数据库的模块
creator=pymysql,
host=host,
port=port,
user=username,
password=password,
database=dbname,
charset=charset,
cursorclass=DictCursor
def Time_Y_M_D(self, time):
return time.strftime('%Y-%m-%d %H:%M:%S', time)
def ip2int(self, ip):
return struct.unpack("!I", socket.inet_aton(ip))[0]
def int2ip(self, ip_num):
return socket.inet_ntoa(struct.pack("!I", ip_num))
def linux_10_nowtime(self):
return int(time.time())
def sql_execute(self, sql, sql_args=None):
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "sql命令执行失败。{}".format(e)})
finally:
# cursor.close()
print("")
def insert_execute(self, sql, sql_args=None):
conn = self.pool.connection()
# 和sql_execute一样的,分开来写,仅仅是为了可读性
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "添加失败。{}".format(e)})
def update_execute(self, sql, sql_args=None):
conn = self.pool.connection()
# 和sql_execute一样的,分开来写,仅仅是为了可读性
with conn.cursor() as cursor:
cursor.execute(sql, sql_args=None)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "更新失败。{}".format(e)})
def delete_execute(self, sql, sql_args=None):
conn = self.pool.connection()
# 和sql_execute一样的,分开来写,仅仅是为了可读性
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "删除失败。{}".format(e)})
def select_count(self, sql, sql_args=None):
num = ""
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
result = cursor.fetchall()
if isinstance(result[0], dict):
num = result[0]["count(*)"]
elif isinstance(result[0], list):
num = result[0][0]
return num # return是在finnally之后执行
except Exception as e: # 异常的时候返回None
return jsonify({"status": 500, "data": "数量查询失败。{}".format(e)})
def select_execute(self, sql, sql_args=None):
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
result = cursor.fetchall() # result的type为tuple,已经是个具体的集合了
return result
except Exception as e: # 异常的时候返回None
return jsonify({"status": 500, "data": "sql执行失败。{}".format(e)})
def select_one(self, sql, sql_args=None):
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
result = cursor.fetchone()
return result # return是在finnally之后执行
except Exception as e: # 异常的时候返回None
return jsonify({"status": 500, "data": "sql获取一条数据失败。{}".format(e)})
def __del__(self):
self.pool.close()
# -*-coding:utf-8-*-
import pymysql
import socket
import struct
import time
from flask import jsonify
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
class MysqlPool:
def __init__(self, dbname, username, password, host='127.0.0.1', port=3306, charset='utf8'):
self.pool = PooledDB(
# 使用链接数据库的模块
creator=pymysql,
host=host,
port=port,
user=username,
password=password,
database=dbname,
charset=charset,
cursorclass=DictCursor
def Time_Y_M_D(self, time):
return time.strftime('%Y-%m-%d %H:%M:%S', time)
def ip2int(self, ip):
return struct.unpack("!I", socket.inet_aton(ip))[0]
def int2ip(self, ip_num):
return socket.inet_ntoa(struct.pack("!I", ip_num))
def linux_10_nowtime(self):
return int(time.time())
def sql_execute(self, sql, sql_args=None):
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "sql命令执行失败。{}".format(e)})
finally:
# cursor.close()
print("")
def insert_execute(self, sql, sql_args=None):
conn = self.pool.connection()
# 和sql_execute一样的,分开来写,仅仅是为了可读性
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "添加失败。{}".format(e)})
def update_execute(self, sql, sql_args=None):
conn = self.pool.connection()
# 和sql_execute一样的,分开来写,仅仅是为了可读性
with conn.cursor() as cursor:
cursor.execute(sql, sql_args=None)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "更新失败。{}".format(e)})
def delete_execute(self, sql, sql_args=None):
conn = self.pool.connection()
# 和sql_execute一样的,分开来写,仅仅是为了可读性
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
conn.commit()
return 1
except Exception as e:
return jsonify({"status": 500, "data": "删除失败。{}".format(e)})
def select_count(self, sql, sql_args=None):
num = ""
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
result = cursor.fetchall()
if isinstance(result[0], dict):
num = result[0]["count(*)"]
elif isinstance(result[0], list):
num = result[0][0]
return num # return是在finnally之后执行
except Exception as e: # 异常的时候返回None
return jsonify({"status": 500, "data": "数量查询失败。{}".format(e)})
def select_execute(self, sql, sql_args=None):
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
result = cursor.fetchall() # result的type为tuple,已经是个具体的集合了
return result
except Exception as e: # 异常的时候返回None
return jsonify({"status": 500, "data": "sql执行失败。{}".format(e)})
def select_one(self, sql, sql_args=None):
conn = self.pool.connection()
with conn.cursor() as cursor:
cursor.execute(sql, sql_args)
result = cursor.fetchone()
return result # return是在finnally之后执行
except Exception as e: # 异常的时候返回None
return jsonify({"status": 500, "data": "sql获取一条数据失败。{}".format(e)})
def __del__(self):
self.pool.close()
1抖音、小红书最新APP抓包方法2搭建最新的Vision和Reality防止VPS...3v2ray 反向代理/内网穿透 方法详解4(已被封)北京西二旗和上海张江程序...5最新burpsuite pro 瘦身版 2023.6 ...
1(已被封)北京西二旗和上海张江程序员的终极悲惨宿命
2frida-server 使用详解
3Frida objection 使用详解
4frida-dexdump 安卓app脱壳 详解
5frida Hook某金融APP加密算法
6牙龈红肿 牙龈疼 牙花子疼治疗方案
7嗓子有痰如何祛痰 祛痰方法
8jumpserver最新re-auth复现(伪随机经典案例)
【升级版】最新特别完善版XSS平台源代码 - 61,780 views解决:failed to handler mux client connection > v2ray/core/proxy/vmess - 58,069 viewsiOS 苹果帐号共享 (下载:Shadowrocket、Kitsunebi、Panda、IGnition)等专用 - 54,775 views关于俺 - 52,128 views绕过微信各种限制 – 无限无实名注册微信帐号 - 49,461 views安卓抓包 封包 改包 无限发送数据包 - 43,592 views