添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
	def test09_EquallyShared(self):
		for threadsafety in (1, 2):
			dbapi.threadsafety = threadsafety
			shareable = threadsafety > 1
			pool = PooledDB(dbapi, 5, 5, 5)
			self.assertEqual(len(pool._idle_cache), 5)
			for i in range(15):
				db = pool.connection(False)
				db.cursor().execute('select test')
				db.close()
			self.assertEqual(len(pool._idle_cache), 5)
			for i in range(5):
				con = pool._idle_cache[i]
				self.assertEqual(con._usage, 3)
				self.assertEqual(con._con.num_queries, 3)
			cache = []
			for i in range(35):
				db = pool.connection()
				db.cursor().execute('select test')
				cache.append(db)
				del db
			self.assertEqual(len(pool._idle_cache), 0)
			if shareable:
				self.assertEqual(len(pool._shared_cache), 5)
				for i in range(5):
					con = pool._shared_cache[i]
					self.assertEqual(con.shared, 7)
					con = con.con
					self.assertEqual(con._usage, 10)
					self.assertEqual(con._con.num_queries, 10)
			del cache
			self.assertEqual(len(pool._idle_cache), 5)
			if shareable:
				self.assertEqual(len(pool._shared_cache), 0)
def generate_word_url():
    pool = PooledDB(MySQLdb, 1, host="localhost", user="******", passwd="123456", db="abnormal")
    conn = pool.connection()
    cur = conn.cursor()
    objFoleder = "Report"
    index = 1
    count = 0
    for objFile in os.listdir(objFoleder):
        reportId = objFile.split("_")[1]
        soup = BeautifulSoup(open(os.path.join(objFoleder, objFile)))
        pageTags = soup.find_all(class_="page")
        pageCountA, pageCountB = 1, 1
        if len(pageTags) > 0:
            pageCountA = len(pageTags[0].find_all("a")) - 1
        if len(pageTags) > 1:
            pageCountB = len(pageTags[1].find_all("a")) - 1
        for i in xrange(2, pageCountA + 1):
            url = "http://service.account.weibo.com/aj/showblog?type=0&rid=%s&page=%d&_t=0" % (reportId, i)
            sql = 'insert into wordlinks values (%d, "%s", 0)' % (index, url)
            cur.execute(sql)
            index += 1
        for i in xrange(2, pageCountB + 1):
            url = "http://service.account.weibo.com/aj/showblog?type=1&rid=%s&page=%d&_t=0" % (reportId, i)
            sql = 'insert into wordlinks values (%d, "%s", 0)' % (index, url)
            cur.execute(sql)
            index += 1
        print count
        count += 1
    conn.commit()
    cur.close()
    conn.close()
class PooledConnectionPolicy(DatabaseConnectionPolicyIface):
    """This connection policy maintains a pool of connections that are doled out
  as needed for each transaction.  NOTE: Appropriate for multi-threaded
  applications. NOTE: The connections are NOT shared concurrently between
  threads.
    def __init__(self):
        """ Consruct an instance. The instance's open() method must be
    called to make it ready for acquireConnection() calls.
        self._logger = _getLogger(self.__class__)
        self._logger.debug("Opening")
        self._pool = PooledDB(**_getCommonSteadyDBArgsDict())
        self._logger.info("Created %s", self.__class__.__name__)
        return
    def close(self):
        """ Close the policy instance and its database connection pool. """
        self._logger.info("Closing")
        if self._pool is not None:
            self._pool.close()
            self._pool = None
        else:
            self._logger.warning("close() called, but connection policy was alredy closed")
        return
    def acquireConnection(self):
        """ Get a connection from the pool.
    Parameters:
    ----------------------------------------------------------------
    retval:       A ConnectionWrapper instance. NOTE: Caller
                    is responsible for calling the  ConnectionWrapper
                    instance's release() method or use it in a context manager
                    expression (with ... as:) to release resources.
        self._logger.debug("Acquiring connection")
        dbConn = self._pool.connection(shareable=False)
        connWrap = ConnectionWrapper(
            dbConn=dbConn, cursor=dbConn.cursor(), releaser=self._releaseConnection, logger=self._logger
        return connWrap
    def _releaseConnection(self, dbConn, cursor):
        """ Release database connection and cursor; passed as a callback to
    ConnectionWrapper
        self._logger.debug("Releasing connection")
        # Close the cursor
        cursor.close()
        # ... then return db connection back to the pool
        dbConn.close()
        return
def generate_accuser_url():
    pool = PooledDB(MySQLdb, 1, host="localhost", user="******", passwd="123456", db="abnormal")
    conn = pool.connection()
    cur = conn.cursor()
    # objFoleder = 'Report'
    objFile = open("ParseResult")
    index = 1
    # for objFile in os.listdir(objFoleder):
    count = 0
    while True:
        line = objFile.readline()
        if not line:
            break
            # print replace_struct_time(line)
        parDict = eval(replace_struct_time(line))
        reportId = parDict["reportId"]
        # soup = BeautifulSoup(os.path.join(objFoleder, objFile))
        # countText = soup.find(class_='W_f12 W_textb').text
        # accuserCount = int(patternNumber.search(countText).group())
        accuserCount = min(parDict["accuserCount"], 20)
        if accuserCount > 1:
            for i in xrange(accuserCount - 1):
                url = "http://service.account.weibo.com/aj/reportuser?rid=%s&page=%d&_t=0" % (reportId, i)
                sql = 'insert into userlinks values (%d, "%s", 0)' % (index, url)
                cur.execute(sql)
                index += 1
        print count
        count += 1
    conn.commit()
    cur.close()
    conn.close()
