添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
乐观的苦瓜  ·  Governo de Timor-Leste·  2 月前    · 
眼睛小的铁链  ·  澳洲Go8-C9 ...·  4 月前    · 
茫然的筷子  ·  伟大的悲剧_百度百科·  4 月前    · 
愤怒的豆芽  ·  获取用户 - Microsoft ...·  1 年前    · 

【注意:此文章为博主原创文章!转载需注意,请带原文链接,至少也要是txt格式!】

自己重新封装了一个Mysql的命令执行的类,但是经常会自动关闭mysql数据库的连接,自己写法完全按照官方的来的,但是就是会出问题,自己也是一脸懵逼,后来启用dbutils解决此问题了,但是又出新问题,就是查询返回数据是tuple,而且是0,1,2,3,4……等对应查询的值,并非字段返回字段对应值,艾玛,想着不可能有这么大缺陷,然后一顿狂看接口API,最终找到解决方法。下看如下代码。

DBUtils 是 Python 的一个用于实现数据库连接池的模块 , 此连接池有两种连接模式

模式一 🍀

为每个线程创建一个连接 , 线程即使调用了 close 方法 , 也不会关闭 , 只是把连接重新放到连接池 , 供自己线程再次使用 , 当线程终止时 , 连接自动关闭

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PersistentDB(
    # 使用链接数据库的模块
    creator=pymysql,  
    # 一个链接最多被重复使用的次数,None表示无限制
    maxusage=None,  
    # 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
    setsession=[],  
    # ping MySQL服务端,检查是否服务可用
    # 0 = None = never, 
    # 1 = default = whenever it is requested, 
    # 2 = when a cursor is created, 
    # 4 = when a query is executed, 
    # 7 = always
    ping=0,
    # 如果为False,conn.close()实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接
    # 如果为True,conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    closeable=False,
    # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    threadlocal=None,  
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8',
    cursorclass=DictCursor    ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()
func()

模式二 🍀

创建一批连接到连接池 , 供所有线程共享使用 (由于 pymysql , MySQLdb 等 threadsafety 值为 1 , 所以该模式连接池中的线程会被所有线程共享)

import time
import pymysql
import threading
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    # 使用链接数据库的模块
    creator=pymysql,  
    # 连接池允许的最大连接数,0和None表示不限制连接数
    maxconnections=6,  
    # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    mincached=2,  
    # 链接池中最多闲置的链接,0和None不限制
    maxcached=5,  
    # 链接池中最多共享的链接数量,0和None表示全部共享
    # PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享
    maxshared=3,
    # 连接池中如果没有可用连接后,是否阻塞等待;True,等待;False,不等待然后报错
    blocking=True,  
    # 一个链接最多被重复使用的次数,None表示无限制
    maxusage=None,  
    # 开始会话前执行的命令列表,如:["set datestyle to ...", "set time zone ..."]
    setsession=[],  
    # ping MySQL服务端,检查是否服务可用
    # 0 = None = never, 
    # 1 = default = whenever it is requested, 
    # 2 = when a cursor is created, 
    # 4 = when a query is executed, 
    # 7 = always
    ping=0,
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8',
    cursorclass=DictCursor    ###重点注意这里,如果没加此行,则查询表返回数据就是tuple格式,加了此行返回数据就是list字段对应值。
def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则等待或报raise TooManyConnections异常
    # 否则则优先去初始化时创建的链接中获取链接 SteadyDBConnection,
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回,
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回,
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用
    conn = POOL.connection()
    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()
func()

以上数据来源:https://github.com/lyonyang/blogs/blob/8042919c035f4586196ccf34d85ec4dbe95d06ea/04-Web-Framework/Flask/DBUtils.md

下面奉上我自己封装的python3 mysql调用类:

# -*-coding:utf-8-*-
import pymysql
import socket
import struct
import time
from flask import jsonify
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
class MysqlPool:
    def __init__(self, dbname, username, password, host='127.0.0.1', port=3306, charset='utf8'):
        self.pool = PooledDB(
            # 使用链接数据库的模块
            creator=pymysql,
            host=host,
            port=port,
            user=username,
            password=password,
            database=dbname,
            charset=charset,
            cursorclass=DictCursor
    def Time_Y_M_D(self, time):
        return time.strftime('%Y-%m-%d %H:%M:%S', time)
    def ip2int(self, ip):
        return struct.unpack("!I", socket.inet_aton(ip))[0]
    def int2ip(self, ip_num):
        return socket.inet_ntoa(struct.pack("!I", ip_num))
    def linux_10_nowtime(self):
        return int(time.time())
    def sql_execute(self, sql, sql_args=None):
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "sql命令执行失败。{}".format(e)})
        finally:
            # cursor.close()
            print("")
    def insert_execute(self, sql, sql_args=None):
            conn = self.pool.connection()
            # 和sql_execute一样的,分开来写,仅仅是为了可读性
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "添加失败。{}".format(e)})
    def update_execute(self, sql, sql_args=None):
            conn = self.pool.connection()
            # 和sql_execute一样的,分开来写,仅仅是为了可读性
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args=None)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "更新失败。{}".format(e)})
    def delete_execute(self, sql, sql_args=None):
            conn = self.pool.connection()
            # 和sql_execute一样的,分开来写,仅仅是为了可读性
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            conn.commit()
            return 1
        except Exception as e:
            return jsonify({"status": 500, "data": "删除失败。{}".format(e)})
    def select_count(self, sql, sql_args=None):
            num = ""
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            result = cursor.fetchall()
            if isinstance(result[0], dict):
                num = result[0]["count(*)"]
            elif isinstance(result[0], list):
                num = result[0][0]
            return num  # return是在finnally之后执行
        except Exception as e:  # 异常的时候返回None
            return jsonify({"status": 500, "data": "数量查询失败。{}".format(e)})
    def select_execute(self, sql, sql_args=None):
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            result = cursor.fetchall()  # result的type为tuple,已经是个具体的集合了
            return result
        except Exception as e:  # 异常的时候返回None
            return jsonify({"status": 500, "data": "sql执行失败。{}".format(e)})
    def select_one(self, sql, sql_args=None):
            conn = self.pool.connection()
            with conn.cursor() as cursor:
                cursor.execute(sql, sql_args)
            result = cursor.fetchone()
            return result  # return是在finnally之后执行
        except Exception as e:  # 异常的时候返回None
            return jsonify({"status": 500, "data": "sql获取一条数据失败。{}".format(e)})
    def __del__(self):
        self.pool.close()

