系列文章总目录: 【2022 年】Python3 爬虫学习教程 ,本教程内容多数来自于《Python3网络爬虫开发实战(第二版)》一书,目前截止 2022 年,可以将爬虫基本技术进行系统讲解,同时将最新前沿爬虫技术如异步、JavaScript 逆向、AST、安卓逆向、Hook、智能解析、群控技术、WebAssembly、大规模分布式、Docker、Kubernetes 等,市面上目前就仅有 《Python3 网络爬虫开发实战(第二版)》 一书了, 点击了解详情 。
在上一节中,我们介绍了异步爬虫的基本原理和 asyncio 的基本用法,并且在最后简单提及了使用 aiohttp 来实现网页爬取的过程。在本节中,我们来介绍一下 aiohttp 的常见用法。
前面介绍的 asyncio 模块内部实现了对 TCP、UDP、SSL 协议的异步操作,但是对于 HTTP 请求来说,我们就需要用到 aiohttp 来实现了。
aiohttp 是一个基于 asyncio 的异步 HTTP 网络模块,它既提供了服务端,又提供了客户端。其中我们用服务端可以搭建一个支持异步处理的服务器,就是用来处理请求并返回响应的,类似于 Django、Flask、Tornado 等一些 Web 服务器。而客户端可以用来发起请求,类似于使用 requests 发起一个 HTTP 请求然后获得响应,但 requests 发起的是同步的网络请求,aiohttp 则是异步的。
本节中,我们主要了解一下 aiohttp 客户端部分的用法。
首先,我们来看一个基本的 aiohttp 请求案例,代码如下:
import aiohttpimport asyncioasync def fetch(session, url): async with session.get(url) as response: return await response.text(), response.statusasync def main(): async with aiohttp.ClientSession() as session: html, status = await fetch(session, 'https://cuiqingcai.com') print(f'html: {html[:100]}...') print(f'status: {status}')if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
这里网页源码过长,只截取输出了一部分。可以看到,这里我们成功获取了网页的源代码及响应状态码 200,也就完成了一次基本的 HTTP 请求,即我们成功使用 aiohttp 通过异步的方式来进行了网页爬取。当然,这个操作用之前讲的 requests 也可以做到。
可以看到,其请求方法的定义和之前有了明显的区别,主要有如下几点:
async
with as
coroutine
await
response
text
main
fetch
run_until_complete
注意:在 Python 3.7 及以后的版本中,我们可以使用 asyncio.run(main()) 来代替最后的启动操作,不需要显示声明事件循环, run 方法内部会自动启动一个事件循环。但这里为了兼容更多的 Python 版本,依然还是显式声明了事件循环。
asyncio.run(main())
run
对于 URL 参数的设置,我们可以借助于 params 参数,传入一个字典即可,示例如下:
params
import aiohttpimport asyncioasync def main(): params = {'name': 'germey', 'age': 25} async with aiohttp.ClientSession() as session: async with session.get('https://httpbin.org/get', params=params) as response: print(await response.text())if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
{ "args": { "age": "25", "name": "germey" }, "headers": { "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Host": "httpbin.org", "User-Agent": "Python/3.7 aiohttp/3.6.2", "X-Amzn-Trace-Id": "Root=1-5e85eed2-d240ac90f4dddf40b4723ef0" }, "origin": "17.20.255.122", "url": "https://httpbin.org/get?name=germey&age=25"}
这里可以看到,其实际请求的 URL 为 https://httpbin.org/get?name=germey&age=25,其 URL 请求参数就对应了 params 的内容。
另外,aiohttp 还支持其他请求类型,如 POST、PUT、DELETE 等,这和 requests 的使用方式有点类似,示例如下:
session.post('http://httpbin.org/post', data=b'data')session.put('http://httpbin.org/put', data=b'data')session.delete('http://httpbin.org/delete')session.head('http://httpbin.org/get')session.options('http://httpbin.org/get')session.patch('http://httpbin.org/patch', data=b'data')
import aiohttpimport asyncioasync def main(): data = {'name': 'germey', 'age': 25} async with aiohttp.ClientSession() as session: async with session.post('https://httpbin.org/post', data=data) as response: print(await response.text())if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
{ "args": {}, "data": "", "files": {}, "form": { "age": "25", "name": "germey" }, "headers": { "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Content-Length": "18", "Content-Type": "application/x-www-form-urlencoded", "Host": "httpbin.org", "User-Agent": "Python/3.7 aiohttp/3.6.2", "X-Amzn-Trace-Id": "Root=1-5e85f0b2-9017ea603a68dc285e0552d0" }, "json": null, "origin": "17.20.255.58", "url": "https://httpbin.org/post"}
{ "args": {}, "data": "{\"name\": \"germey\", \"age\": 25}", "files": {}, "form": {}, "headers": { "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Content-Length": "29", "Content-Type": "application/json", "Host": "httpbin.org", "User-Agent": "Python/3.7 aiohttp/3.6.2", "X-Amzn-Trace-Id": "Root=1-5e85f03e-c91c9a20c79b9780dbed7540" }, "json": { "age": 25, "name": "germey" }, "origin": "17.20.255.58", "url": "https://httpbin.org/post"}
import aiohttpimport asyncioasync def main(): data = {'name': 'germey', 'age': 25} async with aiohttp.ClientSession() as session: async with session.post('https://httpbin.org/post', data=data) as response: print('status:', response.status) print('headers:', response.headers) print('body:', await response.text()) print('bytes:', await response.read()) print('json:', await response.json())if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
status: 200headers: <CIMultiDictProxy('Date': 'Thu, 02 Apr 2020 14:13:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '503', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>body: { "args": {}, "data": "", "files": {}, "form": { "age": "25", "name": "germey" }, "headers": { "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Content-Length": "18", "Content-Type": "application/x-www-form-urlencoded", "Host": "httpbin.org", "User-Agent": "Python/3.7 aiohttp/3.6.2", "X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029" }, "json": null, "origin": "17.20.255.58", "url": "https://httpbin.org/post"}bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "25", \n "name": "germey"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "18", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "httpbin.org", \n "User-Agent": "Python/3.7 aiohttp/3.6.2", \n "X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029"\n }, \n "json": null, \n "origin": "17.20.255.58", \n "url": "https://httpbin.org/post"\n}\n'json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '25', 'name': 'germey'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '18', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.7 aiohttp/3.6.2', 'X-Amzn-Trace-Id': 'Root=1-5e85f2f1-f55326ff5800b15886c8e029'}, 'json': None, 'origin': '17.20.255.58', 'url': 'https://httpbin.org/post'}
这里我们可以看到有些字段前面需要加 await ,有的则不需要。其原则是,如果它返回的是一个 coroutine 对象(如 async 修饰的方法),那么前面就要加 await ,具体可以看 aiohttp 的 API,其链接为: https://docs.aiohttp.org/en/stable/client_reference.html。
对于超时设置,我们可以借助 ClientTimeout 对象,比如这里要设置 1 秒的超时,可以这么实现:
ClientTimeout
import aiohttpimport asyncioasync def main(): timeout = aiohttp.ClientTimeout(total=1) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get('https://httpbin.org/get') as response: print('status:', response.status)if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
如果超时的话,会抛出 TimeoutError 异常,其类型为 asyncio.TimeoutError ,我们再进行异常捕获即可。
TimeoutError
asyncio.TimeoutError
另外,声明 ClientTimeout 对象时还有其他参数,如 connect 、 socket_connect 等,详细可以参考官方文档: https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts。
connect
socket_connect
由于 aiohttp 可以支持非常大的并发,比如上万、十万、百万都是能做到的,但对于这么大的并发量,目标网站很可能在短时间内无法响应,而且很可能瞬时间将目标网站爬挂掉,所以我们需要控制一下爬取的并发量。
一般情况下,我们可以借助于 asyncio 的 Semaphore 来控制并发量,示例如下:
Semaphore
import asyncioimport aiohttpCONCURRENCY = 5URL = 'https://www.baidu.com'semaphore = asyncio.Semaphore(CONCURRENCY)session = Noneasync def scrape_api(): async with semaphore: print('scraping', URL) async with session.get(URL) as response: await asyncio.sleep(1) return await response.text()async def main(): global session session = aiohttp.ClientSession() scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)] await asyncio.gather(*scrape_index_tasks)if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
这里我们声明了 CONCURRENCY (代表爬取的最大并发量)为 5,同时声明爬取的目标 URL 为百度。接着,我们借助于 Semaphore 创建了一个信号量对象,将其赋值为 semaphore ,这样我们就可以用它来控制最大并发量了。怎么使用呢?这里我们把它直接放置在对应的爬取方法里面,使用 async with 语句将 semaphore 作为上下文对象即可。这样的话,信号量可以控制进入爬取的最大协程数量,即我们声明的 CONCURRENCY 的值。
CONCURRENCY
semaphore
async with
在 main 方法里面,我们声明了 10000 个 task ,将其传递给 gather 方法运行。倘若不加以限制,这 10000 个 task 会被同时执行,并发数量太大。但有了信号量的控制之后,同时运行的 task 的数量最大会被控制在 5 个,这样就能给 aiohttp 限制速度了。
task
gather
本节我们了解了 aiohttp 的基本使用方法,更详细的内容还是推荐大家到官方文档查阅,详见 https://docs.aiohttp.org/。
本节代码: https://github.com/Python3WebSpider/AsyncTest。