for threadsafety in (1, 2): dbapi.threadsafety = threadsafety setsession = ('set time zone', 'set datestyle') pool = PooledDB(dbapi, 0, 0, 0, 1, False, None, setsession) self.assertEqual(pool._setsession, setsession) db = pool.connection(False) self.assertEqual(db._setsession_sql, setsession) self.assertEqual(db._con._con.session, ['time zone', 'datestyle']) db.cursor().execute('select test') db.cursor().execute('set test1') self.assertEqual(db._usage, 2) self.assertEqual(db._con._con.num_uses, 4) self.assertEqual(db._con._con.num_queries, 1) self.assertEqual(db._con._con.session, ['time zone', 'datestyle', 'test1']) db.close() db = pool.connection(False) self.assertEqual(db._setsession_sql, setsession) self.assertEqual(db._con._con.session, ['time zone', 'datestyle', 'test1', 'rollback']) db._con._con.close() db.cursor().execute('select test') db.cursor().execute('set test2') self.assertEqual(db._con._con.session, ['time zone', 'datestyle', 'test2']) dbapi.threadsafety = threadsafety for maxusage in (0, 3, 7): pool = PooledDB(dbapi, 0, 0, 0, 1, False, maxusage) self.assertEqual(pool._maxusage, maxusage) self.assertEqual(len(pool._idle_cache), 0) db = pool.connection(False) self.assertEqual(db._con._maxusage, maxusage) self.assertEqual(len(pool._idle_cache), 0) self.assertEqual(db._con._con.open_cursors, 0) self.assertEqual(db._usage, 0) self.assertEqual(db._con._con.num_uses, 0) self.assertEqual(db._con._con.num_queries, 0) for i in range(20): cursor=db.cursor() self.assertEqual(db._con._con.open_cursors, 1) cursor.execute('select test%i' % i) r = cursor.fetchone() self.assertEqual(r, 'test%i' % i) cursor.close() self.assertEqual(db._con._con.open_cursors, 0) if maxusage: j = i % maxusage + 1 else: j = i + 1 self.assertEqual(db._usage, j) self.assertEqual(db._con._con.num_uses, j) self.assertEqual(db._con._con.num_queries, j) db.cursor().callproc('test') self.assertEqual(db._con._con.open_cursors, 0) self.assertEqual(db._usage, j + 1) self.assertEqual(db._con._con.num_uses, j + 1) self.assertEqual(db._con._con.num_queries, j) dbapi.threadsafety = threadsafety pool = PooledDB(dbapi, 0, 1) self.assertEqual(len(pool._idle_cache), 0) db = pool.connection(False) self.assertEqual(len(pool._idle_cache), 0) self.assertEqual(db._con._con.open_cursors, 0) cursor = db.cursor() self.assertEqual(db._con._con.open_cursors, 1) cursor.execute('set doit1') db.commit() cursor.execute('set dont1') cursor.close() self.assertEqual(db._con._con.open_cursors, 0) del db self.assertEqual(len(pool._idle_cache), 1) db = pool.connection(False) self.assertEqual(len(pool._idle_cache), 0) self.assertEqual(db._con._con.open_cursors, 0) cursor = db.cursor() self.assertEqual(db._con._con.open_cursors, 1) cursor.execute('set doit2') cursor.close() self.assertEqual(db._con._con.open_cursors, 0) db.commit() session = db._con._con.session db.close() self.assertEqual(session, [ 'doit1', 'commit', 'dont1', 'rollback', 'doit2', 'commit', 'rollback']) if MySQLDB_.pool is None: pool = PooledDB(MySQLdb, host=host, user=user, passwd=passwd, charset=charset, port=3306, db=db, mincached=1, maxcached=20, maxshared=2, maxconnections=2) return pool.connection() if Mysql.__pool is None: __pool = PooledDB(creator=MySQLdb, mincache=1, maxcached=20, host=Config.DBHOST, port=Config.DBPORT, user=Config.DBPWD, db=Config.DBNAME, use_unicode=False, charset=Config.DBCHAR, cursorclass=DictCursor) return __pool.connection() global SQLPOOL if SQLPOOL is None: SQLPOOL = PooledDB(creator=MySQLdb ,mincached=self.__cachemin , maxcached=0 ,maxshared=0,maxconnections=0,blocking=True,maxusage=0, host=self.__host , port=self.__port , user=self.__user , passwd=self.__passwd, db=self.__db,use_unicode=False,charset=self.__charset ,cursorclass=DictCursor return SQLPOOL.connection()
    def __getConn():
        if Mysqldb.__pool is None:
            __pool = PooledDB(MySQLdb, mincached=1, maxcached=20, maxconnections=20
                              , host=MysqlConfig.host, port=MysqlConfig.port
                              , user=MysqlConfig.user, passwd=MysqlConfig.password
                              , db=MysqlConfig.dbname#, use_unicode=False, charset=MysqlConfig.charset
                              , cursorclass=DictCursor)
        return __pool.connection()
mincached=config.db_mincached, maxcached=config.db_maxcached, maxshared=config.db_maxshared, maxconnections=config.db_maxconnections, cursorclass=cursors.DictCursor def execute(self, sql, param): db = self._db_pool.connection() cursor = db.cursor() result = cursor.execute(sql, param) except Exception as e: print "MySQL Error Execute [%s] %r" % (sql, param) db.close() raise RError(1) db.commit() db.close() return result def query(self, sql, param): db = self._db_pool.connection() cursor = db.cursor() result = cursor.execute(sql, param) except Exception as e: print "MySQL Error [%s] %r" % (sql, param) db.close() raise RError(1) result = cursor.fetchall() db.close() return result def begin(self): db = self._db_pool.connection() except Exception as e: print "MySQL Error when begin an execute." db.close() raise RError(1) return RDataBaseConnection(db) def commit(self, con): return con.commit()
	def test16_ThreeThreadsTwoConnections(self):
		for threadsafety in (1, 2):
			dbapi.threadsafety = threadsafety
			pool = PooledDB(dbapi, 2, 2, 0, 2, True)
			from Queue import Queue, Empty
			queue = Queue(3)
			def connection():
					queue.put(pool.connection(), 1, 1)
				except Exception:
					queue.put(pool.connection(), 1)
			from threading import Thread
			for i in range(3):
				Thread(target=connection).start()
				db1 = queue.get(1, 1)
				db2 = queue.get(1, 1)
			except TypeError:
				db1 = queue.get(1)
				db2 = queue.get(1)
			self.assertNotEqual(db1, db2)
			db1_con = db1._con
			db2_con = db2._con
			self.assertNotEqual(db1_con, db2_con)
				self.assertRaises(Empty, queue.get, 1, 0.1)
			except TypeError:
				self.assertRaises(Empty, queue.get, 0)
			del db1
				db1 = queue.get(1, 1)
			except TypeError:
				db1 = queue.get(1)
			self.assertNotEqual(db1, db2)
			self.assertNotEqual(db1._con, db2._con)
			self.assertEqual(db1._con, db1_con)
			pool = PooledDB(dbapi, 2, 2, 1, 2, True)
			db1 = pool.connection(False)
			db2 = pool.connection(False)
			self.assertNotEqual(db1, db2)
			db1_con = db1._con
			db2_con = db2._con
			self.assertNotEqual(db1_con, db2_con)
			Thread(target=connection).start()
				self.assertRaises(Empty, queue.get, 1, 0.1)
			except TypeError:
				self.assertRaises(Empty, queue.get, 0)
			del db1
				db1 = queue.get(1, 1)
			except TypeError:
				db1 = queue.get(1)
			self.assertNotEqual(db1, db2)
			self.assertNotEqual(db1._con, db2._con)
			self.assertEqual(db1._con, db1_con)
if Mysql.__pool is None: __pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 , host="172.16.130.87" , port=3306 , user="******" , passwd="hta@123" , db="finance_spiderdata",use_unicode=True,charset="utf8",cursorclass=DictCursor) return __pool.connection()
class TRtgHandler:
    def __init__(self,config,queue):
        self.pool = PooledDB(creator=MySQLdb,mincached=10,host=config.MYSQL_SERVER,user=config.MYSQL_USER,passwd=config.MYSQL_PASSWORD,db=config.MYSQL_DATABASE)
        self.queue = queue
    def response(self,r_id):
        conn = self.pool.connection()
        cur = conn.cursor()
        cur.execute('select d_id,response.user_id,replyDate,response.content,cat_id,category.thumb,conversation.title from response inner join (conversation inner join category using (cat_id)) using(d_id) \
                    where r_id=%s',(r_id))
        res = cur.fetchone()
        user = database.fetchUserNoCache(cur,res[1])
        escaped = util.escape(res[3])
        newContent = util.replaceMentions(cur,escaped)
        shortContent = util.replaceMentions(cur,escaped,True)
        cur.close()
        conn.close()
        payload = {'date':res[2].isoformat(),'content':newContent,'short':shortContent,'user':user,'r_id':r_id,'d_id':res[0]}
        self.queue.put(event.Message('/conversation/%d' % (res[0]), 'response',payload))
        happening_data = {'user':user,'date':res[2].isoformat(),'category_image':res[5],'category_id':res[4],'d_id':res[0],'title': res[6],'r_id':r_id,'content':newContent}
        self.queue.put(event.Message('/happening','happening',{'type':'response','data':happening_data}))
    def conversation(self,d_id):
        conn = self.pool.connection()
        cur = conn.cursor()
        cur.execute('select user_id,postDate,content,category.thumb,cat_id,title from conversation inner join category using (cat_id) \
                     where d_id=%s',(d_id,))
        convo = cur.fetchone()
        user = database.fetchUserNoCache(cur,convo[0])
        newContent = util.escape(convo[2])
        payload = {'id':d_id,'date':convo[1].isoformat(),'title':convo[5],'user':user,'content':newContent,'short':util.replaceMentions(cur,newContent,True)}
        cur.close()
        conn.close()
        self.queue.put(event.Message('/category/%d' % (convo[4]),'conversation',payload))
        happening_data = {'user':user,'date':convo[1].isoformat(),'category_image':convo[3],'d_id':d_id,'title':convo[5],'content':newContent}
        self.queue.put(event.Message('/happening','happening',{'type':'post','data':happening_data}))
    def auth(self,auth):
        self.queue.put(event.NewAuthKey(auth.user_id,auth.key))
    def userModified(self,user_id):
        conn = self.pool.connection()
        cur = conn.cursor()
        user = database.fetchUserNoCache(cur,user_id)
        cur.close()
        conn.close()
        self.queue.put(event.Message('/user/%d' % user_id,'user',user))
mincached=config.db_mincached, maxcached=config.db_maxcached, maxshared=config.db_maxshared, maxconnections=config.db_maxconnections, cursorclass=cursors.DictCursor def execute(self, sql, param): db = self._db_pool.connection() cursor = db.cursor() cursor.execute("INSERT into input_params VALUE (now(), %s, %s)", (str(sql), str(param))) result = cursor.execute(sql, param) except Exception as e: print "MySQL Error Execute [%s] %r" % (sql, param) db.close() raise RError(1) db.commit() db.close() return result def query(self, sql, param): db = self._db_pool.connection() cursor = db.cursor() cursor.execute("INSERT into input_params VALUE (now(), %s, %s)", (str(sql), str(param))) result = cursor.execute(sql, param) except Exception as e: print "MySQL Error [%s] %r" % (sql, param) db.close() raise RError(1) result = cursor.fetchall() db.close() return result def begin(self): db = self._db_pool.connection() except Exception as e: print "MySQL Error when begin an execute." db.close() raise RError(1) return RDataBaseConnection(id) def commit(self, con): return con.commit() if Mysql.__pool is None: dbConfig = json.load(open('config.json', 'r')).get('db') #载入配置文件 __pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 , host=dbConfig['host'] , port=dbConfig['port'] , user=dbConfig['user'], passwd=dbConfig['passwd'] , db=dbConfig['db'],charset=dbConfig['charset'],cursorclass=DictCursor) return __pool.connection()
 def reconnect(self):
     """Closes the existing database connection and re-opens it."""
     #self.close()
     # self._db = cx_Oracle.connect(self.user, self.password, self.dsn, threaded=True)
     # self._db.autocommit = True
     if getattr(self, "_pool", None) is not None:
         self._db = self._pool.connection()
     else:
         pool = PooledDB(cx_Oracle, user=self.user, password=self.password, dsn=self.dsn, mincached=2,
                         maxcached=20, maxshared=20, maxconnections=20)
         self._pool=pool
         self._db = pool.connection()
def __new__(clz): if not DataAccess.__singleInstance: DataAccess.__singleInstance = object.__new__(clz) return DataAccess.__singleInstance def __init__(self): # mysql self.pool = PooledDB(MySQLdb, self.pool_size, db=db_set.db_dbname , user=db_set.db_user, passwd=db_set.db_pwd, host=db_set.db_ip, charset="utf8") def InsertRow(self, insertStr): conn = self.pool.connection() cursor = conn.cursor() cursor.execute('SET NAMES utf8') cursor.execute(insertStr) conn.commit() return cursor.lastrowid except: print("InsertRow: Unexpected error:" , sys.exc_info(), sys.exc_traceback.tb_lineno) return 0 finally: if conn: conn.close() def SelectRow(self, selectStr, where = None): conn = self.pool.connection() cursor = conn.cursor() cursor.execute('SET NAMES utf8') if where is not None: cursor.execute(selectStr, where) else: cursor.execute(selectStr) res = cursor.fetchall() return res except: print("SelectRow: Unexpected error:" , sys.exc_info(), sys.exc_traceback.tb_lineno) finally: if conn: conn.close() def debug(self, *print_me): if self.debug_level > 0: print print_me '''登录微博''' paramDict = read_config() if not login(paramDict['username'], paramDict['password']): exit() '''与数据库建立连接和指针''' pool = PooledDB(MySQLdb, int(paramDict['threadnum']), host = paramDict['dbhost'], user = paramDict['dbuser'], passwd = paramDict['dbpasswd'], db = paramDict['dbname']) conn = pool.connection() cur = conn.cursor() '''读取未爬取的链接列表放入队列''' urlQLock = threading.Lock() tableName = 'users' sql = 'select id, uid from %s where isCrawled = 0' % tableName cur.execute(sql) result = cur.fetchall() urlQ = Queue(len(result)) for entry in result: urlQ.put(entry) '''建立线程''' for i in xrange(int(paramDict['threadnum'])): thr = DownloadThread(pool, urlQ, urlQLock) threadPool.append(thr) thr.start() '''检查是否存在结束的线程,若有,则重新建立新的线程''' while True: sleep(60) '''当队列为空时,跳出循环''' if not urlQ.qsize(): break if threading.activeCount() < int(paramDict['threadnum']) + 1: '''检查哪个线程已经结束,将其清除''' i = 0 for thr in threadPool: if not thr.isAlive(): thr.clear() del threadPool[i] newThr = DownloadThread(pool, urlQ, urlQLock) threadPool.append(newThr) newThr.start() else: i += 1 except: print sys.exc_info()[0] for thr in threadPool: thr.end() break print 'Main thread end!' if Mysql.__pool is None: cf = Mysql.__getConf() __pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, host=cf.get("mysqldb", "host"), port=int(cf.get("mysqldb", "port")), user=cf.get("mysqldb", "user"), passwd=cf.get("mysqldb", "passwd"), db=cf.get("mysqldb", "db"), use_unicode=False, charset=cf.get("mysqldb", "charset"), cursorclass=DictCursor) return __pool.connection()
 def getConn():
     if ConnFactortyReport.__pool is None :
         __pool = PooledDB(creator=DBConfig.dbapi, mincached=DBConfig.mincached , maxcached=DBConfig.maxcached ,
                           host=DBConfig.host , port=DBConfig.port , user=DBConfig.username , passwd=DBConfig.password ,
                           db=DBConfig.database_name, use_unicode=DBConfig.use_unicode, charset=DBConfig.charset)
     tryCount = 0; 
     while(tryCount < 50):
             return  __pool.connection()
         except:
             print sys.exc_info()[0], sys.exc_info()[1] 
             tryCount += 1
     raise Error("数据库链接错误!");
