添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
www.chengxiaobai.cn

Guzzle 是一个非常强大而且稳定的 http client。不同于一般的 cURL 封装组件, Guzzle 内部使用了多种请求方式,来实现 http 请求,cURL 只是最常用的方式,并且 Guzzle 提供了强大的异步、并发功能,使得构建一个 http 请求十分容易而且易拓展。现在 Guzzle 已经被 drupal 整合到核心模块中了,可靠性不言而喻。Guzzle 目前使用了 Psr7 规范,拓展性和兼容性也更加优秀了。之前 在一次重构记录中 提到过,但是没有深入分析过,这次决定介绍一些使用例子并深入分析其底层实现原理,如果有问题,请留言指出,共同进步。

  • 特殊的 handler 参数
  • 请求的 uri 参数
  • 构建 reqsest
  • 丰富 request
  • promise 简介
  • promise_for
  • rejection_for
  • handler 处理
  • 注意:为了尽量缩减阅读量,部分源码分析只列出了关键步骤。

    本文使用的 Guzzle 版本为 6.3.0,composer.json 文件内容为

    "require" : { "guzzlehttp/guzzle" : "^6.3" } 复制代码

    Guzzle 的各种配置都和 http 请求相关,比如是否跟踪 302 跳转,是否携带 cookies,是否使用 ssl、超时等等。

    配置项是以数组形式在创建 client 对象的时候传入的, 所有的配置都在这里 。Guzzle 会提供一个默认配置,会和自定义配置进行合并, 并优先自定义配置

    public function __construct(array $config = [])
      $this->configureDefaults($config);
    private function configureDefaults(array $config)
      // 自定义配置和默认配置,在这里合并,并赋值给了成员变量
      $this->config = $config + $defaults;
    }复制代码

    比如这样:

    $config = [
        'allow_redirects' => [
            'max'     => 5,
            'referer' => true,
        'http_errors'     => false,
        'decode_content'  => true,
        'cookies'         => true,
        'connect_timeout' => 1.5,
        'timeout'         => 2.5,
        'headers'         => [
            'User-Agent' => 'test client for chengxiaobai.cn',
    $client = new \GuzzleHttp\Client($config);复制代码

    你也可以在构建请求的时候传入配置,这个时候会和构造方法中传入的配置合并, 并且只对当前请求有效

    private function prepareDefaults($options)
      $defaults = $this->config;
      // 这里这是赋值给了局部变量,所以只对当前请求有效
      $result = $options + $defaults;
      return $result;
    }复制代码

    比如这样:

    $client = new \GuzzleHttp\Client($config);
    $client->request('GET', 'https://www.chengxiaobai.cn/',
                         'allow_redirects' => [
                             'max'     => 1,
                             'referer' => false,
                     ]);复制代码

    特殊的 handler 参数

    handler 参数比较特殊, 它必须是闭包 ,并且参数为 Psr7\Http\Message\RequestInterface 和一个 array 类型的参数,并且必须返回 GuzzleHttp\Promise\PromiseInterface 或者在成功时满足 Psr7\Http\Message\ResponseInterface

    如果按照面向对象的来描述的话,就是你必须得实现一个这样的接口, Chengxiaobai\handler

    interface Chengxiaobai
         * handler interface
         * @param RequestInterface $request
         * @param array            $options
         * @return Psr\Http\Message\ResponseInterface | GuzzleHttp\Promise\PromiseInterface
        public function handler(Psr\Http\Message\RequestInterface $request,array $options);
    }复制代码

    这样对 handler 结构就很明确了吧。我们看源码怎么解析 handler 配置的。

    public function __construct(array $config = [])
      if (!isset($config['handler'])) {
        // 创建一个默认的 handler 栈
        $config['handler'] = HandlerStack::create();
      } elseif (!is_callable($config['handler'])) {
        throw new \InvalidArgumentException('handler must be a callable');
    }复制代码

    很明显,如果自定义了 handler 就会放弃 Guzzle 默认提供的 handlerStack 。除非你有足够的把握,请不要随意操作 。

    举个自定义 handler 操作的例子,比如对任意一个请求都返回404。

    $client = new \GuzzleHttp\Client($config);
    $response = $client->request('GET', 'www.chengxiaobai.cn/history.html',
                                     'handler' => function (\Psr\Http\Message\RequestInterface $request, array $options) {
                                         return new \GuzzleHttp\Psr7\Response(404);
    echo $response->getStatusCode();// 404复制代码

    上面我们说 Guzzle 本身自带了一些 handler ,我们先看看默认创建的 handlerStack 都是些什么,先不管每个 handler 里面的实现,在请求处理阶段会详细说。

    public static function create(callable $handler = null)
      // 这里定义了底层请求实现方法
      $stack = new self($handler ?: choose_handler());
      // 下面都会添加一些 Middleware 中间件
      $stack->push(Middleware::httpErrors(), 'http_errors');
      $stack->push(Middleware::redirect(), 'allow_redirects');
      $stack->push(Middleware::cookies(), 'cookies');
      $stack->push(Middleware::prepareBody(), 'prepare_body');
      return $stack;
    }复制代码

    注意 choose_handler 这个方法,这个方法决定了实现请求的底层方法,通过它能让我们对 Guzzle 请求的实现的底层方法有个初步了解,也就是,所有的请求都是通过它发送出去的。仔细看源码注释,很关键。

    function choose_handler()
        $handler = null;
        // 判定 curl 方法,如果并发和常规 curl 同时存在
        if (function_exists('curl_multi_exec') && function_exists('curl_exec')) {
            // 注册并发 curl 为默认请求方式,常规 curl 为同步请求方式
            $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
        } elseif (function_exists('curl_exec')) {
            // 如果两种 curl 方法同时只有一个存在,则优先常规 curl
            $handler = new CurlHandler();
        } elseif (function_exists('curl_multi_exec')) {
            $handler = new CurlMultiHandler();
        // 如果 allow_url_fopen 开启
        if (ini_get('allow_url_fopen')) {
            $handler = $handler
                // 已有 handler ? 再注册一个流处理 handler 
                ? Proxy::wrapStreaming($handler, new StreamHandler())
                // 否则只有流处理 handler
                : new StreamHandler();
        } elseif (!$handler) {
            throw new \RuntimeException('GuzzleHttp requires cURL, the '
                . 'allow_url_fopen ini setting, or a custom HTTP handler.');
        return $handler;
    }复制代码

    创建完 handler 后,会往 stack 中添加一些 middleware,也就是中间件。简单介绍下,push 函数第一个参数是闭包,第二个参数是字符串,中间件的名字,middleware 主要由闭包组成,可能有的 middleware 嵌套有点多,显得有点复杂,但是无论结构如何复杂,本质上就是用来处理各种请求数据,其结构类型和 Chengxiaobai\handler 一样。

    需要深入了解 Handlers 和 Middleware 的可以 点击这里看官方文档 ,个人觉得需要对闭包掌握的比较好,才能很好的理解其设计思路。

    根据上面的源码分析你可能注意到,系统默认提供的 handler 是以对象的形式存在的。但其真正使用的时候是当做闭包使用的,这里介绍的是真正发挥作用的闭包结构,而不是表面的 HandlerStack 对象。后面”处理请求“章节会详细介绍。

    其实所有的请求在处理上都是异步的 ,同步请求只不过是异步请求构建后立即要求返回结果,异步转同步。但是异步和同步的请求构建都是类似的,不同处我会说明。

    public function request($method, $uri = '', array $options = [])
      $options[RequestOptions::SYNCHRONOUS] = true;
      // requestAsync 就是异步请求,不过直接调用了 wait 转同步
      return $this->requestAsync($method, $uri, $options)->wait();
    }复制代码

    请求的 uri 参数

    如果你在配置中定义了 base_uri 参数,这个时候可以使用相对地址,如果没有,则不支持相对地址,Guzzle 并没有帮你校验最终 uri 参数是否正确,只有等到请求发出去了,才知道 uri 是否正确。

    private
    
    
    
    
        
     function buildUri($uri, array $config)
      // for BC we accept null which would otherwise fail in uri_for
      $uri = Psr7\uri_for($uri === null ? '' : $uri);
      if (isset($config['base_uri'])) {
        $uri = Psr7\UriResolver::resolve(Psr7\uri_for($config['base_uri']), $uri);
      // 这里使用了 psr7 规范,返回的是一个实现了 UriInterface 的对象
      return $uri->getScheme() === '' && $uri->getHost() !== '' ? $uri->withScheme('http') : $uri;
    }复制代码

    比如这样就会报错

    $client = new \GuzzleHttp\Client();
    $response = $client->request('GET', '/history.html');
     * ountput :
     * Fatal error: Uncaught GuzzleHttp\Exception\RequestException:
     * cURL error 3: <url> malformed (see http://curl.haxx.se/libcurl/c/libcurl-errors.html)
     * in /app/vendor/guzzlehttp/guzzle/src/Handler/CurlFactory.php on line 187
     */复制代码

    详细的规则可以参考 RFC 3986, section 2 官方帮我们整理了一些快速了解的例子 。我这里梳理出 4 种情况。

    base_uri result

    保险的情况就是每次都使用绝对路径就好了,但是有时候相对路径在做爬取的时候很有用,依据实际需求使用。

    构建 reqsest

    Guzzle 内部使用的 request 对象都是 Psr\Http\Message\RequestInterface 的实现,这样只要你能按照 psr7 的规范来就很容易拓展 Guzzle。

    这里再次提醒大家,modern php 开发,应该遵循 psr 规范,有利于社区更好的协作和稳健发展。

    public function requestAsync($method, $uri = '', array $options = [])
      $request = new Psr7\Request($method, $uri, $headers, $body, $version); 
      return $this->transfer($request, $options);
    }复制代码

    丰富 request

    transfer 的结构类型和 Chengxiaobai\handler 一样。

    private function transfer(RequestInterface $request, array $options)
      // 这个方法会根据你的请求类型,构建更具体的请求对象
      $request = $this->applyOptions($request, $options);
      $handler = $options['handler'];
    }复制代码

    applyOptions 从名字能看出来,这个方法会根据你的配置,构建出相匹配的 request 对象。比如根据请求类型的不同,进行参数 encode,设置 body 比如 json、stream,设置 header 等请求细节。

    注意配置传入的是一个引用,所以里面对配置的任何修改,都会影响后续操作。

    private function applyOptions(RequestInterface $request, array &$options)
        // 各种判定,修改 $options,如果有没覆盖到的,会新生成一个 $modify 说明需要重新构建 $request
        // 构建新的对象方法
        $request = Psr7\modify_request($request, $modify);
        return $request;
    }复制代码

    如果没有需要修改的,就直接返回,如果有的话,会重新构建一个新的 request 对象。

    注意需要的参数,有些构建参数是从 $changes 取的,但是有些是从原本的 $request 对象取的,本质上就是,有新的就用新的,没有就用老的保持不变。

    function modify_request(RequestInterface $request, array $changes)
        if (!$changes) {
            return $request;
      return new Request(
        isset($changes['method']) ? $changes['method'] : $request->getMethod(),
        $uri,
        $headers,
        isset($changes['body']) ? $changes['body'] : $request->getBody(),
        isset($changes['version'])
        ? $changes['version']
        : $request->getProtocolVersion()
    }复制代码

    promise 简介

    关于 promise ,属于 guzzlehttp/promises 类库,是一个很值得学习的类库,有机会我会专门分析下它的实现原理,目前我们还是着重分析请求实现过程。

    通看源码会发现,虽然 Guzzle 内部大量使用了 promise 并且夹杂着闭包很复杂,但是 promise 发挥的作用都是一样的。目前可以这么理解,就是 promise 是一个状态机,它有三种状态:等待、满足、拒绝。

    下面的这个例子,只是示例会如何执行,promise 规范是有各种要求的,具体见 Promises/A+规范 ,Guzzle 使用的 promise 也是该规范的一个实现。

    $promise = new Promise(
        function () {
            echo 'wait';
        function () {
            echo 'cancle';
    $promise->then(
        function () {
            echo 'onFulfilled';
        function () {
            echo 'onRejected';
    )->then(
        function () {
            echo 'onFulfilled';
        function () {
            echo 'onRejected';
    );复制代码

    从等待状态开始执行,满足了就执行 onFulfilled ,拒绝了就执行 onRejected,一连串下来,依靠不同的状态去执行不同的方法,配合 http 请求要么成功要么失败,不会有第三种状态的场景,就可以很顺畅的理解了。

    private function transfer(RequestInterface $request, array $options)
      $handler = $options['handler'];
      try {
        return Promise\promise_for($handler($request, $options));
      } catch (\Exception $e) {
        return Promise\rejection_for($e);
    }复制代码

    成功就是 promise_for 失败就是 rejection_for。

    promise_for

    这个方法主要是用来保证返回的是一个 promise 对象,因为经过 $handler 处理后的值可能是一个 promise 对象( $handler 如何处理紧接着会说),也能是一个 response 对象,也可能是一个异常,所以需要对对数据做一个“清洗转换”,并返回一个满足状态的 promise。

    function promise_for($value)
        // 如果是一个 promise 对象就直接返回
        if ($value instanceof PromiseInterface) {
            return $value;
        // 如果是一个包含 then 方法的对象,会把它转换成一个 promise 对象
        if (method_exists($value, 'then')) {
            // 如果里面有 wait、cancel、resolve、reject 等方法,会把它添加进去作为默认方法,否则置为 null
            $wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
            $cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
            $promise = new Promise($wfn, $cfn);
            $value->then([$promise, 'resolve'], [$promise, 'reject']);
            return $promise;
        // 前俩者都不满不足的情况下,直接返回一个满足状态的 promise。
        return new FulfilledPromise($value);
    }复制代码

    rejection_for

    异常情况会走入到 rejection_for 方法。同理进行“数据清洗”,并返回一个拒绝状态的 promise 。

    function rejection_for($reason)
        if ($reason instanceof PromiseInterface) {
            return $reason;
        return new RejectedPromise($reason);
    }复制代码

    handler 处理

    还是 transfer 方法,在传给 promise_for 之前,先调用了一个 $handler,也就是配置中的 handler 函数。接着就是返回一个 Promise 对象,用于外层异步调用。

    private function transfer(RequestInterface $request, array $options)
      $handler = $options['handler'];
      try {
        // 这里会先调用配置中的 handler 方法
        return Promise\promise_for($handler($request, $options));
      } catch (\Exception $e) {
        return Promise\rejection_for($e);
    }复制代码

    对于上面的 handler 处理小节,你可能会有疑惑,为什么就调用了 handler 函数,那不是直接开始处理请求了吗?

    我们之前介绍过 handler 的数据结构,是一个是 handlerStack 对象,但是其调用本质是一系列组合闭包。但数据结构上是一个对象,怎么使用的时候就成了闭包呢?

    当尝试以调用函数的方式调用一个对象时, __invoke() 方法会被自动调用。

    有了这个前提,我们看下 handlerStack 源码。

    从 handlerStack 的名字上,我们就能知道它是一个”栈“数据结构,其满足”后进先出“的特性。

    public function __invoke(RequestInterface $request, array $options)
      // 这个函数主要是实现 Middleware 中间件操作
    
    
    
    
        
    
      $handler = $this->resolve();
      //这里下面紧接着会有分析
      return $handler($request, $options);
    public function resolve()
      // 变量缓存,能优化部分性能
      if (!$this->cached) {
        // 这个 handler 就是之前选择的 实现请求的底层方法
        // 如果没有的话,请求都无法实现,就别折腾了,抛个异常终止吧
        if (!($prev = $this->handler)) {
          throw new \LogicException('No handler has been specified');
        // 反转顺序,实现”后进先出“特性,调用每个中间件
        foreach (array_reverse($this->stack) as $fn) {
          // 中间件的注册是 [$middleware, $name] 形式的
          // 所以取第一个元素是其具体实现,第二个参数只是名字
          // 调用第一次传入的是 handler,后续传入的就是上一次处理的结果
          $prev = $fn[0]($prev);
        // 所有的都处理完毕,缓存起来
        $this->cached = $prev;
      return $this->cached;
    }复制代码

    上面就是很经典的中间件模型实现,laravel 中实现的略有区别,主要用到了 array_reduce 这个函数,但是原理上大同小异,知道其原理一通百通。

    我们再继续看看源码。还是这个方法,不过我们分析其最终调用的实现。

    根据 Middleware 流程图,我们知道最后一个调用的是 http_errors,我们就来分析它吧,没有任何特殊性,其他的 Middleware 结构都是一样的,只是有些中间件多次使用了 __invoke() 魔术方法而已。

    Middleware 里面闭包结构复杂,好好理解下。

    public function __invoke(RequestInterface $request, array $options)
      // 这个函数主要是实现 Middleware 中间件操作
      $handler = $this->resolve();
      // 现在我们分析这个
      return $handler($request, $options);
    public static function httpErrors()
      // 第一次调用返回!传入一个 闭包-A
      return function (callable $handler) {
        // 第二次调用返回!传入 $request,$options
        return function ($request, array $options) use ($handler) {
          // Middleware 自己的逻辑判定返回什么样的闭包
          if (empty($options['http_errors'])) {
            // 第三次调用返回!返回 闭包-A 的处理结果
            // 这里根据配置 没有注册 then 函数,直接进行下一步处理
            return $handler($request, $options);
          // 第三次调用返回!返回 闭包-A,附加 promise 
          // 根据上面我们说到的 promise 特性,这里用 then 
          // 附加了 闭包-A 处理完毕之后要调用的逻辑
          return $handler($request, $options)->then(
            function (ResponseInterface $response) use ($request, $handler) {
              $code = $response->getStatusCode();
              if ($code < 400) {
                return $response;
              throw RequestException::create($request, $response);
    }复制代码

    关于返回层数,可以根据 return 来迅速定位,一个 return 就对应一次调用返回。

    现在我们先梳理下到这步 handlerStack 被调用的次数,知道这三层闭包分别在哪里被调用了,有利于我们得出最终结果。

    // 第一次
    public static function create(callable $handler = null)
      $stack->push(Middleware::httpErrors(), 'http_errors');
    // 第二次
    public function resolve()
      $prev = $fn[0]($prev);
    // 第三次
    public function __invoke(RequestInterface $request, array $options)
      $handler($request, $options);
    }复制代码

    最后的结果应该是,如果按照 Middleware 结构应该是这样的:

    $handler($request, $options)->then('http_errors')
                                ->then('allow_redirects')
                                ->then('cookies')
                                ->then('prepare_body')复制代码

    这个 $handler 就是最开始传入的请求实现底层方法。

    整个 Middleware 就实现了,传入的时候先处理一遍请求数据,请求完了,通过 then 再处理一遍请求结果。

    注意!!!由于 Middleware 的作用不同,可能有的 Middleware 并不会处理请求结果,就不会注册 then 函数。这里描述的是 Middleware 的整个流程,并没有对其中某个做特殊分析,因为其需求场景不同,逻辑处理会有细微变化。

    这个 $handler 具体是哪个请求方法呢?还记得 choose_handler() 方法吗,它决定了到底使用哪种底层方法去实现请求,现在我们终于执行到发起请求的步骤了。

    再回顾下 choose_handler() 方法。

    function choose_handler()
        $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());
        $handler = new CurlHandler();
        $handler = new CurlHandler();
        $handler = $handler
            ? Proxy::wrapStreaming($handler, new StreamHandler())
            : new StreamHandler();
        return $handler;
    }复制代码

    这两个方法都有源码分析,没有印象的可以再回去看看。

    不同 $handler 都是在 __invoke() 方法上做文章。

    我们分析第一个 $handler = Proxy::wrapSync(new CurlMultiHandler(), new CurlHandler());

    public static function wrapSync(
      callable $default,
      callable $sync
      return function (RequestInterface $request, array $options) use ($default, $sync) {       // 注意这里的三目运算符,判定同步请求选项是否为空
        return empty($options[RequestOptions::SYNCHRONOUS])
          // 默认是并发请求 new CurlMultiHandler()
          ? $default($request, $options)
          // 这里是同步请求 new CurlHandler()
          : $sync($request, $options);
    }复制代码

    现在,异步和同步请求终于出现了区别。我们先看同步请求。

    我们回顾下 request() 方法,注意到一个步骤。

    public function request($method, $uri = '', array $options = [])
      // 这里往配置中添加了一个选项,设置该请求为同步的
      $options[RequestOptions::SYNCHRONOUS] = true;
    }复制代码

    所以,这里走的是同步请求,我们来分析 CurlHandler()

    public function __invoke(RequestInterface $request, array $options)
      // 如果设置了延迟请求,会在这里阻塞一会
      if (isset($options['delay'])) {
        usleep($options['delay'] * 1000);
      // 创建一个 handler 抽象对象
      $easy = $this->factory->create($request, $options);
      // 执行
      curl_exec($easy->handle);
      $easy->errno = curl_errno($easy->handle);
      // 请求处理结束
      return CurlFactory::finish($this, $easy, $this->factory);
    }复制代码

    这里需要分析一下工厂类 CurlFactory ,里面大都涉及到 cURL的一些配置,有兴趣的可以看下源码学习,配置的含义官方文档有专门的介绍,我这里就不在分析它们了,主要流程的分析还是不会缺的。

    public function create(RequestInterface $request, array $options)
      if (isset($options['curl']['body_as_string'])) {
        $options['_body_as_string'] = $options['curl']['body_as_string'];
        unset($options['curl']['body_as_string']);
      // handle 的一个抽象对象
      $easy = new EasyHandle;
      $easy->request = $request;
      $easy->options = $options;
      // 获取默认配置
      $conf = $this->getDefaultConf($easy);
      // 解析请求方法
      $this->applyMethod($easy, $conf);
      // 解析配置
      $this->applyHandlerOptions($easy, $conf);
      // 解析头部
      $this->applyHeaders($easy, $conf);
      unset($conf['_headers']);
      // 解析自定义 curl 配置
      if (isset($options['curl'
    
    
    
    
        
    ])) {
        $conf = array_replace($conf, $options['curl']);
      // 设置回调函数用于处理返回头
      $conf[CURLOPT_HEADERFUNCTION] = $this->createHeaderFn($easy);
      // 从 handle池 获取一个 handle,没有就新建一个
      $easy->handle = $this->handles
        ? array_pop($this->handles)
        : curl_init();
      curl_setopt_array($easy->handle, $conf);
      return $easy;
    public static function finish(
      callable $handler,
      EasyHandle $easy,
      CurlFactoryInterface $factory
      // 这里会调用配置用设置的 on_stats 函数
      if (isset($easy->options['on_stats'])) {
        self::invokeStats($easy);
      // 有错误的话走错误处理流程
      if (!$easy->response || $easy->errno) {
        return self::finishError($handler, $easy, $factory);
      // 释放资源,还到 handle池
      $factory->release($easy);
      // 处理 流数据
      $body = $easy->response->getBody();
      if ($body->isSeekable()) {
        $body->rewind();
      // 返回一个满足状态的 promise
      return new FulfilledPromise($easy->response);
    }复制代码

    根据源码分析,同步请求在这一步就已经发出了请求,并且回调了配置中的 on_stats 函数,拿到了未经处理的返回值原始返回值,并且同步请求 handler池,也就是复用的请求句柄为3个,这个没有办法修改,写死在代码中的。

        public function __invoke(RequestInterface $request, array $options)
            $easy = $this->factory->create($request, $options);
            // 为每个请求生成一个 ID
            $id = (int) $easy->handle;
            // 注册一个 promise,分别是调用执行和关闭方法
            $promise = new Promise(
                [$this, 'execute'],
                // 依据 ID 来关闭请求
                function () use ($id) { return $this->cancel($id); }
            // 添加请求 底层是 curl_multi_add_handle 方法
            $this->addRequest(['easy' => $easy, 'deferred' => $promise]);
            return $promise;
        }复制代码

    工厂类 CurlFactory 在上面已经分析过,这里不再赘述。但是异步请求这个时候并没有发起最终的请求,先是为每个请求生成一个 ID,然后将请求添加到批处理回话句柄( curl_multi_add_handle )中,最后返回了一个 Promise 对象,里面注册了 execute 函数和 cancel 函数,用于后面发起和关闭请求。

    需要注意的就是设定了延迟执行的请求,是在 addRequest() 方法中处理的。后面在”返回结果“章节会讲到延迟请求处理。

    配置中如果 stream 选项不为空,就会启用它,如果你没有 cURL,那就只能用它了。

    public function __invoke(RequestInterface $request, array $options)
      // 如果设置了延迟请求,会在这里阻塞一会
      if (isset($options['delay'])) {
        usleep($options['delay'] * 1000);
      // 流处理本身信息较少,所以为了补全一些信息,这里记录处理开始时间
      $startTime = isset($options['on_stats']) ? microtime(true) : null;
      try {
        // 不支持 expect header.
        $request = $request->withoutHeader('Expect');
        // 当内容为0的时候,依然添加一个头信息
        if (0 === $request->getBody()->getSize()) {
          $request = $request->withHeader('Content-Length', 0);
        // 发起请求,然后回调 on_stats 函数
        // 解析结果,同样返回一个满足状态的 promise
        return $this->createResponse(
          $request,
          $options,
          $this->createStream($request, $options),
          $startTime
      } catch (\InvalidArgumentException $e) {
        throw $e;
      } catch (\Exception $e) {
        // Determine if the error was a networking error.
        $message = $e->getMessage();
        // This list can probably get more comprehensive.
        if (strpos($message, 'getaddrinfo') // DNS lookup failed
            || strpos($message, 'Connection refused')
            || strpos($message, "couldn't connect to host") // error on HHVM
          $e = new ConnectException($e->getMessage(), $request, $e);
        $e = RequestException::wrapException($request, $e);
        $this->invokeStats($options, $request, $startTime, null, $e);
        return \GuzzleHttp\Promise\rejection_for($e);
    }复制代码

    关于流处理,因为其底层实现是 fopen() 函数,其支持的协议比较多,不止有 http,它 支持的协议和封装协议在这里 可以看到,所以 Guzzle 对其做了一些特殊处理以满足业务需要。

    根据上面的分析我们已经知道 transfer 方法返回的结果是什么了,然后就是获取返回结果。

    同步请求因为在 transfer 方法中,实际的请求已经发出去,已经拿到了未经处理的原始返回结果。

    public function send(RequestInterface $request, array $options = [])
      // 我们注意到最后调用的 wait 方法
      return $this->sendAsync($request, $options)->wait();
    }复制代码

    在同步请求方法中,直接调用了 wait() 方法,所以直接走 Promise 对象的 wait() 方法及注册的 then() 方法。还记得之前的 Middleware 里面注册了一些 then() 方法吗?这里主要就是调用它们了,完成中间件“处理返回结果”的这一步骤,当然还有一些在逻辑处理中注册的 then() 方法,在此不再举例。

    异步请求在 transfer 方法中返回的是一个 Promise,此时实际请求并没有发送。我们从官方例子来分析发送请求并且获取返回结果的方式。

    $promise = $client->requestAsync('GET', 'https://www.chengxiaobai.cn');
    $promise->then(
        function (ResponseInterface $res) {
            echo $res->getStatusCode() . "\n";
        function (RequestException $e) {
            echo $e->getMessage() . "\n";
            echo $e->getRequest()->getMethod();
    );复制代码

    这种方式是对每个异步请求单独注册 then() 方法,说明这个请求成功了怎么处理,失败了怎么处理。

    $client = new Client(['base_uri' => 'https://www.chengxiaobai.cn']);
    // 注册多个异步请求,实现并发
    $promises = [
        'image' => $client->getAsync('/image'),
        'png'   => $client->getAsync('/image/png'),
        'jpeg'  => $client->getAsync('/image/jpeg'),
        'webp'  => $client->getAsync('/image/webp')
    // 有一个失败就终止
    $results = Promise\unwrap($promises);
    // 忽略某些请求的异常,保证所有请求都发送出去
    $results = Promise\settle($promises)->wait();复制代码

    这个是设定多个异步请求,实现并发,并选择对部分请求错误是否忽略进行处理。

    $client = new Client();
    $requests = function ($total) use ($client) {
        for ($i = 1; $i < $total; $i++) {
          $uri = 'https://www.chengxiaobai.cn/page/' . $i;
            // 这里用到了协程
            yield function() use ($client, $uri) {
                return $client->getAsync($uri.$i);
    $pool = new Pool($client, $requests(10), [
        // 并发数
        'concurrency' => 5,
        'fulfilled' => function ($response, $index) {
            echo $res->getStatusCode() . "\n";
        'rejected' => function ($reason, $index) {
            echo $e->getMessage() . "\n";
    // 初始化 Promise
    
    
    
    
        
    
    $promise = $pool->promise();
    // 发起请求处理
    $promise->wait();复制代码

    这个是对大批量请求做出一个批量处理,类似一个请求池的的概念,设定了出口速率( concurrency ),使用统一的处理逻辑,处理请求池当中的数据。

    我们来分析下 Pool 的源码,主要是构造函数。

    public function __construct(
      ClientInterface $client,
      $requests,
      array $config = []
      // 设定请求池大小
      if (isset($config['pool_size'])) {
        $config['concurrency'] = $config['pool_size'];
      } elseif (!isset($config['concurrency'])) {
        // 默认并发数 25
        $config['concurrency'] = 25;
      if (isset($config['options'])) {
        $opts = $config['options'];
        unset($config['options']);
      } else {
        $opts = [];
      // 将请求列表转换为一个迭代器
      $iterable = \GuzzleHttp\Promise\iter_for($requests);
      $requests = function () use ($iterable, $client, $opts) {
        // 遍历请求列表
        foreach ($iterable as $key => $rfn) {
          // 如果是一个 request 的实现,转换为一个异步请求
          if ($rfn instanceof RequestInterface) {
            yield $key => $client->sendAsync($rfn, $opts);
          } elseif (is_callable($rfn)) {
            // 如过是一个闭包,直接调用 
            yield $key => $rfn($opts);
          } else {
            throw new \InvalidArgumentException('...');
      // 支持迭代的 Promise 对象
      $this->each = new EachPromise($requests(), $config);
    }复制代码

    我们可以看到,Pool 模式下,所有的请求配置 $opts 都是一样的,所以每个请求的处理逻辑都是一样的,如果每个请求都有有定制化需求,Pool 模式可能不太适合,当然可以使用修改源码的方式,不过这个已经不符合 Pool 模式设计的初衷了。

    不管哪种形式,都可以发现触发最终调用的都是 wait() 方法。这个和 Promise 的规范有关。

    我们看下异步如何处理请求的。

    还记得异步请求返回的 Promise 吗?

    $promise = new Promise(
                [$this, 'execute'],
                // 依据 ID 来关闭请求
                function () use ($id) { return $this->cancel($id); }
            );复制代码

    wait() 方法调用的就是 [$this, 'execute'] ,我们来分析它的实现。在此之前,我们需要特别说明下延迟请求。

    对于延迟请求,同步请求和流请求很好处理,直接阻塞就好了,如果是20个异步请求中包含10个延迟请求,每个延迟时间还不相等,这个时候延迟请求的处理就得好好考虑下了。

    在”请求处理“章节我们说过,延迟请求是没有立即加到批处理请求句柄的,它被暂时存放在 $this->delays 队列中。直到你决定发起请求了,延迟请求才被拿出来计算其是否应该被加到批处理请求句柄中。计算逻辑我们从源码看看如何计算阻塞时间。

    public function execute()
      $queue = P\queue();
      while ($this->handles || !$queue->isEmpty()) {
        // 如果没有在进行的请求,并且延迟请求队列不为空,就开始阻塞
        if (!$this->active && $this->delays) {
          usleep($this->timeToNext());
        $this->tick();
    private function timeToNext()
      $currentTime = microtime(true);
      $nextTime = PHP_INT_MAX;
      // 找出现有延迟请求队列中最小的延迟时间
      foreach ($this->delays as $time) {
        if ($time < $nextTime) {
          $nextTime = $time;
      return max(0, $nextTime - $currentTime) * 1000000;
    }复制代码

    execute 主要是调用了 tick() 这个方法。

    public function tick()
      // 如果延迟请求队列不为空,处理延迟请求
      if ($this->delays) {
        $currentTime = microtime(true);
        // $this->delays[$id] = microtime(true) + ($easy->options['delay'] / 1000);
        foreach ($this->delays as $id => $delay) {
          // 延迟任务已经达到延迟预期时间,开始处理
          if ($currentTime >= $delay) {
            // 将它从延迟任务队列中删除
            unset($this->delays[$id]);
            // 添加到批量请求句柄中
            curl_multi_add_handle(
              $this->_mh,
              $this->handles[$id]['easy']->handle
      // 执行队列中的任务
      P\queue()->run();
      // 执行请求
      if ($this->active &&
          curl_multi_select($this->_mh, $this->selectTimeout) === -1
        // See: https://bugs.php.net/bug.php?id=61141
        usleep(250);
      while (curl_multi_exec($this->_mh, $this->active) === CURLM_CALL_MULTI_PERFORM);
      // 获取请求结果信息,移除请求成功的请求
      $this->processMessages();
    }复制代码

    然后异步处理流程就很清晰了:

  • 如果延迟请求队列不为空并且当前没有在执行的请求,先阻塞最小的延迟时间,以保证延迟请求队列在每次请求都至少被消耗一个。如果有正在执行的请求或者延迟请求队列不为空,直接执行2。
  • 发起一次批量请求。
  • 获取请求信息,移除成功的请求。
  • 如果请求队列不为空,执行1-3。
  • 从上面的流程,我们可以分析得出,即使你的并发数大于请求数,也并不意味着只请求一次,可能会有重试或者延迟请求造成多次请求。并且根据步骤 1 我们也可以知道,非延迟任务也会跟着一起被阻塞。

    和同步请求一样,异步请求下每个请求处理完毕后,都会执行相应的 then() 方法完成返回结果处理。

    因为流请求本质上是基于 fopen 的,发起请求逻辑比较简单。

    public function __invoke(RequestInterface $request, array $options)
      // 延迟请求直接 delay 操作
      if (isset($options['delay'])) {
        usleep($options['delay'] * 1000);
      // 重点1:解析返回值
      return $this->createResponse(
        $request,
        $options,
        // 重点2:发起请求
        $this->createStream($request, $options),
        $startTime
    }复制代码

    先看如何发起请求的,重点在配置项的处理。

    private function createStream(RequestInterface $request, array $options)
        $params = [];
        // 这里设置了默认请求参数
        $context = $this->getDefaultContext($request);
        // 这里方法主要是依据配置项调用了
        // add_proxy,add_timeout,add_verify,add_cert,add_progress,add_debug
        // 其实本质上就是用自定义配置覆盖默认请求参数
        if (!empty($options)) {
            foreach ($options as $key => $value) {
                $method = "add_{$key}";
                if (isset($methods[$method])) {
                    $this->{$method}($request, $context, $value, $params);
        // 这里也是用自定义配置覆盖默认请求参数
        if (isset($options['stream_context'])) {
            if (!is_array($options['stream_context'])) {
                throw new \InvalidArgumentException('stream_context must be an array');
            $context = array_replace_recursive(
                $context,
                $options['stream_context']
        // 解析 host ,支持强制 IP 解析,v4 和 v6 都支持
        $uri = $this->resolveHost($request, $options);
        $context = $this->createResource(
            function () use ($context, $params) {
                // 这里创建资源流
                return stream_context_create($context, $params);
        return $this->createResource(
            function () use ($uri, &$http_response_header, $context, $options) {
                // 这里发起请求
                $resource = fopen((string) $uri, 'r', null, $context);
                $this->lastHeaders = $http_response_header;
                // 设置超时时间
                if (isset($options['read_timeout'])) {
                    $readTimeout = $options['read_timeout'];
                    $sec = (int) $readTimeout;
                    $usec = ($readTimeout - $sec) * 100000;
                    stream_set_timeout($resource, $sec, $usec);
                return $resource;
    }复制代码

    从代码看,默认启用了 https 。

    这里的自定义配置和默认配置合并,不再是之前简单的数组合并操作了,因为某个配置的修改,可能会涉及到其他配置项的变动,所以对几个主要选项 (proxy,timeout,verify,cert,progress,debug) 做了封装。

    毕竟 fopen 这个功能强大的函数在设计之初其目标就是操作 resource ,所以它的配置项也根据 resource 的不同而有差异, 其针对 http 的配置项在这里可以看到。

    然后就是处理返回值,如果使用 cURL 这些都能方便的处理,但是在流处理中,就得自己去解析它,相当于要自己要完成一部分 cURL 的工作。

    private function createResponse(
            RequestInterface $request,
            array $options,
            $stream,
            $startTime
            $hdrs = $this->lastHeaders;
            $this->lastHeaders = [];
            $parts = explode(' ', array_shift($hdrs), 3);
            $ver = explode('/', $parts[0])[1];
            $status = $parts[1];
            $reason = isset($parts[2]) ? $parts[2] : null;
            // 解析 header 
            $headers = \GuzzleHttp\headers_from_lines($hdrs);
            // 解析返回类型
            list ($stream, $headers) = $this->checkDecode($options, $headers, $stream);
            // 构建一个 Psr7\StreamInterface 的 Stream 对象
            $stream = Psr7\stream_for($stream);
            $sink = $stream;
            $response = new Psr7\Response($status, $headers, $sink, $ver, $reason);
            // 回调 on_headers 函数
            if (isset($options['on_headers'])) {
                try {
                    $options['on_headers']($response);
                } catch (\Exception $e) {
                    $msg = 'An error was encountered during the on_headers event';
                    $ex = new RequestException($msg, $request, $response, $e);
                    return \GuzzleHttp\Promise\rejection_for($ex);
            // 回调 on_stats 函数
            $this->invokeStats($options, $request, $startTime, $response, null);
            return new FulfilledPromise($response);
        }复制代码

    整个过程都是在解析数据,响应内容是通过 stream_get_contents 拿到的,在 Psr7\StreamInterface 实例中有体现。

    这里单独说下 on_headers 这个函数。这个函数是在拿到返回头之后,依据返回头里面的信息,来判定如何响应后面的操作,在返回数据比较大的时候可以做到提前拦截,避免浪费资源。

    这个设置在所有请求方式中都有效,只是用在流处理中意义更大。

    $client->request('GET', 'http://httpbin.org/stream/1024', [
        'on_headers' => function (ResponseInterface $response) {
            if ($response->getHeaderLine('Content-Length') > 1024) {
                throw new \Exception('The file is too big!');
    ]);复制代码

    在分析源码的过程中,发现了一个没有被使用的类 GuzzleHttp\Promise\Coroutine ,它也是对 Promise 的一个实现,但是其是通过迭代器来实现的,会不会有协程版的 Promise 呢?我们拭目以待。

    本作品由 程小白 创作,采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可,可自由转载、引用但需署名作者且注明文章出处。
    原文地址: www.chengxiaobai.cn/php/guzzle-…