Guzzle 是一个非常强大而且稳定的 HTTP client。不同于一般的 cURL 封装组件,Guzzle 内部使用了多种请求方式,来实现 HTTP 请求,cURL 只是最常用的方式,并且 Guzzle 提供了强大的异步、并发功能,使得构建一个 HTTP 请求十分容易而且易拓展。现在 Guzzle 已经被 Drupal 整合到核心模块中了,可靠性不言而喻。Guzzle 目前使用了 PSR-7 规范,拓展性和兼容性也更加优秀了。之前 在一次重构记录中 提到过,但是没有深入分析过,这次决定介绍一些使用例子并深入分析其底层实现原理,如果有问题,请留言指出,共同进步。
注意:为了尽量缩减阅读量,部分源码分析只列出了关键步骤。
环境
本文使用的 Guzzle 版本为 6.3.0,composer.json 文件内容为
"require": { "guzzlehttp/guzzle": "^6.3"配置
Guzzle 的各种配置都和 HTTP 请求相关,比如是否跟踪 302 跳转,是否携带 cookies,是否使用 SSL、超时等等。
配置项是以数组形式在创建 client 对象的时候传入的, 所有的配置都在这里 。Guzzle 会提供一个默认配置,会和自定义配置进行合并, 并优先自定义配置 。
public function __construct(array $config = [])
$this->configureDefaults($config);
private function configureDefaults(array $config)
// 自定义配置和默认配置,在这里合并,并赋值给了成员变量
$this->config = $config + $defaults;
比如这样:
$config = [
'allow_redirects' => [
'max' => 5,
'referer' => true,
'http_errors' => false,
'decode_content' => true,
'cookies' => true,
'connect_timeout' => 1.5,
'timeout' => 2.5,
'headers' => [
'User-Agent' => 'test client for chengxiaobai.com',
$client = new \GuzzleHttp\Client($config);
你也可以在构建请求的时候传入配置,这个时候会和构造方法中传入的配置合并,并且只对当前请求有效。
private function prepareDefaults($options)
$defaults = $this->config;
// 这里这是赋值给了局部变量,所以只对当前请求有效
$result = $options + $defaults;
return $result;
比如这样:
$client = new \GuzzleHttp\Client($config);
$client->request('GET', 'https://www.chengxiaobai.com/',
'allow_redirects' => [
'max' => 1,
'referer' => false,
特殊的 handler 参数
如果按照面向对象的来描述的话,就是你必须得实现一个这样的接口,Chengxiaobai\handler
。
interface Chengxiaobai
* handler interface
* @param RequestInterface $request
* @param array $options
* @return Psr\Http\Message\ResponseInterface | GuzzleHttp\Promise\PromiseInterface
public function handler(Psr\Http\Message\RequestInterface $request,array $options);
这样对 handler 结构就很明确了吧。我们看源码怎么解析 handler 配置的。
public function __construct(array $config = [])
if (!isset($config['handler'])) {
// 创建一个默认的 handler 栈
$config['handler'] = HandlerStack::create();
} elseif (!is_callable($config['handler'])) {
throw new \InvalidArgumentException('handler must be a callable');
很明显,如果自定义了 handler 就会放弃 Guzzle 默认提供的 handlerStack。除非你有足够的把握,请不要随意操作。
举个自定义 handler 操作的例子,比如对任意一个请求都返回 404。
$client = new \GuzzleHttp\Client($config);
$response = $client->request('GET', 'www.chengxiaobai.com/archives/',
'handler' => function (\Psr\Http\Message\RequestInterface $request, array $options) {
return new \GuzzleHttp\Psr7\Response(404);
echo $response->getStatusCode();// 404
上面我们说 Guzzle 本身自带了一些 handler,我们先看看默认创建的 handlerStack 都是些什么,先不管每个 handler 里面的实现,在请求处理阶段会详细说。
public static function create(callable $handler = null)
// 这里定义了底层请求实现方法
$stack = new self($handler ?: choose_handler());
// 下面都会添加一些 Middleware 中间件
$stack->push(Middleware::httpErrors(), 'http_errors');
$stack->push(Middleware::redirect(), 'allow_redirects');
$stack->push(Middleware::cookies(), 'cookies');
$stack->push(Middleware::prepareBody(), 'prepare_body');
return $stack;
注意 choose_handler 这个方法,这个方法决定了实现请求的底层方法,通过它能让我们对 Guzzle 请求的实现的底层方法有个初步了解,也就是,所有的请求都是通过它发送出去的。仔细看源码注释,很关键。
function choose_handler()
$handler = null;
// 判定 curl 方法,如果并发和常规 curl 同时存在
if (function_exists('curl_multi_exec') && function_exists('curl_exec')) {
// 注册并发 curl 为默认请求方式,常规 curl 为同步请求方式
$handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
} elseif (function_exists('curl_exec')) {
// 如果两种 curl 方法同时只有一个存在,则优先常规 curl
$handler = new CurlHandler();
} elseif (function_exists('curl_multi_exec')) {
$handler = new CurlMultiHandler();
// 如果 allow_url_fopen 开启
if (ini_get('allow_url_fopen')) {
$handler = $handler
// 已有 handler ? 再注册一个流处理 handler
? Proxy::wrapStreaming($handler, new StreamHandler())
// 否则只有流处理 handler
: new StreamHandler();
} elseif (!$handler) {
throw new \RuntimeException('GuzzleHttp requires cURL, the '
. 'allow_url_fopen ini setting, or a custom HTTP handler.');
return $handler;
创建完 handler 后,会往 stack 中添加一些 middleware,也就是中间件。简单介绍下,push 函数第一个参数是闭包,第二个参数是字符串,中间件的名字,middleware 主要由闭包组成,可能有的 middleware 嵌套有点多,显得有点复杂,但是无论结构如何复杂,本质上就是用来处理各种请求数据,其结构类型和 Chengxiaobai\handler
一样。
需要深入了解 Handlers 和 Middleware 的可以点击这里看官方文档,个人觉得需要对闭包掌握的比较好,才能很好的理解其设计思路。
根据上面的源码分析你可能注意到,系统默认提供的 handler 是以对象的形式存在的。但其真正使用的时候是当做闭包使用的,这里介绍的是真正发挥作用的闭包结构,而不是表面的 HandlerStack 对象。后面” 处理请求 “章节会详细介绍。
构建请求
其实所有的请求在处理上都是异步的,同步请求只不过是异步请求构建后立即要求返回结果,异步转同步。但是异步和同步的请求构建都是类似的,不同处我会说明。
public function request($method, $uri = '', array $options = [])
$options[RequestOptions::SYNCHRONOUS] = true;
// requestAsync 就是异步请求,不过直接调用了 wait 转同步
return $this->requestAsync($method, $uri, $options)->wait();
请求的 uri 参数
private function buildUri($uri, array $config)
// for BC we accept null which would otherwise fail in uri_for
$uri = Psr7\uri_for($uri === null ? '' : $uri);
if (isset($config['base_uri'])) {
$uri = Psr7\UriResolver::resolve(Psr7\uri_for($config['base_uri']), $uri);
// 这里使用了 PSR-7 规范,返回的是一个实现了 UriInterface 的对象
return $uri->getScheme() === '' && $uri->getHost() !== '' ? $uri->withScheme('http') : $uri;
比如这样就会报错
$client = new \GuzzleHttp\Client();
$response = $client->request('GET', '/history.html');
* ountput :
* Fatal error: Uncaught GuzzleHttp\Exception\RequestException:
* cURL error 3: <url> malformed (see http://curl.haxx.se/libcurl/c/libcurl-errors.html)
* in /app/vendor/guzzlehttp/guzzle/src/Handler/CurlFactory.php on line 187
详细的规则可以参考 RFC 3986, section 2。官方帮我们整理了一些快速了解的例子。我这里梳理出 4 种情况。
base_uri
result
chengxiaobai.com/first/
/second
chengxiaobai.com/second
chengxiaobai.com/first/
second
chengxiaobai.com/first/second
chengxiaobai.com/first
/second
chengxiaobai.com/second
chengxiaobai.com/first
second
chengxiaobai.com/second
保险的情况就是每次都使用绝对路径就好了,但是有时候相对路径在做爬取的时候很有用,依据实际需求使用。
构建 reqsest
Guzzle 内部使用的 request 对象都是 Psr\Http\Message\RequestInterface
的实现,这样只要你能按照 PSR-7 的规范来就很容易拓展 Guzzle。
这里再次提醒大家,modern PHP 开发,应该遵循 PSR 规范,有利于社区更好的协作和稳健发展。
public function requestAsync($method, $uri = '', array $options = [])
$request = new Psr7\Request($method, $uri, $headers, $body, $version);
return $this->transfer($request, $options);
丰富 request
transfer 的结构类型和 Chengxiaobai\handler
一样。
private function transfer(RequestInterface $request, array $options)
// 这个方法会根据你的请求类型,构建更具体的请求对象
$request = $this->applyOptions($request, $options);
$handler = $options['handler'];
applyOptions
从名字能看出来,这个方法会根据你的配置,构建出相匹配的 request 对象。比如根据请求类型的不同,进行参数 encode,设置 body 比如 json、stream,设置 header 等请求细节。
注意配置传入的是一个引用,所以里面对配置的任何修改,都会影响后续操作。
private function applyOptions(RequestInterface $request, array &$options)
// 各种判定,修改 $options,如果有没覆盖到的,会新生成一个 $modify 说明需要重新构建 $request
// 构建新的对象方法
$request = Psr7\modify_request($request, $modify);
return $request;
如果没有需要修改的,就直接返回,如果有的话,会重新构建一个新的 request 对象。
注意需要的参数,有些构建参数是从 $changes
取的,但是有些是从原本的 $request
对象取的,本质上就是,有新的就用新的,没有就用老的保持不变。
function modify_request(RequestInterface $request, array $changes)
if (!$changes) {
return $request;
return new Request(
isset($changes['method']) ? $changes['method'] : $request->getMethod(),
$uri,
$headers,
isset($changes['body']) ? $changes['body'] : $request->getBody(),
isset($changes['version'])
? $changes['version']
: $request->getProtocolVersion()
promise 简介
关于 promise,属于 guzzlehttp/promises
类库,是一个很值得学习的类库,有机会我会专门分析下它的实现原理,目前我们还是着重分析请求实现过程。
通看源码会发现,虽然 Guzzle 内部大量使用了 promise 并且夹杂着闭包很复杂,但是 promise 发挥的作用都是一样的。目前可以这么理解,就是 promise 是一个状态机,它有三种状态:等待、满足、拒绝。
下面的这个例子,只是示例会如何执行,promise 规范是有各种要求的,具体见 Promises/A + 规范,Guzzle 使用的 promise 也是该规范的一个实现。
$promise = new Promise(
function () {
echo 'wait';
function () {
echo 'cancle';
$promise->then(
function () {
echo 'onFulfilled';
function () {
echo 'onRejected';
)->then(
function () {
echo 'onFulfilled';
function () {
echo 'onRejected';
从等待状态开始执行,满足了就执行 onFulfilled,拒绝了就执行 onRejected,一连串下来,依靠不同的状态去执行不同的方法,配合 HTTP 请求要么成功要么失败,不会有第三种状态的场景,就可以很顺畅的理解了。
private function transfer(RequestInterface $request, array $options)
$handler = $options['handler'];
try {
return Promise\promise_for($handler($request, $options));
} catch (\Exception $e) {
return Promise\rejection_for($e);
成功就是 promise_for 失败就是 rejection_for。
promise_for
function promise_for($value)
// 如果是一个 promise 对象就直接返回
if ($value instanceof PromiseInterface) {
return $value;
// 如果是一个包含 then 方法的对象,会把它转换成一个 promise 对象
if (method_exists($value, 'then')) {
// 如果里面有 wait、cancel、resolve、reject 等方法,会把它添加进去作为默认方法,否则置为 null
$wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
$cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
$promise = new Promise($wfn, $cfn);
$value->then([$promise, 'resolve'], [$promise, 'reject']);
return $promise;
// 前俩者都不满不足的情况下,直接返回一个满足状态的 promise。
return new FulfilledPromise($value);
rejection_for
异常情况会走入到 rejection_for 方法。同理进行 “数据清洗”,并返回一个拒绝状态的 promise。
function rejection_for($reason)
if ($reason instanceof PromiseInterface) {
return $reason;
return new RejectedPromise($reason);
handler 处理
还是 transfer 方法,在传给 promise_for 之前,先调用了一个 $handler,也就是配置中的 handler 函数。接着就是返回一个 promise 对象,用于外层异步调用。
private function transfer(RequestInterface $request, array $options)
$handler = $options['handler'];
try {
// 这里会先调用配置中的 handler 方法
return Promise\promise_for($handler($request, $options));
} catch (\Exception $e) {
return Promise\rejection_for($e);
处理请求
对于上面的 handler 处理小节,你可能会有疑惑,为什么就调用了 handler 函数,那不是直接开始处理请求了吗?
我们之前介绍过 handler 的数据结构,是一个是 handlerStack 对象,但是其调用本质是一系列组合闭包。但数据结构上是一个对象,怎么使用的时候就成了闭包呢?
当尝试以调用函数的方式调用一个对象时,__invoke() 方法会被自动调用。
有了这个前提,我们看下 handlerStack 源码。
从 handlerStack 的名字上,我们就能知道它是一个” 栈 “数据结构,其满足” 后进先出 “的特性。
public function __invoke(RequestInterface $request, array $options)
// 这个函数主要是实现 Middleware 中间件操作
$handler = $this->resolve();
//这里下面紧接着会有分析
return $handler($request, $options);
public function resolve()
// 变量缓存,能优化部分性能
if (!$this->cached) {
// 这个 handler 就是之前选择的 实现请求的底层方法
// 如果没有的话,请求都无法实现,就别折腾了,抛个异常终止吧
if (!($prev = $this->handler)) {
throw new \LogicException('No handler has been specified');
// 反转顺序,实现”后进先出“特性,调用每个中间件
foreach (array_reverse($this->stack) as $fn) {
// 中间件的注册是 [$middleware, $name] 形式的
// 所以取第一个元素是其具体实现,第二个参数只是名字
// 调用第一次传入的是 handler,后续传入的就是上一次处理的结果
$prev = $fn[0]($prev);
// 所有的都处理完毕,缓存起来
$this->cached = $prev;
return $this->cached;
上面就是很经典的中间件模型实现,laravel 中实现的略有区别,主要用到了 array_reduce
这个函数,但是原理上大同小异,知道其原理一通百通。
我们再继续看看源码。还是这个方法,不过我们分析其最终调用的实现。
根据 Middleware 流程图,我们知道最后一个调用的是 http_errors,我们就来分析它吧,没有任何特殊性,其他的 Middleware 结构都是一样的,只是有些中间件多次使用了 __invoke()
魔术方法而已。
Middleware 里面闭包结构复杂,好好理解下。
public function __invoke(RequestInterface $request, array $options)
// 这个函数主要是实现 Middleware 中间件操作
$handler = $this->resolve();
// 现在我们分析这个
return $handler($request, $options);
public static function httpErrors()
// 第一次调用返回!传入一个 闭包-A
return function (callable $handler) {
// 第二次调用返回!传入 $request,$options
return function ($request, array $options) use ($handler) {
// Middleware 自己的逻辑判定返回什么样的闭包
if (empty($options['http_errors'])) {
// 第三次调用返回!返回 闭包-A 的处理结果
// 这里根据配置 没有注册 then 函数,直接进行下一步处理
return $handler($request, $options);
// 第三次调用返回!返回 闭包-A,附加 promise
// 根据上面我们说到的 promise 特性,这里用 then
// 附加了 闭包-A 处理完毕之后要调用的逻辑
return $handler($request, $options)->then(
function (ResponseInterface $response) use ($request, $handler) {
$code = $response->getStatusCode();
if ($code < 400) {
return $response;
throw RequestException::create($request, $response);
关于返回层数,可以根据 return
来迅速定位,一个 return
就对应一次调用返回。
现在我们先梳理下到这步 handlerStack 被调用的次数,知道这三层闭包分别在哪里被调用了,有利于我们得出最终结果。
// 第一次
public static function create(callable $handler = null)
$stack->push(Middleware::httpErrors(), 'http_errors');
// 第二次
public function resolve()
$prev = $fn[0]($prev);
// 第三次
public function __invoke(RequestInterface $request, array $options)
$handler($request, $options);
最后的结果应该是,如果按照 Middleware 结构应该是这样的:
$handler($request, $options)->then('http_errors')
->then('allow_redirects')
->then('cookies')
->then('prepare_body')
这个 $handler
就是最开始传入的请求实现底层方法。
整个 Middleware 就实现了,传入的时候先处理一遍请求数据,请求完了,通过 then 再处理一遍请求结果。
注意!!!由于 Middleware 的作用不同,可能有的 Middleware 并不会处理请求结果,就不会注册 then 函数。这里描述的是 Middleware 的整个流程,并没有对其中某个做特殊分析,因为其需求场景不同,逻辑处理会有细微变化。
这个 $handler
具体是哪个请求方法呢?还记得 choose_handler()
方法吗,它决定了到底使用哪种底层方法去实现请求,现在我们终于执行到发起请求的步骤了。
再回顾下 choose_handler()
方法。
function choose_handler()
$handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
$handler = new CurlHandler();
$handler = new CurlHandler();
$handler = $handler
? Proxy::wrapStreaming($handler, new StreamHandler())
: new StreamHandler();
return $handler;
这两个方法都有源码分析,没有印象的可以再回去看看。
不同 $handler
都是在 __invoke()
方法上做文章。
我们分析第一个 $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
。
public static function wrapSync(
callable $default,
callable $sync
return function (RequestInterface $request, array $options) use ($default, $sync) { // 注意这里的三目运算符,判定同步请求选项是否为空
return empty($options[RequestOptions::SYNCHRONOUS])
// 默认是并发请求 new CurlMultiHandler()
? $default($request, $options)
// 这里是同步请求 new CurlHandler()
: $sync($request, $options);
现在,异步和同步请求终于出现了区别。我们先看同步请求。
同步请求
public function request($method, $uri = '', array $options = [])
// 这里往配置中添加了一个选项,设置该请求为同步的
$options[RequestOptions::SYNCHRONOUS] = true;
所以,这里走的是同步请求,我们来分析 CurlHandler()
。
public function __invoke(RequestInterface $request, array $options)
// 如果设置了延迟请求,会在这里阻塞一会
if (isset($options['delay'])) {
usleep($options['delay'] * 1000);
// 创建一个 handler 抽象对象
$easy = $this->factory->create($request, $options);
// 执行
curl_exec($easy->handle);
$easy->errno = curl_errno($easy->handle);
// 请求处理结束
return CurlFactory::finish($this, $easy, $this->factory);
这里需要分析一下工厂类 CurlFactory
,里面大都涉及到 cURL 的一些配置,有兴趣的可以看下源码学习,配置的含义官方文档有专门的介绍,我这里就不在分析它们了,主要流程的分析还是不会缺的。
public function create(RequestInterface $request, array $options)
if (isset($options['curl']['body_as_string'])) {
$options['_body_as_string'] = $options['curl']['body_as_string'];
unset($options['curl']['body_as_string']);
// handle 的一个抽象对象
$easy = new EasyHandle;
$easy->request = $request;
$easy->options = $options;
// 获取默认配置
$conf = $this->getDefaultConf($easy);
// 解析请求方法
$this->applyMethod($easy, $conf);
// 解析配置
$this->applyHandlerOptions($easy, $conf);
// 解析头部
$this->applyHeaders($easy, $conf);
unset($conf['_headers']);
// 解析自定义 curl 配置
if (isset($options['curl'])) {
$conf = array_replace($conf, $options['curl']);
// 设置回调函数用于处理返回头
$conf[CURLOPT_HEADERFUNCTION] = $this->createHeaderFn($easy);
// 从 handle 池 获取一个 handle,没有就新建一个
$easy->handle = $this->handles
? array_pop($this->handles)
: curl_init();
curl_setopt_array($easy->handle, $conf);
return $easy;
public static function finish(
callable $handler,
EasyHandle $easy,
CurlFactoryInterface $factory
// 这里会调用配置用设置的 on_stats 函数
if (isset($easy->options['on_stats'])) {
self::invokeStats($easy);
// 有错误的话走错误处理流程
if (!$easy->response || $easy->errno) {
return self::finishError($handler, $easy, $factory);
// 释放资源,还到 handle 池
$factory->release($easy);
// 处理 流数据
$body = $easy->response->getBody();
if ($body->isSeekable()) {
$body->rewind();
// 返回一个满足状态的 promise
return new FulfilledPromise($easy->response);
根据源码分析,同步请求在这一步就已经发出了请求,并且回调了配置中的 on_stats
函数,拿到了未经处理的返回值原始返回值,并且同步请求 handler 池,也就是复用的请求句柄为 3 个,这个没有办法修改,写死在代码中的。
异步请求
public function __invoke(RequestInterface $request, array $options)
$easy = $this->factory->create($request, $options);
// 为每个请求生成一个 ID
$id = (int) $easy->handle;
// 注册一个 promise,分别是调用执行和关闭方法
$promise = new Promise(
[$this, 'execute'],
// 依据 ID 来关闭请求
function () use ($id) { return $this->cancel($id); }
// 添加请求 底层是 curl_multi_add_handle 方法
$this->addRequest(['easy' => $easy, 'deferred' => $promise]);
return $promise;
工厂类 CurlFactory
在上面已经分析过,这里不再赘述。但是异步请求这个时候并没有发起最终的请求,先是为每个请求生成一个 ID,然后将请求添加到批处理回话句柄 (curl_multi_add_handle) 中,最后返回了一个 promise 对象,里面注册了 execute
函数和 cancel
函数,用于后面发起和关闭请求。
需要注意的就是设定了延迟执行的请求,是在 addRequest()
方法中处理的。后面在” 返回结果 “章节会讲到延迟请求处理。
流处理
配置中如果 stream 选项不为空,就会启用它,如果你没有 cURL,那就只能用它了。
public function __invoke(RequestInterface $request, array $options)
// 如果设置了延迟请求,会在这里阻塞一会
if (isset($options['delay'])) {
usleep($options['delay'] * 1000);
// 流处理本身信息较少,所以为了补全一些信息,这里记录处理开始时间
$startTime = isset($options['on_stats']) ? microtime(true) : null;
try {
// 不支持 expect header.
$request = $request->withoutHeader('Expect');
// 当内容为 0 的时候,依然添加一个头信息
if (0 === $request->getBody()->getSize()) {
$request = $request->withHeader('Content-Length', 0);
// 发起请求,然后回调 on_stats 函数
// 解析结果,同样返回一个满足状态的 promise
return $this->createResponse(
$request,
$options,
$this->createStream($request, $options),
$startTime
} catch (\InvalidArgumentException $e) {
throw $e;
} catch (\Exception $e) {
// Determine if the error was a networking error.
$message = $e->getMessage();
// This list can probably get more comprehensive.
if (strpos($message, 'getaddrinfo') // DNS lookup failed
|| strpos($message, 'Connection refused')
|| strpos($message, "couldn't connect to host") // error on HHVM
$e = new ConnectException($e->getMessage(), $request, $e);
$e = RequestException::wrapException($request, $e);
$this->invokeStats($options, $request, $startTime, null, $e);
return \GuzzleHttp\Promise\rejection_for($e);
关于流处理,因为其底层实现是 fopen()
函数,其支持的协议比较多,不止有 HTTP,它支持的协议和封装协议在这里可以看到,所以 Guzzle 对其做了一些特殊处理以满足业务需要。
返回结果
根据上面的分析我们已经知道 transfer
方法返回的结果是什么了,然后就是获取返回结果。
同步请求
同步请求因为在 transfer
方法中,实际的请求已经发出去,已经拿到了未经处理的原始返回结果。
public function send(RequestInterface $request, array $options = [])
// 我们注意到最后调用的 wait 方法
return $this->sendAsync($request, $options)->wait();
在同步请求方法中,直接调用了 wait()
方法,所以直接走 promise 对象的 wait()
方法及注册的 then()
方法。还记得之前的 Middleware 里面注册了一些 then()
方法吗?这里主要就是调用它们了,完成中间件 “处理返回结果” 的这一步骤,当然还有一些在逻辑处理中注册的 then()
方法,在此不再举例。
异步请求
异步请求在 transfer
方法中返回的是一个 Promise,此时实际请求并没有发送。我们从官方例子来分析发送请求并且获取返回结果的方式。
$promise = $client->requestAsync('GET', 'https://www.chengxiaobai.com');
$promise->then(
function (ResponseInterface $res) {
echo $res->getStatusCode() . "\n";
function (RequestException $e) {
echo $e->getMessage() . "\n";
echo $e->getRequest()->getMethod();
这种方式是对每个异步请求单独注册 then()
方法,说明这个请求成功了怎么处理,失败了怎么处理。
$client = new Client(['base_uri' => 'https://www.chengxiaobai.com']);
// 注册多个异步请求,实现并发
$promises = [
'image' => $client->getAsync('/image'),
'png' => $client->getAsync('/image/png'),
'jpeg' => $client->getAsync('/image/jpeg'),
'webp' => $client->getAsync('/image/webp')
// 有一个失败就终止
$results = Promise\unwrap($promises);
// 忽略某些请求的异常,保证所有请求都发送出去
$results = Promise\settle($promises)->wait();
这个是设定多个异步请求,实现并发,并选择对部分请求错误是否忽略进行处理。
$client = new Client();
$requests = function ($total) use ($client) {
for ($i = 1; $i < $total; $i++) {
$uri = 'https://www.chengxiaobai.com/page/' . $i;
// 这里用到了协程
yield function() use ($client, $uri) {
return $client->getAsync($uri.$i);
$pool = new Pool($client, $requests(10), [
// 并发数
'concurrency' => 5,
'fulfilled' => function ($response, $index) {
echo $res->getStatusCode() . "\n";
'rejected' => function ($reason, $index) {
echo $e->getMessage() . "\n";
// 初始化 Promise
$promise = $pool->promise();
// 发起请求处理
$promise->wait();
这个是对大批量请求做出一个批量处理,类似一个请求池的的概念,设定了出口速率 (concurrency),使用统一的处理逻辑,处理请求池当中的数据。
我们来分析下 Pool 的源码,主要是构造函数。
public function __construct(
ClientInterface $client,
$requests,
array $config = []
// 设定请求池大小
if (isset($config['pool_size'])) {
$config['concurrency'] = $config['pool_size'];
} elseif (!isset($config['concurrency'])) {
// 默认并发数 25
$config['concurrency'] = 25;
if (isset($config['options'])) {
$opts = $config['options'];
unset($config['options']);
} else {
$opts = [];
// 将请求列表转换为一个迭代器
$iterable = \GuzzleHttp\Promise\iter_for($requests);
$requests = function () use ($iterable, $client, $opts) {
// 遍历请求列表
foreach ($iterable as $key => $rfn) {
// 如果是一个 request 的实现,转换为一个异步请求
if ($rfn instanceof RequestInterface) {
yield $key => $client->sendAsync($rfn, $opts);
} elseif (is_callable($rfn)) {
// 如过是一个闭包,直接调用
yield $key => $rfn($opts);
} else {
throw new \InvalidArgumentException('...');
// 支持迭代的 promise 对象
$this->each = new EachPromise($requests(), $config);
我们可以看到,Pool 模式下,所有的请求配置 $opts
都是一样的,所以每个请求的处理逻辑都是一样的,如果每个请求都有有定制化需求,Pool 模式可能不太适合,当然可以使用修改源码的方式,不过这个已经不符合 Pool 模式设计的初衷了。
不管哪种形式,都可以发现触发最终调用的都是 wait()
方法。这个和 promise 的规范有关。
我们看下异步如何处理请求的。
还记得异步请求返回的 promise 吗?
$promise = new Promise(
[$this, 'execute'],
// 依据 ID 来关闭请求
function () use ($id) { return $this->cancel($id); }
wait()
方法调用的就是 [$this, 'execute']
,我们来分析它的实现。在此之前,我们需要特别说明下延迟请求。
延迟时间
对于延迟请求,同步请求和流请求很好处理,直接阻塞就好了,如果是 20 个异步请求中包含 10 个延迟请求,每个延迟时间还不相等,这个时候延迟请求的处理就得好好考虑下了。
在” 请求处理 “章节我们说过,延迟请求是没有立即加到批处理请求句柄的,它被暂时存放在 $this->delays
队列中。直到你决定发起请求了,延迟请求才被拿出来计算其是否应该被加到批处理请求句柄中。计算逻辑我们从源码看看如何计算阻塞时间。
public function execute()
$queue = P\queue();
while ($this->handles || !$queue->isEmpty()) {
// 如果没有在进行的请求,并且延迟请求队列不为空,就开始阻塞
if (!$this->active && $this->delays) {
usleep($this->timeToNext());
$this->tick();
private function timeToNext()
$currentTime = microtime(true);
$nextTime = PHP_INT_MAX;
// 找出现有延迟请求队列中最小的延迟时间
foreach ($this->delays as $time) {
if ($time < $nextTime) {
$nextTime = $time;
return max(0, $nextTime - $currentTime) * 1000000;
execute
主要是调用了 tick()
这个方法。
public function tick()
// 如果延迟请求队列不为空,处理延迟请求
if ($this->delays) {
$currentTime = microtime(true);
// $this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000);
foreach ($this->delays as $id => $delay) {
// 延迟任务已经达到延迟预期时间,开始处理
if ($currentTime >= $delay) {
// 将它从延迟任务队列中删除
unset($this->delays[$id]);
// 添加到批量请求句柄中
curl_multi_add_handle(
$this->_mh,
$this->handles[$id]['easy']->handle
// 执行队列中的任务
P\queue()->run();
// 执行请求
if ($this->active &&
curl_multi_select($this->_mh, $this->selectTimeout) === -1
// See: https://bugs.php.net/bug.php?id=61141
usleep(250);
while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);
// 获取请求结果信息,移除请求成功的请求
$this->processMessages();
然后异步处理流程就很清晰了:
如果延迟请求队列不为空并且当前没有在执行的请求,先阻塞最小的延迟时间,以保证延迟请求队列在每次请求都至少被消耗一个。如果有正在执行的请求或者延迟请求队列不为空,直接执行 2。
发起一次批量请求。
获取请求信息,移除成功的请求。
如果请求队列不为空,执行 1-3。
从上面的流程,我们可以分析得出,即使你的并发数大于请求数,也并不意味着只请求一次,可能会有重试或者延迟请求造成多次请求。并且根据步骤 1 我们也可以知道,非延迟任务也会跟着一起被阻塞。
和同步请求一样,异步请求下每个请求处理完毕后,都会执行相应的 then()
方法完成返回结果处理。
流请求
因为流请求本质上是基于 fopen
的,发起请求逻辑比较简单。
public function __invoke(RequestInterface $request, array $options)
// 延迟请求直接 delay 操作
if (isset($options['delay'])) {
usleep($options['delay'] * 1000);
// 重点 1:解析返回值
return $this->createResponse(
$request,
$options,
// 重点 2:发起请求
$this->createStream($request, $options),
$startTime
先看如何发起请求的,重点在配置项的处理。
private function createStream(RequestInterface $request, array $options)
$params = [];
// 这里设置了默认请求参数
$context = $this->getDefaultContext($request);
// 这里方法主要是依据配置项调用了
// add_proxy,add_timeout,add_verify,add_cert,add_progress,add_debug
// 其实本质上就是用自定义配置覆盖默认请求参数
if (!empty($options)) {
foreach ($options as $key => $value) {
$method = "add_{$key}";
if (isset($methods[$method])) {
$this->{$method}($request, $context, $value, $params);
// 这里也是用自定义配置覆盖默认请求参数
if (isset($options['stream_context'])) {
if (!is_array($options['stream_context'])) {
throw new \InvalidArgumentException('stream_context must be an array');
$context = array_replace_recursive(
$context,
$options['stream_context']
// 解析 host,支持强制 IP 解析,v4 和 v6 都支持
$uri = $this->resolveHost($request, $options);
$context = $this->createResource(
function () use ($context, $params) {
// 这里创建资源流
return stream_context_create($context, $params);
return $this->createResource(
function () use ($uri, &$http_response_header, $context, $options) {
// 这里发起请求
$resource = fopen((string) $uri, 'r', null, $context);
$this->lastHeaders = $http_response_header;
// 设置超时时间
if (isset($options['read_timeout'])) {
$readTimeout = $options['read_timeout'];
$sec = (int) $readTimeout;
$usec = ($readTimeout - $sec) * 100000;
stream_set_timeout($resource, $sec, $usec);
return $resource;
从代码看,默认启用了 HTTPS。
这里的自定义配置和默认配置合并,不再是之前简单的数组合并操作了,因为某个配置的修改,可能会涉及到其他配置项的变动,所以对几个主要选项 (proxy,timeout,verify,cert,progress,debug)
做了封装。
毕竟 fopen
这个功能强大的函数在设计之初其目标就是操作 resource,所以它的配置项也根据 resource 的不同而有差异,其针对 HTTP 的配置项在这里可以看到。
然后就是处理返回值,如果使用 cURL 这些都能方便的处理,但是在流处理中,就得自己去解析它,相当于要自己要完成一部分 cURL 的工作。
private function createResponse(
RequestInterface $request,
array $options,
$stream,
$startTime
$hdrs = $this->lastHeaders;
$this->lastHeaders = [];
$parts = explode(' ', array_shift($hdrs), 3);
$ver = explode('/', $parts[0])[1];
$status = $parts[1];
$reason = isset($parts[2]) ? $parts[2] : null;
// 解析 header
$headers = \GuzzleHttp\headers_from_lines($hdrs);
// 解析返回类型
list ($stream, $headers) = $this->checkDecode($options, $headers, $stream);
// 构建一个 Psr7\StreamInterface 的 Stream 对象
$stream = Psr7\stream_for($stream);
$sink = $stream;
$response = new Psr7\Response($status, $headers, $sink, $ver, $reason);
// 回调 on_headers 函数
if (isset($options['on_headers'])) {
try {
$options['on_headers']($response);
} catch (\Exception $e) {
$msg = 'An error was encountered during the on_headers event';
$ex = new RequestException($msg, $request, $response, $e);
return \GuzzleHttp\Promise\rejection_for($ex);
// 回调 on_stats 函数
$this->invokeStats($options, $request, $startTime, $response, null);
return new FulfilledPromise($response);
整个过程都是在解析数据,响应内容是通过 stream_get_contents
拿到的,在 Psr7\StreamInterface
实例中有体现。
这里单独说下 on_headers
这个函数。这个函数是在拿到返回头之后,依据返回头里面的信息,来判定如何响应后面的操作,在返回数据比较大的时候可以做到提前拦截,避免浪费资源。
这个设置在所有请求方式中都有效,只是用在流处理中意义更大。
$client->request('GET', 'http://httpbin.org/stream/1024', [
'on_headers' => function (ResponseInterface $response) {
if ($response->getHeaderLine('Content-Length') > 1024) {
throw new \Exception('The file is too big!');
彩蛋