shareable = threadsafety > 1 pool = PooledDB(dbapi, 10) self.assertEqual(len(pool._idle_cache), 10) pool.close() self.assertEqual(len(pool._idle_cache), 0) pool = PooledDB(dbapi, 10) closed = ['no'] def close(what=closed): what[0] = 'yes' pool._idle_cache[7]._con.close = close self.assertEqual(closed, ['no']) del pool self.assertEqual(closed, ['yes']) pool = PooledDB(dbapi, 10, 10, 5) self.assertEqual(len(pool._idle_cache), 10) if shareable: self.assertEqual(len(pool._shared_cache), 0) cache = [] for i in range(5): cache.append(pool.connection()) self.assertEqual(len(pool._idle_cache), 5) if shareable: self.assertEqual(len(pool._shared_cache), 5) else: self.assertEqual(len(pool._idle_cache), 5) pool.close() self.assertEqual(len(pool._idle_cache), 0) if shareable: self.assertEqual(len(pool._shared_cache), 0) pool = PooledDB(dbapi, 10, 10, 5) closed = [] def close_idle(what=closed): what.append('idle') def close_shared(what=closed): what.append('shared') if shareable: cache = [] for i in range(5): cache.append(pool.connection()) pool._shared_cache[3].con.close = close_shared else: pool._idle_cache[7]._con.close = close_shared pool._idle_cache[3]._con.close = close_idle self.assertEqual(closed, []) del pool if shareable: del cache self.assertEqual(closed, ['idle', 'shared']) if MysqlClient.__pool is None: __pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 , host=Config.DBHOST , port=Config.DBPORT , user=Config.DBUSER , passwd=Config.DBPWD , db=Config.DBNAME,use_unicode=True,charset=Config.DBCHAR,cursorclass=DictCursor) if MysqlClient.__mutex is None: MysqlClient.__mutex = threading.Lock() return __pool.connection() if DbManager.__pool is None: __pool = PooledDB(creator=MySQLdb, mincached=pf.getProfileValue('db', 'mincached', 'int'), maxcached=pf.getProfileValue('db', 'maxcached', 'int'), cursorclass=DictCursor, use_unicode=True, charset=pf.getProfileValue('db', 'charset', 'string'), **pf.getDbUserInfo()) return __pool.connection() dbapi.threadsafety = threadsafety shareable = threadsafety > 1 pool = PooledDB(dbapi, 0, 0, 5) cache = [] for i in range(35): db = pool.connection() db.cursor().execute('select test1') db.cursor().execute('select test2') db.cursor().callproc('test3') cache.append(db) del db self.assertEqual(len(pool._idle_cache), 0) if shareable: self.assertEqual(len(pool._shared_cache), 5) for i in range(5): con = pool._shared_cache[i] self.assertEqual(con.shared, 7) con = con.con self.assertEqual(con._usage, 21) self.assertEqual(con._con.num_queries, 14) cache[3] = cache[8] = cache[33] = None cache[12] = cache[17] = cache[34] = None self.assertEqual(len(pool._shared_cache), 5) self.assertEqual(pool._shared_cache[0].shared, 7) self.assertEqual(pool._shared_cache[1].shared, 7) self.assertEqual(pool._shared_cache[2].shared, 5) self.assertEqual(pool._shared_cache[3].shared, 4) self.assertEqual(pool._shared_cache[4].shared, 6) for db in cache: if db: db.cursor().callproc('test4') for i in range(6): db = pool.connection() db.cursor().callproc('test4') cache.append(db) del db for i in range(5): con = pool._shared_cache[i] self.assertEqual(con.shared, 7) con = con.con self.assertEqual(con._usage, 28) self.assertEqual(con._con.num_queries, 14) del cache if shareable: self.assertEqual(len(pool._idle_cache), 5) self.assertEqual(len(pool._shared_cache), 0) else: self.assertEqual(len(pool._idle_cache), 35)
class DBUtils():
    def __init__(self):
        self.pool = PooledDB(pymysql,50,host='127.0.0.1',user='******',passwd='lgroot',db='twitter',port=3306,charset="utf8")
    def getTwitterById(self, twitterId):
        db = self.pool.connection()
        cue = db.cursor()
            cue.execute("SELECT * FROM twitter WHERE twitter_id = '%s'"%twitterId)  
            results = cue.fetchall()
            if len(results) == 0:
                return False
            else:
                return True
        except Exception as e:  
            print('Insert error:',e)  
            db.rollback()  
        else:  
            db.commit()  
    # 保存twitter贴文
    def saveTwitter(self, item):
        db = self.pool.connection()
        cue = db.cursor()
            GMT_FORMAT = '%I:%M %p - %d %b %Y'
            #格式化推文时间
            t = time.strptime(item['twitter_time'], GMT_FORMAT)
            #当前时间
            dt=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 
            cue.execute("insert into twitter (twitter_id,twitter_author,twitter_content,twitter_time,twitter_reply,twitter_trunsmit,twitter_zan,twitter_img,create_date\
                ) values(%s, %s, %s, %s, %s, %s, %s, %s, %s)", \
                (item["twitter_id"], item["twitter_author"], item["twitter_content"], t, item["twitter_reply"], item["twitter_trunsmit"], item["twitter_zan"], item["twitter_img"], dt))  
            print("insert success")#测试语句  
        except Exception as e:  
            print('Insert error:',e)  
            db.rollback()  
        else:  
            db.commit()  
    def getSeendNameAll(self):
        db = self.pool.connection()
        cue = db.cursor()
            cue.execute("SELECT seed_twitter_name FROM twitter_seed")  
            results = cue.fetchall()
            return results
        except Exception as e:  
            print('Insert error:',e)  
            db.rollback()  
        else:  
            db.commit()  
    # 设置该种子所有历史爬过
    def updateSeedTag(self, name):
        db = self.pool.connection()
        cue = db.cursor()
            cue.execute("UPDATE twitter_seed SET seed_twitter_tag = 1 WHERE seed_twitter_name = '%s'"%name)  
        except Exception as e:  
            print('Insert error:',e)  
            db.rollback()  
        else:  
            db.commit()  
    # 种子爬取次数加一并且修改爬取位置
    def updateSeedCountLocation(self, name, twitterId):
        db = self.pool.connection()
        cue = db.cursor()
            cue.execute("UPDATE twitter_seed SET seed_twitter_count = seed_twitter_count + 1, seed_twitter_location = '%s' WHERE seed_twitter_name = '%s'"%(twitterId, name))  
        except Exception as e:  
            print('Insert error:',e)  
            db.rollback()  
        else:  
            db.commit()  
    # 判断当前的推文之不是之前抓的最后贴文
    def isSeedLocation(self, spider_name, next_page_id):
        db = self.pool.connection()
        cue = db.cursor()
            cue.execute("SELECT * FROM twitter_seed WHERE seed_twitter_name = '%s' AND seed_twitter_location = '%s'"%(spider_name, next_page_id))  
            if len(cue.fetchall()) == 1:
                return True
            else:
                return False
        except Exception as e:  
            print('Insert error:',e)  
            db.rollback()  
        else:  
            db.commit()  
