def test09_EquallyShared(self):
for threadsafety in (1, 2):
dbapi.threadsafety = threadsafety
shareable = threadsafety > 1
pool = PooledDB(dbapi, 5, 5, 5)
self.assertEqual(len(pool._idle_cache), 5)
for i in range(15):
db = pool.connection(False)
db.cursor().execute('select test')
db.close()
self.assertEqual(len(pool._idle_cache), 5)
for i in range(5):
con = pool._idle_cache[i]
self.assertEqual(con._usage, 3)
self.assertEqual(con._con.num_queries, 3)
cache = []
for i in range(35):
db = pool.connection()
db.cursor().execute('select test')
cache.append(db)
del db
self.assertEqual(len(pool._idle_cache), 0)
if shareable:
self.assertEqual(len(pool._shared_cache), 5)
for i in range(5):
con = pool._shared_cache[i]
self.assertEqual(con.shared, 7)
con = con.con
self.assertEqual(con._usage, 10)
self.assertEqual(con._con.num_queries, 10)
del cache
self.assertEqual(len(pool._idle_cache), 5)
if shareable:
self.assertEqual(len(pool._shared_cache), 0)
def generate_word_url():
pool = PooledDB(MySQLdb, 1, host="localhost", user="******", passwd="123456", db="abnormal")
conn = pool.connection()
cur = conn.cursor()
objFoleder = "Report"
index = 1
count = 0
for objFile in os.listdir(objFoleder):
reportId = objFile.split("_")[1]
soup = BeautifulSoup(open(os.path.join(objFoleder, objFile)))
pageTags = soup.find_all(class_="page")
pageCountA, pageCountB = 1, 1
if len(pageTags) > 0:
pageCountA = len(pageTags[0].find_all("a")) - 1
if len(pageTags) > 1:
pageCountB = len(pageTags[1].find_all("a")) - 1
for i in xrange(2, pageCountA + 1):
url = "http://service.account.weibo.com/aj/showblog?type=0&rid=%s&page=%d&_t=0" % (reportId, i)
sql = 'insert into wordlinks values (%d, "%s", 0)' % (index, url)
cur.execute(sql)
index += 1
for i in xrange(2, pageCountB + 1):
url = "http://service.account.weibo.com/aj/showblog?type=1&rid=%s&page=%d&_t=0" % (reportId, i)
sql = 'insert into wordlinks values (%d, "%s", 0)' % (index, url)
cur.execute(sql)
index += 1
print count
count += 1
conn.commit()
cur.close()
conn.close()
class PooledConnectionPolicy(DatabaseConnectionPolicyIface):
"""This connection policy maintains a pool of connections that are doled out
as needed for each transaction. NOTE: Appropriate for multi-threaded
applications. NOTE: The connections are NOT shared concurrently between
threads.
def __init__(self):
""" Consruct an instance. The instance's open() method must be
called to make it ready for acquireConnection() calls.
self._logger = _getLogger(self.__class__)
self._logger.debug("Opening")
self._pool = PooledDB(**_getCommonSteadyDBArgsDict())
self._logger.info("Created %s", self.__class__.__name__)
return
def close(self):
""" Close the policy instance and its database connection pool. """
self._logger.info("Closing")
if self._pool is not None:
self._pool.close()
self._pool = None
else:
self._logger.warning("close() called, but connection policy was alredy closed")
return
def acquireConnection(self):
""" Get a connection from the pool.
Parameters:
----------------------------------------------------------------
retval: A ConnectionWrapper instance. NOTE: Caller
is responsible for calling the ConnectionWrapper
instance's release() method or use it in a context manager
expression (with ... as:) to release resources.
self._logger.debug("Acquiring connection")
dbConn = self._pool.connection(shareable=False)
connWrap = ConnectionWrapper(
dbConn=dbConn, cursor=dbConn.cursor(), releaser=self._releaseConnection, logger=self._logger
return connWrap
def _releaseConnection(self, dbConn, cursor):
""" Release database connection and cursor; passed as a callback to
ConnectionWrapper
self._logger.debug("Releasing connection")
# Close the cursor
cursor.close()
# ... then return db connection back to the pool
dbConn.close()
return
def generate_accuser_url():
pool = PooledDB(MySQLdb, 1, host="localhost", user="******", passwd="123456", db="abnormal")
conn = pool.connection()
cur = conn.cursor()
# objFoleder = 'Report'
objFile = open("ParseResult")
index = 1
# for objFile in os.listdir(objFoleder):
count = 0
while True:
line = objFile.readline()
if not line:
break
# print replace_struct_time(line)
parDict = eval(replace_struct_time(line))
reportId = parDict["reportId"]
# soup = BeautifulSoup(os.path.join(objFoleder, objFile))
# countText = soup.find(class_='W_f12 W_textb').text
# accuserCount = int(patternNumber.search(countText).group())
accuserCount = min(parDict["accuserCount"], 20)
if accuserCount > 1:
for i in xrange(accuserCount - 1):
url = "http://service.account.weibo.com/aj/reportuser?rid=%s&page=%d&_t=0" % (reportId, i)
sql = 'insert into userlinks values (%d, "%s", 0)' % (index, url)
cur.execute(sql)
index += 1
print count
count += 1
conn.commit()
cur.close()
conn.close()
for threadsafety in (1, 2):
dbapi.threadsafety = threadsafety
setsession = ('set time zone', 'set datestyle')
pool = PooledDB(dbapi, 0, 0, 0, 1, False, None, setsession)
self.assertEqual(pool._setsession, setsession)
db = pool.connection(False)
self.assertEqual(db._setsession_sql, setsession)
self.assertEqual(db._con._con.session,
['time zone', 'datestyle'])
db.cursor().execute('select test')
db.cursor().execute('set test1')
self.assertEqual(db._usage, 2)
self.assertEqual(db._con._con.num_uses, 4)
self.assertEqual(db._con._con.num_queries, 1)
self.assertEqual(db._con._con.session,
['time zone', 'datestyle', 'test1'])
db.close()
db = pool.connection(False)
self.assertEqual(db._setsession_sql, setsession)
self.assertEqual(db._con._con.session,
['time zone', 'datestyle', 'test1', 'rollback'])
db._con._con.close()
db.cursor().execute('select test')
db.cursor().execute('set test2')
self.assertEqual(db._con._con.session,
['time zone', 'datestyle', 'test2'])
dbapi.threadsafety = threadsafety
for maxusage in (0, 3, 7):
pool = PooledDB(dbapi, 0, 0, 0, 1, False, maxusage)
self.assertEqual(pool._maxusage, maxusage)
self.assertEqual(len(pool._idle_cache), 0)
db = pool.connection(False)
self.assertEqual(db._con._maxusage, maxusage)
self.assertEqual(len(pool._idle_cache), 0)
self.assertEqual(db._con._con.open_cursors, 0)
self.assertEqual(db._usage, 0)
self.assertEqual(db._con._con.num_uses, 0)
self.assertEqual(db._con._con.num_queries, 0)
for i in range(20):
cursor=db.cursor()
self.assertEqual(db._con._con.open_cursors, 1)
cursor.execute('select test%i' % i)
r = cursor.fetchone()
self.assertEqual(r, 'test%i' % i)
cursor.close()
self.assertEqual(db._con._con.open_cursors, 0)
if maxusage:
j = i % maxusage + 1
else:
j = i + 1
self.assertEqual(db._usage, j)
self.assertEqual(db._con._con.num_uses, j)
self.assertEqual(db._con._con.num_queries, j)
db.cursor().callproc('test')
self.assertEqual(db._con._con.open_cursors, 0)
self.assertEqual(db._usage, j + 1)
self.assertEqual(db._con._con.num_uses, j + 1)
self.assertEqual(db._con._con.num_queries, j)
dbapi.threadsafety = threadsafety
pool = PooledDB(dbapi, 0, 1)
self.assertEqual(len(pool._idle_cache), 0)
db = pool.connection(False)
self.assertEqual(len(pool._idle_cache), 0)
self.assertEqual(db._con._con.open_cursors, 0)
cursor = db.cursor()
self.assertEqual(db._con._con.open_cursors, 1)
cursor.execute('set doit1')
db.commit()
cursor.execute('set dont1')
cursor.close()
self.assertEqual(db._con._con.open_cursors, 0)
del db
self.assertEqual(len(pool._idle_cache), 1)
db = pool.connection(False)
self.assertEqual(len(pool._idle_cache), 0)
self.assertEqual(db._con._con.open_cursors, 0)
cursor = db.cursor()
self.assertEqual(db._con._con.open_cursors, 1)
cursor.execute('set doit2')
cursor.close()
self.assertEqual(db._con._con.open_cursors, 0)
db.commit()
session = db._con._con.session
db.close()
self.assertEqual(session, [
'doit1', 'commit', 'dont1', 'rollback',
'doit2', 'commit', 'rollback'])
if MySQLDB_.pool is None:
pool = PooledDB(MySQLdb, host=host, user=user, passwd=passwd,
charset=charset, port=3306, db=db, mincached=1,
maxcached=20, maxshared=2, maxconnections=2)
return pool.connection()
if Mysql.__pool is None:
__pool = PooledDB(creator=MySQLdb, mincache=1, maxcached=20, host=Config.DBHOST, port=Config.DBPORT, user=Config.DBPWD, db=Config.DBNAME, use_unicode=False, charset=Config.DBCHAR, cursorclass=DictCursor)
return __pool.connection()
global SQLPOOL
if SQLPOOL is None:
SQLPOOL = PooledDB(creator=MySQLdb ,mincached=self.__cachemin , maxcached=0 ,maxshared=0,maxconnections=0,blocking=True,maxusage=0,
host=self.__host , port=self.__port , user=self.__user , passwd=self.__passwd,
db=self.__db,use_unicode=False,charset=self.__charset
,cursorclass=DictCursor
return SQLPOOL.connection()
def __getConn():
if Mysqldb.__pool is None:
__pool = PooledDB(MySQLdb, mincached=1, maxcached=20, maxconnections=20
, host=MysqlConfig.host, port=MysqlConfig.port
, user=MysqlConfig.user, passwd=MysqlConfig.password
, db=MysqlConfig.dbname#, use_unicode=False, charset=MysqlConfig.charset
, cursorclass=DictCursor)
return __pool.connection()
mincached=config.db_mincached,
maxcached=config.db_maxcached,
maxshared=config.db_maxshared,
maxconnections=config.db_maxconnections,
cursorclass=cursors.DictCursor
def execute(self, sql, param):
db = self._db_pool.connection()
cursor = db.cursor()
result = cursor.execute(sql, param)
except Exception as e:
print "MySQL Error Execute [%s] %r" % (sql, param)
db.close()
raise RError(1)
db.commit()
db.close()
return result
def query(self, sql, param):
db = self._db_pool.connection()
cursor = db.cursor()
result = cursor.execute(sql, param)
except Exception as e:
print "MySQL Error [%s] %r" % (sql, param)
db.close()
raise RError(1)
result = cursor.fetchall()
db.close()
return result
def begin(self):
db = self._db_pool.connection()
except Exception as e:
print "MySQL Error when begin an execute."
db.close()
raise RError(1)
return RDataBaseConnection(db)
def commit(self, con):
return con.commit()
def test16_ThreeThreadsTwoConnections(self):
for threadsafety in (1, 2):
dbapi.threadsafety = threadsafety
pool = PooledDB(dbapi, 2, 2, 0, 2, True)
from Queue import Queue, Empty
queue = Queue(3)
def connection():
queue.put(pool.connection(), 1, 1)
except Exception:
queue.put(pool.connection(), 1)
from threading import Thread
for i in range(3):
Thread(target=connection).start()
db1 = queue.get(1, 1)
db2 = queue.get(1, 1)
except TypeError:
db1 = queue.get(1)
db2 = queue.get(1)
self.assertNotEqual(db1, db2)
db1_con = db1._con
db2_con = db2._con
self.assertNotEqual(db1_con, db2_con)
self.assertRaises(Empty, queue.get, 1, 0.1)
except TypeError:
self.assertRaises(Empty, queue.get, 0)
del db1
db1 = queue.get(1, 1)
except TypeError:
db1 = queue.get(1)
self.assertNotEqual(db1, db2)
self.assertNotEqual(db1._con, db2._con)
self.assertEqual(db1._con, db1_con)
pool = PooledDB(dbapi, 2, 2, 1, 2, True)
db1 = pool.connection(False)
db2 = pool.connection(False)
self.assertNotEqual(db1, db2)
db1_con = db1._con
db2_con = db2._con
self.assertNotEqual(db1_con, db2_con)
Thread(target=connection).start()
self.assertRaises(Empty, queue.get, 1, 0.1)
except TypeError:
self.assertRaises(Empty, queue.get, 0)
del db1
db1 = queue.get(1, 1)
except TypeError:
db1 = queue.get(1)
self.assertNotEqual(db1, db2)
self.assertNotEqual(db1._con, db2._con)
self.assertEqual(db1._con, db1_con)
if Mysql.__pool is None:
__pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 ,
host="172.16.130.87" , port=3306 , user="******" , passwd="hta@123" ,
db="finance_spiderdata",use_unicode=True,charset="utf8",cursorclass=DictCursor)
return __pool.connection()
class TRtgHandler:
def __init__(self,config,queue):
self.pool = PooledDB(creator=MySQLdb,mincached=10,host=config.MYSQL_SERVER,user=config.MYSQL_USER,passwd=config.MYSQL_PASSWORD,db=config.MYSQL_DATABASE)
self.queue = queue
def response(self,r_id):
conn = self.pool.connection()
cur = conn.cursor()
cur.execute('select d_id,response.user_id,replyDate,response.content,cat_id,category.thumb,conversation.title from response inner join (conversation inner join category using (cat_id)) using(d_id) \
where r_id=%s',(r_id))
res = cur.fetchone()
user = database.fetchUserNoCache(cur,res[1])
escaped = util.escape(res[3])
newContent = util.replaceMentions(cur,escaped)
shortContent = util.replaceMentions(cur,escaped,True)
cur.close()
conn.close()
payload = {'date':res[2].isoformat(),'content':newContent,'short':shortContent,'user':user,'r_id':r_id,'d_id':res[0]}
self.queue.put(event.Message('/conversation/%d' % (res[0]), 'response',payload))
happening_data = {'user':user,'date':res[2].isoformat(),'category_image':res[5],'category_id':res[4],'d_id':res[0],'title': res[6],'r_id':r_id,'content':newContent}
self.queue.put(event.Message('/happening','happening',{'type':'response','data':happening_data}))
def conversation(self,d_id):
conn = self.pool.connection()
cur = conn.cursor()
cur.execute('select user_id,postDate,content,category.thumb,cat_id,title from conversation inner join category using (cat_id) \
where d_id=%s',(d_id,))
convo = cur.fetchone()
user = database.fetchUserNoCache(cur,convo[0])
newContent = util.escape(convo[2])
payload = {'id':d_id,'date':convo[1].isoformat(),'title':convo[5],'user':user,'content':newContent,'short':util.replaceMentions(cur,newContent,True)}
cur.close()
conn.close()
self.queue.put(event.Message('/category/%d' % (convo[4]),'conversation',payload))
happening_data = {'user':user,'date':convo[1].isoformat(),'category_image':convo[3],'d_id':d_id,'title':convo[5],'content':newContent}
self.queue.put(event.Message('/happening','happening',{'type':'post','data':happening_data}))
def auth(self,auth):
self.queue.put(event.NewAuthKey(auth.user_id,auth.key))
def userModified(self,user_id):
conn = self.pool.connection()
cur = conn.cursor()
user = database.fetchUserNoCache(cur,user_id)
cur.close()
conn.close()
self.queue.put(event.Message('/user/%d' % user_id,'user',user))
mincached=config.db_mincached,
maxcached=config.db_maxcached,
maxshared=config.db_maxshared,
maxconnections=config.db_maxconnections,
cursorclass=cursors.DictCursor
def execute(self, sql, param):
db = self._db_pool.connection()
cursor = db.cursor()
cursor.execute("INSERT into input_params VALUE (now(), %s, %s)", (str(sql), str(param)))
result = cursor.execute(sql, param)
except Exception as e:
print "MySQL Error Execute [%s] %r" % (sql, param)
db.close()
raise RError(1)
db.commit()
db.close()
return result
def query(self, sql, param):
db = self._db_pool.connection()
cursor = db.cursor()
cursor.execute("INSERT into input_params VALUE (now(), %s, %s)", (str(sql), str(param)))
result = cursor.execute(sql, param)
except Exception as e:
print "MySQL Error [%s] %r" % (sql, param)
db.close()
raise RError(1)
result = cursor.fetchall()
db.close()
return result
def begin(self):
db = self._db_pool.connection()
except Exception as e:
print "MySQL Error when begin an execute."
db.close()
raise RError(1)
return RDataBaseConnection(id)
def commit(self, con):
return con.commit()
if Mysql.__pool is None:
dbConfig = json.load(open('config.json', 'r')).get('db') #载入配置文件
__pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 ,
host=dbConfig['host'] , port=dbConfig['port'] , user=dbConfig['user'], passwd=dbConfig['passwd'] ,
db=dbConfig['db'],charset=dbConfig['charset'],cursorclass=DictCursor)
return __pool.connection()
def reconnect(self):
"""Closes the existing database connection and re-opens it."""
#self.close()
# self._db = cx_Oracle.connect(self.user, self.password, self.dsn, threaded=True)
# self._db.autocommit = True
if getattr(self, "_pool", None) is not None:
self._db = self._pool.connection()
else:
pool = PooledDB(cx_Oracle, user=self.user, password=self.password, dsn=self.dsn, mincached=2,
maxcached=20, maxshared=20, maxconnections=20)
self._pool=pool
self._db = pool.connection()
def __new__(clz):
if not DataAccess.__singleInstance:
DataAccess.__singleInstance = object.__new__(clz)
return DataAccess.__singleInstance
def __init__(self):
# mysql
self.pool = PooledDB(MySQLdb, self.pool_size, db=db_set.db_dbname
, user=db_set.db_user, passwd=db_set.db_pwd, host=db_set.db_ip, charset="utf8")
def InsertRow(self, insertStr):
conn = self.pool.connection()
cursor = conn.cursor()
cursor.execute('SET NAMES utf8')
cursor.execute(insertStr)
conn.commit()
return cursor.lastrowid
except:
print("InsertRow: Unexpected error:"
, sys.exc_info(), sys.exc_traceback.tb_lineno)
return 0
finally:
if conn:
conn.close()
def SelectRow(self, selectStr, where = None):
conn = self.pool.connection()
cursor = conn.cursor()
cursor.execute('SET NAMES utf8')
if where is not None:
cursor.execute(selectStr, where)
else:
cursor.execute(selectStr)
res = cursor.fetchall()
return res
except:
print("SelectRow: Unexpected error:"
, sys.exc_info(), sys.exc_traceback.tb_lineno)
finally:
if conn:
conn.close()
def debug(self, *print_me):
if self.debug_level > 0:
print print_me
'''登录微博'''
paramDict = read_config()
if not login(paramDict['username'], paramDict['password']):
exit()
'''与数据库建立连接和指针'''
pool = PooledDB(MySQLdb, int(paramDict['threadnum']), host = paramDict['dbhost'], user = paramDict['dbuser'], passwd = paramDict['dbpasswd'], db = paramDict['dbname'])
conn = pool.connection()
cur = conn.cursor()
'''读取未爬取的链接列表放入队列'''
urlQLock = threading.Lock()
tableName = 'users'
sql = 'select id, uid from %s where isCrawled = 0' % tableName
cur.execute(sql)
result = cur.fetchall()
urlQ = Queue(len(result))
for entry in result:
urlQ.put(entry)
'''建立线程'''
for i in xrange(int(paramDict['threadnum'])):
thr = DownloadThread(pool, urlQ, urlQLock)
threadPool.append(thr)
thr.start()
'''检查是否存在结束的线程,若有,则重新建立新的线程'''
while True:
sleep(60)
'''当队列为空时,跳出循环'''
if not urlQ.qsize():
break
if threading.activeCount() < int(paramDict['threadnum']) + 1:
'''检查哪个线程已经结束,将其清除'''
i = 0
for thr in threadPool:
if not thr.isAlive():
thr.clear()
del threadPool[i]
newThr = DownloadThread(pool, urlQ, urlQLock)
threadPool.append(newThr)
newThr.start()
else:
i += 1
except:
print sys.exc_info()[0]
for thr in threadPool:
thr.end()
break
print 'Main thread end!'
if Mysql.__pool is None:
cf = Mysql.__getConf()
__pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20,
host=cf.get("mysqldb", "host"), port=int(cf.get("mysqldb", "port")),
user=cf.get("mysqldb", "user"), passwd=cf.get("mysqldb", "passwd"),
db=cf.get("mysqldb", "db"), use_unicode=False, charset=cf.get("mysqldb", "charset"),
cursorclass=DictCursor)
return __pool.connection()
def getConn():
if ConnFactortyReport.__pool is None :
__pool = PooledDB(creator=DBConfig.dbapi, mincached=DBConfig.mincached , maxcached=DBConfig.maxcached ,
host=DBConfig.host , port=DBConfig.port , user=DBConfig.username , passwd=DBConfig.password ,
db=DBConfig.database_name, use_unicode=DBConfig.use_unicode, charset=DBConfig.charset)
tryCount = 0;
while(tryCount < 50):
return __pool.connection()
except:
print sys.exc_info()[0], sys.exc_info()[1]
tryCount += 1
raise Error("数据库链接错误!");
shareable = threadsafety > 1
pool = PooledDB(dbapi, 10)
self.assertEqual(len(pool._idle_cache), 10)
pool.close()
self.assertEqual(len(pool._idle_cache), 0)
pool = PooledDB(dbapi, 10)
closed = ['no']
def close(what=closed):
what[0] = 'yes'
pool._idle_cache[7]._con.close = close
self.assertEqual(closed, ['no'])
del pool
self.assertEqual(closed, ['yes'])
pool = PooledDB(dbapi, 10, 10, 5)
self.assertEqual(len(pool._idle_cache), 10)
if shareable:
self.assertEqual(len(pool._shared_cache), 0)
cache = []
for i in range(5):
cache.append(pool.connection())
self.assertEqual(len(pool._idle_cache), 5)
if shareable:
self.assertEqual(len(pool._shared_cache), 5)
else:
self.assertEqual(len(pool._idle_cache), 5)
pool.close()
self.assertEqual(len(pool._idle_cache), 0)
if shareable:
self.assertEqual(len(pool._shared_cache), 0)
pool = PooledDB(dbapi, 10, 10, 5)
closed = []
def close_idle(what=closed):
what.append('idle')
def close_shared(what=closed):
what.append('shared')
if shareable:
cache = []
for i in range(5):
cache.append(pool.connection())
pool._shared_cache[3].con.close = close_shared
else:
pool._idle_cache[7]._con.close = close_shared
pool._idle_cache[3]._con.close = close_idle
self.assertEqual(closed, [])
del pool
if shareable:
del cache
self.assertEqual(closed, ['idle', 'shared'])
if MysqlClient.__pool is None:
__pool = PooledDB(creator=MySQLdb, mincached=1 , maxcached=20 ,
host=Config.DBHOST , port=Config.DBPORT , user=Config.DBUSER , passwd=Config.DBPWD ,
db=Config.DBNAME,use_unicode=True,charset=Config.DBCHAR,cursorclass=DictCursor)
if MysqlClient.__mutex is None:
MysqlClient.__mutex = threading.Lock()
return __pool.connection()
if DbManager.__pool is None:
__pool = PooledDB(creator=MySQLdb,
mincached=pf.getProfileValue('db', 'mincached', 'int'),
maxcached=pf.getProfileValue('db', 'maxcached', 'int'),
cursorclass=DictCursor,
use_unicode=True,
charset=pf.getProfileValue('db', 'charset', 'string'),
**pf.getDbUserInfo())
return __pool.connection()
dbapi.threadsafety = threadsafety
shareable = threadsafety > 1
pool = PooledDB(dbapi, 0, 0, 5)
cache = []
for i in range(35):
db = pool.connection()
db.cursor().execute('select test1')
db.cursor().execute('select test2')
db.cursor().callproc('test3')
cache.append(db)
del db
self.assertEqual(len(pool._idle_cache), 0)
if shareable:
self.assertEqual(len(pool._shared_cache), 5)
for i in range(5):
con = pool._shared_cache[i]
self.assertEqual(con.shared, 7)
con = con.con
self.assertEqual(con._usage, 21)
self.assertEqual(con._con.num_queries, 14)
cache[3] = cache[8] = cache[33] = None
cache[12] = cache[17] = cache[34] = None
self.assertEqual(len(pool._shared_cache), 5)
self.assertEqual(pool._shared_cache[0].shared, 7)
self.assertEqual(pool._shared_cache[1].shared, 7)
self.assertEqual(pool._shared_cache[2].shared, 5)
self.assertEqual(pool._shared_cache[3].shared, 4)
self.assertEqual(pool._shared_cache[4].shared, 6)
for db in cache:
if db:
db.cursor().callproc('test4')
for i in range(6):
db = pool.connection()
db.cursor().callproc('test4')
cache.append(db)
del db
for i in range(5):
con = pool._shared_cache[i]
self.assertEqual(con.shared, 7)
con = con.con
self.assertEqual(con._usage, 28)
self.assertEqual(con._con.num_queries, 14)
del cache
if shareable:
self.assertEqual(len(pool._idle_cache), 5)
self.assertEqual(len(pool._shared_cache), 0)
else:
self.assertEqual(len(pool._idle_cache), 35)
class DBUtils():
def __init__(self):
self.pool = PooledDB(pymysql,50,host='127.0.0.1',user='******',passwd='lgroot',db='twitter',port=3306,charset="utf8")
def getTwitterById(self, twitterId):
db = self.pool.connection()
cue = db.cursor()
cue.execute("SELECT * FROM twitter WHERE twitter_id = '%s'"%twitterId)
results = cue.fetchall()
if len(results) == 0:
return False
else:
return True
except Exception as e:
print('Insert error:',e)
db.rollback()
else:
db.commit()
# 保存twitter贴文
def saveTwitter(self, item):
db = self.pool.connection()
cue = db.cursor()
GMT_FORMAT = '%I:%M %p - %d %b %Y'
#格式化推文时间
t = time.strptime(item['twitter_time'], GMT_FORMAT)
#当前时间
dt=datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cue.execute("insert into twitter (twitter_id,twitter_author,twitter_content,twitter_time,twitter_reply,twitter_trunsmit,twitter_zan,twitter_img,create_date\
) values(%s, %s, %s, %s, %s, %s, %s, %s, %s)", \
(item["twitter_id"], item["twitter_author"], item["twitter_content"], t, item["twitter_reply"], item["twitter_trunsmit"], item["twitter_zan"], item["twitter_img"], dt))
print("insert success")#测试语句
except Exception as e:
print('Insert error:',e)
db.rollback()
else:
db.commit()
def getSeendNameAll(self):
db = self.pool.connection()
cue = db.cursor()
cue.execute("SELECT seed_twitter_name FROM twitter_seed")
results = cue.fetchall()
return results
except Exception as e:
print('Insert error:',e)
db.rollback()
else:
db.commit()
# 设置该种子所有历史爬过
def updateSeedTag(self, name):
db = self.pool.connection()
cue = db.cursor()
cue.execute("UPDATE twitter_seed SET seed_twitter_tag = 1 WHERE seed_twitter_name = '%s'"%name)
except Exception as e:
print('Insert error:',e)
db.rollback()
else:
db.commit()
# 种子爬取次数加一并且修改爬取位置
def updateSeedCountLocation(self, name, twitterId):
db = self.pool.connection()
cue = db.cursor()
cue.execute("UPDATE twitter_seed SET seed_twitter_count = seed_twitter_count + 1, seed_twitter_location = '%s' WHERE seed_twitter_name = '%s'"%(twitterId, name))
except Exception as e:
print('Insert error:',e)
db.rollback()
else:
db.commit()
# 判断当前的推文之不是之前抓的最后贴文
def isSeedLocation(self, spider_name, next_page_id):
db = self.pool.connection()
cue = db.cursor()
cue.execute("SELECT * FROM twitter_seed WHERE seed_twitter_name = '%s' AND seed_twitter_location = '%s'"%(spider_name, next_page_id))
if len(cue.fetchall()) == 1:
return True
else:
return False
except Exception as e:
print('Insert error:',e)
db.rollback()
else:
db.commit()
self._cursor.execute('SELECT @@IDENTITY AS id')
result = self._cursor.fetchall()
return result[0]['id']
# return result
except Exception as e:
print('------------获取影响的记录的id')
print(e)
def insertOne(self, sql):
self._cursor.execute(sql)
return self.__getInsertId()
except Exception as e:
print('-------------插入一条数据出现问题')
print(e)
return False
def insertMany(self, sql, values):
count = self._cursor.execute(sql, values)
return count
except Exception as e:
print('--------------插入多条数据出现问题')
print(e)
def update(self, sql, param=None):
return self.__query(sql, param)
except Exception as e:
print('--------------更新出现问题')
print(e)
return False
def __query(self, sql, param=None):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
except Exception as e:
print('-------------获取影响的id,出现问题')
print(e)
def delete(self, sql, param=None):
id = self.__query(sql, param)
return id
except Exception as e:
print('--------删除语句,出现问题')
print(e)
def begin(self):
self._conn.autocommit(1)
except Exception as e:
print('------------------开启事务,出现问题')
print(e)
def end(self, option='commit'):
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
except Exception as e:
print('-----------------结束事务,出现问题')
print(e)
def errdispose(self, isEnd=1):
if isEnd == 1:
self.end('roollback')
self._cursor.close()
self._conn.close()
except Exception as e:
print('-------------事务回滚,关闭连接出现问题')
print(e)
def dispose(self, isEnd=1):
if isEnd == 1:
self.end('commit')
else:
self.end('roolback')
self._cursor.close()
self._conn.close()
except Exception as e:
print('----------------释放资源出现问题')
print(e)
def isexist(self, sql, param=None):
if param is None:
result = self._cursor.execute(sql)
else:
result = self._cursor.execute(sql, param)
return result
except Exception as e:
print('--------------判断记录是否存在出现问题')
print(e)
return 0
pooldb=PooledDB(pymysql,10,host="localhost",user="******",passwd="123123",db="vega")
# pool=pooldb.connection()
except Exception as e:
print(e)
if __name__ == '__main__':
# redis连接池
redis_conn_pool = redis.ConnectionPool(host=setting.redis_host,
port=setting.redis_port,
max_connections=5)
# sql连接池
sql_conn_pool = PooledDB(creator=pymssql,
mincached=2,
maxcached=5,
maxconnections=6,
blocking=True,
maxshared=3,
host=setting.db_host,
port=setting.db_port,
user=setting.db_user,
password=setting.db_password,
database=setting.db_database)
# 创建表具基本资料及配置处理线程
get_thread = threading.Thread(target=DatabaseOperation.meter_get,
args=(redis_conn_pool, sql_conn_pool, 0.1))
update_thread = threading.Thread(target=DatabaseOperation.meter_update,
args=(redis_conn_pool, sql_conn_pool,
0.1))
get_thread.start()
update_thread.start()
while True:
def __init__(self):
#mysql数据库
self.Pool=PooledDB(creator=pymysql, mincached=Config.DB_MIN_CACHED , maxcached=Config.DB_MAX_CACHED,maxshared=Config.DB_MAX_SHARED, maxconnections=Config.DB_MAX_CONNECYIONS,blocking=Config.DB_BLOCKING, maxusage=Config.DB_MAX_USAGE,setsession=Config.DB_SET_SESSION,host=Config.DB_TEST_HOST , port=Config.DB_TEST_PORT ,user=Config.DB_TEST_USER , passwd=Config.DB_TEST_PASSWORD ,db=Config.DB_TEST_DBNAME , use_unicode=False, charset=Config.DB_CHARSET)
def __init__(self, db_type, cursor_type, **kwargs):
self.pool = PooledDB(db_type, **kwargs)
self.conn = self.pool.connection()
self.cr = self.conn.cursor(cursor_type)
import mysql.connector
from DBUtils.PooledDB import PooledDB
from dota2api.src.exceptions import APIError
from api_caller import get_match_details, get_latest_match
from db_operations import insert_match, get_latest_match_id, get_earliest_match_id
PROCESS_NUMBER = multiprocessing.cpu_count()
DB_CONNECTIONS_NUMBER = PROCESS_NUMBER + 2
DATABASE_POOL = PooledDB(mysql.connector,
DB_CONNECTIONS_NUMBER,
db='dota',
user='******',
autocommit=True)
# My private API key.
API_KEY = '352CBD8EBE2C282BB58AF974C9DE1FD2'
DOTA_API = dota2api.Initialise(api_key=API_KEY)
def getting_match(args):
Call get_match_details() to get a dict() of match data.
:param args: Contains a dota2api instance and a match id.
:return: An array of match data if exists.
raw_match = get_match_details(args[0], args[1])
class Db(object):
def __init__(self, db_host, db_port, db_name, db_user, db_passwd, charset):
self.conn = PooledDB(
creator=pymysql,
maxconnections=10, # 连接池允许的最大连接数, 0和none表示没有限制
mincached=2, # 初始化时,连接池至少创建的空闲连接,0表示不创建
maxcached=5, # 连接池空闲的最多连接数,0和none表示不限制
blocking=True, # 连接池中如果没有可用共享连接后是否阻塞等待,True表示等待,反之则为报错弹出
host= db_host,
port=int(db_port),
user= db_user,
passwd= db_passwd,
database= db_name,
charset= charset
).connection()
self.cursor = self.conn.cursor()
# 新增数据
def db_insert(self, tableName, dataDict):
str_field = ""
str_value = ""
for filed,value in dataDict.items():
str_field += "`" + filed + "`,"
if (type(value) == type("kkk")):
str_value += "'" + str(value) + "'" + ","
elif(type(value) == type(123)):
str_value += str(value) + ","
sql = "INSERT INTO `"+ tableName +"`(" + str_field[:-1] + ")VALUE(" + str_value[:-1] + ")"
self.cursor.execute(sql)
self.conn.commit()
get_rows = self.cursor.rowcount
if get_rows == 1 :
return True
else:
return False
# print(str_value)
# 更新数据
def db_updata(self):
pass;
# 提取数据 return 元组
def db_getdata(self, tableName, field):
sql = "SELECT " + field + " FROM " + tableName;
print(sql)
self.cursor.execute(sql)
data_tuple= self.cursor.fetchall()
return data_tuple
# 删除数据
def db_deldata(self):
pass;
# 查询数据
def db_selectdata(self):
pass;
# 回收数据库资源
def __del__(self):
self.cursor.close()
self.conn.close()
无,创建该画师记录
2. get_total 查询pixiv表中有多少表该画师id字段信息
3. update_latest_id 符合更新条件,更新pxusers表中该画师id的最新插画id
4. 队列中,check_illust 查询pixiv表中是否有该pid的记录
5. 队列中,update_illust 每次作品网络请求,都会进行该pid数据的更新
6. 队列中,insert_illust 满足插入条件,向pixiv表中插入该pid的记录
收藏作品-数据库流程:
1. 判断更新条件
2. 队列中,check_illust 查询pixiv表中是否有该pid的记录
3. 队列中,update_illust 每次作品网络请求,都会进行该pid数据的更新
4. 队列中,insert_illust 满足插入条件,向pixiv表中插入该pid的记录
def __init__(self, thread_num=16):
self.class_name = self.__class__.__name__
if DB_ENABLE == False:
return
log_str(TEMP_MSG["DB_INST"].format(self.class_name))
self.pool = PooledDB(
creator=pymysql,
maxconnections=thread_num, # 连接池允许的最大连接
mincached=1, # 连接池中的初始空闲连接数
maxcached=1, # 连接池中最大闲置连接数
# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
blocking=True,
host=DB_HOST,user=DB_USER,passwd=DB_PASSWD,db=DB_DATABASE,port=DB_PORT,charset=DB_CHARSET
except pymysql.err.OperationalError as e:
log_str(TEMP_MSG["DB_CONNECT_ERROR_INFO"].format(e))
exit()
def get_conn(self):
从数据库连接池中取出一个链接
# connection()获取数据库连接
conn = self.pool.connection()
cur = conn.cursor(DictCursor)
return conn,cur
def check_user(self, u):
数据库中画师记录的latest_id与接口返回的latest_id是否一致
相同 --> False,不需要更新或下载该画师的作品
判断pxusers表是否含有该画师uid的记录
无 --> sql_2
有 --> sql_3
:params u: 用户数据
:return: latest_id
conn,cur = self.get_conn()
# 查询画师记录sql
sql_1 = "SELECT COUNT(uid) FROM pxusers WHERE uid=%s"
# 插入画师记录sql
sql_2 = '''INSERT INTO pxusers(uid,userName,latest_id,path) VALUES(%s,%s,%s,%s)'''
# 查询latest_id sql
sql_3 = "SELECT latest_id FROM pxusers WHERE uid=%s"
uid = u["uid"]
data = (
u["uid"],u["userName"],u["latest_id"],u["path"]
# 确认数据库是否有该画师记录
cur.execute(sql_1,uid)
res = cur.fetchall()
e = res[0]["COUNT(uid)"]
# log_str("查询结果 :{}".format(e))
if e >= 1:
# 返回数据库中查询的latest_id
cur.execute(sql_3,uid)
d = cur.fetchall()[0]
latest_id = d["latest_id"]
return latest_id
else:
cur.execute(sql_2,data)
conn.commit()
except Exception as e:
log_str(e)
conn.rollback()
# 默认全更新
return u["latest_id"]
else:
return u["latest_id"]
finally:
cur.close()
conn.close()
def get_total(self, u):
查询数据库中有多少条[画师uid]的数据
:params u: 作品数据
:return: 画师作品数量
conn,cur = self.get_conn()
sql = '''SELECT COUNT(1) FROM pixiv WHERE uid=%s'''
data = u["uid"]
cur.execute(sql,data)
d = cur.fetchall()[0]
# d_total = d["COUNT(*)"]
d_total = d["COUNT(1)"]
return d_total
def update_latest_id(self, u):
更新latest_id
:params u: 作品数据
:return:
conn,cur = self.get_conn()
# 更新latest _id sql
sql = """UPDATE pxusers SET latest_id=%s WHERE uid=%s"""
data = (
u["latest_id"],u["uid"]
cur.execute(sql,data)
conn.commit()
except Exception as e:
log_str("{} | {}".format(e,u))
conn.rollback()
finally:
cur.close()
conn.close()
def check_illust(self, value, key="pid", table="pixiv", database=None):
查询数据库中是否有该id的作品,table为非pixiv,bookmark时采用通用sql
:parmas key: 对应字段名
:parmas value: 对应记录值
:parmas table: 数据表
:return: (True,path)/(False,"")
Result--fetchall获取的原始数据
data in db: [{'COUNT(1)': 1, 'path': 'None'}]
data not in db: ()
conn,cur = self.get_conn()
if key == "":
return False,""
if value == "":
return False,""
# 切换数据库
if database != None:
conn.select_db(database)
# 查询id sql
if table in ["pixiv","bookmark"]:
# path为下载地址,不存在该记录时为None
sql = """SELECT COUNT(1),path FROM {} """.format(table) + """WHERE {}=%s GROUP BY path""".format(key)
else:
sql = """SELECT COUNT(1) FROM {} """.format(table) + """WHERE {}=%s""".format(key)
# log_str(sql)
data = (value)
cur.execute(sql,data)
except Exception as e:
log_str("{}:check_illust | {}".format(self.class_name,e))
return False,""
else:
# 未使用GROUP BY path,非严格模式报1140
# 使用GROUP BY path,不存在对应pid记录时,fetchall结果为()
d = cur.fetchall()
if d != () and d[0]["COUNT(1)"] >= 1:
return True,d[0].get("path","")
else:
return False,""
finally:
cur.close()
conn.close()
def insert_illust(self, u, table="pixiv"):
:params u 数据
:parmas table: 操作数据表
:return: True/False
conn,cur = self.get_conn()
sql = '''INSERT INTO {} '''.format(table) + '''(uid,userName,pid,purl,title,tag,pageCount,\
illustType,is_r18,score,illust_level,viewCount,bookmarkCount,likeCount,\
commentCount,urls,original,path) VALUES(%s,%s,%s,%s,\
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'''
data = (
u["uid"],u["userName"],u["pid"],u["purl"],u["title"],u["tag"],
u["pageCount"],u["illustType"],u["is_r18"],u["score"],u["illust_level"],
u["viewCount"],u["bookmarkCount"],u["likeCount"],u["commentCount"],
u["urls"],u["original"],u["path"]
cur.execute(sql,data)
conn.commit()
except Exception as e:
log_str("{} | {}".format(e,u))
conn.rollback()
return False
else:
return True
finally:
cur.close()
conn.close()
def update_illust(self, u, table="pixiv"):
更新作品数据,主要是浏览数,收藏数,评论数,喜欢数,path
:params u:作品数据
:parmas table: 操作数据表
:return: True/False
更新11个字段 tag,pageCount,illustType,is_r18,score,illust_level,
viewCount,bookmarkCount,likeCount,commentCount,path
conn,cur = self.get_conn()
# 更新sql
sql = """UPDATE {} """.format(table) + """SET tag=%s,pageCount=%s,\
illustType=%s,is_r18=%s,score=%s,illust_level=%s,viewCount=%s,\
bookmarkCount=%s,likeCount=%s,commentCount=%s,path=%s WHERE pid=%s"""
# 更新数据
data = (
u["tag"],u["pageCount"],u["illustType"],u["is_r18"],u["score"],u["illust_level"],
u["viewCount"],u["bookmarkCount"],u["likeCount"],u["commentCount"],u["path"],u["pid"]
cur.execute(sql,data)
conn.commit()
except Exception as e:
log_str(TEMP_MSG["DB_UPDATE_ILLUST_ERROR_INFO"].format(self.class_name,u["pid"],e))
log_str(u)
conn.rollback()
return False
else:
return True
finally:
cur.close()
conn.close()
def select_illust(self, pid, table="pixiv"):
查询作品数据,对接API接口方法
:params pid:作品pid
:parmas table: 操作数据表
:return :
conn,cur = self.get_conn()
sql = """SELECT * FROM {} """.format(table) + """WHERE pid=%s"""
data = (pid,)
cur.execute(sql,data)
except Exception as e:
log_str(e)
return
else:
r = cur.fetchall()
if len(r) != 0:
# API处增加[0]下标
# res = r[0]
return r
else:
return
finally:
cur.close()
conn.close()
def random_illust(self,
extra=None,
limit=None,
illust_level=None,
is_r18=True,
table="pixiv"
对接API-random接口
:params extra: 指定tag组(str),如原创,碧蓝航线;最多两个
:params limit: 指定最低收藏数(str),
:params illust_level: 指定单个或多个评分等级(str) str;如:SR或R,SR,SSR,UR
:params is_r18: 是否开启R18;
默认True开启,False为关闭,关闭则会过滤掉tag中包含'R-18'的结果
:parmas table: 数据表
返回符合条件的所有pid
删除urls,path,t2.id等非必要字段/中间字段
conn,cur = self.get_conn()
sql = """SELECT pid FROM {} WHERE 1 = 1 """.format(table)
# 指定tag
e = """AND tag LIKE "%{}%" """
if extra:
ex = extra.split(",")[:2]
for i in ex:
sql = sql + e.format(i)
# 指定最低收藏数限制
if limit:
limit_sql = """AND bookmarkCount > {} """.format(str(limit))
sql += limit_sql
# 指定评分等级
if illust_level:
illust_level = ",".join(["'{}'".format(_) for _ in illust_level.split(",")])
illust_level_sql = """AND illust_level in ({}) """.format(str(illust_level))
sql += illust_level_sql
# 关闭r18
if not is_r18:
is_r18_sql = """AND tag NOT LIKE "%R-18%" """
sql += is_r18_sql
print(sql)
cur.execute(sql)
pid_list = cur.fetchall()
if len(pid_list) == 0:
return []
else:
return pid_list
def delete_user_illust(self, key="uid", value=None, table="pixiv"):
删除指定user的所有/单条作品记录
:params key: 用于判断的key,默认为uid
:params value: 用于判断的值
:params table: 指定数据表,默认为pixiv
:return: 默认None,异常则False
if value == None:
return False
conn,cur = self.get_conn()
sql = """DELETE FROM {} WHERE {} = %s""".format(table,str(key))
data = (value,)
cur.execute(sql,data)
conn.commit()
except Exception as e:
log_str("{} | {}".format(e,(key,value)))
conn.rollback()
return False
else:
return True
finally:
cur.close()
conn.close()
def pixiv_re_proxy(self, u):
根据作品数据反代
动图单图:pixiv.cat/{id}.{Suffix}
多图:pixiv.cat/{id}-{num}.{Suffix}
:params u:作品数据
:returnL 反代链接
h = "https://pixiv.cat/"
pid = u["pid"]
suffix = u["original"].split(".")[-1]
if u["pageCount"] > 1:
num = random.randint(1,u["pageCount"])
# 暂时为1
num = 1
reverse_url = "{}{}-{}.{}".format(h,pid,num,suffix)
else:
reverse_url = "{}{}.{}".format(h,pid,suffix)
return reverse_url
# DBClient = db_client()
def __init__(self, host, user, passwd, db_name, port, charset):
self.db_pool = PooledDB(pymysql,
host=host,
user=user,
passwd=passwd,
db=db_name,
port=port,
charset=charset)
conn = self.db_pool.connection()
cursor = conn.cursor()
conn.commit()
def select_mysql(self, table_name, where_str):
conn = self.db_pool.connection()
cursor = conn.cursor()
# now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sql = "select * from {table_name} {where_str}".format(
table_name=table_name, where_str=where_str)
# print(sql)
cursor.execute(sql)
datas = cursor.fetchall()
cursor.close()
conn.close()
return datas
except Exception as e:
logger.info(" msyql {}".format(e))
return False
table_name: 查询表
search_str: 查询内容
where_str: 条件
def select_mysql_many(self, table_name, search_str, where_str):
conn = self.db_pool.connection()
cursor = conn.cursor()
# now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sql = "set sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION';"
cursor.execute(sql)
sql = "select {search_str} from {table_name} {where_str}".format(
search_str=search_str,
table_name=table_name,
where_str=where_str)
# print(sql)
cursor.execute(sql)
datas = cursor.fetchall()
cursor.close()
conn.close()
return datas
except Exception as e:
logger.info(" msyql {}".format(e))
return False
def select_mysql_count(self, table_name, where_str):
conn = self.db_pool.connection()
cursor = conn.cursor()
# now_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
sql = "select count(*) from {table_name} {where_str}".format(
table_name=table_name, where_str=where_str)
# print(sql)
cursor.execute(sql)
numbers = cursor.fetchone()
cursor.close()
conn.close()
return numbers[0]
except Exception as e:
logger.info(" msyql {}".format(e))
return False
def insert_mysql(self, table_name, datas):
conn = self.db_pool.connection()
cursor = conn.cursor()
nub = 0
for data in datas:
keys = ','.join(data.keys())
values = ','.join(['%s'] * len(data))
sql = 'insert ignore into {table_name} ({keys}) values({values})'.format(
table_name=table_name, keys=keys, values=values)
# print(sql)
nub_ = cursor.execute(sql, tuple(data.values()))
conn.commit()
nub += nub_
except Exception as e:
logger.info(" msyql {}".format(e))
print(data)
cursor.close()
conn.close()
return nub
def update_mysql(self, sql):
conn = self.db_pool.connection()
cursor = conn.cursor()
numbers = cursor.execute(sql)
conn.commit()
cursor.close()
conn.close()
return numbers
except Exception as e:
logger.info(" msyql {}".format(e))
return False
# print(PROXY)
def get_shanghai_next_level_res(url, params):
all_area_data = np.zeros(0, dtype=dict)
# print(url)
# url = "https://baike.baidu.com/item/%E6%B5%A6%E4%B8%9C%E6%96%B0%E5%8C%BA/5232458?fromtitle=%E4%B8%8A%E6%B5%B7%E5%B8%82%E6%B5%A6%E4%B8%9C%E6%96%B0%E5%8C%BA&fromid=15403686&fr=aladdin"
header = {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.122 Safari/537.36"
# conn.set_character_set('utf8')
csor2.execute('SET NAMES utf8')
csor2.execute("SET CHARACTER SET utf8")
csor2.execute("SET character_set_connection=utf8")
def initCap():
sqCat = dict()
db=2,
decode_responses=True)
# self.red = redis.Redis(host="127.0.0.1", port=6379, db=2, decode_responses=True)
print('连接成功')
print('连接mysql服务器')
db_config = {
"host": '172.18.115.15',
"port": 3306,
"user": '******',
"passwd": '123456',
"db": 'Im',
"charset": 'utf8'
# db_config = {"host": '127.0.0.1', "port": 3306, "user": '******', "passwd": '123456', "db": 'sys',
# "charset": 'utf8'}
self.pool = PooledDB(pymysql, 5, **db_config)
# 5为连接池里的最少连接数
self.conn = self.pool.connection()
# 以后每次需要数据库连接就是用connection()函数获取连接就好了
self.cur = self.conn.cursor()
def process_item(self, item, spider):
# 获取redis所有为NewsTitle的key值
keys = self.red.keys('NewsTitle')
# 转变类型为字符串
key = ''.join(keys)
# lrange获取所有key值为NewsTitle的内容
value = self.red.lrange('%s' % key, '0', '-1')
# 判断内容是否为空
if len(value) >= 0:
NewsTitless = base64.b64encode(
item['NewsTitle'].encode('utf-8'))
NewsTitles = str(NewsTitless, 'utf-8')
# 判断爬取的title是否在redis key值为NewsTitle里在提示已存在,不在执行添加
if NewsTitles not in value:
if item['NewsTitle'] == '' or item[
'NewsContent'] == '' or item['NewsContent'] == '':
else:
i = datetime.datetime.now()
b = "%s0%s" % (i.year, i.month)
self.red.lpush(
'NewsTitle',
base64.b64encode(
item['NewsTitle'].encode('utf-8')))
sql1 = 'insert ignore into tbl_NewsDetails{0}(NewsID, NewsCategory, SourceCategory, NewsType, NewsTitle, NewsRawUrl, SourceName, InsertDate, NewsContent, NewsDate, NewsClickLike, NewsBad, NewsRead, NewsOffline)' \
'VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'.format(b)
sql2 = 'insert into tbl_NewsFileManager{0}(FileID, FileType, FileDirectory, FileDirectoryCompress, FileDate, FileLength, FileUserID, Description, NewsID,image_url)' \
'VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)'.format(b)
self.cur.execute(
sql1,
(item['NewsID'], item["NewsCategory"],
item["SourceCategory"], item["NewsType"],
item["NewsTitle"], item["NewsRawUrl"],
item["SourceName"], item["InsertDate"],
item["NewsContent"], item['NewsDate'],
item['NewsClickLike'], item['NewsBad'],
item['NewsRead'], item['NewsOffline']))
for dic in item['FileList']:
self.cur.execute(
sql2,
(dic['FileID'], dic["FileType"],
dic["FileDirectory"],
dic["FileDirectoryCompress"],
dic["FileDate"], dic["FileLength"],
dic["FileUserID"], dic["Description"],
dic["NewsID"], dic["image_url"]))
self.conn.commit()
except Exception as e:
print(e)
print("执行sql语句失败")
items = json.dumps(dict(item))
self.red.lpush(b + 'news' + item['NewsCategory'],
items)
return item
else:
print('redis数据已存在')
else:
print('出错')
except:
print('错误操作')
def close_conn(self, spider):
# 关闭链接
self.conn.close()
# 关闭游标
self.cur.close()
class MysqlTool:
def __init__(self):
# self.connect = pymysql.connect(host="139.196.91.125", user="******", password="******",
# database="weibo", port=3306)
# self.pool = PooledDB(pymysql, 5, host="139.196.91.125", user='******',
# passwd='keith123', db='weibo', port=3306)
# self.connect = pymysql.connect(host="127.0.0.1", user="******", password="******",
# database="chiccess", port=3306)
# self.pool = PooledDB(pymysql, 5, host="127.0.0.1", user='******',
# passwd='woaixuexi', db='chiccess', port=3306)
self.connect = pymysql.connect(host="127.0.0.1",
user="******",
password="",
database="chiccess",
port=3306)
self.pool = PooledDB(pymysql,
host="127.0.0.1",
user='******',
passwd='',
db='chiccess',
port=3306)
def save_user_profile(self, json_data):
data = json_data['graphql']['user']
conn = self.pool.connection()
cursor = conn.cursor()
_sql = 'insert into ins_user_profile(id,profile_pic_url,username,full_name,follow,followed_by,media_num,video_num)values ("%s","%s","%s","%s","%s","%s","%s","%s")'
cursor.execute(
_sql % (data['id'], data['profile_pic_url'], data['username'],
data['full_name'], data['edge_follow']['count'],
data['edge_followed_by']['count'],
data['edge_owner_to_timeline_media']['count'],
data['edge_felix_video_timeline']['count']))
except Exception:
_sql = 'update ins_user_profile set follow="%s",followed_by="%s",media_num="%s",video_num="%s" where username="******"'
cursor.execute(
_sql %
(data['edge_follow']['count'],
data['edge_followed_by']['count'],
data['edge_owner_to_timeline_media']['count'],
data['edge_felix_video_timeline']['count'], data['username']))
conn.commit()
def get_ins_cookie(self):
conn = self.pool.connection()
cursor = conn.cursor()
cursor.execute("select cookie from ins_cookies")
ret = cursor.fetchall()
cursor.close()
conn.close()
ret = [i[0] for i in ret]
return ret
def save_pics(self, ret_list):
print('save_pics{}'.format(ret_list[0]['username']))
r = []
for i in ret_list:
r.append(
(i['short'], i['time'], pymysql.escape_string(i['text']),
i['content'], i['user_id'], i['username'], i['like_num'],
i['comment_num'], i['pic_tagged']))
except Exception:
ret_list = r
cursor = self.connect.cursor()
sql_template = "insert into ins_pics(short,time,text,content,user_id,username,like_num,comment_num,pic_tagged)values (%s,%s,%s,%s,%s,%s,%s,%s,%s)"
cursor.executemany(sql_template, ret_list)
self.connect.commit()
except Exception:
self.connect.rollback()
for ret in ret_list:
cursor.execute(sql_template % (ret))
except Exception as e:
print(e.args)
print(traceback.format_exc())
self.connect.commit()
def save_tagged(self, ret_list):
print('save_tagged{}'.format(ret_list[0]['username']))
ret_list = [(i['short'], i['time'], pymysql.escape_string(i['text']),
i['content'], i['_typename'], i['user_id'], i['username'],
i['owner_id'], i['owner_name'], i['comment_num'])
for i in ret_list]
cursor = self.connect.cursor()
sql_template = "insert into ins_tagged(short,time,text,content,typename,user_id,username,owner_id,owner_name,comment_num)values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
cursor.executemany(sql_template, ret_list)
self.connect.commit()
except Exception:
self.connect.rollback()
for ret in ret_list:
cursor.execute(sql_template % (ret))
except Exception:
print('inert error')
self.connect.commit()
def get_short(self, type):
conn = self.pool.connection()
cursor = conn.cursor()
cursor.execute(
" select short from ins_pics where short not in(select short from ins_{}); "
.format(type))
ret = cursor.fetchall()
cursor.close()
conn.close()
return ret
def save_started(self, ret_list):
'''item['owner_id'] = i['node']['id']
item['owner_name'] = i['node']['username']
item['full_name'] = i['node']['full_name']
item['profile_url'] = i['node']['profile_pic_url']
item['short']'''
print('save_star')
conn = self.pool.connection()
cursor = conn.cursor()
ret_list = [(i['short'], i['owner_id'], i['owner_name'],
i['full_name'], i['profile_url']) for i in ret_list]
sql_template = "insert into ins_liked(short,user_id,username,fullname,profile_url)values (%s,%s,%s,%s,%s)"
cursor.executemany(sql_template, ret_list)
conn.commit()
except Exception:
finally:
cursor.close()
conn.close()
def save_comments(self, ret_list):
print('save_comment')
conn = self.pool.connection()
cursor = conn.cursor()
ret_list = [(i['_id'], i['short'], i['time'],
pymysql.escape_string(i['comment']), i['owner'],
i['owner_name'], i['liked']) for i in ret_list]
sql_template = "insert into ins_comment(id,short,time,comment,user_id,username,like_num)values (%s,%s,%s,%s,%s,%s,%s)"
cursor.executemany(sql_template, ret_list)
conn.commit()
except Exception:
self.connect.rollback()
for ret in ret_list:
sql_tem = "insert into ins_comment(id,short,time,comment,user_id,username,like_num)values ('%s','%s','%s','%s','%s','%s','%s')"
cursor.execute(sql_tem % (ret))
except Exception:
print('inert error')
conn.commit()
finally:
cursor.close()
conn.close()
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=
[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='******',
password='******',
database='youku30',
charset='utf8',
autocommit='True')
# 传入需要连接的数据库的名称dbname和待执行的sql语句sql
def __init__(self, configUtil, host, port, username, password, db):
self.config = configUtil
self.host = host
self.port = port
self.db = db
self.username = username
self.password = password
self.logger = Logger("db", configUtil).getlog()
self.pool = PooledDB(
creator=pymysql, # 指定数据库连接驱动
mincached=1, # 连接池中空闲的最多连接数,0和None表示没有限制
maxcached=20, # 连接池允许的最大连接数,0和None表示没有限制
host=self.host,
port=int(self.port),
user=self.username,
passwd=self.password,
db=self.db,
use_unicode=True,
charset="utf8")
def get_connection(self):
mysql_con = self.pool.connection()
return mysql_con
except Exception as e:
self.logger.error(e)
for i in range(3):
time.sleep(5)
mysql_con = self.pool.connection()
return mysql_con
except Exception as e:
self.logger.error(e)
self.logger.error("数据库连接异常执行" + str(i + 1) + "次连接")
sys.exit(1)
def release_connection(self, connection):
connection.close()
except Exception as e:
self.logger.error(e)
self.logger.error("mysql connection 关闭异常")
sys.exit(1)
def query(self, sql):
# def query(self, sql):
# results = ''
# conn = pymysql.connect(host=self.host,
# port=self.port,
# user=self.username,
# passwd=self.password,
# db=self.db)
# cursor = conn.cursor()
# try:
# # 执行SQL语句
# cursor.execute(sql)
# # 获取所有记录列表
# results = cursor.fetchall()
# except Exception as data:
# print('Error: 执行查询失败,%s' % data)
# conn.close()
# return results
connection = self.get_connection()
cursor = connection.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
cursor.close()
self.release_connection(connection)
return rows
except Exception as e:
self.logger.error("执行查询:%s 出错" % (e))
sys.exit(1)
def insert_dict_into_table(self, table_name, data_dict):
cols = ','.join(data_dict.keys())
qmarks = ','.join(['%s'] * len(data_dict))
insert_sql = 'insert into %s (%s) values(%s)' % (table_name, cols,
qmarks)
self.insert(insert_sql, data_dict.values())
def insert(self, sql, values):
connection = self.get_connection()
cursor = connection.cursor()
cursor.execute(sql, values)
connection.commit()
cursor.close()
self.release_connection(connection)
except Exception as e:
self.logger.error("执行查询:%s 出错:%s" % (sql, e))
connection.rollback()
sys.exit(1)
finally:
self.release_connection(connection)
def delete(self, sql):
connection = self.get_connection()
cursor = connection.cursor()
cursor.execute(sql)
connection.commit()
cursor.close()
self.release_connection(connection)
except Exception as e:
self.logger.error("执行查询:%s 出错:%s" % (sql, e))
connection.rollback()
sys.exit(1)
finally:
self.release_connection(connection)
def __init__(self):
_MYSQL = my_config.MYSQL_POOL
self.POOL = PooledDB(creator=_MYSQL['creator'],
host=_MYSQL['host'],
port=_MYSQL['port'],
user=_MYSQL['user'],
password=_MYSQL['password'],
database=_MYSQL['database'],
charset=_MYSQL['charset'],
mincached=_MYSQL['mincached'],
maxcached=_MYSQL['maxcached'],
maxshared=_MYSQL['maxshared'])
def __new__(cls, *args, **kw):
启用单例模式
:param args:
:param kw:
:return:
if not hasattr(cls, '_instance'):
cls._instance = object.__new__(cls)
return cls._instance
def connect(self):
:return:
conn = self.POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
def connect_close(self, conn, cursor):
:param conn:
:param cursor:
:return:
cursor.close()
conn.close()
self.POOL = None
def fetch_one(self, sql, args):
查询单条数据
:param sql:
:param args:
:return:
conn, cursor = self.connect()
cursor.execute(sql, args)
result = cursor.fetchone()
self.connect_close(conn, cursor)
return result
def fetch_all(self, sql, args):
:param sql:
:param args:
:return:
conn, cursor = self.connect()
cursor.execute(sql, args)
record_list = cursor.fetchall()
self.connect_close(conn, cursor)
return record_list
def insert(self, sql, args):
:param sql:
:param args:
:return:
return self.execute(sql, args)
def insert_many(self, sql, args):
批量新增数据
:param sql:
:param args:
:return:
return self.execute_many(sql, args)
def update(self, sql, args):
:param sql:
:param args:
:return:
return self.execute(sql, args)
def delete(self, sql, args):
:param sql:
:param args:
:return:
return self.execute(sql, args)
def execute(self, sql, args):
执行单条写入操作
:param sql:
:param args:
:return:
conn, cursor = self.connect()
row = cursor.execute(sql, args)
conn.commit()
except pymysql.err.Error:
conn.rollback()
return False
finally:
self.connect_close(conn, cursor)
return row
def execute_many(self, sql, args):
执行批量写入操作
:param sql:
:param args:
:return:
conn, cursor = self.connect()
row = cursor.executemany(sql, args)
conn.commit()
self.connect_close(conn, cursor)
return row
from datetime import datetime
from datetime import timedelta
from DBUtils.PooledDB import PooledDB
############### Global varables ####################3
match_head = {} #数据库头部结构字典
match_info = {}
MATCH_TABLE = 't_match_nba'
TODAY = ''
TOMORROW = ''
POOL = PooledDB(creator=pymysql, mincached=3, maxcached=3, maxshared=3, maxconnections=3, host='localhost', user='******', passwd='3G_', db='guessing',\
charset='utf8', port=3306, cursorclass=pymysql.cursors.DictCursor)
g_conn = POOL.connection() #连接数据库
API_URL_FMT = 'http://matchweb.sports.qq.com/kbs/list?from=NBA_PC&columnId=100000&startTime={}&endTime={}&_=1542078952802'
SQL_GET_MATCH_FMT = 'select home_club_name, match_id, guest_club_name, match_time from {} where from_unixtime(match_time, "%Y-%m-%d") BETWEEN "{}" and "{}"'
SQL_INS_MATCH_FMT = 'insert into {} (home_club_name, home_club_logo, home_score, \
guest_club_name, guest_club_logo, guest_score, \
total_player, total_oc, win_bettor, win_oc, \
lose_bettor, lose_oc, match_time) \
value ({}, {},{},{},{},{},\
0, 0,0,0, 0,0, {:d});"'
############### error infomation define ####################
err_no_data = {"code": 101, "msg": "Unable to fetch data from database."}
err_exec_failed = {"code": 102, "msg": "Failed to execute SQL Statement."}
err_ivd_match_type = {"code": 105, "msg": "Invalid match type."}
err_bet_time_out = {"code": 104, "msg": "Betting timed out."}
self.base_store = BaseStore()
self.article_table = 'articles'
self.tag_table = 'tags' # 存储标签
self.art_cor_table = 'tag_article_correspond' # 存储原文章和伪原创文章对应关系 tag_article_correspond
self.last_ignore_artid_table = 'last_and_ignore_artid' # 存储已经参与伪原创的最大article_id和无需伪原创的article_id
self.article_pool = PooledDB(pymysql, 1, 5, **config.ARTICLE_DB_CONFIG)
self.tag_pool = PooledDB(pymysql, 1, 5, **config.TAG_SORT_ARTICLE_CONFIG)
def query_last_id(self):
查询last_id
query_last_id_sql = 'select `last_id` from {}'.format(self.last_ignore_artid_table)
connection = self.tag_pool.connection()
result = self.base_store.query(query_last_id_sql, connection)
if result is not None:
last_id = result[0]
else:
last_id = 0
# 向表中插入一条数据
insert_one_data = {'last_id': 0}
keys = 'last_id'
values = '%s'
insert_sql = 'insert into {table}({keys}) values ({values})'.format(table=self.last_ignore_artid_table,
keys=keys, values=values)
self.base_store.insert(insert_sql, insert_one_data, connection)
return last_id
def query_article(self):
从last_id开始查询文章
last_id = self.query_last_id()
query_sql = "select `id`, `title`, `content` from {} where id > {} and title != ''".format(self.article_table, last_id)
connection = self.article_pool.connection()
result = self.base_store.query(query_sql, connection)
return result
except:
print("query_article error")
traceback.print_exc()
def query_tag_id(self, tag):
查询tag_id
query_tag_id_sql = 'select `id` from {} where tag="{}"'.format(self.tag_table, tag)
connection = self.tag_pool.connection()
result = self.base_store.query(query_tag_id_sql, connection)
return result
except:
print("query_tag_id error")
traceback.print_exc()
def insert_tag(self, tag):
存储tag
insert_tag_data = {"tag": tag}
keys = ','.join(insert_tag_data.keys())
values = ','.join(['%s'] * len(insert_tag_data))
insert_tag_sql = 'insert ignore into {table}({keys}) values ({values})'.format(table=self.tag_table,
keys=keys, values=values)
connection = self.tag_pool.connection()
self.base_store.insert(insert_tag_sql, insert_tag_data, connection)
except:
print("insert_tag error")
traceback.print_exc()
def insert_tagid_score(self, article_id, tag, score):
将article_id,tag和score的对应关系存储数据库
tag_id = self.query_tag_id(tag)
insert_tag_article_data = {
"article_id": article_id,
"tag_id": tag_id[0],
"score": score
keys = ','.join(insert_tag_article_data.keys())
values = ','.join(['%s'] * len(insert_tag_article_data))
insert_tag_articleid_sql = 'insert ignore into {table}({keys}) values ({values})'.format(
table=self.art_cor_table,
keys=keys, values=values)
connection = self.tag_pool.connection()
self.base_store.insert(insert_tag_articleid_sql, insert_tag_article_data, connection)
except:
print("insert_tagid_score error")
traceback.print_exc()
def update_last_id(self, article_id):
更改last_id
update_sql = 'update {} set last_id = {} where id > 0'.format(self.last_ignore_artid_table, article_id)
connection = self.tag_pool.connection()
self.base_store.update(update_sql, connection)
except:
print("update_is_used error")
traceback.print_exc()
def __init__(self):
# self.pool = PooledDB(pymysql, 5, host='localhost', user='******', passwd='111', db='test', port=3306) # 5为连接池里的最少连接数
self.pool = PooledDB(pymysql, mincached = 1, maxcached = 20, \
host = mysql_config.DBHOST, port = mysql_config.DBPORT, user = mysql_config.DBUSER, passwd = mysql_config.DBPWD, \
db =mysql_config.DBNAME, use_unicode = True, charset = mysql_config.DBCHAR, cursorclass = DictCursor)
def getMyIp(self):
r = requests.get("http://2017.ip138.com/ic.asp")
self.MyIp = re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}', r.text)[0]
def excuteSql(self, s):
conn = self.pool.connection()
cur = conn.cursor()
SQL = s
r = cur.execute(SQL)
conn.commit()
r = cur.fetchall()
cur.close()
conn.close()
return r
def check(self):
global cur
while self.Queue.empty() == False:
ip = self.Queue.get()
#print(ip)
host = ip.split(":")[0]
proxies = {
# "http": "http://60.215.194.73:8888",
# "https": "http://10.10.1.10:1080",
"http": "http://" + ip
r = requests.get('http://2017.ip138.com/ic.asp',
timeout=10,
proxies=proxies)
r.encoding = "gb2312"
delay = round(r.elapsed.total_seconds(), 2)
except:
continue
print('ip:' + ip + '不可用')
else:
if re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}',
r.text) == []:
continue
elif re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}',
r.text)[0] == host:
print('高匿ip:' + ip + ' delay:' + str(delay) + 's')
sql = "INSERT INTO ip VALUE('" + ip + "',TRUE," + str(
delay) + ",0,0) "
self.excuteSql(sql)
except Exception as e:
print('ip重复:' + ip + str(e))
continue
elif re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}',
r.text)[0] == self.MyIp:
print('普通ip:' + ip + ' delay:' + str(delay) + 's')
sql = "INSERT INTO ip VALUE('" + ip + "',FALSE," + str(
delay) + ",0,0) "
self.excuteSql(sql)
except:
#没加这个就卡了
print('ip重复:' + ip)
continue
def getIP(self):
#url1
print('正在获取ip')
rIP = requests.get(
'http://www.89ip.cn/apijk/?&tqsl=100000&sxa=&sxb=&tta=&ports=&ktip=&cf=1'
rIP.encoding = "gb2312"
IPs = re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}:\d{1,5}',
rIP.text)
for i in range(len(IPs)):
self.I = self.I + 1
self.Queue.put(IPs[i])
print('url1获取ip:' + str(self.I))
except:
print('url1获取ip失败')
#url2
p = {'http': '127.0.0.1:8080'}
headers = {
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:56.0) Gecko/20100101 Firefox/56.0',
'Host': 'www.66ip.cn'
rIP = requests.get(
'http://www.66ip.cn/nmtq.php?getnum=999900&isp=0&anonymoustype=0&start=&ports=&export=&ipaddress=&area=0&proxytype=2&api=66ip',
headers=headers,
timeout=10)
IPs = re.findall('\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}:\d{1,5}',
rIP.text)
count = 0
for i in range(len(IPs)):
self.I += 1
count = count + 1
self.Queue.put(IPs[i])
print('url2获取ip:' + str(count))
count = 0
except:
print('url2获取ip失败')
#url3
url1 = "http://www.xicidaili.com/nn/"
header = {
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'Upgrade-Insecure-Requests': '1',
'User-Agent':
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko)',
'Accept':
'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding': 'gzip, deflate, sdch',
'Accept-Language': 'zh-CN,zh;q=0.8',
for i in range(1, 10):
rurl = url1 + str(i)
html = requests.get(rurl, headers=header).text
soup = BeautifulSoup(html, 'html.parser')
tags = soup.select('#ip_list')[0].select('tr')
for tag in tags:
ip = tag.select('td')[1].string + ":" + tag.select(
'td')[2].string
self.I += 1
self.Queue.put(ip)
count += 1
#sum += 1
except IndexError:
print('url3获取ip:' + str(count))
count = 0
except:
print('url3获取ip失败')
#url4
url4 = "http://www.kuaidaili.com/free/inha/"
header = {
'Host':
'www.kuaidaili.com',
'Cache-Control':
'max-age=0',
'Upgrade-Insecure-Requests':
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',
'Accept':
'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Encoding':
'gzip, deflate, sdch',
'Accept-Language':
'zh-CN,zh;q=0.8',
'Cookie':
'yd_cookie=edf7538f-6e8f-42a6f381178111fa513b62651a51827dc817; _ydclearance=a46bb1ff737a3ffaf93464b4-f7f3-484d-b800-fd9da69f7504-1513701173; _gat=1; channelid=0; sid=1513692518622292; _ga=GA1.2.1184806693.1513693989; _gid=GA1.2.91565562.1513693989'
for i in range(1, 6):
rurl = url4 + str(i)
html = requests.get(rurl, headers=header).text
iplist = re.findall(
'\d{0,4}\.\d{0,4}\.\d{0,4}\.\d{0,4}</td>\n <td data-title="PORT">\d{0,6}',
html)
for ip in iplist:
self.Queue.put(
ip.replace(
'</td>\n <td data-title="PORT">',
':')))
self.I += 1
count += 1
print('url4获取ip:' + str(count))
except Exception as e:
print('url4获取ip失败:' + str(e))
print('.........get ip finished all:' + str(self.I))
def start(self):
self.getMyIp()
print('MyIp:' + self.MyIp)
self.getIP()
# 检测ip是否可用
for i in range(int(self.Threds)):
t = threading.Thread(target=self.check)
threadslist.append(t)
t.start()
for i in threadslist:
i.join()
format='%(asctime)s - PID: %(process)d - %(levelname)s - %(pathname)s - lineno:%(lineno)d, %(message)s')
parser = argparse.ArgumentParser(description="Import product info for recommendation engine")
parser.add_argument('--vendordbuser', default='root')
parser.add_argument('--vendordbpassword', default='admin')
parser.add_argument('--vendordbhost', default='localhost')
parser.add_argument('--vendordbdatabase', default='jinbag')
parser.add_argument('--vendordbport', default='3306')
args = parser.parse_args()
pool = PooledDB(
creator=MySQLdb,
host=args.vendordbhost,
port=int(args.vendordbport),
user=args.vendordbuser,
passwd=args.vendordbpassword,
db=args.vendordbdatabase,
charset='utf8'
user_id = get_user_id(pool)
cal_res = {}
for it in user_id:
cal_res[it] = 0
events = ['view', 'plan', 'buy', 'fav']
hbase_data = open("/home/zhenping/zhenping_07_08_2018/part-00000")
for line in hbase_data:
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
释放连接对象;conn.close()或del conn
# 连接池对象
__pool = None
def __init__(self):
# 数据库构造函数,从连接池中取出连接,并生成操作游标
self._conn = Mysql.__getConn(self)
self._cursor = self._conn.cursor()
@staticmethod
def __getConn(self):
@summary: 静态方法,从连接池中取出连接
@return MySQLdb.connection
if self.__pool is None:
self.__pool = PooledDB(creator=MySQLdb,
mincached=1,
maxcached=20,
host=config.MYSQL_HOST,
port=config.MYSQL_PORT,
user=config.MYSQL_USER,
passwd=config.MYSQL_PWD,
db=config.MYSQL_NAME,
use_unicode=False,
charset=config.MYSQL_CHAR,
cursorclass=DictCursor)
return self.__pool.connection()
def getAll(self, sql, param=None):
@summary: 执行查询,并取出所有结果集
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list(字典对象)/boolean 查询到的结果集
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchall()
else:
result = False
return result
def getOne(self, sql, param=None):
@summary: 执行查询,并取出第一条
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchone()
else:
result = False
return result
def getMany(self, sql, num, param=None):
@summary: 执行查询,并取出num条结果
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param num:取得的结果条数
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchmany(num)
else:
result = False
return result
def insertOne(self, sql, value):
@summary: 向数据表插入一条记录
@param sql:要插入的SQL格式
@param value:要插入的记录数据tuple/list
@return: insertId 受影响的行数
self._cursor.execute(sql, value)
return self.__getInsertId()
def insertMany(self, sql, values):
@summary: 向数据表插入多条记录
@param sql:要插入的SQL格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
count = self._cursor.executemany(sql, values)
return count
def __getInsertId(self):
获取当前连接最后一次插入操作生成的id,如果没有则为0
self._cursor.execute("SELECT @@IDENTITY AS id")
result = self._cursor.fetchall()
return result[0]['id']
def __query(self, sql, param=None):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
def update(self, sql, param=None):
@summary: 更新数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
return self.__query(sql, param)
def delete(self, sql, param=None):
@summary: 删除数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要删除的条件 值 tuple/list
@return: count 受影响的行数
return self.__query(sql, param)
def begin(self):
@summary: 开启事务
self._conn.autocommit(0)
def end(self, option='commit'):
@summary: 结束事务
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, isEnd=1):
@summary: 释放连接池资源
if isEnd == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()
class DB_CONN(object):
def __init__(self, **kwargs):
parse = ParserConf(os.environ['DATA_PATH'])
database = kwargs.get('database', 'DATABASE')
ip = parse.get_config_value_by_key(database, "ip")
port = parse.get_config_value_by_key(database, "port")
user = parse.get_config_value_by_key(database, "user")
password = parse.get_config_value_by_key(database, "password")
charset = parse.get_config_value_by_key(database, "charset")
self.__pool = PooledDB(creator=pymysql,
maxusage=None,
maxconnections=10,
mincached=5,
maxcached=10,
maxshared=10,
blocking=True,
host=ip,
port=int(port),
user=user,
passwd=password,
charset=charset)
except Exception:
print('数据库连接失败')
def db_query_count(self, sql):
conn = self.__pool.connection()
cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
cur.execute(sql)
conn.commit()
return cur
except Exception as e:
print('数据查询失败', e)
finally:
cur.close()
conn.close()
def db_Query_Json(self, sql):
获取数据json格式游标,使用需要fetchall()或fetchone()fetchmany()
:param sql: 查询语句
:return: 游标json格式 使用时需要使用fetchall()或fetchone()fetchmaeeny()
conn = self.__pool.connection()
cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
len = 0
if len == 0:
for i in range(5):
len = cur.execute(sql)
if len > 0:
conn.commit()
return cur
time.sleep(1)
return cur
except Exception as e:
print('数据查询失败', e)
finally:
cur.close()
conn.close()
def db_Query_tuple(self, sql, params=None):
获取数据元组格式游标,使用需要fetchall()或fetchone()fetchmany()
:param sql: 查询语句
:return: 元组格式游标,使用需要fetchall()或fetchone()fetchmany()
#self.conn = DB_CONN.__pool.connection()
conn = self.__pool.connection()
cur = conn.cursor()
if params:
cur.execute(sql, params)
else:
cur.execute(sql)
return cur
except Exception as e:
print('数据库查询失败')
finally:
cur.close()
# self.conn.close()
conn.close()
# 数据库插入
def db_Insert(self, sql, params):
数据库插入
:param sql: 插入语句
:param params: 插入数据
:return: 插入成功数目
#self.conn = DB_CONN.__pool.connection()
#conn = self.__pool.connection()
cur = self.conn.cursor()
data_counts = cur.execute(sql, params)
self.conn.commit()
return data_counts
except Exception as e:
self.conn.rollback()
finally:
cur.close()
# self.conn.close()
self.conn.close()
# 数据库更新
def db_Update(self, sql, params=None):
:param sql:
:return:
#self.conn = DB_CONN.__pool.connection()
#conn = self.__pool.connection()
cur = self.conn.cursor()
if params:
data_counts = cur.execute(sql, params)
else:
data_counts = cur.execute(sql)
self.conn.commit()
return data_counts
except Exception as e:
self.conn.rollback()
finally:
cur.close()
# self.conn.close()
self.conn.close()
def db_Batch(self, sql, params):
#self.conn = DB_CONN.__pool.connection()
#conn = self.__pool.connection()
cur = self.conn.cursor()
data_counts = cur.executemany(sql, params)
self.conn.commit()
return data_counts
except Exception as e:
self.conn.rollback()
return False, '***执行更新失败,请检查数据!错误信息:%s' % e + "查询语句为:" + sql
finally:
cur.close()
# self.conn.close()
self.conn.close()
# 单例模式的初始化函数必须先判断
if(not hasattr(self, "pool")):
self.pool = PooledDB(creator=pymysql, mincached=1, maxcached=10, maxconnections=100,
blocking=True,host= "127.0.0.1", port=3306, user='******',
passwd='123456',db='yiibaidb', charset='utf8',)
page_end = 1
# 爬虫字段 host,ip,端口,协议,国家,省份,城市,默认不需要更改(建议不要修改)
fields = ['host', 'ip', 'port', 'protocol', 'country', 'region', 'city']
# port,protocol,country,region,city,host
pool = PooledDB(pymysql,
host=host,
user=user,
passwd=pwd,
db=db_name,
port=port,
charset=charset)
connection = pool.connection()
cursor = connection.cursor()
session = requests.session()
# 请求头
headers = {
'Upgrade-Insecure-Requests':
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.110 Safari/537.36'