await page.evaluate("""
() =>{
Object.defineProperties(navigator,{
webdriver:{
get: () => false
""")
我们会看到这一步非常关键,因为puppeteer
出于政策考虑(这个词用的不是很好,就是那个意思)会设置window.navigator.webdriver
为true
,告诉网站我是一个 webdriver 驱动的浏览器。有些网站比较聪明(反爬措施做得比较好),就会通过这个来判断对方是不是爬虫程序。详情情参考这篇文章:一日一技:如何正确移除Selenium中window.navigator.webdriver的值
这等价于在 devtools 里面输入那一段 js 代码。
还可以加载一个 js 文件:
await page.addScriptTag(path=path_to_your_js_file)
通过注入 js 脚本能完成很多很多有用的操作,比如自动下拉页面等。
截获 request 和 response
await page.setRequestInterception(True)
page.on('request', intercept_request)
page.on('response', intercept_response)
intercept_request
和intercept_response
相当于是注册的两个回调函数,在浏览器发出请求和获取到请求之前指向这两个函数。
比如可以这样禁止获取图片、多媒体资源和发起 websocket 请求:
async def intercept_request(req):
"""请求过滤"""
if req.resourceType in ['image', 'media', 'eventsource', 'websocket']:
await req.abort()
else:
await req.continue_()
然后每次获取到请求之后将内容打印出来(这里只打印了fetch
和xhr
类型response 的内容):
async def intercept_response(res):
resourceType = res.request.resourceType
if resourceType in ['xhr', 'fetch']:
resp = await res.text()
print(resp)
一共有哪些resourceType,pyppeteer文档里面有:
拼多多搜索爬虫
页面自动下拉
拼多多的搜索界面是一个无限下拉的页面,我们希望能够实现无限下拉页面,并且能够控制程序提前退出,不然一直下拉也不好,我们可能并不需要那么多数据。
js 脚本
async () => {
await new Promise((resolve, reject) => {
// 允许下滑的最大高度,防止那种可以无限下拉的页面无法结束
const maxScrollHeight = null;
// 控制下拉次数
const maxScrollTimes = null;
let currentScrollTimes = 0;
// 记录上一次scrollHeight,便于判断此次下拉操作有没有成功,从而提前结束下拉
let scrollHeight = 0;
// maxTries : 有时候无法下拉可能是网速的原因
let maxTries = 5;
let tried = 0;
const timer = setInterval(() => {
// 下拉失败,提前退出
// BUG : 如果网速慢的话,这一步会成立~
// 所以设置一个 maxTried 变量
if (document.body.scrollHeight === scrollHeight) {
tried += 1;
if (tried >= maxTries) {
console.log("reached the end, now finished!");
clearInterval(timer);
resolve();
scrollHeight = document.body.scrollHeight;
window.scrollTo(0, scrollHeight);
window.scrollBy(0, -10);
// 判断是否设置了maxScrollTimes
if (maxScrollTimes) {
if (currentScrollTimes >= maxScrollTimes) {
clearInterval(timer);
resolve();
// 判断是否设置了maxScrollHeight
if (maxScrollHeight) {
if (scrollHeight >= maxScrollHeight) {
if (currentScrollTimes >= maxScrollTimes) {
clearInterval(timer);
resolve();
currentScrollTimes += 1;
// 还原 tried
tried = 0;
}, 1000);
这里面有几个重要的参数:
interval : 下拉间隔时间,以毫秒为单位
maxScrollHeight : 运行页面下拉最大高度
maxScrollTimes : 最多下拉多少次(推荐使用,可以更好控制爬取多少数据)
maxTries : 下拉不成功时最多重试几次,比如有时候会因为网络原因导致没能在 interval ms 内成功下拉
把这些替换成你需要的。同时你可以打开 chrome 的开发者工具运行一下这段 js 脚本。
这段代码一共也就只有70多行,比较简陋,情根据自己的实际需求更改。
import os
import time
import json
from urllib.parse import urlsplit
import asyncio
import pyppeteer
from scripts import scripts
BASE_DIR = os.path.dirname(__file__)
async def intercept_request(req):
"""请求过滤"""
if req.resourceType in ['image', 'media', 'eventsource', 'websocket']:
await req.abort()
else:
await req.continue_()
async def intercept_response(res):
resourceType = res.request.resourceType
if resourceType in ['xhr', 'fetch']:
resp = await res.text()
url = res.url
tokens = urlsplit(url)
folder = BASE_DIR + '/' + 'data/' + tokens.netloc + tokens.path + "/"
if not os.path.exists(folder):
os.makedirs(folder, exist_ok=True)
filename = os.path.join(folder, str(int(time.time())) + '.json')
with open(filename, 'w', encoding='utf-8') as f:
f.write(resp)
async def main():
browser = await pyppeteer.launch({
'executablePath': '/Users/changjiang/apps/Chromium.app/Contents/MacOS/Chromium',
'args': [
'--disable-extensions',
'--hide-scrollbars',
'--disable-bundled-ppapi-flash',
'--mute-audio',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-gpu',
'dumpio': True,
page = await browser.newPage()
await page.setRequestInterception(True)
page.on('request', intercept_request)
page.on('response', intercept_response)
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299')
await page.setViewport({'width': 1080, 'height': 960})
await page.goto('http://yangkeduo.com')
await page.evaluate("""
() =>{
Object.defineProperties(navigator,{
webdriver:{
get: () => false
""")
await page.evaluate("你的那一段页面自动下拉 js 脚本")
await browser.close()
if __name__ == '__main__':
asyncio.run(main())
如果你像我一样真正热爱计算机科学,喜欢研究底层逻辑,欢迎关注我的微信公众号:
liaochangjiang
搬砖工 @ Authing
134.9k
粉丝