self._cursor.execute('SELECT @@IDENTITY AS id') result = self._cursor.fetchall() return result[0]['id'] # return result except Exception as e: print('------------获取影响的记录的id') print(e) def insertOne(self, sql): self._cursor.execute(sql) return self.__getInsertId() except Exception as e: print('-------------插入一条数据出现问题') print(e) return False def insertMany(self, sql, values): count = self._cursor.execute(sql, values) return count except Exception as e: print('--------------插入多条数据出现问题') print(e) def update(self, sql, param=None): return self.__query(sql, param) except Exception as e: print('--------------更新出现问题') print(e) return False def __query(self, sql, param=None): if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) return count except Exception as e: print('-------------获取影响的id,出现问题') print(e) def delete(self, sql, param=None): id = self.__query(sql, param) return id except Exception as e: print('--------删除语句,出现问题') print(e) def begin(self): self._conn.autocommit(1) except Exception as e: print('------------------开启事务,出现问题') print(e) def end(self, option='commit'): if option == 'commit': self._conn.commit() else: self._conn.rollback() except Exception as e: print('-----------------结束事务,出现问题') print(e) def errdispose(self, isEnd=1): if isEnd == 1: self.end('roollback') self._cursor.close() self._conn.close() except Exception as e: print('-------------事务回滚,关闭连接出现问题') print(e) def dispose(self, isEnd=1): if isEnd == 1: self.end('commit') else: self.end('roolback') self._cursor.close() self._conn.close() except Exception as e: print('----------------释放资源出现问题') print(e) def isexist(self, sql, param=None): if param is None: result = self._cursor.execute(sql) else: result = self._cursor.execute(sql, param) return result except Exception as e: print('--------------判断记录是否存在出现问题') print(e) return 0 pooldb=PooledDB(pymysql,10,host="localhost",user="******",passwd="123123",db="vega") # pool=pooldb.connection() except Exception as e: print(e) if __name__ == '__main__': # redis连接池 redis_conn_pool = redis.ConnectionPool(host=setting.redis_host, port=setting.redis_port, max_connections=5) # sql连接池 sql_conn_pool = PooledDB(creator=pymssql, mincached=2, maxcached=5, maxconnections=6, blocking=True, maxshared=3, host=setting.db_host, port=setting.db_port, user=setting.db_user, password=setting.db_password, database=setting.db_database) # 创建表具基本资料及配置处理线程 get_thread = threading.Thread(target=DatabaseOperation.meter_get, args=(redis_conn_pool, sql_conn_pool, 0.1)) update_thread = threading.Thread(target=DatabaseOperation.meter_update, args=(redis_conn_pool, sql_conn_pool, 0.1)) get_thread.start() update_thread.start() while True:
 def __init__(self):
     #mysql数据库
     self.Pool=PooledDB(creator=pymysql, mincached=Config.DB_MIN_CACHED , maxcached=Config.DB_MAX_CACHED,maxshared=Config.DB_MAX_SHARED, maxconnections=Config.DB_MAX_CONNECYIONS,blocking=Config.DB_BLOCKING, maxusage=Config.DB_MAX_USAGE,setsession=Config.DB_SET_SESSION,host=Config.DB_TEST_HOST , port=Config.DB_TEST_PORT ,user=Config.DB_TEST_USER , passwd=Config.DB_TEST_PASSWORD ,db=Config.DB_TEST_DBNAME , use_unicode=False, charset=Config.DB_CHARSET)
 def __init__(self, db_type, cursor_type, **kwargs):
     self.pool = PooledDB(db_type, **kwargs)
     self.conn = self.pool.connection()
     self.cr = self.conn.cursor(cursor_type)
import mysql.connector from DBUtils.PooledDB import PooledDB from dota2api.src.exceptions import APIError from api_caller import get_match_details, get_latest_match from db_operations import insert_match, get_latest_match_id, get_earliest_match_id PROCESS_NUMBER = multiprocessing.cpu_count() DB_CONNECTIONS_NUMBER = PROCESS_NUMBER + 2 DATABASE_POOL = PooledDB(mysql.connector, DB_CONNECTIONS_NUMBER, db='dota', user='******', autocommit=True) # My private API key. API_KEY = '352CBD8EBE2C282BB58AF974C9DE1FD2' DOTA_API = dota2api.Initialise(api_key=API_KEY) def getting_match(args): Call get_match_details() to get a dict() of match data. :param args: Contains a dota2api instance and a match id. :return: An array of match data if exists. raw_match = get_match_details(args[0], args[1])
class Db(object):
    def __init__(self, db_host, db_port, db_name, db_user, db_passwd, charset):
        self.conn = PooledDB(
            creator=pymysql,
            maxconnections=10,  # 连接池允许的最大连接数, 0和none表示没有限制
            mincached=2,  # 初始化时,连接池至少创建的空闲连接,0表示不创建
            maxcached=5,  # 连接池空闲的最多连接数,0和none表示不限制
            blocking=True,  # 连接池中如果没有可用共享连接后是否阻塞等待,True表示等待,反之则为报错弹出
            host= db_host,
            port=int(db_port),
            user= db_user,
            passwd= db_passwd,
            database= db_name,
            charset= charset
        ).connection()
        self.cursor = self.conn.cursor()
    # 新增数据
    def db_insert(self, tableName, dataDict):
        str_field = ""
        str_value = ""
        for filed,value in dataDict.items():
            str_field += "`" + filed + "`,"
            if (type(value) == type("kkk")):
                str_value += "'" + str(value) + "'" + ","
            elif(type(value) == type(123)):
                str_value += str(value) + ","
        sql = "INSERT INTO `"+ tableName +"`(" + str_field[:-1] + ")VALUE(" + str_value[:-1] + ")"
        self.cursor.execute(sql)
        self.conn.commit()
        get_rows = self.cursor.rowcount
        if get_rows == 1 :
            return True
        else:
            return False
        # print(str_value)
    # 更新数据
    def db_updata(self):
        pass;
    # 提取数据 return 元组
    def db_getdata(self, tableName, field):
        sql = "SELECT " + field + " FROM " + tableName;
        print(sql)
        self.cursor.execute(sql)
        data_tuple= self.cursor.fetchall()
        return data_tuple
    # 删除数据
    def db_deldata(self):
        pass;
    # 查询数据
    def db_selectdata(self):
        pass;
    # 回收数据库资源
    def __del__(self):
        self.cursor.close()
        self.conn.close()