# -*-coding:utf-8-*- import pymysql import socket import struct import time from flask import jsonify from pymysql.cursors import DictCursor from DBUtils.PooledDB import PooledDB class MysqlPool: def __init__(self, dbname, username, password, host='127.0.0.1', port=3306, charset='utf8'): self.pool = PooledDB( # 使用链接数据库的模块 creator=pymysql, host=host, port=port, user=username, password=password, database=dbname, charset=charset, cursorclass=DictCursor def Time_Y_M_D(self, time): return time.strftime('%Y-%m-%d %H:%M:%S', time) def ip2int(self, ip): return struct.unpack("!I", socket.inet_aton(ip))[0] def int2ip(self, ip_num): return socket.inet_ntoa(struct.pack("!I", ip_num)) def linux_10_nowtime(self): return int(time.time()) def sql_execute(self, sql, sql_args=None): conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "sql命令执行失败。{}".format(e)}) finally: # cursor.close() print("") def insert_execute(self, sql, sql_args=None): conn = self.pool.connection() # 和sql_execute一样的,分开来写,仅仅是为了可读性 with conn.cursor() as cursor: cursor.execute(sql, sql_args) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "添加失败。{}".format(e)}) def update_execute(self, sql, sql_args=None): conn = self.pool.connection() # 和sql_execute一样的,分开来写,仅仅是为了可读性 with conn.cursor() as cursor: cursor.execute(sql, sql_args=None) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "更新失败。{}".format(e)}) def delete_execute(self, sql, sql_args=None): conn = self.pool.connection() # 和sql_execute一样的,分开来写,仅仅是为了可读性 with conn.cursor() as cursor: cursor.execute(sql, sql_args) conn.commit() return 1 except Exception as e: return jsonify({"status": 500, "data": "删除失败。{}".format(e)}) def select_count(self, sql, sql_args=None): num = "" conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) result = cursor.fetchall() if isinstance(result[0], dict): num = result[0]["count(*)"] elif isinstance(result[0], list): num = result[0][0] return num # return是在finnally之后执行 except Exception as e: # 异常的时候返回None return jsonify({"status": 500, "data": "数量查询失败。{}".format(e)}) def select_execute(self, sql, sql_args=None): conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) result = cursor.fetchall() # result的type为tuple,已经是个具体的集合了 return result except Exception as e: # 异常的时候返回None return jsonify({"status": 500, "data": "sql执行失败。{}".format(e)}) def select_one(self, sql, sql_args=None): conn = self.pool.connection() with conn.cursor() as cursor: cursor.execute(sql, sql_args) result = cursor.fetchone() return result # return是在finnally之后执行 except Exception as e: # 异常的时候返回None return jsonify({"status": 500, "data": "sql获取一条数据失败。{}".format(e)}) def __del__(self): self.pool.close()

  • 1抖音、小红书最新APP抓包方法
  • 2搭建最新的Vision和Reality防止VPS...
  • 3v2ray 反向代理/内网穿透 方法详解
  • 4(已被封)北京西二旗和上海张江程序...
  • 5最新burpsuite pro 瘦身版 2023.6 ...
  • 1(已被封)北京西二旗和上海张江程序员的终极悲惨宿命
  • 2frida-server 使用详解
  • 3Frida objection 使用详解
  • 4frida-dexdump 安卓app脱壳 详解
  • 5frida Hook某金融APP加密算法
  • 6牙龈红肿 牙龈疼 牙花子疼治疗方案
  • 7嗓子有痰如何祛痰 祛痰方法
  • 8jumpserver最新re-auth复现(伪随机经典案例)
  • 【升级版】最新特别完善版XSS平台源代码 - 61,780 views
  • 解决:failed to handler mux client connection > v2ray/core/proxy/vmess - 58,069 views
  • iOS 苹果帐号共享 (下载:Shadowrocket、Kitsunebi、Panda、IGnition)等专用 - 54,775 views
  • 关于俺 - 52,128 views
  • 绕过微信各种限制 – 无限无实名注册微信帐号 - 49,461 views
  • 安卓抓包 封包 改包 无限发送数据包 - 43,592 views
  •