无,创建该画师记录 2. get_total 查询pixiv表中有多少表该画师id字段信息 3. update_latest_id 符合更新条件,更新pxusers表中该画师id的最新插画id 4. 队列中,check_illust 查询pixiv表中是否有该pid的记录 5. 队列中,update_illust 每次作品网络请求,都会进行该pid数据的更新 6. 队列中,insert_illust 满足插入条件,向pixiv表中插入该pid的记录 收藏作品-数据库流程: 1. 判断更新条件 2. 队列中,check_illust 查询pixiv表中是否有该pid的记录 3. 队列中,update_illust 每次作品网络请求,都会进行该pid数据的更新 4. 队列中,insert_illust 满足插入条件,向pixiv表中插入该pid的记录 def __init__(self, thread_num=16): self.class_name = self.__class__.__name__ if DB_ENABLE == False: return log_str(TEMP_MSG["DB_INST"].format(self.class_name)) self.pool = PooledDB( creator=pymysql, maxconnections=thread_num, # 连接池允许的最大连接 mincached=1, # 连接池中的初始空闲连接数 maxcached=1, # 连接池中最大闲置连接数 # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 blocking=True, host=DB_HOST,user=DB_USER,passwd=DB_PASSWD,db=DB_DATABASE,port=DB_PORT,charset=DB_CHARSET except pymysql.err.OperationalError as e: log_str(TEMP_MSG["DB_CONNECT_ERROR_INFO"].format(e)) exit() def get_conn(self): 从数据库连接池中取出一个链接 # connection()获取数据库连接 conn = self.pool.connection() cur = conn.cursor(DictCursor) return conn,cur def check_user(self, u): 数据库中画师记录的latest_id与接口返回的latest_id是否一致 相同 --> False,不需要更新或下载该画师的作品 判断pxusers表是否含有该画师uid的记录 无 --> sql_2 有 --> sql_3 :params u: 用户数据 :return: latest_id conn,cur = self.get_conn() # 查询画师记录sql sql_1 = "SELECT COUNT(uid) FROM pxusers WHERE uid=%s" # 插入画师记录sql sql_2 = '''INSERT INTO pxusers(uid,userName,latest_id,path) VALUES(%s,%s,%s,%s)''' # 查询latest_id sql sql_3 = "SELECT latest_id FROM pxusers WHERE uid=%s" uid = u["uid"] data = ( u["uid"],u["userName"],u["latest_id"],u["path"] # 确认数据库是否有该画师记录 cur.execute(sql_1,uid) res = cur.fetchall() e = res[0]["COUNT(uid)"] # log_str("查询结果 :{}".format(e)) if e >= 1: # 返回数据库中查询的latest_id cur.execute(sql_3,uid) d = cur.fetchall()[0] latest_id = d["latest_id"] return latest_id else: cur.execute(sql_2,data) conn.commit() except Exception as e: log_str(e) conn.rollback() # 默认全更新 return u["latest_id"] else: return u["latest_id"] finally: cur.close() conn.close() def get_total(self, u): 查询数据库中有多少条[画师uid]的数据 :params u: 作品数据 :return: 画师作品数量 conn,cur = self.get_conn() sql = '''SELECT COUNT(1) FROM pixiv WHERE uid=%s''' data = u["uid"] cur.execute(sql,data) d = cur.fetchall()[0] # d_total = d["COUNT(*)"] d_total = d["COUNT(1)"] return d_total def update_latest_id(self, u): 更新latest_id :params u: 作品数据 :return: conn,cur = self.get_conn() # 更新latest _id sql sql = """UPDATE pxusers SET latest_id=%s WHERE uid=%s""" data = ( u["latest_id"],u["uid"] cur.execute(sql,data) conn.commit() except Exception as e: log_str("{} | {}".format(e,u)) conn.rollback() finally: cur.close() conn.close() def check_illust(self, value, key="pid", table="pixiv", database=None): 查询数据库中是否有该id的作品,table为非pixiv,bookmark时采用通用sql :parmas key: 对应字段名 :parmas value: 对应记录值 :parmas table: 数据表 :return: (True,path)/(False,"") Result--fetchall获取的原始数据 data in db: [{'COUNT(1)': 1, 'path': 'None'}] data not in db: () conn,cur = self.get_conn() if key == "": return False,"" if value == "": return False,"" # 切换数据库 if database != None: conn.select_db(database) # 查询id sql if table in ["pixiv","bookmark"]: # path为下载地址,不存在该记录时为None sql = """SELECT COUNT(1),path FROM {} """.format(table) + """WHERE {}=%s GROUP BY path""".format(key) else: sql = """SELECT COUNT(1) FROM {} """.format(table) + """WHERE {}=%s""".format(key) # log_str(sql) data = (value) cur.execute(sql,data) except Exception as e: log_str("{}:check_illust | {}".format(self.class_name,e)) return False,"" else: # 未使用GROUP BY path,非严格模式报1140 # 使用GROUP BY path,不存在对应pid记录时,fetchall结果为() d = cur.fetchall() if d != () and d[0]["COUNT(1)"] >= 1: return True,d[0].get("path","") else: return False,"" finally: cur.close() conn.close() def insert_illust(self, u, table="pixiv"): :params u 数据 :parmas table: 操作数据表 :return: True/False conn,cur = self.get_conn() sql = '''INSERT INTO {} '''.format(table) + '''(uid,userName,pid,purl,title,tag,pageCount,\ illustType,is_r18,score,illust_level,viewCount,bookmarkCount,likeCount,\ commentCount,urls,original,path) VALUES(%s,%s,%s,%s,\ %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''' data = ( u["uid"],u["userName"],u["pid"],u["purl"],u["title"],u["tag"], u["pageCount"],u["illustType"],u["is_r18"],u["score"],u["illust_level"], u["viewCount"],u["bookmarkCount"],u["likeCount"],u["commentCount"], u["urls"],u["original"],u["path"] cur.execute(sql,data) conn.commit() except Exception as e: log_str("{} | {}".format(e,u)) conn.rollback() return False else: return True finally: cur.close() conn.close() def update_illust(self, u, table="pixiv"): 更新作品数据,主要是浏览数,收藏数,评论数,喜欢数,path :params u:作品数据 :parmas table: 操作数据表 :return: True/False 更新11个字段 tag,pageCount,illustType,is_r18,score,illust_level, viewCount,bookmarkCount,likeCount,commentCount,path conn,cur = self.get_conn() # 更新sql sql = """UPDATE {} """.format(table) + """SET tag=%s,pageCount=%s,\ illustType=%s,is_r18=%s,score=%s,illust_level=%s,viewCount=%s,\ bookmarkCount=%s,likeCount=%s,commentCount=%s,path=%s WHERE pid=%s""" # 更新数据 data = ( u["tag"],u["pageCount"],u["illustType"],u["is_r18"],u["score"],u["illust_level"], u["viewCount"],u["bookmarkCount"],u["likeCount"],u["commentCount"],u["path"],u["pid"] cur.execute(sql,data) conn.commit() except Exception as e: log_str(TEMP_MSG["DB_UPDATE_ILLUST_ERROR_INFO"].format(self.class_name,u["pid"],e)) log_str(u) conn.rollback() return False else: return True finally: cur.close() conn.close() def select_illust(self, pid, table="pixiv"): 查询作品数据,对接API接口方法 :params pid:作品pid :parmas table: 操作数据表 :return : conn,cur = self.get_conn() sql = """SELECT * FROM {} """.format(table) + """WHERE pid=%s""" data = (pid,) cur.execute(sql,data) except Exception as e: log_str(e) return else: r = cur.fetchall() if len(r) != 0: # API处增加[0]下标 # res = r[0] return r else: return finally: cur.close() conn.close() def random_illust(self, extra=None, limit=None, illust_level=None, is_r18=True, table="pixiv" 对接API-random接口 :params extra: 指定tag组(str),如原创,碧蓝航线;最多两个 :params limit: 指定最低收藏数(str), :params illust_level: 指定单个或多个评分等级(str) str;如:SR或R,SR,SSR,UR :params is_r18: 是否开启R18; 默认True开启,False为关闭,关闭则会过滤掉tag中包含'R-18'的结果 :parmas table: 数据表 返回符合条件的所有pid 删除urls,path,t2.id等非必要字段/中间字段 conn,cur = self.get_conn() sql = """SELECT pid FROM {} WHERE 1 = 1 """.format(table) # 指定tag e = """AND tag LIKE "%{}%" """ if extra: ex = extra.split(",")[:2] for i in ex: sql = sql + e.format(i) # 指定最低收藏数限制 if limit: limit_sql = """AND bookmarkCount > {} """.format(str(limit)) sql += limit_sql # 指定评分等级 if illust_level: illust_level = ",".join(["'{}'".format(_) for _ in illust_level.split(",")]) illust_level_sql = """AND illust_level in ({}) """.format(str(illust_level)) sql += illust_level_sql # 关闭r18 if not is_r18: is_r18_sql = """AND tag NOT LIKE "%R-18%" """ sql += is_r18_sql print(sql) cur.execute(sql) pid_list = cur.fetchall() if len(pid_list) == 0: return [] else: return pid_list def delete_user_illust(self, key="uid", value=None, table="pixiv"): 删除指定user的所有/单条作品记录 :params key: 用于判断的key,默认为uid :params value: 用于判断的值 :params table: 指定数据表,默认为pixiv :return: 默认None,异常则False if value == None: return False conn,cur = self.get_conn() sql = """DELETE FROM {} WHERE {} = %s""".format(table,str(key)) data = (value,) cur.execute(sql,data) conn.commit() except Exception as e: log_str("{} | {}".format(e,(key,value))) conn.rollback() return False else: return True finally: cur.close() conn.close() def pixiv_re_proxy(self, u): 根据作品数据反代 动图单图:pixiv.cat/{id}.{Suffix} 多图:pixiv.cat/{id}-{num}.{Suffix} :params u:作品数据 :returnL 反代链接 h = "https://pixiv.cat/" pid = u["pid"] suffix = u["original"].split(".")[-1] if u["pageCount"] > 1: num = random.randint(1,u["pageCount"]) # 暂时为1 num = 1 reverse_url = "{}{}-{}.{}".format(h,pid,num,suffix) else: reverse_url = "{}{}.{}".format(h,pid,suffix) return reverse_url # DBClient = db_client() def __init__(self, host, user, passwd, db_name, port, charset): self.db_pool = PooledDB(pymysql, host=host, user=user, passwd=passwd, db=db_name, port=port, charset=charset) conn = self.db_pool.connection() cursor = conn.cursor() conn.commit() def select_mysql(self, table_name, where_str): conn = self.db_pool.connection() cursor = conn.cursor() # now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sql = "select * from {table_name} {where_str}".format( table_name=table_name, where_str=where_str) # print(sql) cursor.execute(sql) datas = cursor.fetchall() cursor.close() conn.close() return datas except Exception as e: logger.info(" msyql {}".format(e)) return False table_name: 查询表 search_str: 查询内容 where_str: 条件 def select_mysql_many(self, table_name, search_str, where_str): conn = self.db_pool.connection() cursor = conn.cursor() # now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sql = "set sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION';" cursor.execute(sql) sql = "select {search_str} from {table_name} {where_str}".format( search_str=search_str, table_name=table_name, where_str=where_str) # print(sql) cursor.execute(sql) datas = cursor.fetchall() cursor.close() conn.close() return datas except Exception as e: logger.info(" msyql {}".format(e)) return False def select_mysql_count(self, table_name, where_str): conn = self.db_pool.connection() cursor = conn.cursor() # now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sql = "select count(*) from {table_name} {where_str}".format( table_name=table_name, where_str=where_str) # print(sql) cursor.execute(sql) numbers = cursor.fetchone() cursor.close() conn.close() return numbers[0] except Exception as e: logger.info(" msyql {}".format(e)) return False def insert_mysql(self, table_name, datas): conn = self.db_pool.connection() cursor = conn.cursor() nub = 0 for data in datas: keys = ','.join(data.keys()) values = ','.join(['%s'] * len(data)) sql = 'insert ignore into {table_name} ({keys}) values({values})'.format( table_name=table_name, keys=keys, values=values) # print(sql) nub_ = cursor.execute(sql, tuple(data.values())) conn.commit() nub += nub_ except Exception as e: logger.info(" msyql {}".format(e)) print(data) cursor.close() conn.close() return nub def update_mysql(self, sql): conn = self.db_pool.connection() cursor = conn.cursor() numbers = cursor.execute(sql) conn.commit() cursor.close() conn.close() return numbers except Exception as e: logger.info(" msyql {}".format(e)) return False # print(PROXY) def get_shanghai_next_level_res(url, params): all_area_data = np.zeros(0, dtype=dict) # print(url) # url = "https://baike.baidu.com/item/%E6%B5%A6%E4%B8%9C%E6%96%B0%E5%8C%BA/5232458?fromtitle=%E4%B8%8A%E6%B5%B7%E5%B8%82%E6%B5%A6%E4%B8%9C%E6%96%B0%E5%8C%BA&fromid=15403686&fr=aladdin" header = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36" # conn.set_character_set('utf8') csor2.execute('SET NAMES utf8') csor2.execute("SET CHARACTER SET utf8") csor2.execute("SET character_set_connection=utf8") def initCap(): sqCat = dict() db=2, decode_responses=True) # self.red = redis.Redis(host="127.0.0.1", port=6379, db=2, decode_responses=True) print('连接成功') print('连接mysql服务器') db_config = { "host": '172.18.115.15', "port": 3306, "user": '******', "passwd": '123456', "db": 'Im', "charset": 'utf8' # db_config = {"host": '127.0.0.1', "port": 3306, "user": '******', "passwd": '123456', "db": 'sys', # "charset": 'utf8'} self.pool = PooledDB(pymysql, 5, **db_config) # 5为连接池里的最少连接数 self.conn = self.pool.connection() # 以后每次需要数据库连接就是用connection()函数获取连接就好了 self.cur = self.conn.cursor() def process_item(self, item, spider): # 获取redis所有为NewsTitle的key值 keys = self.red.keys('NewsTitle') # 转变类型为字符串 key = ''.join(keys) # lrange获取所有key值为NewsTitle的内容 value = self.red.lrange('%s' % key, '0', '-1') # 判断内容是否为空 if len(value) >= 0: NewsTitless = base64.b64encode( item['NewsTitle'].encode('utf-8')) NewsTitles = str(NewsTitless, 'utf-8') # 判断爬取的title是否在redis key值为NewsTitle里在提示已存在,不在执行添加 if NewsTitles not in value: if item['NewsTitle'] == '' or item[ 'NewsContent'] == '' or item['NewsContent'] == '': else: i = datetime.datetime.now() b = "%s0%s" % (i.year, i.month) self.red.lpush( 'NewsTitle', base64.b64encode( item['NewsTitle'].encode('utf-8'))) sql1 = 'insert ignore into tbl_NewsDetails{0}(NewsID, NewsCategory, SourceCategory, NewsType, NewsTitle, NewsRawUrl, SourceName, InsertDate, NewsContent, NewsDate, NewsClickLike, NewsBad, NewsRead, NewsOffline)' \ 'VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'.format(b) sql2 = 'insert into tbl_NewsFileManager{0}(FileID, FileType, FileDirectory, FileDirectoryCompress, FileDate, FileLength, FileUserID, Description, NewsID,image_url)' \ 'VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'.format(b) self.cur.execute( sql1, (item['NewsID'], item["NewsCategory"], item["SourceCategory"], item["NewsType"], item["NewsTitle"], item["NewsRawUrl"], item["SourceName"], item["InsertDate"], item["NewsContent"], item['NewsDate'], item['NewsClickLike'], item['NewsBad'], item['NewsRead'], item['NewsOffline'])) for dic in item['FileList']: self.cur.execute( sql2, (dic['FileID'], dic["FileType"], dic["FileDirectory"], dic["FileDirectoryCompress"], dic["FileDate"], dic["FileLength"], dic["FileUserID"], dic["Description"], dic["NewsID"], dic["image_url"])) self.conn.commit() except Exception as e: print(e) print("执行sql语句失败") items = json.dumps(dict(item)) self.red.lpush(b + 'news' + item['NewsCategory'], items) return item else: print('redis数据已存在') else: print('出错') except: print('错误操作') def close_conn(self, spider): # 关闭链接 self.conn.close() # 关闭游标 self.cur.close()
class MysqlTool:
    def __init__(self):
        # self.connect = pymysql.connect(host="139.196.91.125", user="******", password="******",
        #                                database="weibo", port=3306)
        # self.pool = PooledDB(pymysql, 5, host="139.196.91.125", user='******',
        #                      passwd='keith123', db='weibo', port=3306)
        # self.connect = pymysql.connect(host="127.0.0.1", user="******", password="******",
        #                                database="chiccess", port=3306)
        # self.pool = PooledDB(pymysql, 5, host="127.0.0.1", user='******',
        #                      passwd='woaixuexi', db='chiccess', port=3306)
        self.connect = pymysql.connect(host="127.0.0.1",
                                       user="******",
                                       password="",
                                       database="chiccess",
                                       port=3306)
        self.pool = PooledDB(pymysql,
                             host="127.0.0.1",
                             user='******',
                             passwd='',
                             db='chiccess',
                             port=3306)
    def save_user_profile(self, json_data):
        data = json_data['graphql']['user']
        conn = self.pool.connection()
        cursor = conn.cursor()
            _sql = 'insert into ins_user_profile(id,profile_pic_url,username,full_name,follow,followed_by,media_num,video_num)values ("%s","%s","%s","%s","%s","%s","%s","%s")'
            cursor.execute(
                _sql % (data['id'], data['profile_pic_url'], data['username'],
                        data['full_name'], data['edge_follow']['count'],
                        data['edge_followed_by']['count'],
                        data['edge_owner_to_timeline_media']['count'],
                        data['edge_felix_video_timeline']['count']))
        except Exception:
            _sql = 'update ins_user_profile set follow="%s",followed_by="%s",media_num="%s",video_num="%s" where username="******"'
            cursor.execute(
                _sql %
                (data['edge_follow']['count'],
                 data['edge_followed_by']['count'],
                 data['edge_owner_to_timeline_media']['count'],
                 data['edge_felix_video_timeline']['count'], data['username']))
        conn.commit()
    def get_ins_cookie(self):
        conn = self.pool.connection()
        cursor = conn.cursor()
        cursor.execute("select cookie from ins_cookies")
        ret = cursor.fetchall()
        cursor.close()
        conn.close()
        ret = [i[0] for i in ret]
        return ret
    def save_pics(self, ret_list):
        print('save_pics{}'.format(ret_list[0]['username']))
        r = []
        for i in ret_list:
                r.append(
                    (i['short'], i['time'], pymysql.escape_string(i['text']),
                     i['content'], i['user_id'], i['username'], i['like_num'],
                     i['comment_num'], i['pic_tagged']))
            except Exception:
        ret_list = r
            cursor = self.connect.cursor()
            sql_template = "insert into ins_pics(short,time,text,content,user_id,username,like_num,comment_num,pic_tagged)values (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
            cursor.executemany(sql_template, ret_list)
            self.connect.commit()
        except Exception:
            self.connect.rollback()
            for ret in ret_list:
                    cursor.execute(sql_template % (ret))
                except Exception as e:
                    print(e.args)
                    print(traceback.format_exc())
            self.connect.commit()
    def save_tagged(self, ret_list):
        print('save_tagged{}'.format(ret_list[0]['username']))
        ret_list = [(i['short'], i['time'], pymysql.escape_string(i['text']),
                     i['content'], i['_typename'], i['user_id'], i['username'],
                     i['owner_id'], i['owner_name'], i['comment_num'])
                    for i in ret_list]
            cursor = self.connect.cursor()
            sql_template = "insert into ins_tagged(short,time,text,content,typename,user_id,username,owner_id,owner_name,comment_num)values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
            cursor.executemany(sql_template, ret_list)
            self.connect.commit()
        except Exception:
            self.connect.rollback()
            for ret in ret_list:
                    cursor.execute(sql_template % (ret))
                except Exception:
                    print('inert error')
            self.connect.commit()
    def get_short(self, type):
        conn = self.pool.connection()
        cursor = conn.cursor()
        cursor.execute(
            " select short from ins_pics where short not in(select short from ins_{}); "
            .format(type))
        ret = cursor.fetchall()
        cursor.close()
        conn.close()
        return ret
    def save_started(self, ret_list):
        '''item['owner_id'] = i['node']['id']
            item['owner_name'] = i['node']['username']
            item['full_name'] = i['node']['full_name']
            item['profile_url'] = i['node']['profile_pic_url']
            item['short']'''
        print('save_star')
        conn = self.pool.connection()
        cursor = conn.cursor()
        ret_list = [(i['short'], i['owner_id'], i['owner_name'],
                     i['full_name'], i['profile_url']) for i in ret_list]
            sql_template = "insert into ins_liked(short,user_id,username,fullname,profile_url)values (%s,%s,%s,%s,%s)"
            cursor.executemany(sql_template, ret_list)
            conn.commit()
        except Exception:
        finally:
            cursor.close()
            conn.close()
    def save_comments(self, ret_list):
        print('save_comment')
        conn = self.pool.connection()
        cursor = conn.cursor()
        ret_list = [(i['_id'], i['short'], i['time'],
                     pymysql.escape_string(i['comment']), i['owner'],
                     i['owner_name'], i['liked']) for i in ret_list]
            sql_template = "insert into ins_comment(id,short,time,comment,user_id,username,like_num)values (%s,%s,%s,%s,%s,%s,%s)"
            cursor.executemany(sql_template, ret_list)
            conn.commit()
        except Exception:
            self.connect.rollback()
            for ret in ret_list:
                    sql_tem = "insert into ins_comment(id,short,time,comment,user_id,username,like_num)values ('%s','%s','%s','%s','%s','%s','%s')"
                    cursor.execute(sql_tem % (ret))
                except Exception:
                    print('inert error')
            conn.commit()
        finally:
            cursor.close()
            conn.close()
POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession= [], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='******', password='******', database='youku30', charset='utf8', autocommit='True') # 传入需要连接的数据库的名称dbname和待执行的sql语句sql def __init__(self, configUtil, host, port, username, password, db): self.config = configUtil self.host = host self.port = port self.db = db self.username = username self.password = password self.logger = Logger("db", configUtil).getlog() self.pool = PooledDB( creator=pymysql, # 指定数据库连接驱动 mincached=1, # 连接池中空闲的最多连接数,0和None表示没有限制 maxcached=20, # 连接池允许的最大连接数,0和None表示没有限制 host=self.host, port=int(self.port), user=self.username, passwd=self.password, db=self.db, use_unicode=True, charset="utf8") def get_connection(self): mysql_con = self.pool.connection() return mysql_con except Exception as e: self.logger.error(e) for i in range(3): time.sleep(5) mysql_con = self.pool.connection() return mysql_con except Exception as e: self.logger.error(e) self.logger.error("数据库连接异常执行" + str(i + 1) + "次连接") sys.exit(1) def release_connection(self, connection): connection.close() except Exception as e: self.logger.error(e) self.logger.error("mysql connection 关闭异常") sys.exit(1) def query(self, sql): # def query(self, sql): # results = '' # conn = pymysql.connect(host=self.host, # port=self.port, # user=self.username, # passwd=self.password, # db=self.db) # cursor = conn.cursor() # try: # # 执行SQL语句 # cursor.execute(sql) # # 获取所有记录列表 # results = cursor.fetchall() # except Exception as data: # print('Error: 执行查询失败,%s' % data) # conn.close() # return results connection = self.get_connection() cursor = connection.cursor() cursor.execute(sql) rows = cursor.fetchall() cursor.close() self.release_connection(connection) return rows except Exception as e: self.logger.error("执行查询:%s 出错" % (e)) sys.exit(1) def insert_dict_into_table(self, table_name, data_dict): cols = ','.join(data_dict.keys()) qmarks = ','.join(['%s'] * len(data_dict)) insert_sql = 'insert into %s (%s) values(%s)' % (table_name, cols, qmarks) self.insert(insert_sql, data_dict.values()) def insert(self, sql, values): connection = self.get_connection() cursor = connection.cursor() cursor.execute(sql, values) connection.commit() cursor.close() self.release_connection(connection) except Exception as e: self.logger.error("执行查询:%s 出错:%s" % (sql, e)) connection.rollback() sys.exit(1) finally: self.release_connection(connection) def delete(self, sql): connection = self.get_connection() cursor = connection.cursor() cursor.execute(sql) connection.commit() cursor.close() self.release_connection(connection) except Exception as e: self.logger.error("执行查询:%s 出错:%s" % (sql, e)) connection.rollback() sys.exit(1) finally: self.release_connection(connection) def __init__(self): _MYSQL = my_config.MYSQL_POOL self.POOL = PooledDB(creator=_MYSQL['creator'], host=_MYSQL['host'], port=_MYSQL['port'], user=_MYSQL['user'], password=_MYSQL['password'], database=_MYSQL['database'], charset=_MYSQL['charset'], mincached=_MYSQL['mincached'], maxcached=_MYSQL['maxcached'], maxshared=_MYSQL['maxshared']) def __new__(cls, *args, **kw): 启用单例模式 :param args: :param kw: :return: if not hasattr(cls, '_instance'): cls._instance = object.__new__(cls) return cls._instance def connect(self): :return: conn = self.POOL.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) return conn, cursor def connect_close(self, conn, cursor): :param conn: :param cursor: :return: cursor.close() conn.close() self.POOL = None def fetch_one(self, sql, args): 查询单条数据 :param sql: :param args: :return: conn, cursor = self.connect() cursor.execute(sql, args) result = cursor.fetchone() self.connect_close(conn, cursor) return result def fetch_all(self, sql, args): :param sql: :param args: :return: conn, cursor = self.connect() cursor.execute(sql, args) record_list = cursor.fetchall() self.connect_close(conn, cursor) return record_list def insert(self, sql, args): :param sql: :param args: :return: return self.execute(sql, args) def insert_many(self, sql, args): 批量新增数据 :param sql: :param args: :return: return self.execute_many(sql, args) def update(self, sql, args): :param sql: :param args: :return: return self.execute(sql, args) def delete(self, sql, args): :param sql: :param args: :return: return self.execute(sql, args) def execute(self, sql, args): 执行单条写入操作 :param sql: :param args: :return: conn, cursor = self.connect() row = cursor.execute(sql, args) conn.commit() except pymysql.err.Error: conn.rollback() return False finally: self.connect_close(conn, cursor) return row def execute_many(self, sql, args): 执行批量写入操作 :param sql: :param args: :return: conn, cursor = self.connect() row = cursor.executemany(sql, args) conn.commit() self.connect_close(conn, cursor) return row from datetime import datetime from datetime import timedelta from DBUtils.PooledDB import PooledDB ############### Global varables ####################3 match_head = {} #数据库头部结构字典 match_info = {} MATCH_TABLE = 't_match_nba' TODAY = '' TOMORROW = '' POOL = PooledDB(creator=pymysql, mincached=3, maxcached=3, maxshared=3, maxconnections=3, host='localhost', user='******', passwd='3G_', db='guessing',\ charset='utf8', port=3306, cursorclass=pymysql.cursors.DictCursor) g_conn = POOL.connection() #连接数据库 API_URL_FMT = 'http://matchweb.sports.qq.com/kbs/list?from=NBA_PC&columnId=100000&startTime={}&endTime={}&_=1542078952802' SQL_GET_MATCH_FMT = 'select home_club_name, match_id, guest_club_name, match_time from {} where from_unixtime(match_time, "%Y-%m-%d") BETWEEN "{}" and "{}"' SQL_INS_MATCH_FMT = 'insert into {} (home_club_name, home_club_logo, home_score, \ guest_club_name, guest_club_logo, guest_score, \ total_player, total_oc, win_bettor, win_oc, \ lose_bettor, lose_oc, match_time) \ value ({}, {},{},{},{},{},\ 0, 0,0,0, 0,0, {:d});"' ############### error infomation define #################### err_no_data = {"code": 101, "msg": "Unable to fetch data from database."} err_exec_failed = {"code": 102, "msg": "Failed to execute SQL Statement."} err_ivd_match_type = {"code": 105, "msg": "Invalid match type."} err_bet_time_out = {"code": 104, "msg": "Betting timed out."} self.base_store = BaseStore() self.article_table = 'articles' self.tag_table = 'tags' # 存储标签 self.art_cor_table = 'tag_article_correspond' # 存储原文章和伪原创文章对应关系 tag_article_correspond self.last_ignore_artid_table = 'last_and_ignore_artid' # 存储已经参与伪原创的最大article_id和无需伪原创的article_id self.article_pool = PooledDB(pymysql, 1, 5, **config.ARTICLE_DB_CONFIG) self.tag_pool = PooledDB(pymysql, 1, 5, **config.TAG_SORT_ARTICLE_CONFIG) def query_last_id(self): 查询last_id query_last_id_sql = 'select `last_id` from {}'.format(self.last_ignore_artid_table) connection = self.tag_pool.connection() result = self.base_store.query(query_last_id_sql, connection) if result is not None: last_id = result[0] else: last_id = 0 # 向表中插入一条数据 insert_one_data = {'last_id': 0} keys = 'last_id' values = '%s' insert_sql = 'insert into {table}({keys}) values ({values})'.format(table=self.last_ignore_artid_table, keys=keys, values=values) self.base_store.insert(insert_sql, insert_one_data, connection) return last_id def query_article(self): 从last_id开始查询文章 last_id = self.query_last_id() query_sql = "select `id`, `title`, `content` from {} where id > {} and title != ''".format(self.article_table, last_id) connection = self.article_pool.connection() result = self.base_store.query(query_sql, connection) return result except: print("query_article error") traceback.print_exc() def query_tag_id(self, tag): 查询tag_id query_tag_id_sql = 'select `id` from {} where tag="{}"'.format(self.tag_table, tag) connection = self.tag_pool.connection() result = self.base_store.query(query_tag_id_sql, connection) return result except: print("query_tag_id error") traceback.print_exc() def insert_tag(self, tag): 存储tag insert_tag_data = {"tag": tag} keys = ','.join(insert_tag_data.keys()) values = ','.join(['%s'] * len(insert_tag_data)) insert_tag_sql = 'insert ignore into {table}({keys}) values ({values})'.format(table=self.tag_table, keys=keys, values=values) connection = self.tag_pool.connection() self.base_store.insert(insert_tag_sql, insert_tag_data, connection) except: print("insert_tag error") traceback.print_exc() def insert_tagid_score(self, article_id, tag, score): 将article_id,tag和score的对应关系存储数据库 tag_id = self.query_tag_id(tag) insert_tag_article_data = { "article_id": article_id, "tag_id": tag_id[0], "score": score keys = ','.join(insert_tag_article_data.keys()) values = ','.join(['%s'] * len(insert_tag_article_data)) insert_tag_articleid_sql = 'insert ignore into {table}({keys}) values ({values})'.format( table=self.art_cor_table, keys=keys, values=values) connection = self.tag_pool.connection() self.base_store.insert(insert_tag_articleid_sql, insert_tag_article_data, connection) except: print("insert_tagid_score error") traceback.print_exc() def update_last_id(self, article_id): 更改last_id update_sql = 'update {} set last_id = {} where id > 0'.format(self.last_ignore_artid_table, article_id) connection = self.tag_pool.connection() self.base_store.update(update_sql, connection) except: print("update_is_used error") traceback.print_exc()
 def __init__(self):
     # self.pool = PooledDB(pymysql, 5, host='localhost', user='******', passwd='111', db='test', port=3306)  # 5为连接池里的最少连接数
     self.pool = PooledDB(pymysql, mincached = 1, maxcached = 20, \
                          host = mysql_config.DBHOST, port = mysql_config.DBPORT, user = mysql_config.DBUSER, passwd = mysql_config.DBPWD, \
                          db =mysql_config.DBNAME, use_unicode = True, charset = mysql_config.DBCHAR, cursorclass = DictCursor)
def getMyIp(self): r = requests.get("http://2017.ip138.com/ic.asp") self.MyIp = re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}', r.text)[0] def excuteSql(self, s): conn = self.pool.connection() cur = conn.cursor() SQL = s r = cur.execute(SQL) conn.commit() r = cur.fetchall() cur.close() conn.close() return r def check(self): global cur while self.Queue.empty() == False: ip = self.Queue.get() #print(ip) host = ip.split(":")[0] proxies = { # "http": "http://60.215.194.73:8888", # "https": "http://10.10.1.10:1080", "http": "http://" + ip r = requests.get('http://2017.ip138.com/ic.asp', timeout=10, proxies=proxies) r.encoding = "gb2312" delay = round(r.elapsed.total_seconds(), 2) except: continue print('ip:' + ip + '不可用') else: if re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}', r.text) == []: continue elif re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}', r.text)[0] == host: print('高匿ip:' + ip + ' delay:' + str(delay) + 's') sql = "INSERT INTO ip VALUE('" + ip + "',TRUE," + str( delay) + ",0,0) " self.excuteSql(sql) except Exception as e: print('ip重复:' + ip + str(e)) continue elif re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}', r.text)[0] == self.MyIp: print('普通ip:' + ip + ' delay:' + str(delay) + 's') sql = "INSERT INTO ip VALUE('" + ip + "',FALSE," + str( delay) + ",0,0) " self.excuteSql(sql) except: #没加这个就卡了 print('ip重复:' + ip) continue def getIP(self): #url1 print('正在获取ip') rIP = requests.get( 'http://www.89ip.cn/apijk/?&tqsl=100000&sxa=&sxb=&tta=&ports=&ktip=&cf=1' rIP.encoding = "gb2312" IPs = re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}:\d{1,5}', rIP.text) for i in range(len(IPs)): self.I = self.I + 1 self.Queue.put(IPs[i]) print('url1获取ip:' + str(self.I)) except: print('url1获取ip失败') #url2 p = {'http': '127.0.0.1:8080'} headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0', 'Host': 'www.66ip.cn' rIP = requests.get( 'http://www.66ip.cn/nmtq.php?getnum=999900&isp=0&anonymoustype=0&start=&ports=&export=&ipaddress=&area=0&proxytype=2&api=66ip', headers=headers, timeout=10) IPs = re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}:\d{1,5}', rIP.text) count = 0 for i in range(len(IPs)): self.I += 1 count = count + 1 self.Queue.put(IPs[i]) print('url2获取ip:' + str(count)) count = 0 except: print('url2获取ip失败') #url3 url1 = "http://www.xicidaili.com/nn/" header = { 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko)', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8', for i in range(1, 10): rurl = url1 + str(i) html = requests.get(rurl, headers=header).text soup = BeautifulSoup(html, 'html.parser') tags = soup.select('#ip_list')[0].select('tr') for tag in tags: ip = tag.select('td')[1].string + ":" + tag.select( 'td')[2].string self.I += 1 self.Queue.put(ip) count += 1 #sum += 1 except IndexError: print('url3获取ip:' + str(count)) count = 0 except: print('url3获取ip失败') #url4 url4 = "http://www.kuaidaili.com/free/inha/" header = { 'Host': 'www.kuaidaili.com', 'Cache-Control': 'max-age=0', 'Upgrade-Insecure-Requests': 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8', 'Cookie': 'yd_cookie=edf7538f-6e8f-42a6f381178111fa513b62651a51827dc817; _ydclearance=a46bb1ff737a3ffaf93464b4-f7f3-484d-b800-fd9da69f7504-1513701173; _gat=1; channelid=0; sid=1513692518622292; _ga=GA1.2.1184806693.1513693989; _gid=GA1.2.91565562.1513693989' for i in range(1, 6): rurl = url4 + str(i) html = requests.get(rurl, headers=header).text iplist = re.findall( '\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}</td>\n <td data-title="PORT">\d{0,6}', html) for ip in iplist: self.Queue.put( ip.replace( '</td>\n <td data-title="PORT">', ':'))) self.I += 1 count += 1 print('url4获取ip:' + str(count)) except Exception as e: print('url4获取ip失败:' + str(e)) print('.........get ip finished all:' + str(self.I)) def start(self): self.getMyIp() print('MyIp:' + self.MyIp) self.getIP() # 检测ip是否可用 for i in range(int(self.Threds)): t = threading.Thread(target=self.check) threadslist.append(t) t.start() for i in threadslist: i.join()
                        format='%(asctime)s - PID: %(process)d - %(levelname)s - %(pathname)s - lineno:%(lineno)d, %(message)s')
    parser = argparse.ArgumentParser(description="Import product info for recommendation engine")
    parser.add_argument('--vendordbuser', default='root')
    parser.add_argument('--vendordbpassword', default='admin')
    parser.add_argument('--vendordbhost', default='localhost')
    parser.add_argument('--vendordbdatabase', default='jinbag')
    parser.add_argument('--vendordbport', default='3306')
    args = parser.parse_args()
    pool = PooledDB(
        creator=MySQLdb,
        host=args.vendordbhost,
        port=int(args.vendordbport),
        user=args.vendordbuser,
        passwd=args.vendordbpassword,
        db=args.vendordbdatabase,
        charset='utf8'
    user_id = get_user_id(pool)
    cal_res = {}
    for it in user_id:
        cal_res[it] = 0
    events = ['view', 'plan', 'buy', 'fav']
    hbase_data = open("/home/zhenping/zhenping_07_08_2018/part-00000")
    for line in hbase_data:
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn() 释放连接对象;conn.close()或del conn # 连接池对象 __pool = None def __init__(self): # 数据库构造函数,从连接池中取出连接,并生成操作游标 self._conn = Mysql.__getConn(self) self._cursor = self._conn.cursor() @staticmethod def __getConn(self): @summary: 静态方法,从连接池中取出连接 @return MySQLdb.connection if self.__pool is None: self.__pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, host=config.MYSQL_HOST, port=config.MYSQL_PORT, user=config.MYSQL_USER, passwd=config.MYSQL_PWD, db=config.MYSQL_NAME, use_unicode=False, charset=config.MYSQL_CHAR, cursorclass=DictCursor) return self.__pool.connection() def getAll(self, sql, param=None): @summary: 执行查询,并取出所有结果集 @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param param: 可选参数,条件列表值(元组/列表) @return: result list(字典对象)/boolean 查询到的结果集 if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchall() else: result = False return result def getOne(self, sql, param=None): @summary: 执行查询,并取出第一条 @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param param: 可选参数,条件列表值(元组/列表) @return: result list/boolean 查询到的结果集 if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchone() else: result = False return result def getMany(self, sql, num, param=None): @summary: 执行查询,并取出num条结果 @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来 @param num:取得的结果条数 @param param: 可选参数,条件列表值(元组/列表) @return: result list/boolean 查询到的结果集 if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) if count > 0: result = self._cursor.fetchmany(num) else: result = False return result def insertOne(self, sql, value): @summary: 向数据表插入一条记录 @param sql:要插入的SQL格式 @param value:要插入的记录数据tuple/list @return: insertId 受影响的行数 self._cursor.execute(sql, value) return self.__getInsertId() def insertMany(self, sql, values): @summary: 向数据表插入多条记录 @param sql:要插入的SQL格式 @param values:要插入的记录数据tuple(tuple)/list[list] @return: count 受影响的行数 count = self._cursor.executemany(sql, values) return count def __getInsertId(self): 获取当前连接最后一次插入操作生成的id,如果没有则为0 self._cursor.execute("SELECT @@IDENTITY AS id") result = self._cursor.fetchall() return result[0]['id'] def __query(self, sql, param=None): if param is None: count = self._cursor.execute(sql) else: count = self._cursor.execute(sql, param) return count def update(self, sql, param=None): @summary: 更新数据表记录 @param sql: SQL格式及条件,使用(%s,%s) @param param: 要更新的 值 tuple/list @return: count 受影响的行数 return self.__query(sql, param) def delete(self, sql, param=None): @summary: 删除数据表记录 @param sql: SQL格式及条件,使用(%s,%s) @param param: 要删除的条件 值 tuple/list @return: count 受影响的行数 return self.__query(sql, param) def begin(self): @summary: 开启事务 self._conn.autocommit(0) def end(self, option='commit'): @summary: 结束事务 if option == 'commit': self._conn.commit() else: self._conn.rollback() def dispose(self, isEnd=1): @summary: 释放连接池资源 if isEnd == 1: self.end('commit') else: self.end('rollback') self._cursor.close() self._conn.close()
class DB_CONN(object):
    def __init__(self, **kwargs):
        parse = ParserConf(os.environ['DATA_PATH'])
        database = kwargs.get('database', 'DATABASE')
        ip = parse.get_config_value_by_key(database, "ip")
        port = parse.get_config_value_by_key(database, "port")
        user = parse.get_config_value_by_key(database, "user")
        password = parse.get_config_value_by_key(database, "password")
        charset = parse.get_config_value_by_key(database, "charset")
            self.__pool = PooledDB(creator=pymysql,
                                   maxusage=None,
                                   maxconnections=10,
                                   mincached=5,
                                   maxcached=10,
                                   maxshared=10,
                                   blocking=True,
                                   host=ip,
                                   port=int(port),
                                   user=user,
                                   passwd=password,
                                   charset=charset)
        except Exception:
            print('数据库连接失败')
    def db_query_count(self, sql):
        conn = self.__pool.connection()
        cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
            cur.execute(sql)
            conn.commit()
            return cur
        except Exception as e:
            print('数据查询失败', e)
        finally:
            cur.close()
            conn.close()
    def db_Query_Json(self, sql):
        获取数据json格式游标,使用需要fetchall()或fetchone()fetchmany()
        :param sql: 查询语句
        :return: 游标json格式 使用时需要使用fetchall()或fetchone()fetchmaeeny()
        conn = self.__pool.connection()
        cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
        len = 0
            if len == 0:
                for i in range(5):
                    len = cur.execute(sql)
                    if len > 0:
                        conn.commit()
                        return cur
                    time.sleep(1)
            return cur
        except Exception as e:
            print('数据查询失败', e)
        finally:
            cur.close()
            conn.close()
    def db_Query_tuple(self, sql, params=None):
        获取数据元组格式游标,使用需要fetchall()或fetchone()fetchmany()
        :param sql: 查询语句
        :return: 元组格式游标,使用需要fetchall()或fetchone()fetchmany()
        #self.conn = DB_CONN.__pool.connection()
        conn = self.__pool.connection()
        cur = conn.cursor()
            if params:
                cur.execute(sql, params)
            else:
                cur.execute(sql)
            return cur
        except Exception as e:
            print('数据库查询失败')
        finally:
            cur.close()
            # self.conn.close()
            conn.close()
    # 数据库插入
    def db_Insert(self, sql, params):
        数据库插入
        :param sql: 插入语句
        :param params: 插入数据
        :return: 插入成功数目
        #self.conn = DB_CONN.__pool.connection()
        #conn = self.__pool.connection()
        cur = self.conn.cursor()
            data_counts = cur.execute(sql, params)
            self.conn.commit()
            return data_counts
        except Exception as e:
            self.conn.rollback()
        finally:
            cur.close()
            # self.conn.close()
            self.conn.close()
    # 数据库更新
    def db_Update(self, sql, params=None):
        :param sql:
        :return:
        #self.conn = DB_CONN.__pool.connection()
        #conn = self.__pool.connection()
        cur = self.conn.cursor()
            if params:
                data_counts = cur.execute(sql, params)
            else:
                data_counts = cur.execute(sql)
            self.conn.commit()
            return data_counts
        except Exception as e:
            self.conn.rollback()
        finally:
            cur.close()
            # self.conn.close()
            self.conn.close()
    def db_Batch(self, sql, params):
        #self.conn = DB_CONN.__pool.connection()
        #conn = self.__pool.connection()
        cur = self.conn.cursor()
            data_counts = cur.executemany(sql, params)
            self.conn.commit()
            return data_counts
        except Exception as e:
            self.conn.rollback()
            return False, '***执行更新失败,请检查数据!错误信息:%s' % e + "查询语句为:" + sql
        finally:
            cur.close()
            # self.conn.close()
            self.conn.close()
# 单例模式的初始化函数必须先判断 if(not hasattr(self, "pool")): self.pool = PooledDB(creator=pymysql, mincached=1, maxcached=10, maxconnections=100, blocking=True,host= "127.0.0.1", port=3306, user='******', passwd='123456',db='yiibaidb', charset='utf8',) page_end = 1 # 爬虫字段 host,ip,端口,协议,国家,省份,城市,默认不需要更改(建议不要修改) fields = ['host', 'ip', 'port', 'protocol', 'country', 'region', 'city'] # port,protocol,country,region,city,host pool = PooledDB(pymysql, host=host, user=user, passwd=pwd, db=db_name, port=port, charset=charset) connection = pool.connection() cursor = connection.cursor() session = requests.session() # 请求头 headers = { 'Upgrade-Insecure-Requests': 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'