添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Vert.x Core中的功能相当底层 —— 您在此不会找到诸如数据库访问、授权或高层Web应用的功能。您可以在 Vert.x ext (扩展包) (译者注:Vert.x的扩展包是Vert.x的子项目集合,类似 Web Web Client Data Access 等)中找到这些功能。

Vert.x Core 小而轻,您可以只使用您需要的部分。它可整体嵌入现存应用中。我们并不会强迫您用特定的方式构造您的应用。

您亦可在其它Vert.x支持的语言中使用Vert.x Core。很酷的是:我们并不强迫您在书写诸如 JavaScript 或 Ruby 时直接调用 Java API, 毕竟不同的语言有不同的代码风格,若强行让 Ruby 开发人员遵循 Java 的代码风格会很怪异,所以我们根据 Java API 自动生成了适应不同语言代码风格的 API。

From now on we’ll just use the word core to refer to Vert.x core.

如果您在使用 Maven 或 Gradle(译者注:两种常用的项目构建工具),将以下依赖项添加到您的项目描述文件的 dependencies 节点中以使用 Vert.x Core 的API:

除非您拿到 Vertx 对象,否则在Vert.x领域中您做不了太多的事情。它是 Vert.x 的控制中心,也是您做几乎一切事情的基础,包括创建客户端和服务器、获取事件总线的引用、设置定时器等等。

那么如何获取它的实例呢?

如果您用嵌入方式使用Vert.x,可通过以下代码创建实例:

Vertx vertx = Vertx.vertx();

创建集群模式的 Vert.x 对象

如果您想创建一个 集群模式 Vertx 对象(参考 Event Bus 章节了解更多事件总线集群细节),那么通常情况下您将需要使用另一种异步的方式来创建 Vertx 对象。

这是因为让不同的 Vert.x 实例组成一个集群需要一些时间(也许是几秒钟)。在这段时间内,我们不想去阻塞调用线程,所以我们将结果异步返回给您。

HttpServerResponse response = request.response();
response.putHeader("Content-Type", "text/plain");
response.write("some text");
response.end();
vertx.setPeriodic(1000, id -> {
  // This handler will get called every second
  System.out.println("timer fired!");

又或者收到一个HTTP请求:

server.requestHandler(request -> {
  // This handler will be called every time an HTTP request is received at the server
  request.response().end("hello world!");

稍后当Vert.x有一个事件要传给您的处理器时,它会 异步地 调用这个处理器。

由此引入了下面一些Vert.x中的重要概念。

Vert.x的工作方式有所不同。每个 Vertx 实例维护的是 多个Event Loop 线程。默认情况下,我们会根据机器上可用的核数量来设置 Event Loop 的数量,您亦可自行设置。

这意味着 Vertx 进程能够在您的服务器上扩展,与 Node.js 不同。

我们将这种模式称为 Multi-Reactor 模式(多反应器模式),区别于单线程的 Reactor 模式(反应器模式)。

如果上述任何一种情况停止了 Event Loop 并占用了 显著执行时间 ,那您应该去罚站(译者注:原文此处为 Naughy Step,英国父母会在家里选择一个角落作为小孩罚站或静坐的地方,被称为 naughty corner 或 naughty step),等待下一步的指示。

所以,什么是 显著执行时间

您要等多久?它取决于您的应用程序和所需的并发数量。

如果您只有单个 Event Loop,而且您希望每秒处理10000个 HTTP 请求,很明显的是每一个请求处理时间不可以超过0.1毫秒,所以您不能阻塞任何过多(大于0.1毫秒)的时间。

这个数学题并不难,将留给读者作为练习。

如果您的应用程序没有响应,可能这是一个迹象,表明您在某个地方阻塞了Event Loop。 为了帮助您诊断类似问题,若 Vert.x 检测到 Event Loop 有一段时间没有响应,将会自动记录这种警告。 若您在日志中看到类似警告,那么您需要检查您的代码。比如:

Thread vertx-eventloop-thread-3 has been blocked for 20458 ms

Vert.x 还将提供堆栈跟踪,以精确定位发生阻塞的位置。

如果想关闭这些警告或更改设置,您可以在创建 Vertx 对象之前在 VertxOptions 中完成此操作。

事实是,很多,也非所有的库,特别是在JVM生态系统中有很多同步API,这些API中许多方法都是阻塞式的。一个很好的例子就是 JDBC API,它本质上是同步的,无论多么努力地去尝试,Vert.x都不能像魔法小精灵撒尘变法一样将它转换成异步API。

我们不会将所有的内容重写成异步方式,所以我们为您提供一种在 Vert.x 应用中安全调用"传统"阻塞API的方法。

如之前讨论,您不能在 Event Loop 中直接调用阻塞式操作,因为这样做会阻止 Event Loop 执行其他有用的任务。那您该怎么做?

可以通过调用 executeBlocking 方法来指定阻塞式代码的执行以及阻塞式代码执行后处理结果的异步回调。

vertx.executeBlocking(future -> {
  // Call some blocking API that takes a significant amount of time to return
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());

默认情况下,如果 executeBlocking 在同一个上下文环境中(如:同一个 Verticle 实例)被调用了多次,那么这些不同的 executeBlocking 代码块会 顺序执行 (一个接一个)。

若您不需要关心您调用 executeBlocking 的顺序,可以将 ordered 参数的值设为 false 。这样任何 executeBlocking 都会在 Worker Pool 中并行执行。

另外一种运行阻塞式代码的方法是使用Worker Verticle

一个 Worker Verticle 始终会使用 Worker Pool 中的某个线程来执行。

默认的阻塞式代码会在 Vert.x 的 Worker Pool 中执行,通过 setWorkerPoolSize 配置。

可以为不同的用途创建不同的池:

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool");
executor.executeBlocking(future -> {
  // Call some blocking API that takes a significant amount of time to return
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());

Worker Executor 在不需要的时候必须被关闭:

executor.close();

当使用同一个名字创建了许多 worker 时,它们将共享同一个 pool。当所有的 worker executor 调用了 close 方法被关闭过后,对应的 worker pool 会被销毁。

如果 Worker Executor 在 Verticle 中创建,那么 Verticle 实例销毁的同时 Vert.x 将会自动关闭这个 Worker Executor。

Worker Executor 可以在创建的时候配置:

int poolSize = 10;
// 2 minutes
long maxExecuteTime = 2;
TimeUnit maxExecuteTimeUnit = TimeUnit.MINUTES;
WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime, maxExecuteTimeUnit);
Future<HttpServer> httpServerFuture = Future.future();
httpServer.listen(httpServerFuture.completer());
Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.completer());
CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
  if (ar.succeeded()) {
    // All servers started
  } else {
    // At least one server failed

所有被合并的 Future 中的操作同时运行。 当组合的处理操作完成时,该方法返回的 Future 上绑定的处理器 Handler 会被调用。 当一个操作失败(其中的某一个 Future 的状态被标记成失败),则返回的 Future 会被标记为失败。 当所有的操作都成功时,返回的 Future 将会成功完成。

您可以传入一个 Future 列表(可能为空):

CompositeFuture.all(Arrays.asList(future1, future2, future3));

不同于 all 方法的合并会等待所有的 Future 成功执行(或任一失败), any 方法的合并会等待第一个成功执行的Future。 CompositeFuture.any 方法接受多个 Future 作为参数(最多6个,或传入 List )。当任意一个 Future 成功得到结果,则该 Future 成功;当所有的 Future 都执行失败,则该 Future 失败。

CompositeFuture.any(future1, future2).setHandler(ar -> {
  if (ar.succeeded()) {
    // At least one is succeeded
  } else {
    // All failed

它也可使用 Future 列表传参:

CompositeFuture.any(Arrays.asList(f1, f2, f3));

join 方法的合并会等待所有的 Future 完成,无论成败。 CompositeFuture.join 方法接受多个 Future 作为参数(最多6个),并将结果归并成一个 Future 。 当全部 Future 成功执行完成,得到的 Future 是成功状态的;当至少一个 Future 执行失败时,得到的 Future 是失败状态的。

CompositeFuture.join(future1, future2, future3).setHandler(ar -> {
  if (ar.succeeded()) {
    // All succeeded
  } else {
    // All completed and at least one failed

它也可使用 Future 列表传参:

CompositeFuture.join(Arrays.asList(future1, future2, future3));
fut1.compose(v -> { // When the file is created (fut1), execute this: Future<Void> fut2 = Future.future(); fs.writeFile("/foo", Buffer.buffer(), fut2.completer()); return fut2; }).compose(v -> { // When the file is written (fut2), execute this: fs.move("/foo", "/bar", startFuture.completer()); // mark startFuture it as failed if any step fails. startFuture);

这里例子中,有三个操作被串起来了:

Verticle 是由 Vert.x 部署和运行的代码块。默认情况一个 Vert.x 实例维护了N(默认情况下N = CPU核数 x 2)个 Event Loop 线程。Verticle 实例可使用任意 Vert.x 支持的编程语言编写,而且一个简单的应用程序也可以包含多种语言编写的 Verticle。

您可以将 Verticle 想成 Actor Model中的 Actor。(译者注: 参与者模式

一个应用程序通常是由在同一个 Vert.x 实例中同时运行的许多 Verticle 实例组合而成。不同的 Verticle 实例通过向 Event Bus 上发送消息来相互通信。

编写 Verticle

Verticle 的实现类必须实现 Verticle 接口。

如果您喜欢的话,可以直接实现该接口,但是通常直接从抽象类 AbstractVerticle 继承更简单。

这儿有一个例子:

public class MyVerticle extends AbstractVerticle {
 // Verticle部署时调用
 public void start() {
 // 可选 - Verticle撤销时调用
 public void stop() {

通常您需要像上边例子一样重写 start 方法。

当 Vert.x 部署 Verticle 时,它的 start 方法将被调用,这个方法执行完成后 Verticle 就变成已启动状态。

您同样可以重写 stop 方法,当Vert.x 撤销一个 Verticle 时它会被调用,这个方法执行完成后 Verticle 就变成已停止状态了。

Verticle 异步启动和停止

有些时候您的 Verticle 启动会耗费一些时间,您想要在这个过程做一些事,并且您做的这些事并不想等到Verticle部署完成过后再发生。 如:您想在 start 方法中部署其他的 Verticle。

您不能在您的 start 方法中阻塞等待其他的 Verticle 部署完成,这样做会破坏 黄金法则

所以您要怎么做?

您可以实现 异步版本start 方法来做这个事。这个版本的方法会以一个 Future 作参数被调用。方法执行完时,Verticle 实例 并没有 部署好(状态不是 deployed)。

稍后,您完成了所有您需要做的事(如:启动其他Verticle),您就可以调用 Futurecomplete(或 fail )方法来标记启动完成或失败了。

这儿有一个例子:

public class MyVerticle extends AbstractVerticle {
 private HttpServeer server;
 public void start(Future<Void> startFuture) {
   server = vertx.createHttpServer().requestHandler(req -> {
     req.response()
       .putHeader("content-type", "text/plain")
       .end("Hello from Vert.x!");
   // Now bind the server:
   server.listen(8080, res -> {
     if (res.succeeded()) {
       startFuture.complete();
     } else {
       startFuture.fail(res.cause());

同样的,这儿也有一个异步版本的 stop 方法,如果您想做一些耗时的 Verticle 清理工作,您可以使用它。

public class MyVerticle extends AbstractVerticle {
 public void start() {
   // 做一些事
 public void stop(Future<Void> stopFuture) {
   obj.doSomethingThatTakesTime(res -> {
     if (res.succeeded()) {
       stopFuture.complete();
     } else {
       stopFuture.fail();

INFO: 请注意:您不需要在一个 Verticle 的 stop 方法中手工去撤销启动时部署的子 Verticle,当父 Verticle 在撤销时 Vert.x 会自动撤销任何子 Verticle。

Verticle 种类

这儿有三种不同类型的 Verticle:

Standard Verticles

这是最常用的一类 Verticle —— 它们永远运行在 Event Loop 线程上。稍后的章节我们会讨论更多。

Worker Verticles

这类 Verticle 会运行在 Worker Pool 中的线程上。一个实例绝对不会被多个线程同时执行。

Multi-threaded worker verticles

这类 Verticle 也会运行在 Worker Pool 中的线程上。一个实例可以由多个线程同时执行(译者注:因此需要开发者自己确保线程安全)。

Standard verticles

当 Standard Verticle 被创建时,它会被分派给一个 Event Loop 线程,并在这个 Event Loop 中执行它的 start 方法。当您在一个 Event Loop 上调用了 Core API 中的方法并传入了处理器时,Vert.x 将保证用与调用该方法时相同的 Event Loop 来执行这些处理器。

这意味着我们可以保证您的 Verticle 实例中 所有的代码都是在相同Event Loop中执行(只要您不创建自己的线程并调用它!)

同样意味着您可以将您的应用中的所有代码用单线程方式编写,让 Vert.x 去考虑线程和扩展问题。您不用再考虑 synchronized 和 volatile 的问题,也可以避免传统的多线程应用经常会遇到的竞态条件和死锁的问题。

Worker verticles

Worker Verticle 和 Standard Verticle 很像,但它并不是由一个 Event Loop 来执行,而是由Vert.x中的 Worker Pool 中的线程执行。

Worker Verticle 被设计来调用阻塞式代码,它不会阻塞任何 Event Loop。

如果您不想使用 Worker Verticle 来运行阻塞式代码,您还可以在一个Event Loop中直接使用 [内联阻塞式代码](#运行阻塞式代码)。

若您想要将 Verticle 部署成一个 Worker Verticle,您可以通过 setWorker 方法来设置:

DeploymentOptions options = new DeploymentOptions().setWorker(true);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

Worker Verticle 实例绝对不会在 Vert.x 中被多个线程同时执行,但它可以在不同时间由不同线程执行。

Multi-threaded worker verticles

一个 Multi-threaded Worker Verticle 近似于普通的 Worker Verticle,但是它可以由不同的线程同时执行。

vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle");
// Deploy a JavaScript verticle
vertx.deployVerticle("verticles/myverticle.js");
// Deploy a Ruby verticle verticle
vertx.deployVerticle("verticles/my_verticle.rb");
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", res -> {
  if (res.succeeded()) {
    System.out.println("Deployment id is: " + res.result());
  } else {
    System.out.println("Deployment failed!");

如果部署成功,这个完成处理器的结果中将会包含部署ID的字符串。

这个部署 ID可以在之后您想要撤销它时使用。

撤销Verticle

我们可以通过 undeploy 方法来撤销部署好的 Verticle。

撤销操作也是异步的,因此若您想要在撤销完成过后收到通知则可以指定另一个完成处理器:

vertx.undeploy(deploymentID, res -> {
  if (res.succeeded()) {
    System.out.println("Undeployed ok");
  } else {
    System.out.println("Undeploy failed!");
JsonObject config = new JsonObject().put("name", "tim").put("directory", "/blah");
DeploymentOptions options = new DeploymentOptions().setConfig(config);
vertx.deployVerticle("com.mycompany.MyOrderProcessorVerticle", options);

传入之后,这个配置可以通过 Context 对象或使用 config 方法访问。

这个配置会以 JSON 对象(JsonObject) 的形式返回,因此您可以用下边代码读取数据:

System.out.println("Configuration: " + config().getString("name"));

当使用隔离组时,您需要用 setIsolatedClasses 方法来提供一个您想隔离的类名列表。列表项可以是一个Java 限定类全名,如 com.mycompany.myproject.engine.MyClass ;也可以是包含通配符的可匹配某个包或子包的任何类,例如 com.mycompany.myproject.* 将会匹配所有 com.mycompany.myproject 包或任意子包中的任意类名。

请注意仅仅只有匹配的类会被隔离,其他任意类会被当前类加载器加载。

若您想要加载的类和资源不存在于主类路径(main classpath),您可使用 setExtraClasspath 方法将额外的类路径添加到这里。

DeploymentOptions options = new DeploymentOptions().setIsolationGroup("mygroup");
options.setIsolatedClasses(Arrays.asList("com.mycompany.myverticle.*",
                   "com.mycompany.somepkg.SomeClass", "org.somelibrary.*"));
vertx.deployVerticle("com.mycompany.myverticle.VerticleClass", options);

Verticle可以启用高可用方式(HA)部署。在这种方式下,当其中一个部署在 Vert.x 实例中的 Verticle 突然挂掉,这个 Verticle 可以在集群环境中的另一个 Vert.x 实例中重新部署。

若要启用高可用方式运行一个 Verticle,仅需要追加 -ha 参数:

vertx run my-verticle.js -ha

当启用高可用方式时,不需要追加 -cluster 参数。

关于高可用的功能和配置的更多细节可参考 高可用和故障转移 章节。

从命令行运行Verticle

您可以在 Maven 或 Gradle 项目中以正常方式添加 Vert.x Core 为依赖,在项目中直接使用 Vert.x。

但是,您也可以从命令行直接运行 Vert.x 的 Verticle。

为此,您需要下载并安装 Vert.x 的发行版,并且将安装的 bin 目录添加到您的 PATH 环境变量中,还要确保您的 PATH 中设置了Java 8的JDK环境。

Context context = vertx.getOrCreateContext();
if (context.isEventLoopContext()) {
  System.out.println("Context attached to Event Loop");
} else if (context.isWorkerContext()) {
  System.out.println("Context attached to Worker Thread");
} else if (context.isMultiThreadedWorkerContext()) {
  System.out.println("Context attached to Worker Thread - multi threaded worker");
} else if (! Context.isOnVertxThread()) {
  System.out.println("Context not attached to a thread managed by vert.x");

当您获取了这个 Context 对象,您就可以在 Context 中异步执行代码了。换句话说,您提交的任务将会在同一个 Context 中运行:

vertx.getOrCreateContext().runOnContext( (v) -> {
  System.out.println("This will be executed asynchronously in the same context");

当在同一个 Context 中运行了多个处理函数时,可能需要在它们之间共享数据。 Context 对象提供了存储和读取共享数据的方法。举例来说,它允许您将数据传递到 runOnContext 方法运行的某些操作中:

final Context context = vertx.getOrCreateContext();
context.put("data", "hello");
context.runOnContext((v) -> {
  String hello = context.get("data");

您还可以通过 config 方法访问 Verticle 的配置信息。查看 向 Verticle 传入配置 章节了解更多配置信息。

执行周期性/延迟性操作

在 Vert.x 中,想要延迟之后执行或定期执行操作很常见。

在 Standard Verticle 中您不能直接让线程休眠以引入延迟,因为它会阻塞 Event Loop 线程。

取而代之是使用 Vert.x 定时器。定时器可以是一次性或周期性的,两者我们都会讨论到。

一次性计时器

一次性计时器会在一定延迟后调用一个 Event Handler,以毫秒为单位计时。

您可以通过 setTimer 方法传递延迟时间和一个处理器来设置计时器的触发。

long timerID = vertx.setTimer(1000, id -> {
  System.out.println("And one second later this is printed");
System.out.println("First this is printed");

返回值是一个唯一的计时器id,该id可用于之后取消该计时器,这个计时器id会传入给处理器。

周期性计时器

您同样可以使用 setPeriodic 方法设置一个周期性触发的计时器。

第一次触发之前同样会有一段设置的延时时间。

setPeriodic 方法的返回值也是一个唯一的计时器id,若之后该计时器需要取消则使用该id。

传给处理器的参数也是这个唯一的计时器id。

请记住这个计时器将会定期触发。如果您的定时任务会花费大量的时间,则您的计时器事件可能会连续执行甚至发生更坏的情况:重叠。

这种情况,您应考虑使用 setTimer 方法,当任务执行完成时设置下一个计时器。

long timerID = vertx.setPeriodic(1000, id -> {
  System.out.println("And every second this is printed");
System.out.println("First this is printed");

Verticle worker pool

Verticle 使用 Vert.x 中的 Worker Pool 来执行阻塞式行为,例如 executeBlocking 或 Worker Verticle。

可以在部署配置项中指定不同的Worker 线程池:

vertx.deployVerticle("the-verticle", new DeploymentOptions().setWorkerPoolName("the-specific-pool"));

同任何花哨的寻址方案相比,Vert.x的地址格式并不麻烦。Vert.x中的地址是一个简单的字符串,任意字符串都合法。当然,使用某种模式来命名仍然是明智的。如:使用点号来划分命名空间。

一些合法的地址形如: europe.news.feed1, acme.games.pacman, sausages, and X.

消息在处理器(Handler)中被接收。您可以在某个地址上注册一个处理器来接收消息。

同一个地址可以注册许多不同的处理器。

一个处理器也可以注册在多个不同的地址上。

发布/订阅消息

Event Bus支持 发布消息 功能。

消息将被发布到一个地址中,发布意味着会将信息传递给 所有 注册在该地址上的处理器。

这和 发布/订阅模式 很类似。

点对点模式/请求-响应模式

Event Bus也支持 点对点消息模式

消息将被发送到一个地址中,Vert.x将会把消息分发到某个注册在该地址上的处理器。

若这个地址上有不止一个注册过的处理器,它将使用 不严格的轮询算法 选择其中一个。

点对点消息传递模式下,可在消息发送的时候指定一个应答处理器(可选)。

当接收者收到消息并且已经被处理时,它可以选择性决定回复该消息,若选择回复则绑定的应答处理器将会被调用。

当发送者收到回复消息时,它也可以回复,这个过程可以不断重复。通过这种方式可以允许在两个不同的 Verticle 之间设置一个对话窗口。

这种消息模式被称作 请求-响应 模式。

Vert.x会尽它最大努力去传递消息,并且不会主动丢弃消息。这种方式称为 尽力传输(Best-effort delivery)

但是,当 Event Bus 中的全部或部分发生故障时,则可能会丢失消息。

若您的应用关心丢失的消息,您应该编写具有幂等性的处理器,并且您的发送者可以在恢复后重试。

eb.consumer("news.uk.sport", message -> { System.out.println("I have received a message: " + message.body());

当一个消息达到您的处理器,该处理器会以 message 为参数被调用。

调用 consumer 方法会返回一个 MessageConsumer 对象。

该对象随后可用于撤销处理器、或将处理器用作流式处理。

您也可以不设置处理器而使用 consumer 方法直接返回一个 MessageConsumer,之后再来设置处理器。如:

EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.consumer("news.uk.sport");
consumer.handler(message -> {
  System.out.println("I have received a message: " + message.body());

在集群模式下的Event Bus上注册处理器时,注册信息会花费一些时间才能传播到集群中的所有节点。

若您希望在完成注册后收到通知,您可以在 MessageConsumer 对象上注册一个 completion handler

consumer.completionHandler(res -> {
  if (res.succeeded()) {
    System.out.println("The handler registration has reached all nodes");
  } else {
    System.out.println("Registration failed!");
consumer.unregister(res -> {
  if (res.succeeded()) {
    System.out.println("The handler un-registration has reached all nodes");
  } else {
    System.out.println("Un-registration failed!");
DeliveryOptions options = new DeliveryOptions();
options.addHeader("some-header", "some-value");
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball", options);
MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");
consumer.handler(message -> {
  System.out.println("I have received a message: " + message.body());
  message.reply("how interesting!");
eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {
  if (ar.succeeded()) {
    System.out.println("Received reply: " + ar.result().body());

在应答的消息体中可以包含有用的信息。

关于 处理中 的含义实际上是由应用程序来定义的。这完全取决于消费者如何执行,Event Bus 对此并不关心。

一些例子:

eventBus.registerCodec(myCodec);
DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.name());
eventBus.send("orders", new MyPOJO(), options);

若您总是希望某个类使用将特定的编解码器,那么您可以为这个类注册默认编解码器。 这样您就不需要在每次发送的时候使用 DeliveryOptions 来指定了:

eventBus.registerDefaultCodec(MyPOJO.class, myCodec);
eventBus.send("orders", new MyPOJO());

您可以通过 unregisterCodec 方法注销某个消息编解码器。

消息编解码器的编码和解码不一定使用同一个类型。例如您可以编写一个编解码器来发送 MyPOJO 类的对象,但是当消息发送给处理器后解码成 MyOtherPOJO 对象。

集群模式的 Event Bus

Event Bus 不仅仅存在于单个 Vert.x 实例中。通过您在网络上将不同的 Vert.x 实例集群在一起,它可以形成一个单一的、分布式的Event Bus。

通过代码的方式启用集群模式

若您用编程的方式创建 Vert.x 实例(Vertx),则可以通过将 Vert.x 实例配置成集群模式来获取集群模式的Event Bus:

VertxOptions options = new VertxOptions();
Vertx.clusteredVertx(options, res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
    EventBus eventBus = vertx.eventBus();
    System.out.println("We now have a clustered event bus: " + eventBus);
  } else {
    System.out.println("Failed: " + res.cause());

您需要确在您的 classpath 中(或构建工具的依赖中)包含 ClusterManager 的实现类,如默认的 HazelcastClusterManager

通过命令行启用集群模式

您可以通过以下命令以集群模式运行 Vert.x 应用:

vertx run my-verticle.js -cluster

Event Bus 是可以配置的,这对于以集群模式运行的 Event Bus 是非常有用的。Event Bus 使用 TCP 连接发送和接收消息,因此可以通过 EventBusOptions 对TCP连接进行全面的配置。由于 Event Bus 同时用作客户端和服务器,因此这些配置近似于 NetClientOptionsNetServerOptions

VertxOptions options = new VertxOptions()
    .setEventBusOptions(new EventBusOptions()
        .setSsl(true)
        .setKeyStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setTrustStoreOptions(new JksOptions().setPath("keystore.jks").setPassword("wibble"))
        .setClientAuth(ClientAuth.REQUIRED)
Vertx.clusteredVertx(options, res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
    EventBus eventBus = vertx.eventBus();
    System.out.println("We now have a clustered event bus: " + eventBus);
  } else {
    System.out.println("Failed: " + res.cause());

上边代码段描述了如何在Event Bus中使用SSL连接替换传统的TCP连接。

WARNING: 若要在集群模式下保证安全性,您 必须 将集群管理器配置成加密的或强制安全的。参考集群管理器的文档获取更多细节。

Event Bus 的配置需要在所有集群节点中保持一致性。

EventBusOptions 还允许您指定 Event Bus 是否运行在集群模式下,以及它的主机信息和端口。您可使用 setClusteredgetClusterHostgetClusterPort 方法来设置。

在容器中使用时,您也可以配置公共主机和端口号:

VertxOptions options = new VertxOptions()
    .setEventBusOptions(new EventBusOptions()
        .setClusterPublicHost("whatever")
        .setClusterPublicPort(1234)
Vertx.clusteredVertx(options, res -> {
  if (res.succeeded()) {
    Vertx vertx = res.result();
    EventBus eventBus = vertx.eventBus();
    System.out.println("We now have a clustered event bus: " + eventBus);
  } else {
    System.out.println("Failed: " + res.cause());
request.bodyHandler(buff -> {
  JsonObject jsonObject = buff.toJsonObject();
  User javaObject = jsonObject.mapTo(User.class);

请注意上述代码直接使用了 Jackson 的 ObjectMapper#convertValue() 来执行映射。 关于字段和构造函数的可见性的影响、对象引用的序列化和反序列化的问题等等可参考 Jackson 的文档获取更多信息。

在最简单的情况下,如果 Java 类中所有的字段都是 public (或者有 public 的 getter/setter)时, 并且有一个 public 的默认构造函数(或不定义构造函数),mapFrommapTo 都应该成功。

只要不存在对象的循环引用,嵌套的 Java 对象可以被序列化/反序列化为嵌套的JSON对象。

将 JSON 对象编码成字符串

您可使用 encode 方法将一个对象编码成字符串格式。

一个 Buffer 是可以读取或写入的0个或多个字节序列,并且根据需要可以自动扩容、将任意字节写入 Buffer 。您也可以将 Buffer 想象成字节数组(译者注:类似于 JDK 中的 ByteBuffer )。

创建 Buffer

可以使用静态方法 Buffer.buffer 来创建 Buffer

Buffer 可以从字符串或字节数组初始化,或者直接创建空的 Buffer

这儿有一些创建 Buffer 的例子。

创建一个空的 Buffer

Buffer buff = Buffer.buffer();

从字符串创建一个 Buffer , 这个 Buffer 中的字符会以 UTF-8 格式编码:

Buffer buff = Buffer.buffer("some string");

从字符串创建一个 Buffer ,这个字符串可以用指定的编码方式编码,例如:

Buffer buff = Buffer.buffer("some string", "UTF-16");

从字节数组 byte[] 创建 Buffer

byte[] bytes = new byte[] {1, 3, 5};
Buffer buff = Buffer.buffer(bytes);

创建一个指定初始大小的 Buffer 。若您知道您的 Buffer 会写入一定量的数据,您可以创建 Buffer 并指定它的大小。 这使得这个 Buffer 初始化时分配了更多的内存,比数据写入时重新调整大小的效率更高。

注意以这种方式创建的 Buffer空的。它不会创建一个填满了 0 的Buffer。代码如下:

Buffer buff = Buffer.buffer(10000);

向Buffer写入数据

Buffer 写入数据的方式有两种:追加和随机写入。任何一种情况下 Buffer 都会自动进行扩容,所以不可能在使用 Buffer 时遇到 IndexOutOfBoundsException

追加到Buffer

您可以使用 appendXXX 方法追加数据到 BufferBuffer 类提供了追加各种不同类型数据的追加写入方法。

因为 appendXXX 方法的返回值就是 Buffer 自身,所以它可以链式地调用:

Buffer buff = Buffer.buffer();
buff.appendInt(123).appendString("hello\n");
socket.write(buff);

您还可以指定一个索引值,通过 setXXX 方法写入数据到 Buffer ,它也存在各种不同数据类型的方法。 所有的 set 方法都会将索引值作为第一个参数 —— 这表示 Buffer 中开始写入数据的位置。

Buffer 始终根据需要进行自动扩容。

Buffer buff = Buffer.buffer();
buff.setInt(1000, 123);
buff.setString(0, "hello");
Buffer buff = Buffer.buffer();
for (int i = 0; i < buff.length(); i += 4) {
  System.out.println("int value at " + i + " is " + buff.getInt(i));

可使用 getUnsignedXXX , appendUnsignedXXXsetUnsignedXXX 方法将无符号数从 Buffer 中读取或追加/设置到 Buffer 里。这对以优化网络协议和最小化带宽消耗为目的实现的编解码器是很有用的。

下边例子中,值 200 被设置到了仅占用一个字节的特定位置:

Buffer buff = Buffer.buffer(128);
int pos = 15;
buff.setUnsignedByte(pos, (short) 200);
System.out.println(buff.getUnsignedByte(pos));

控制台中显示 200

Buffer长度

可使用 length 方法获取Buffer长度,Buffer的长度值是Buffer中包含的字节的最大索引 + 1。

拷贝Buffer

可使用 copy 方法创建一个Buffer的副本。

裁剪Buffer

裁剪得到的Buffer是基于原始Buffer的一个新的Buffer。它不会拷贝实际的数据。使用 slice 方法裁剪一个Buffer。

Buffer 重用

将Buffer写入到一个Socket或其他类似位置后,Buffer就不可被重用了。

NetServer server = vertx.createNetServer();
server.listen(1234, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
NetServer server = vertx.createNetServer();
server.listen(0, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening on actual port: " + server.actualPort());
  } else {
    System.out.println("Failed to bind!");
NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
  // Handle the connection in here

当连接成功时,您可以在回调函数中处理得到的 NetSocket 实例。

这是一个代表了实际连接的套接字接口,它允许您读取和写入数据、以及执行各种其他操作,如关闭 Socket。

从Socket读取数据

您可以在Socket上调用 handler 方法来设置用于读取数据的处理器。

每次 Socket 接收到数据时,会以 Buffer 对象为参数调用处理器。

NetServer server = vertx.createNetServer();
server.connectHandler(socket -> {
  socket.handler(buffer -> {
    System.out.println("I received some bytes: " + buffer.length());
for (int i = 0; i < 10; i++) {
  NetServer server = vertx.createNetServer();
  server.connectHandler(socket -> {
    socket.handler(buffer -> {
      // Just echo back the data
      socket.write(buffer);
  server.listen(1234, "localhost");

如果您使用的是 Verticle,您可以通过在命令行上使用 -instances 选项来简单部署更多的服务器实例:

vertx run com.mycompany.MyVerticle -instances 10

或者使用编程方式部署您的 Verticle 时:

DeploymentOptions options = new DeploymentOptions().setInstances(10);
vertx.deployVerticle("com.mycompany.MyVerticle", options);

一旦您这样做,您将发现echo服务器在功能上与之前相同,但是服务器上的所有核都可以被利用,并且可以处理更多的工作。

在这一点上,您可能会问自己:如何让多台服务器在同一主机和端口上侦听?尝试部署一个以上的实例时真的不会遇到端口冲突吗?

Vert.x在这里有一点魔法。

当您在与现有服务器相同的主机和端口上部署另一个服务器实例时,实际上它并不会尝试创建在同一主机/端口上侦听的新服务器实例。

相反,它内部仅仅维护一个服务器实例。当传入新的连接时,它以轮询的方式将其分发给任意一个连接处理器处理。

因此,Vert.x TCP 服务端可以水平扩展到多个核,并且每个实例保持单线程环境不变。

创建 TCP 客户端

使用所有默认选项创建 TCP 客户端的最简单方法如下:

NetClient client = vertx.createNetClient();
NetClientOptions options = new NetClientOptions().setConnectTimeout(10000);
NetClient client = vertx.createNetClient(options);
client.connect(4321, "localhost", res -> {
  if (res.succeeded()) {
    System.out.println("Connected!");
    NetSocket socket = res.result();
  } else {
    System.out.println("Failed to connect: " + res.cause().getMessage());
NetClientOptions options = new NetClientOptions().
  setReconnectAttempts(10).
  setReconnectInterval(500);
NetClient client = vertx.createNetClient(options);

默认情况下,多个连接尝试是被禁用的。

记录网络活动

网络活动可以被记录下来,用于调试:

NetServerOptions options = new NetServerOptions().setLogActivity(true);
NetServer server = vertx.createNetServer(options);

对于客户端:

NetClientOptions options = new NetClientOptions().setLogActivity(true);
NetClient client = vertx.createNetClient(options);

Netty 使用 DEBUG 级别和 io.netty.handler.logging.LoggingHandler 名称来记录网络活动。使用网络活动记录时,需要注意以下几点:

无论是否使用SSL/TLS,服务器和客户端的API都是相同的。通过创建客户端/服务器时使用的 NetClientOptionsNetServerOptions 来启用TLS/SSL。

在服务端启用SSL/TLS

您需要设置 ssl 配置项来启用 SSL/TLS。

默认是禁用的。

指定服务端的密钥/证书

SSL/TLS 服务端通常向客户端提供证书,以便验证服务端的身份。

可以通过以下几种方式为服务端配置证书/密钥:

第一种方法是指定包含证书和私钥的Java密钥库位置。

可以使用 JDK 附带的 keytool 实用程序来管理Java密钥存储。

还应提供密钥存储的密码:

NetServerOptions options = new NetServerOptions().setSsl(true).setKeyStoreOptions(
  new JksOptions().
    setPath("/path/to/your/server-keystore.jks").
    setPassword("password-of-your-keystore")
NetServer server = vertx.createNetServer(options);

或者,您可以自己读取密钥库到一个 Buffer ,并将它直接提供给 JksOptions

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-keystore.jks");
JksOptions jksOptions = new JksOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyStoreOptions(jksOptions);
NetServer server = vertx.createNetServer(options);

PKCS#12格式的密钥/证书 (http://en.wikipedia.org/wiki/PKCS_12) ,通常为 .pfx.p12 扩展名)也可以用与JKS密钥存储相似的方式加载:

NetServerOptions options = new NetServerOptions().setSsl(true).setPfxKeyCertOptions(
  new PfxOptions().
    setPath("/path/to/your/server-keystore.pfx").
    setPassword("password-of-your-keystore")
NetServer server = vertx.createNetServer(options);

也支持通过 Buffer 来配置:

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-keystore.pfx");
PfxOptions pfxOptions = new PfxOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setPfxKeyCertOptions(pfxOptions);
NetServer server = vertx.createNetServer(options);

另外一种分别提供服务器私钥和证书的方法是使用 .pem 文件。

NetServerOptions options = new NetServerOptions().setSsl(true).setPemKeyCertOptions(
  new PemKeyCertOptions().
    setKeyPath("/path/to/your/server-key.pem").
    setCertPath("/path/to/your/server-cert.pem")
NetServer server = vertx.createNetServer(options);

也支持通过 Buffer 来配置:

Buffer myKeyAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-key.pem");
Buffer myCertAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-cert.pem");
PemKeyCertOptions pemOptions = new PemKeyCertOptions().
  setKeyValue(myKeyAsABuffer).
  setCertValue(myCertAsABuffer);
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setPemKeyCertOptions(pemOptions);
NetServer server = vertx.createNetServer(options);

PKCS8, PKCS1 and X.509 certificates wrapped in a PEM block formats are supported.

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustStoreOptions(
    new JksOptions().
      setPath("/path/to/your/truststore.jks").
      setPassword("password-of-your-truststore")
NetServer server = vertx.createNetServer(options);

或者您可以自己读取受信存储到 Buffer ,并将它直接提供:

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.jks");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setTrustStoreOptions(
    new JksOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
NetServer server = vertx.createNetServer(options);

PKCS#12格式的密钥/证书 (http://en.wikipedia.org/wiki/PKCS_12) ,通常为 .pfx.p12 扩展名)也可以用与JKS密钥存储相似的方式加载:

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setPfxTrustOptions(
    new PfxOptions().
      setPath("/path/to/your/truststore.pfx").
      setPassword("password-of-your-truststore")
NetServer server = vertx.createNetServer(options);

也支持通过 Buffer 来配置:

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.pfx");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setPfxTrustOptions(
    new PfxOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
NetServer server = vertx.createNetServer(options);

另一种提供服务器证书颁发机构的方法是使用一个 .pem 文件列表。

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setPemTrustOptions(
    new PemTrustOptions().
      addCertPath("/path/to/your/server-ca.pem")
NetServer server = vertx.createNetServer(options);

也支持通过 Buffer 来配置:

Buffer myCaAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/server-ca.pfx");
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setClientAuth(ClientAuth.REQUIRED).
  setPemTrustOptions(
    new PemTrustOptions().
      addCertValue(myCaAsABuffer)
NetServer server = vertx.createNetServer(options);
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setHostnameVerificationAlgorithm("HTTPS");
NetClient client = vertx.createNetClient(options);

和服务器配置相同,也可通过以下几种方式配置受信客户端:

第一种方法是指定包含证书颁发机构的Java受信库的位置。

它只是一个标准的Java密钥存储,与服务器端的密钥存储相同。通过在 jks options 上使用 path 设置客户端受信存储位置。如果服务器在连接期间提供不在客户端受信存储中的证书,则尝试连接将不会成功。

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustStoreOptions(
    new JksOptions().
      setPath("/path/to/your/truststore.jks").
      setPassword("password-of-your-truststore")
NetClient client = vertx.createNetClient(options);

它也支持 Buffer 的配置:

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.jks");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustStoreOptions(
    new JksOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
NetClient client = vertx.createNetClient(options);

PKCS#12格式的密钥/证书 (http://en.wikipedia.org/wiki/PKCS_12) ,通常为 .pfx.p12 扩展名)也可以用与JKS密钥存储相似的方式加载:

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setPfxTrustOptions(
    new PfxOptions().
      setPath("/path/to/your/truststore.pfx").
      setPassword("password-of-your-truststore")
NetClient client = vertx.createNetClient(options);

也支持通过 Buffer 来配置:

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/truststore.pfx");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setPfxTrustOptions(
    new PfxOptions().
      setValue(myTrustStoreAsABuffer).
      setPassword("password-of-your-truststore")
NetClient client = vertx.createNetClient(options);

另一种提供服务器证书颁发机构的方法是使用一个 .pem 文件列表。

NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setPemTrustOptions(
    new PemTrustOptions().
      addCertPath("/path/to/your/ca-cert.pem")
NetClient client = vertx.createNetClient(options);

也支持通过 Buffer 来配置:

Buffer myTrustStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/ca-cert.pem");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setPemTrustOptions(
    new PemTrustOptions().
      addCertValue(myTrustStoreAsABuffer)
NetClient client = vertx.createNetClient(options);
NetClientOptions options = new NetClientOptions().setSsl(true).setKeyStoreOptions(
  new JksOptions().
    setPath("/path/to/your/client-keystore.jks").
    setPassword("password-of-your-keystore")
NetClient client = vertx.createNetClient(options);

也支持通过 Buffer 来配置:

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-keystore.jks");
JksOptions jksOptions = new JksOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setKeyStoreOptions(jksOptions);
NetClient client = vertx.createNetClient(options);

PKCS#12格式的密钥/证书 (http://en.wikipedia.org/wiki/PKCS_12) ,通常为 .pfx.p12 扩展名)也可以用与JKS密钥存储相似的方式加载:

NetClientOptions options = new NetClientOptions().setSsl(true).setPfxKeyCertOptions(
  new PfxOptions().
    setPath("/path/to/your/client-keystore.pfx").
    setPassword("password-of-your-keystore")
NetClient client = vertx.createNetClient(options);

也支持通过 Buffer 来配置:

Buffer myKeyStoreAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-keystore.pfx");
PfxOptions pfxOptions = new PfxOptions().
  setValue(myKeyStoreAsABuffer).
  setPassword("password-of-your-keystore");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setPfxKeyCertOptions(pfxOptions);
NetClient client = vertx.createNetClient(options);

另一种单独提供服务器私钥和证书的方法是使用 .pem 文件。

NetClientOptions options = new NetClientOptions().setSsl(true).setPemKeyCertOptions(
  new PemKeyCertOptions().
    setKeyPath("/path/to/your/client-key.pem").
    setCertPath("/path/to/your/client-cert.pem")
NetClient client = vertx.createNetClient(options);

也支持通过 Buffer 来配置:

Buffer myKeyAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-key.pem");
Buffer myCertAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/client-cert.pem");
PemKeyCertOptions pemOptions = new PemKeyCertOptions().
  setKeyValue(myKeyAsABuffer).
  setCertValue(myCertAsABuffer);
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setPemKeyCertOptions(pemOptions);
NetClient client = vertx.createNetClient(options);

请记住 pem 的配置和私钥是不加密的。

用于测试和开发目的的自签名证书

SelfSignedCertificate certificate = SelfSignedCertificate.create();
NetServerOptions serverOptions = new NetServerOptions()
  .setSsl(true)
  .setKeyCertOptions(certificate.keyCertOptions())
  .setTrustOptions(certificate.trustOptions());
NetServer server = vertx.createNetServer(serverOptions)
  .connectHandler(socket -> socket.write("Hello!").end())
  .listen(1234, "localhost");
NetClientOptions clientOptions = new NetClientOptions()
  .setSsl(true)
  .setKeyCertOptions(certificate.keyCertOptions())
  .setTrustOptions(certificate.trustOptions());
NetClient client = vertx.createNetClient(clientOptions);
client.connect(1234, "localhost", ar -> {
  if (ar.succeeded()) {
    ar.result().handler(buffer -> System.out.println(buffer));
  } else {
    System.err.println("Woops: " + ar.cause().getMessage());

客户端也可配置为信任所有证书:

NetClientOptions clientOptions = new NetClientOptions()
  .setSsl(true)
  .setTrustAll(true);

自签名证书也适用于其他基于TCP的协议,如HTTPS:

SelfSignedCertificate certificate = SelfSignedCertificate.create();
vertx.createHttpServer(new HttpServerOptions()
  .setSsl(true)
  .setKeyCertOptions(certificate.keyCertOptions())
  .setTrustOptions(certificate.trustOptions()))
  .requestHandler(req -> req.response().end("Hello!"))
  .listen(8080);
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustStoreOptions(trustOptions).
  addCrlPath("/path/to/your/crl.pem");
NetClient client = vertx.createNetClient(options);

也支持通过 Buffer 来配置:

Buffer myCrlAsABuffer = vertx.fileSystem().readFileBlocking("/path/to/your/crl.pem");
NetClientOptions options = new NetClientOptions().
  setSsl(true).
  setTrustStoreOptions(trustOptions).
  addCrlValue(myCrlAsABuffer);
NetClient client = vertx.createNetClient(options);
NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyStoreOptions(keyStoreOptions).
  addEnabledCipherSuite("ECDHE-RSA-AES128-GCM-SHA256").
  addEnabledCipherSuite("ECDHE-ECDSA-AES128-GCM-SHA256").
  addEnabledCipherSuite("ECDHE-RSA-AES256-GCM-SHA384").
  addEnabledCipherSuite("CDHE-ECDSA-AES256-GCM-SHA384");
NetServer server = vertx.createNetServer(options);

密码套件可在 NetServerOptionsNetClientOptions 配置项中指定。

配置TLS协议版本

默认情况下,TLS配置将使用以下协议版本:SSLv2Hello、TLSv1、TLSv1.1 和 TLSv1.2。 协议版本可以通过显式添加启用协议进行配置:

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyStoreOptions(keyStoreOptions).
  removeEnabledSecureTransportProtocol("TLSv1").
  addEnabledSecureTransportProtocol("TLSv1.3");
NetServer server = vertx.createNetServer(options);

协议版本可在 NetServerOptionsNetClientOptions 配置项中指定。

SSL引擎

引擎实现可以配置为使用 OpenSSL 而不是JDK实现(来支持SSL)。 OpenSSL提供比JDK引擎更好的性能和CPU使用率、以及JDK版本独立性。

引擎选项可使用:

NetServerOptions options = new NetServerOptions().
  setSsl(true).
  setKeyStoreOptions(keyStoreOptions);
// Use JDK SSL engine explicitly
options = new NetServerOptions().
  setSsl(true).
  setKeyStoreOptions(keyStoreOptions).
  setJdkSslEngineOptions(new JdkSSLEngineOptions());
// Use OpenSSL engine
options = new NetServerOptions().
  setSsl(true).
  setKeyStoreOptions(keyStoreOptions).
  setOpenSslEngineOptions(new OpenSSLEngineOptions());

Server Name Indication (SNI)

Server Name Indication (SNI) is a TLS extension by which a client specifies a hostname attempting to connect: during the TLS handshake the client gives a server name and the server can use it to respond with a specific certificate for this server name instead of the default deployed certificate. If the server requires client authentication the server can use a specific trusted CA certificate depending on the indicated server name.

When SNI is active the server uses

the certificate CN or SAN DNS (Subject Alternative Name with DNS) to do an exact match, e.g www.example.com

the certificate CN or SAN DNS certificate to match a wildcard name, e.g *.example.com

otherwise the first certificate when the client does not present a server name or the presented server name cannot be matched

if JksOptions were used to set the trust options ( options ) then an exact match with the trust store alias is done

otherwise the available CA certificates are used in the same way as if no SNI is in place

You can enable SNI on the server by setting setSni to true and configured the server with multiple key/certificate pairs.

Java KeyStore files or PKCS12 files can store multiple key/cert pairs out of the box.

JksOptions keyCertOptions = new JksOptions().setPath("keystore.jks").setPassword("wibble");
NetServer netServer = vertx.createNetServer(new NetServerOptions()
    .setKeyStoreOptions(keyCertOptions)
    .setSsl(true)
    .setSni(true)

PemKeyCertOptions can be configured to hold multiple entries:

PemKeyCertOptions keyCertOptions = new PemKeyCertOptions()
    .setKeyPaths(Arrays.asList("default-key.pem", "host1-key.pem", "etc..."))
    .setCertPaths(Arrays.asList("default-cert.pem", "host2-key.pem", "etc...")
NetServer netServer = vertx.createNetServer(new NetServerOptions()
    .setPemKeyCertOptions(keyCertOptions)
    .setSsl(true)
    .setSni(true)

The client implicitly sends the connecting host as an SNI server name for Fully Qualified Domain Name (FQDN).

You can provide an explicit server name when connecting a socket

NetClient client = vertx.createNetClient(new NetClientOptions()
    .setTrustStoreOptions(trustOptions)
    .setSsl(true)
// Connect to 'localhost' and present 'server.name' server name
client.connect(1234, "localhost", "server.name", res -> {
  if (res.succeeded()) {
    System.out.println("Connected!");
    NetSocket socket = res.result();
  } else {
    System.out.println("Failed to connect: " + res.cause().getMessage());

It can be used for different purposes:

应用层协议协商 (ALPN)

ALPN(Application-Layer Protocol Negotiation)是应用层协议协商的TLS扩展,它被HTTP/2使用:在TLS握手期时,客户端给出其接受的应用协议列表,之后服务器使用它所支持的协议响应。

If you are using Java 9, you are fine and you can use HTTP/2 out of the box without extra steps.

标准的Java 8不支持ALPN,所以ALPN应该通过其他方式启用:

其中 ${version} 取决于JVM的版本,如 OpenJDK 1.8.0u74 中的 8.1.7.v20160121。这个完整列表可以在 Jetty-ALPN page 页面上找到。

这种方法主要缺点是ALPN的实现版本依赖于JVM的版本。

为了解决这个问题,可以使用 Jetty ALPN agent 。agent是一个JVM代理,它会为运行它的JVM选择正确的ALPN版本:

-javaagent:/path/to/alpn/agent
NetClientOptions options = new NetClientOptions()
  .setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
    .setHost("localhost").setPort(1080)
    .setUsername("username").setPassword("secret"));
NetClient client = vertx.createNetClient(options);

DNS 解析总是在代理服务器上完成解析,为了实现 SOCKS4 客户端的功能,需要先在本地解析 DNS 地址。

.setUseAlpn(true) .setSsl(true) .setKeyStoreOptions(new JksOptions().setPath("/path/to/my/keystore")); HttpServer server = vertx.createHttpServer(options);

ALPN是一个TLS的扩展,它在客户端和服务器开始交换数据之前协商协议。

不支持ALPN的客户端仍然可以执行经典的SSL握手。

通常情况,ALPN会对 h2 协议达成一致,尽管服务器或客户端决定了仍然使用 HTTP/1.1 协议。

要处理 h2c 请求,TLS必须被禁用,服务器将升级到 HTTP/2 以满足任何希望升级到 HTTP/2 的 HTTP/1.1 请求。它还将接受以 PRI*HTTP/2.0\r\nSM\r\n 开始的 h2c 直接连接。

HttpServer server = vertx.createHttpServer();
server.listen(8080, "myhost.com", res -> {
  if (res.succeeded()) {
    System.out.println("Server is now listening!");
  } else {
    System.out.println("Failed to bind!");
HttpServer server = vertx.createHttpServer();
server.requestHandler(request -> {
  // Handle the request in here
vertx.createHttpServer().requestHandler(request -> {
  request.response().end("Hello world");
}).listen(8080);

在请求中指定的 HTTP 版本可通过 version 方法获取。

使用 method 方法读取请求中的 HTTP Method(即GET、POST、PUT、DELETE、HEAD、OPTIONS等)。

请求URI

使用 uri 方法读取请求中的URI路径。

请注意,这是在HTTP 请求中传递的实际URI,它总是一个相对的URI。

这个URI是在 Section 5.1.2 of the HTTP specification - Request-URI 中定义的。

使用 path 方法读取URI中的路径部分。

例如,请求的URI为:

a/b/c/page.html?param1=abc&param2=xyz

路径部分应该是:

/a/b/c/page.html

使用 query 读取URI中的查询部分。

例如,请求的URI为:

a/b/c/page.html?param1=abc&param2=xyz

查询部分应该是:

param1=abc&param2=xyz

使用 headers 方法获取HTTP 请求中的请求头部信息。

这个方法返回一个 MultiMap 实例。它像一个普通的Map或Hash,并且它还允许同一个键支持多个值 —— 因为HTTP允许同一个键支持多个请求头的值。

它的键值不区分大小写,这意味着您可以执行以下操作:

MultiMap headers = request.headers();
// Get the User-Agent:
System.out.println("User agent is " + headers.get("user-agent"));
// You can also do this and get the same result:
System.out.println("User agent is " + headers.get("User-Agent"));
request.handler(buffer -> {
  System.out.println("I have received a chunk of the body of length " + buffer.length());

传递给处理器的对象是一个 Buffer ,当数据从网络到达时,处理器可以多次被调用,这取决于请求体的大小。

在某些情况下(例:若请求体很小),您将需要将这个请求体聚合到内存中,以便您可以按照下边的方式进行聚合:

Buffer totalBuffer = Buffer.buffer();
request.handler(buffer -> {
  System.out.println("I have received a chunk of the body of length " + buffer.length());
  totalBuffer.appendBuffer(buffer);
request.endHandler(v -> {
  System.out.println("Full body received, length = " + totalBuffer.length());

这是一个常见的情况,Vert.x为您提供了一个 bodyHandler 方法来执行此操作。当所有请求体被收到时, bodyHandler 绑定的处理器会被调用一次:

request.bodyHandler(totalBuffer -> {
  System.out.println("Full body received, length = " + totalBuffer.length());

处理 HTML 表单

您可使用 application/x-www-form-urlencodedmultipart/form-data 这两种 content-type 来提交 HTML 表单。

对于使用 URL 编码过的表单,表单属性会被编码在URL中,如同普通查询参数一样。

对于 multipart 类型的表单,它会被编码在请求体中,而且在整个请求体被完全读取之前它是不可用的。Multipart 表单还可以包含文件上传。

Multipart 表单还可以包含文件上传。

若您想要读取 multipart 表单的属性,您应该告诉 Vert.x 您会在读取任何正文 之前 调用 setExpectMultipart 方法,然后在整个请求体都被读取后,您可以使用 formAttributes 方法来读取实际的表单属性。

server.requestHandler(request -> {
  request.setExpectMultipart(true);
  request.endHandler(v -> {
    // The body has now been fully read, so retrieve the form attributes
    MultiMap formAttributes = request.formAttributes();
server.requestHandler(request -> {
  request.setExpectMultipart(true);
  request.uploadHandler(upload -> {
    System.out.println("Got a file upload " + upload.name());

上传的文件可能很大,我们不会在单个缓冲区中包含整个上传的数据,因为这样会导致内存耗尽。相反,上传数据是以块的形式被接收的:

request.uploadHandler(upload -> {
  upload.handler(chunk -> {
    System.out.println("Received a chunk of the upload of length " + chunk.length());

上传对象实现了 ReadStream 接口,因此您可以将请求体读取到任何 WriteStream 实例中。详细说明请参阅 流和管道(泵) 章节。

若您只是想将文件上传到服务器的某个磁盘,可以使用 streamToFileSystem 方法:

request.uploadHandler(upload -> {
  upload.streamToFileSystem("myuploads_directory/" + upload.filename());
request.customFrameHandler(frame -> {
  System.out.println("Received a frame type=" + frame.type() +
      " payload" + frame.payload().toString());

HTTP/2 帧不受流量控制限制 —— 当接收到自定义帧时,不论请求是否暂停,自定义帧处理器都将立即被调用。

非标准的 HTTP 方法

OTHER HTTP 方法可用于非标准方法,在这种情况下, rawMethod 方法返回客户端发送的实际 HTTP 方法。

默认情况下,Vert.x 不会自动关闭 keep-alive 的连接,若您想要在一段空闲时间之后让 Vert.x 自动关闭 keep-alive 的连接,则使用 setIdleTimeout 方法进行配置。

HTTP/2 连接在关闭响应之前会发送 GOAWAY 帧。

设置响应头

HTTP 响应头可直接添加到 HTTP 响应中,通常直接操作 headers

HttpServerResponse response = request.response();
MultiMap headers = response.headers();
headers.set("content-type", "text/html");
headers.set("other-header", "wibble");

或您可使用 putHeader 方法:

HttpServerResponse response = request.response();
response.putHeader("content-type", "text/html").putHeader("other-header", "wibble");

响应头必须在写入响应正文消息之前进行设置。

分块 HTTP 响应和附加尾部

Vert.x 支持 分块传输编码(HTTP Chunked Transfer Encoding).

这允许HTTP 响应体以块的形式写入,通常在响应体预先不知道尺寸、需要将很大响应正文以流式传输到客户端时使用。

您可以通过如下方式开启分块模式:

HttpServerResponse response = request.response();
response.setChunked(true);

默认是不分块的,当处于分块模式,每次调用任意一个 write 方法将导致新的 HTTP 块被写出。

在分块模式下,您还可以将响应的HTTP 响应附加尾部(trailers)写入响应,这种方式实际上是在写入响应的最后一块。

HttpServerResponse response = request.response();
response.setChunked(true);
MultiMap trailers = response.trailers();
trailers.set("X-wibble", "woobble").set("X-quux", "flooble");

或者调用 putTrailer 方法:

HttpServerResponse response = request.response();
response.setChunked(true);
response.putTrailer("X-wibble", "woobble").putTrailer("X-quux", "flooble");

或者,Vert.x 提供了一种方法,允许您在一个操作中将文件从磁盘或文件系统中读取并提供给HTTP 响应。若底层操作系统支持,这会导致操作系统不通过用户空间复制而直接将文件内容中字节数据从文件传输到Socket。

这是使用 sendFile 方法完成的,对于大文件处理通常更有效,而这个方法对于小文件可能很慢。

这儿是一个非常简单的 Web 服务器,它使用 sendFile 方法从文件系统中读取并提供文件:

vertx.createHttpServer().requestHandler(request -> {
  String file = "";
  if (request.path().equals("/")) {
    file = "index.html";
  } else if (!request.path().contains("..")) {
    file = request.path();
  request.response().sendFile("web/" + file);
}).listen(8080);

发送文件是异步的,可能在调用返回一段时间后才能完成。如果要在文件写入时收到通知,可以在 sendFile 方法中设置一个处理器。 sendFile

请阅读 从 Classpath 访问文件 章节了解类路径的限制或禁用它。

try { offset = Long.parseLong(request.getParam("start")); } catch (NumberFormatException e) { // error handling... long end = Long.MAX_VALUE; try { end = Long.parseLong(request.getParam("end")); } catch (NumberFormatException e) { // error handling... request.response().sendFile("web/mybigfile.txt", offset, end); }).listen(8080);

若您想要从偏移量开始发送文件直到尾部,则不需要提供长度信息,这种情况下,您可以执行以下操作:

vertx.createHttpServer().requestHandler(request -> {
  long offset = 0;
  try {
    offset = Long.parseLong(request.getParam("start"));
  } catch (NumberFormatException e) {
    // error handling...
  request.response().sendFile("web/mybigfile.txt", offset);
}).listen(8080);

服务端响应 HttpServerResponse 也是一个 WriteStream 实例,因此您可以从任何 ReadStream 向其泵送数据,如 AsyncFile , NetSocket , WebSocketHttpServerRequest

这儿有一个例子,它回应了任何 PUT 方法的响应中的请求体,它为请求体使用了 Pump,所以即使 HTTP 请求体很大并填满了内存,任何时候它依旧会工作:

vertx.createHttpServer().requestHandler(request -> {
  HttpServerResponse response = request.response();
  if (request.method() == HttpMethod.PUT) {
    response.setChunked(true);
    Pump.pump(request, response).start();
    request.endHandler(v -> response.end());
  } else {
    response.setStatusCode(400).end();
}).listen(8080);
request.response().exceptionHandler(err -> {
  if (err instanceof StreamResetException) {
    StreamResetException reset = (StreamResetException) err;
    System.out.println("Stream reset " + reset.getCode());
    // The server is ready to push the response
    HttpServerResponse pushedResponse = ar.result();
    // Send main.js response
    pushedResponse.
        putHeader("content-type", "application/json").
        end("alert(\"Push response hello\")");
  } else {
    System.out.println("Could not push client resource " + ar.cause());
// Send the requested resource
response.sendFile("<html><head><script src=\"/main.js\"></script></head><body></body></html>");

当服务器准备推送响应时,推送响应处理器会被调用,并会发送响应。

推送响应处理器客户能会接收到失败,如:客户端可能取消推送,因为它已经在缓存中包含了 main.js ,并不在需要它。

您必须在响应结束之前调用 push 方法,但是在推送响应过后依然可以写响应。

Handling exceptions

You can set an exceptionHandler to receive any exceptions that happens before the connection is passed to the requestHandler or to the websocketHandler , e.g during the TLS handshake.

使用高于 1-2 的压缩级别通常允许仅仅保存一些字节大小 —— 它的增益不是线性的,并取决于要压缩的特定数据 —— 但它可以满足服务器所要求的CPU周期的不可控的成本(注意现在Vert.x不支持任何缓存形式的响应数据,如静态文件,因此压缩是在每个请求体生成时进行的),它可生成压缩过的响应数据、并对接收的响应解码(膨胀)—— 和客户端使用的方式一致,这种操作随着压缩级别的增长会变得更加倾向于CPU密集型。

默认情况下 —— 如果通过 setCompressionSupported 方法启用压缩,Vert.x 将使用 6 作为压缩级别,但是该参数可通过 setCompressionLevel 方法来更改。

创建 HTTP 客户端

您可通过以下方式创建一个具有默认配置的 HttpClient 实例:

HttpClient client = vertx.createHttpClient();

若您想要配置客户端选项,可按以下方式创建:

HttpClientOptions options = new HttpClientOptions().setKeepAlive(false);
HttpClient client = vertx.createHttpClient(options);

Vert.x 支持基于 TLS h2 和 TCP h2c 的 HTTP/2 协议。

默认情况下,HTTP 客户端会发送 HTTP/1.1 请求。若要执行 HTTP/2 请求,则必须调用 setProtocolVersion 方法将版本设置成 HTTP_2

对于 h2 请求,必须使用应用层协议协商(ALPN)启用TLS:

HttpClientOptions options = new HttpClientOptions().
    setProtocolVersion(HttpVersion.HTTP_2).
    setSsl(true).
    setUseAlpn(true).
    setTrustAll(true);
HttpClient client = vertx.createHttpClient(options);

对于 h2c 请求,TLS必须禁用,客户端将执行 HTTP/1.1 请求并尝试升级到 HTTP/2:

HttpClientOptions options = new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2);
HttpClient client = vertx.createHttpClient(options);

h2c 连接也可以直接建立,如连接可以使用前文提到的方式创建,当 setHttp2ClearTextUpgrade 选项设置为 false 时:建立连接后,客户端将发送 HTTP/2 连接前缀,并期望从服务端接收相同的连接偏好。

HTTP 服务端可能不支持 HTTP/2,当响应到达时,可以使用 version 方法检查响应实际HTTP版本。

当客户端连接到 HTTP/2 服务端时,它将向服务端发送其 初始设置. 。设置定义服务器如何使用连接、客户端的默认初始设置是由 HTTP/2 RFC定义的。

记录客户端网络活动

为了进行调试,可以记录网络活动:

HttpClientOptions options = new HttpClientOptions().setLogActivity(true);
HttpClient client = vertx.createHttpClient(options);

详情请参阅 记录网络活动 章节。

HTTP 客户端是很灵活的,您可以通过各种方式发出请求。

通常您希望使用 HTTP 客户端向同一个主机/端口发送很多请求。为避免每次发送请求时重复设主机/端口,您可以为客户端配置默认主机/端口:

HttpClientOptions options = new HttpClientOptions().setDefaultHost("wibble.com");
// Can also set default port if you want...
HttpClient client = vertx.createHttpClient(options);
client.getNow("/some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());

或者您发现自己使用相同的客户端向不同主机的主机/端口发送大量请求,则可以在发出请求时简单指定主机/端口:

HttpClient client = vertx.createHttpClient();
// Specify both port and host name
client.getNow(8080, "myserver.mycompany.com", "/some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
// This time use the default port 80 but specify the host name
client.getNow("foo.othercompany.com", "/other-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());

用客户端发出请求的所有不同方式都支持这两种指定主机/端口的方法。

无请求体的简单请求

通常,您想发出没有请求体的HTTP 请求,这种情况通常如HTTP GET、OPTIONS 和 HEAD 请求。

使用 Vert.x HTTP Client 执行这种请求最简单的方式是使用加了 Now 后缀的请求方法,如 getNow

这些方法会创建HTTP 请求,并在单个方法调用中发送它,而且允许您提供一个处理器,当HTTP 响应发送回来时调用该处理器来处理响应结果。

HttpClient client = vertx.createHttpClient();
// Send a GET request
client.getNow("/some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
// Send a GET request
client.headNow("/other-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
HttpClient client = vertx.createHttpClient();
client.request(HttpMethod.GET, "some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
}).end();
client.request(HttpMethod.POST, "foo-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
}).end("some-data");
HttpClient client = vertx.createHttpClient();
HttpClientRequest request = client.post("some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
// Now do stuff with the request
request.putHeader("content-length", "1000");
request.putHeader("content-type", "text/plain");
request.write(body);
// Make sure the request is ended when you're done with it
request.end();
// Or fluently:
client.post("some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
}).putHeader("content-length", "1000").putHeader("content-type", "text/plain").write(body).end();
// Or event more simply:
client.post("some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
}).putHeader("content-type", "text/plain").end(body);

可以用UTF-8编码方式编码字符串和以指定方式编码编码字符串、或写 Buffer 的方法:

request.write("some data");
// Write string encoded in specific encoding
request.write("some other data", "UTF-16");
// Write a buffer
Buffer buffer = Buffer.buffer();
buffer.appendInt(123).appendLong(245l);
request.write(buffer);

若您仅需要写单个字符串或 Buffer 到HTTP请求中,您可以直接调用 end 函数完成写入和请求的发送操作。

request.end("some simple data");
// Write buffer and end the request (send it) in a single call
Buffer buffer = Buffer.buffer().appendDouble(12.34d).appendLong(432l);
request.end(buffer);

当您写入请求时,第一次调用 write 方法将先将请求头写入到请求报文中。

实际写入操作是异步的,它可能在调用返回一段时间后才发生。

带请求体的非分块 HTTP 请求需要提供 Content-Length 头。

因此,若您不使用 HTTP 分块,则必须在写入请求之前设置 Content-Length 头,否则会出错。

若您在调用其中一个 end 方法处理 String 或 Buffer,在写入请求体之前,Vert.x 将自动计算并设置 Content-Length

若您在使用HTTP 分块模式,则不需要 Content-Length 头,因此您不必先计算大小。

您可以直接使用 MultiMap 结构的 headers 来设置请求头:

MultiMap headers = request.headers();
headers.set("content-type", "application/json").set("other-header", "foo");

这个headers是一个 MultiMap 的实例,它提供了添加、设置、删除条目的操作。HTTP Header允许一个特定的键包含多个值。

您也可以使用 putHeader 方法编写头文件:

request.putHeader("content-type", "application/json").putHeader("other-header", "foo");

若您想写入请求头,则您必须在写入任何请求体之前这样做来设置请求头。

非标准的HTTP 方法

The OTHER HTTP method is used for non standard methods, when this method is used, setRawMethod must be used to set the raw method to send to the server.

发送 HTTP 请求

一旦完成了 HTTP 请求的准备工作,您必须调用其中一个 end 方法来发送该请求(结束请求)。

结束一个请求时,若请求头尚未被写入,会导致它们被写入,并且请求被标记成完成的。

请求可以通过多种方式结束。无参简单结束请求的方式如:

request.end();

或可以在调用 end 方法时提供 String 或 Buffer,这个和先调用带 String/Buffer 参数的 write 方法之后再调用无参 end 方法一样:

request.end("some-data");
// End it with a buffer
Buffer buffer = Buffer.buffer().appendFloat(12.3f).appendInt(321);
request.end(buffer);
HttpClientRequest request = client.post("some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
request.exceptionHandler(e -> {
  System.out.println("Received exception: " + e.getMessage());
  e.printStackTrace();

这种处理器不处理需要在 HttpClientResponse 中处理的非 2xx 响应:

HttpClientRequest request = client.post("some-uri", response -> {
  if (response.statusCode() == 200) {
    System.out.println("Everything fine");
    return;
  if (response.statusCode() == 500) {
    System.out.println("Unexpected behavior on the server side");
    return;
request.end();
HttpClientRequest request = client.post("some-uri");
request.handler(response -> {
  System.out.println("Received response with status code " + response.statusCode());
request.setChunked(true);
Pump pump = Pump.pump(file, request);
file.endHandler(v -> request.end());
pump.start();
request.exceptionHandler(err -> {
  if (err instanceof StreamResetException) {
    StreamResetException reset = (StreamResetException) err;
    System.out.println("Stream reset " + reset.getCode());
client.getNow("some-uri", response -> {
  // the status code - e.g. 200 or 404
  System.out.println("Status code is " + response.statusCode());
  // the status message e.g. "OK" or "Not Found".
  System.out.println("Status message is " + response.statusMessage());

使用流式响应

HttpClientResponse 实例也是一个 ReadStream 实例,这意味着您可以泵送数据到任何 WriteStream 实例。

响应头和尾

HTTP 响应可包含头信息。您可以使用 headers 方法来读取响应头。

该方法返回的对象是 一个 MultiMap 实例,因为 HTTP 响应头中单个键可以关联多个值。

String contentType = response.headers().get("content-type");
String contentLength = response.headers().get("content-lengh");

分块 HTTP 响应还可以包含响应尾(trailer) —— 这实际上是在发送响应体的最后一个(数据)块。

您可使用 trailers 方法读取响应尾,尾数据也是一个 MultiMap

读取请求体

当从报文中读取到响应头时,响应处理器就会被调用。

如果收到的HTTP 响应包含响应体(正文),它可能会在响应头被读取后的某个时间以分片的方式到达。在调用响应处理器之前,我们不要等待所有的响应体到达,因为它可能非常大而要等待很长时间、又或者会花费大量内存。

当响应体的某部分(数据)到达时,handler 方法绑定的回调函数将会被调用,其中传入的 Buffer 中包含了响应体的这一分片(部分)内容:

client.getNow("some-uri", response -> {
  response.handler(buffer -> {
    System.out.println("Received a part of the response body: " + buffer);

若您知道响应体不是很大,并想在处理之前在内存中聚合所有响应体数据,那么您可以自己聚合:

client.getNow("some-uri", response -> {
  // Create an empty buffer
  Buffer totalBuffer = Buffer.buffer();
  response.handler(buffer -> {
    System.out.println("Received a part of the response body: " + buffer.length());
    totalBuffer.appendBuffer(buffer);
  response.endHandler(v -> {
    // Now all the body has been read
    System.out.println("Total response body length is " + totalBuffer.length());

或者当响应已被完全读取时,您可以使用 bodyHandler 方法以便读取整个响应体:

client.getNow("some-uri", response -> {
  response.bodyHandler(totalBuffer -> {
    // Now all the body has been read
    System.out.println("Total response body length is " + totalBuffer.length());

30x 重定向处理器

客户端可配置成遵循HTTP 重定向:当客户端接收到 301302303307 状态代码时,它遵循由 Location 响应头提供的重定向,并且响应处理器将传递重定向响应以替代原始响应。

这有个例子:

client.get("some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
}).setFollowRedirects(true).end();

重定向策略如下:

client.get("some-uri", response -> { System.out.println("Received response with status code " + response.statusCode()); }).setFollowRedirects(true).end();

没有放之四海而皆准的策略,缺省的重定向策略可能不能满足您的需要。

默认重定向策略可使用自定义实现更改:

client.redirectHandler(response -> {
  // Only follow 301 code
  if (response.statusCode() == 301 && response.getHeader("Location") != null) {
    // Compute the redirect URI
    String absoluteURI = resolveURI(response.request().absoluteURI(), response.getHeader("Location"));
    // Create a new ready to use request that the client will use
    return Future.succeededFuture(client.getAbs(absoluteURI));
  // We don't redirect
  return null;

这个策略将会处理接收到的原始 HttpClientResponse ,并返回 nullFuture<HttpClientRequest>

HttpClientRequest request = client.put("some-uri", response -> {
  System.out.println("Received response with status code " + response.statusCode());
request.putHeader("Expect", "100-Continue");
request.continueHandler(v -> {
  // OK to send rest of body
  request.write("Some data");
  request.write("Some more data");
  request.end();

在服务端,Vert.x HTTP Server可配置成接收到 Expect: 100-Continue 头时自动发回 100 Continue 临时响应信息。

这个可通过 setHandle100ContinueAutomatically 方法来设置。

若您想要决定是否手动发送持续响应,那么此属性可设置成 false (默认值),然后您可以通过检查头信息并且调用 writeContinue 方法让客户端持续发送请求体:

httpServer.requestHandler(request -> {
  if (request.getHeader("Expect").equalsIgnoreCase("100-Continue")) {
    // Send a 100 continue response
    request.response().writeContinue();
    // The client should send the body when it receives the 100 response
    request.bodyHandler(body -> {
      // Do something with body
    request.endHandler(v -> {
      request.response().end();

您也可以通过直接发送故障状态代码来拒绝该请求:这种情况下,请求体应该被忽略或连接应该被关闭( 100-Continue 是一个性能提示,并不是逻辑协议约束):

httpServer.requestHandler(request -> {
  if (request.getHeader("Expect").equalsIgnoreCase("100-Continue")) {
    boolean rejectAndClose = true;
    if (rejectAndClose) {
      // Reject with a failure code and close the connection
      // this is probably best with persistent connection
      request.response()
          .setStatusCode(405)
          .putHeader("Connection", "close")
          .end();
    } else {
      // Reject with a failure code and ignore the body
      // this may be appropriate if the body is small
      request.response()
          .setStatusCode(405)
          .end();
HttpClientRequest request = client.get("/index.html", response -> {
  // Process index.html response
// Set a push handler to be aware of any resource pushed by the server
request.pushHandler(pushedRequest -> {
  // A resource is pushed for this request
  System.out.println("Server pushed " + pushedRequest.path());
  // Set an handler for the response
  pushedRequest.handler(pushedResponse -> {
    System.out.println("The response for the pushed request");
// End the request
request.end();

若客户端不想收到推送请求,它可重置流:

request.pushHandler(pushedRequest -> {
  if (pushedRequest.path().equals("/main.js")) {
    pushedRequest.reset();
  } else {
    // Handle it

若没有设置任何处理器时,任何被推送的流将被客户端自动重置流(错误代码 8 )。

接收自定义 HTTP/2 帧

HTTP/2 是用于 HTTP 请求/响应模型的具有各种帧的一个帧协议,该协议允许发送和接收其他类型的帧。

要接收自定义帧,您可以在请求中使用 customFrameHandler ,每次自定义帧到达时就会调用它。以下是一个例子:

response.customFrameHandler(frame -> {
  System.out.println("Received a frame type=" + frame.type() +
      " payload" + frame.payload().toString());

要告诉服务器当前客户端支持哪种压缩,则它(请求头)将包含一个 Accept-Encoding 头,其值为可支持的压缩算法,(该值可)支持多种压缩算法。这种情况 Vert.x 将添加以下头:

Accept-Encoding: gzip, deflate

服务器将从其中(算法)选择一个,您可以通过服务器发回的响应中响应头 Content-Encoding 来检测服务器是否适应这个正文。

若响应体通过 gzip 压缩,它将包含例如下边的头:

Content-Encoding: gzip

创建客户端时可使用 setTryUseCompression 设置配置项启用压缩。

默认情况压缩被禁用。

HTTP/1.x Pooling 和 Keep alive

HTTP 的 Keep Alive 允许单个 HTTP 连接用于多个请求。当您向同一台服务器发送多个请求时,可以更加有效使用连接。

对于 HTTP/1.x 版本,HTTP 客户端支持连接池,它允许您重用请求之间的连接。

为了连接池(能)工作,配置客户端时,keep alive 必须通过 setKeepAlive 方法设置成 true 。默认值为 true

当 keep alive 启用时,Vert.x 将为每一个发送的 HTTP/1.0 请求添加一个 Connection: Keep-Alive 头。 当 keep alive 禁用时,Vert.x 将为每一个 HTTP/1.1 请求添加一个 Connection: Close 头 —— 表示在响应完成后连接将被关闭。

可使用 setMaxPoolSize 方法为每个服务器配置连接池的最大连接数。

当启用连接池创建请求时,若存在少于已经为服务器创建的最大连接数,Vert.x 将创建一个新连接,否则直接将请求添加到队列中。

Keep Alive的连接将不会被客户端自动关闭,要关闭它们您可以关闭客户端实例。

keep-alive: timeout=30

或者,您可使用 setKeepAliveTimeout 设置空闲时间——在设置的时间内然后没使用的连接将被关闭。请注意空闲超时值以秒为单位而不是毫秒。

HTTP/1.1 pipe-lining

客户端还支持连接上的请求管道(pipeline)。

管道意味着在返回一个响应之前,在同一个连接上发送另一个请求,管道不适合所有请求。

若要启用管道,必须调用 setPipelining 方法,默认管道是禁止的。

当启用管道时,请求可以不等待以前的响应返回而写入到连接。

单个连接的管道请求限制数由 setPipeliningLimit 方法设置,此选项定义了发送到服务器的等待响应的最大请求数。这个限制可以确保和同一个服务器的连接分发到客户端的公平性。

HTTP/2 多路复用

HTTP/2 提倡使用服务器的单一连接,默认情况下,HTTP 客户端针对每个服务器都使用单一连接,同样服务器上的所有流都会复用到对应连接中。

当客户端需要使用连接池并使用超过一个连接时,则可使用 setHttp2MaxPoolSize

当您希望限制每个连接的多路复用流数量而使用连接池而不是单个连接时,可使用 setHttp2MultiplexingLimit

HttpClientOptions clientOptions = new HttpClientOptions().
    setHttp2MultiplexingLimit(10).
    setHttp2MaxPoolSize(3);
// Uses up to 3 connections and up to 10 streams per connection
HttpClient client = vertx.createHttpClient(clientOptions);

连接的复用限制是在客户端上设置限制单个连接的流数量,如果服务器使用 SETTINGS_MAX_CONCURRENT_STREAMS 设置了下限,则有效值可以更低。

HTTP/2 连接不会被客户端自动关闭,若要关闭它们,可以调用 close 来关闭客户端实例。

或者,您可以使用 setIdleTimeout 设置空闲时间——这个时间内没有使用的任何连接将被关闭,注意,空闲时间以秒为单位,不是毫秒。

HTTP 连接

HttpConnection 接口提供了处理HTTP 连接事件、生命周期、设置的API。

HTTP/2 实现了完整的 HttpConnection API。

HTTP/1.x 实现了 HttpConnection 中的部分API:仅关闭操作,实现了关闭处理器和异常处理器。该协议并不提供其他操作的语义。

服务端连接

connection 方法会返回服务器上的请求连接:

HttpConnection connection = request.connection();

可以在服务器上设置连接处理器,任意连接传入时可得到通知:

HttpServer server = vertx.createHttpServer(http2Options);
server.connectionHandler(connection -> {
  System.out.println("A client connected");
connection.updateSettings(new Http2Settings().setMaxConcurrentStreams(100), ar -> {
  if (ar.succeeded()) {
    System.out.println("The settings update has been acknowledged ");

相反,在收到新的远程设置时会通知 remoteSettingsHandler

connection.remoteSettingsHandler(settings -> {
  System.out.println("Received new settings");
io.vertx.examples.http.sharing.HttpServerVerticle
vertx.createHttpServer().requestHandler(request -> {
  request.response().end("Hello from server " + this);
}).listen(8080);

这个服务正在监听 8080 端口。所以,当这个 Verticle 被实例化多次,如运行以下命令: vertx run io.vertx.examples.http.sharing.HttpServerVerticle -instances 2 , 将会发生什么?如果两个 Verticle 都绑定到同一个端口,您将收到一个 Socket 异常。幸运的是,Vert.x 可以为您处理这种情况。在与现有服务端相同的主机和端口上部署另一个服务器时,实际上并不会尝试创建在同一主机/端口上监听的新服务端,它只绑定一次到Socket,当接收到请求时,会按照轮询策略调用服务端的请求处理函数。

我们现在想象一个客户端,如下:

vertx.setPeriodic(100, (l) -> {
  vertx.createHttpClient().getNow(8080, "localhost", "/", resp -> {
    resp.bodyHandler(body -> {
      System.out.println(body.toString("ISO-8859-1"));

Vert.x 将请求顺序委托给其中一个服务器:

Hello from i.v.e.h.s.HttpServerVerticle@1
Hello from i.v.e.h.s.HttpServerVerticle@2
Hello from i.v.e.h.s.HttpServerVerticle@1
Hello from i.v.e.h.s.HttpServerVerticle@2

因此,服务器可直接扩展可用的核,而每个 Vert.x 中的 Verticle 实例仍然严格使用单线程,您不需要像编写负载均衡器那样使用任何特殊技巧去编写,以便在多核机器上扩展服务器。

使用 HTTPS

Vert.x 的 HTTP 服务端和客户端可以配置成和网络服务器完全相同的方式使用 HTTPS。

有关详细信息,请参阅 配置网络服务器以使用 SSL 章节。

SSL可以通过每个请求的 RequestOptions 来启用/禁用,或在指定模式时调用 requestAbs

client.getNow(new RequestOptions()
    .setHost("localhost")
    .setPort(8080)
    .setURI("/")
    .setSsl(true), response -> {
  System.out.println("Received response with status code " + response.statusCode());

setSsl 设置将用作客户端默认配置。

setSsl 将覆盖默认客户端设置:

Server Name Indication (SNI)

Vert.x http servers can be configured to use SNI in exactly the same way as {@linkplain io.vertx.core.net net servers} .

Vert.x http client will present the actual hostname as server name during the TLS handshake.

server.websocketHandler(websocket -> {
  if (websocket.path().equals("/myapi")) {
    websocket.reject();
  } else {
    // Do something
Buffer buffer = Buffer.buffer().appendInt(123).appendFloat(1.23f);
websocket.writeBinaryMessage(buffer);
// Write a simple text message
String message = "hello";
websocket.writeTextMessage(message);

若WebSocket 消息大于使用 setMaxWebsocketFrameSize 设置的WebSocket 的帧的最大值,则Vert.x在将其发送到报文之前将其拆分为多个WebSocket 帧。

向 WebSocket 写入帧

WebSocket 消息可以由多个帧组成,在这种情况下,第一帧是二进制或文本帧(text | binary),后边跟着零个或多个 连续 帧。

消息中的最后一帧标记成 final

要发送多个帧组成的消息,请使用 WebSocketFrame.binaryFrame , WebSocketFrame.textFrameWebSocketFrame.continuationFrame 方法创建帧,并使用 writeFrame 方法将其写入WebSocket。

以下是二进制帧的示例:

WebSocketFrame frame1 = WebSocketFrame.binaryFrame(buffer1, false);
websocket.writeFrame(frame1);
WebSocketFrame frame2 = WebSocketFrame.continuationFrame(buffer2, false);
websocket.writeFrame(frame2);
// Write the final frame
WebSocketFrame frame3 = WebSocketFrame.continuationFrame(buffer2, true);
websocket.writeFrame(frame3);

许多情况下,您只需要发送一个包含了单个最终帧的 WebSocket 消息,因此我们提供了 writeFinalBinaryFramewriteFinalTextFrame 这两个快捷方法。

下边是示例:

websocket.writeFinalTextFrame("Geronimo!");
// Send a websocket messages consisting of a single final binary frame:
Buffer buff = Buffer.buffer().appendInt(12).appendString("foo");
websocket.writeFinalBinaryFrame(buff);

使用 HTTP/HTTPS 连接代理

HTTP 客户端支持通过HTTP 代理(如Squid)或 SOCKS4aSOCKS5 代理访问 HTTP/HTTPS 的 URL。CONNECT 协议使用 HTTP/1.x,但可以连接到 HTTP/1.x 和 HTTP/2 服务器。

h2c (未加密HTTP/2服务器)的连接可能不受 HTTP 代理支持,因为代理仅支持 HTTP/1.1。

您可以通过 HttpClientOptions 中的 ProxyOptions 对象配置来配置代理(包括代理类型、主机名、端口和可选用户名和密码)。

以下是使用 HTTP 代理的例子:

HttpClientOptions options = new HttpClientOptions()
    .setProxyOptions(new ProxyOptions().setType(ProxyType.HTTP)
        .setHost("localhost").setPort(3128)
        .setUsername("username").setPassword("secret"));
HttpClient client = vertx.createHttpClient(options);

当客户端连接到HTTP URL时,它连接到代理服务器,并在HTTP请求中提供完整URL ("GET http://www.somehost.com/path/file.html HTTP/1.1").

当客户端连接到HTTPS URL时,它要求代理使用 CONNECT 方法创建到远程主机的通道。

对于 SOCKS5 代理:

HttpClientOptions options = new HttpClientOptions()
    .setProxyOptions(new ProxyOptions().setType(ProxyType.SOCKS5)
        .setHost("localhost").setPort(1080)
        .setUsername("username").setPassword("secret"));
HttpClient client = vertx.createHttpClient(options);

DNS 解析会一直在代理服务器上执行。为了实现 SOCKS4 客户端的功能,需要先在本地解析 DNS 地址。

Handling of other protocols

The HTTP proxy implementation supports getting ftp:// urls if the proxy supports that, which isn’t available in non-proxy getAbs requests.

HttpClientOptions options = new HttpClientOptions()
    .setProxyOptions(new ProxyOptions().setType(ProxyType.HTTP));
HttpClient client = vertx.createHttpClient(options);
client.getAbs("ftp://ftp.gnu.org/gnu/", response -> {
  System.out.println("Received response with status code " + response.statusCode());

Support for other protocols is not available since java.net.URL does not support them (gopher:// for example).

共享数据(Shared Data)包含的功能允许您可以安全地在应用程序的不同部分之间、同一 Vert.x 实例中的不同应用程序之间或集群中的不同 Vert.x 实例之间安全地共享数据。

共享数据包括:

本地共享Map

本地共享Map Local shared maps 允许您在同一个 Vert.x 实例中的不同 Event Loop(如不同的 Verticle 中)之间安全共享数据。

本地共享Map仅允许将某些数据类型作为键值和值,这些类型必须是不可变的,或可以像 Buffer 那样复制某些其他类型。在后一种情况中,键/值将被复制,然后再放到Map中。

这样,我们可以确保在Vert.x应用程序不同线程之间 没有共享访问可变状态,因此您不必担心需要通过同步访问来保护该状态。

以下是使用 LocalMap 的示例:

SharedData sd = vertx.sharedData();
LocalMap<String, String> map1 = sd.getLocalMap("mymap1");
map1.put("foo", "bar"); // Strings are immutable so no need to copy
LocalMap<String, Buffer> map2 = sd.getLocalMap("mymap2");
map2.put("eek", Buffer.buffer().appendInt(123)); // This buffer will be copied before adding to map
// Then... in another part of your application:
map1 = sd.getLocalMap("mymap1");
String val = map1.get("foo");
map2 = sd.getLocalMap("mymap2");
Buffer buff = map2.get("eek");
In clustered mode, asynchronous shared maps rely on distributed data structures provided by the cluster manager. Beware that the latency relative to asynchronous shared map operations can be much higher in clustered than in local mode. sd.<String, String>getAsyncMap("mymap", res -> { if (res.succeeded()) { AsyncMap<String, String> map = res.result(); } else { // Something went wrong!

将数据放入Map

您可以使用 put 方法将数据放入Map。

put 方法是异步的,一旦完成它会通知处理器:

map.put("foo", "bar", resPut -> {
  if (resPut.succeeded()) {
    // Successfully put the value
  } else {
    // Something went wrong!
    Lock lock = res.result();
    // 5 seconds later we release the lock so someone else can get it
    vertx.setTimer(5000, tid -> lock.release());
  } else {
    // Something went wrong

您可以为锁设置一个超时,若在超时时间期间无法获取锁,将会进入失败状态,处理器会去处理对应的异常:

sd.getLockWithTimeout("mylock", 10000, res -> {
  if (res.succeeded()) {
    // Got the lock!
    Lock lock = res.result();
  } else {
    // Failed to get lock
// Copy file from foo.txt to bar.txt
fs.copy("foo.txt", "bar.txt", res -> {
  if (res.succeeded()) {
    // Copied ok!
  } else {
    // Something went wrong

阻塞版本的方法名为 xxxBlocking ,它要么返回结果或直接抛出异常。 很多情况下,一些潜在的阻塞操作可以快速返回(这取决于操作系统和文件系统),这就是我们为什么提供它。 但是强烈建议您在 Event Loop 中使用它之前测试使用它们究竟需要耗费多长时间,以避免打破黄金法则。

以下是使用阻塞 API的拷贝示例:

FileSystem fs = vertx.fileSystem();
// Copy file from foo.txt to bar.txt synchronously
fs.copyBlocking("foo.txt", "bar.txt");

Vert.x 文件系统支持诸如 copy、move、truncate、chmod 和许多其他文件操作。我们不会在这里列出所有内容,请参考 API 文档 获取完整列表。

让我们看看使用异步方法的几个例子:

vertx.fileSystem().readFile("target/classes/readme.txt", result -> {
  if (result.succeeded()) {
    System.out.println(result.result());
  } else {
    System.err.println("Oh oh ..." + result.cause());
// Copy a file
vertx.fileSystem().copy("target/classes/readme.txt", "target/classes/readme2.txt", result -> {
  if (result.succeeded()) {
    System.out.println("File copied");
  } else {
    System.err.println("Oh oh ..." + result.cause());
// Write a file
vertx.fileSystem().writeFile("target/classes/hello.txt", Buffer.buffer("Hello"), result -> {
  if (result.succeeded()) {
    System.out.println("File written");
  } else {
    System.err.println("Oh oh ..." + result.cause());
// Check existence and delete
vertx.fileSystem().exists("target/classes/junk.txt", result -> {
  if (result.succeeded() && result.result()) {
    vertx.fileSystem().delete("target/classes/junk.txt", r -> {
      System.out.println("File deleted");
  } else {
    System.err.println("Oh oh ... - cannot delete the file: " + result.cause());

异步文件访问

Vert.x提供了异步文件访问的抽象,允许您操作文件系统上的文件。

您可以像下边代码打开一个 AsyncFile

OpenOptions options = new OpenOptions();
fileSystem.open("myfile.txt", options, res -> {
  if (res.succeeded()) {
    AsyncFile file = res.result();
  } else {
    // Something went wrong!

AsyncFile 实现了 ReadStreamWriteStream 接口,因此您可以将文件和其他流对象配合 工作,如 NetSocket 、HTTP 请求和响应和 WebSocket 等。

它们还允许您直接读写。

随机访问写

要使用 AsyncFile 进行随机访问写,请使用 write 方法。

这个方法的参数有:

vertx.fileSystem().open("target/classes/hello.txt", new OpenOptions(), result -> {
  if (result.succeeded()) {
    AsyncFile file = result.result();
    Buffer buff = Buffer.buffer("foo");
    for (int i = 0; i < 5; i++) {
      file.write(buff, buff.length() * i, ar -> {
        if (ar.succeeded()) {
          System.out.println("Written ok!");
          // etc
        } else {
          System.err.println("Failed to write: " + ar.cause());
  } else {
    System.err.println("Cannot open file " + result.cause());
vertx.fileSystem().open("target/classes/les_miserables.txt", new OpenOptions(), result -> {
  if (result.succeeded()) {
    AsyncFile file = result.result();
    Buffer buff = Buffer.buffer(1000);
    for (int i = 0; i < 10; i++) {
      file.read(buff, i * 100, i * 100, 100, ar -> {
        if (ar.succeeded()) {
          System.out.println("Read ok!");
        } else {
          System.err.println("Failed to write: " + ar.cause());
  } else {
    System.err.println("Cannot open file " + result.cause());

打开 AsyncFile 时,您可以传递一个 OpenOptions 实例,这些选项描述了访问文件的行为。例如:您可使用 setRead , setWritesetPerms 方法配置文件访问权限。

若打开的文件已经存在,则可以使用 setCreateNewsetTruncateExisting 配置对应行为。

您可以使用 setDeleteOnClose 标记在关闭时或JVM停止时要删除的文件。

将数据刷新到底层存储

OpenOptions 中,您可以使用 setDsync . 方法在每次写入时启用/禁用内容的自动同步。这种情况下,您可以使用 flush 方法手动刷新OS缓存中的数据写入。

该方法也可使用一个处理器来调用,这个处理器在 flush 完成时被调用。

将 AsyncFile 作为 ReadStream 和 WriteStream

AsyncFile 实现了 ReadStreamWriteStream 接口。您可以使用泵将数据与其他读取和写入流进行数据*泵*送。 例如,这会将内容复制到另外一个 AsyncFile

final AsyncFile output = vertx.fileSystem().openBlocking("target/classes/plagiary.txt", new OpenOptions());
vertx.fileSystem().open("target/classes/les_miserables.txt", new OpenOptions(), result -> {
  if (result.succeeded()) {
    AsyncFile file = result.result();
    Pump.pump(file, output).start();
    file.endHandler((r) -> {
      System.out.println("Copy done");
  } else {
    System.err.println("Cannot open file " + result.cause());

您还可以使用泵将文件内容写入到HTTP 响应中,或者写入任意 WriteStream

从 Classpath 访问文件

当Vert.x找不到文件系统上的文件时,它尝试从类路径中解析该文件。请注意,类路径的资源路径不以 / 开头。

由于Java不提供对类路径资源的异步方法,所以当类路径资源第一次被访问时,该文件将复制到工作线程中的文件系统。 当第二次访问相同资源时,访问的文件直接从(工作线程的)文件系统提供。即使类路径资源发生变化(例如开发系统中),也会提供原始内容。

您可以将系统属性 vertx.disableFileCaching 设置为 true ,禁用此(文件)缓存行为。

文件缓存的路径默认为 .vertx ,它可以通过设置系统属性 vertx.cacheDirBase 进行自定义。

您还可以通过系统属性 vertx.disableFileCPResolving 设置为 true 来禁用整个类路径解析功能。

If you want to disable classpath resolving for a particular application but keep it enabled by default system-wide, you can do so via the setClassPathResolvingEnabled option.

关闭 AsyncFile

您可调用 close 方法来关闭 AsyncFile 。关闭是异步的,如果希望在关闭过后收到通知,您可指定一个处理器作为函数( close )参数传入。

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
Buffer buffer = Buffer.buffer("content");
// Send a Buffer
socket.send(buffer, 1234, "10.0.0.1", asyncResult -> {
  System.out.println("Send succeeded? " + asyncResult.succeeded());
// Send a String
socket.send("A string used as content", 1234, "10.0.0.1", asyncResult -> {
  System.out.println("Send succeeded? " + asyncResult.succeeded());
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket.listen(1234, "0.0.0.0", asyncResult -> {
  if (asyncResult.succeeded()) {
    socket.handler(packet -> {
      // Do something with the packet
  } else {
    System.out.println("Listen failed" + asyncResult.cause());

注意,即使 {code AsyncResult} 成功,它只意味着它可能已经写入了网络堆栈,但不保证它已经到达或者将到达远端。

若您需要这样的保证,您可在TCP之上建立一些握手逻辑。

发送多播数据包

多播允许多个Socket接收相同的数据包,该目标可以通过加入到同一个可发送数据包的多播组来实现。

我们将在下一节中介绍如何加入多播组,从而接收数据包。

现在让我们专注于如何发送多播报文,发送多播报文与发送普通数据报报文没什么不同。 唯一的区别是您可以将多播组的地址传递给 send 方法发送出去。

如下所示:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
Buffer buffer = Buffer.buffer("content");
// Send a Buffer to a multicast address
socket.send(buffer, 1234, "230.0.0.1", asyncResult -> {
  System.out.println("Send succeeded? " + asyncResult.succeeded());

所有已经加入多播组 230.0.0.1 的Socket都将收到该报文。

接收多播数据包

若要接收特定多播组的数据包,您需要通过调用 DatagramSocketlisten(…​) 方法来绑定一个地址并且加入多播组,并加入多播组。

这样,您将能够接收到被发送到 DatagramSocket 所监听的地址和端口的数据报,同时也可以接收被发送到该多播组的数据报。

除此之外,您还可设置一个处理器,它在每次接收到 DatagramPacket 时会被调用。

DatagramPacket 有以下方法:

DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket.listen(1234, "0.0.0.0", asyncResult -> {
  if (asyncResult.succeeded()) {
    socket.handler(packet -> {
      // Do something with the packet
    // join the multicast group
    socket.listenMulticastGroup("230.0.0.1", asyncResult2 -> {
        System.out.println("Listen succeeded? " + asyncResult2.succeeded());
  } else {
    System.out.println("Listen failed" + asyncResult.cause());
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
socket.listen(1234, "0.0.0.0", asyncResult -> {
    if (asyncResult.succeeded()) {
      socket.handler(packet -> {
        // Do something with the packet
      // join the multicast group
      socket.listenMulticastGroup("230.0.0.1", asyncResult2 -> {
          if (asyncResult2.succeeded()) {
            // will now receive packets for group
            // do some work
            socket.unlistenMulticastGroup("230.0.0.1", asyncResult3 -> {
              System.out.println("Unlisten succeeded? " + asyncResult3.succeeded());
          } else {
            System.out.println("Listen failed" + asyncResult2.cause());
    } else {
      System.out.println("Listen failed" + asyncResult.cause());
DatagramSocket socket = vertx.createDatagramSocket(new DatagramSocketOptions());
// Some code
// This would block packets which are send from 10.0.0.2
socket.blockMulticastGroup("230.0.0.1", "10.0.0.2", asyncResult -> {
  System.out.println("block succeeded? " + asyncResult.succeeded());
DnsClient client = vertx.createDnsClient(new DnsClientOptions()
  .setPort(53)
  .setHost("10.0.0.1")
  .setQueryTimeout(10000)

Creating the client with no arguments or omitting the server address will use the address of the server used internally for non blocking address resolution.

DnsClient client1 = vertx.createDnsClient();
// Just the same but with a different query timeout
DnsClient client2 = vertx.createDnsClient(new DnsClientOptions().setQueryTimeout(10000));

请注意,您可以传入 InetSocketAddress 参数的变量,以指定多个的DNS服务器来尝试查询解析DNS。它将按照此处指定的相同顺序查询DNS服务器,若在使用上一个DNS服务器解析时出现了错误,下一个将会被继续调用。

lookup

当尝试为一个指定名称元素获取A(ipv4)或 AAAA(ipv6)记录时,第一条被返回的(记录)将会被使用。它的操作方式和操作系统上使用 nslookup 类似。

要为 vertx.io 获取 A/AAAA 记录,您需要像下面那样做:

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.lookup("vertx.io", ar -> {
  if (ar.succeeded()) {
    System.out.println(ar.result());
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.lookup4("vertx.io", ar -> {
  if (ar.succeeded()) {
    System.out.println(ar.result());
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.lookup6("vertx.io", ar -> {
  if (ar.succeeded()) {
    System.out.println(ar.result());
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveA("vertx.io", ar -> {
  if (ar.succeeded()) {
    List<String> records = ar.result();
    for (String record : records) {
      System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveAAAA("vertx.io", ar -> {
  if (ar.succeeded()) {
    List<String> records = ar.result();
    for (String record : records) {
      System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveCNAME("vertx.io", ar -> {
  if (ar.succeeded()) {
    List<String> records = ar.result();
    for (String record : records) {
      System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveMX("vertx.io", ar -> {
  if (ar.succeeded()) {
    List<MxRecord> records = ar.result();
    for (MxRecord record: records) {
      System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());

请注意,列表将包含按照它们优先级排序的 MxRecord ,这意味着列表中优先级低的MX记录会第一个优先出现在列表中。

MxRecord 允许您通过下边提供的方法访问MX记录的优先级和名称:

record.priority();
record.name();
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveTXT("vertx.io", ar -> {
  if (ar.succeeded()) {
    List<String> records = ar.result();
    for (String record: records) {
      System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveNS("vertx.io", ar -> {
  if (ar.succeeded()) {
    List<String> records = ar.result();
    for (String record: records) {
      System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolveSRV("vertx.io", ar -> {
  if (ar.succeeded()) {
    List<SrvRecord> records = ar.result();
    for (SrvRecord record: records) {
      System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());

请注意,列表将包含按照它们优先级排序的 SrvRecord ,这意味着优先级低的记录会第一个优先出现在列表中。

SrvRecord 允许您访问SRV记录本身中包含的所有信息:

record.priority();
record.name();
record.weight();
record.port();
record.protocol();
record.service();
record.target();

有关详细信息,请参阅API文档。

resolvePTR

尝试解析给定名称的PTR记录,PTR记录将 ipaddress 映射到名称。

要解析IP地址 10.0.0.1 的PTR记录,您将使用 1.0.0.10.in-addr.arpa 的PTR概念。

DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.resolvePTR("1.0.0.10.in-addr.arpa", ar -> {
  if (ar.succeeded()) {
    String record = ar.result();
    System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());
DnsClient client = vertx.createDnsClient(53, "9.9.9.9");
client.reverseLookup("10.0.0.1", ar -> {
  if (ar.succeeded()) {
    String record = ar.result();
    System.out.println(record);
  } else {
    System.out.println("Failed to resolve entry" + ar.cause());

如前边部分所述, DnsClient 允许您传递一个 Handler ,一旦查询完成将会传入一个 AsyncResultHandler 并通知它。 在出现错误的情况下,通知中将包含一个 DnsException ,该异常会包含一个说明为何失败的 DnsResponseCode 。此 DnsResponseCode 可用于更详细检查原因。

可能的 DnsResponseCode 值是:

DnsClient client = vertx.createDnsClient(53, "10.0.0.1");
client.lookup("nonexisting.vert.xio", ar -> {
  if (ar.succeeded()) {
    String record = ar.result();
    System.out.println(record);
  } else {
    Throwable cause = ar.cause();
    if (cause instanceof DnsException) {
      DnsException exception = (DnsException) cause;
      DnsResponseCode code = exception.code();
      // ...
    } else {
      System.out.println("Failed to resolve entry" + ar.cause());

一个非常简单的例子是从 NetSocket 读取然后写回到同一个 NetSocket - 因为 NetSocket 既实现了 ReadStream 也实现了 WriteStream 接口。 请注意,这些操作适用于任何实现了 ReadStreamWriteStream 接口的对象,包括HTTP 请求、HTTP 响应、异步文件 I/O 和 WebSocket等。

一个最简单的方法是直接获取已经读取的数据,并立即将其写入 NetSocket

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    // Write the data straight back
    sock.write(buffer);
}).listen();

上面的例子有一个问题:如果从Socket读取数据的速度比写回Socket的速度快,那么它将在 NetSocket 的写队列中不断堆积,最终耗尽内存。 这是有可能会发生,例如,若Socket另一端的客户端读取速度不够快,无法快速地向连接的另一端回压。

由于 NetSocket 实现了 WriteStream 接口,我们可以在写入之前检查 WriteStream 是否已满:

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    if (!sock.writeQueueFull()) {
      sock.write(buffer);
}).listen();

这个例子不会耗尽内存,但如果写入队列已满,我们最终会丢失数据。我们真正想要做的是在写入队列已满时暂停读取 NetSocket

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    sock.write(buffer);
    if (sock.writeQueueFull()) {
      sock.pause();
}).listen();

我们已经快达到我们的目标,但还没有完全实现。现在 NetSocket 在文件已满时会暂停,但是当写队列处理完成时,我们需要取消暂停:

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
server.connectHandler(sock -> {
  sock.handler(buffer -> {
    sock.write(buffer);
    if (sock.writeQueueFull()) {
      sock.pause();
      sock.drainHandler(done -> {
        sock.resume();
}).listen();

在这里,我们的目标实现了。当写队列准备好接收更多的数据时, drainHandler 事件处理器将被调用,它会恢复 NetSocket 的状态,允许读取更多的数据。

在编写Vert.x 应用程序时,这样做是很常见的,因此我们提供了一个名为 Pump 的帮助类,它为您完成所有这些艰苦的工作。 您只需要给 ReadStream 追加上 WriteStream ,然后启动它:

NetServer server = vertx.createNetServer(
    new NetServerOptions().setPort(1234).setHost("localhost")
server.connectHandler(sock -> {
  Pump.pump(sock, sock).start();
}).listen();

这和更加详细的例子完全一样。

现在我们来看看 ReadStreamWriteStream 的方法。

ReadStream

ReadStream (可读流) 接口的实现类包括: HttpClientResponse , DatagramSocket , HttpClientRequest , HttpServerFileUpload , HttpServerRequest , MessageConsumer , NetSocket , WebSocket , TimeoutStream , AsyncFile .

WriteStream (可写流)接口的实现类包括: HttpClientRequest , HttpServerResponse WebSocket , NetSocket , AsyncFile , and MessageProducer

setWriteQueueMaxSize : 设置写入队列被认为是 full 的对象的数量——方法 writeQueueFull 返回 true 。注意,当写队列被认为已满时,若写(操作)被调用则数据依然会被接收和排队。实际数量取决于流的实现,对于 Buffer ,尺寸代表实际写入的字节数,而并非缓冲区的数量。

writeQueueFull : 若写队列被认为已满,则返回 true

exceptionHandler : 若 WriteStream 发生异常,则被调用。

drainHandler : 若 WriteStream 被认为不再满,则处理器将被调用。

final RecordParser parser = RecordParser.newDelimited("\n", h -> {
  System.out.println(h.toString());
parser.handle(Buffer.buffer("HELLO\nHOW ARE Y"));
parser.handle(Buffer.buffer("OU?\nI AM"));
parser.handle(Buffer.buffer("DOING OK"));
parser.handle(Buffer.buffer("\n"));

我们还可以生成固定尺寸的块,如下:

RecordParser.newFixed(4, h -> {
  System.out.println(h.toString());

有关更多详细信息,请查看 RecordParser 类。

You can easily parse JSON structures but that requires to provide the JSON content at once, but it may not be convenient when you need to parse very large structures.

The non-blocking JSON parser is an event driven parser able to deal with very large structures. It transforms a sequence of input buffer to a sequence of JSON parse events.

JsonParser parser = JsonParser.newParser();
// Set handlers for various events
parser.handler(event -> {
  switch (event.type()) {
    case START_OBJECT:
      // Start an objet
      break;
    case END_OBJECT:
      // End an objet
      break;
    case START_ARRAY:
      // Start an array
      break;
    case END_ARRAY:
      // End an array
      break;
    case VALUE:
      // Handle a value
      String field = event.fieldName();
      if (field != null) {
        // In an object
      } else {
        // In an array or top level
        if (event.isString()) {
        } else {
          // ...
      break;

The parser is non-blocking and emitted events are driven by the input buffers.

JsonParser parser = JsonParser.newParser();
// start array event
// start object event
// "firstName":"Bob" event
parser.handle(Buffer.buffer("[{\"firstName\":\"Bob\","));
// "lastName":"Morane" event
// end object event
parser.handle(Buffer.buffer("\"lastName\":\"Morane\"},"));
// start object event
// "firstName":"Luke" event
// "lastName":"Lucky" event
// end object event
parser.handle(Buffer.buffer("{\"firstName\":\"Luke\",\"lastName\":\"Lucky\"}"));
// end array event
parser.handle(Buffer.buffer("]"));
// Always call end
parser.end();

Event driven parsing provides more control but comes at the price of dealing with fine grained events, which can be inconvenient sometimes. The JSON parser allows you to handle JSON structures as values when it is desired:

JsonParser parser = JsonParser.newParser();
parser.objectValueMode();
parser.handler(event -> {
  switch (event.type()) {
    case START_ARRAY:
      // Start the array
      break;
    case END_ARRAY:
      // End the array
      break;
    case VALUE:
      // Handle each object
      break;
parser.handle(Buffer.buffer("[{\"firstName\":\"Bob\"},\"lastName\":\"Morane\"),...]"));
parser.end();

The value mode can be set and unset during the parsing allowing you to switch between fine grained events or JSON object value events.

JsonParser parser = JsonParser.newParser();
parser.handler(event -> {
  // Start the object
  switch (event.type()) {
    case START_OBJECT:
      // Set object value mode to handle each entry, from now on the parser won't emit start object events
      parser.objectValueMode();
      break;
    case VALUE:
      // Handle each object
      // Get the field in which this object was parsed
      String id = event.fieldName();
      System.out.println("User with id " + id + " : " + event.value());
      break;
    case END_OBJECT:
      // Set the object event mode so the parser emits start/end object events again
      parser.objectEventMode();
      break;
parser.handle(Buffer.buffer("{\"39877483847\":{\"firstName\":\"Bob\"},\"lastName\":\"Morane\"),...}"));
parser.end();

You can do the same with arrays as well

JsonParser parser = JsonParser.newParser();
parser.handler(event -> {
  // Start the object
  switch (event.type()) {
    case START_OBJECT:
      // Set array value mode to handle each entry, from now on the parser won't emit start array events
      parser.arrayValueMode();
      break;
    case VALUE:
      // Handle each array
      // Get the field in which this object was parsed
      System.out.println("Value : " + event.value());
      break;
    case END_OBJECT:
      // Set the array event mode so the parser emits start/end object events again
      parser.arrayEventMode();
      break;
parser.handle(Buffer.buffer("[0,1,2,3,4,...]"));
parser.end();

You can also decode POJOs

parser.handler(event -> {
  // Handle each object
  // Get the field in which this object was parsed
  String id = event.fieldName();
  User user = event.mapTo(User.class);
  System.out.println("User with id " + id + " : " + user.firstName + " " + user.lastName);

Whenever the parser fails to process a buffer, an exception will be thrown unless you set an exception handler:

JsonParser parser = JsonParser.newParser();
parser.exceptionHandler(err -> {
  // Catch any parsing or decoding error

The parser also parses json streams:

例如,若您部署了一个创建 NetServer 的Verticle,该 NetServer 在处理器中提供了 NetSocket 实例,则最好始终从该Verticle的Event Loop中访问Socket 实例。

如您坚持使用标准的Vert.x Verticle部署模型,避免在 Verticle 之间分享对象,那这种情况您无需考虑。

默认情况下,Vert.x不会记录任何指标。相反,它为其他人提供了一个SPI,可以将其添加到类路径中。SPI是一项高级功能,允许实施者从Vert.x捕获事件以收集指标。有关详细信息,请参阅 API 文档

若使用 setFactory 嵌入了Vert.x实例,也可以用编程方式指定度量工厂。

Vert.x Core被打包成了 OSGi Bundle,因此可以在任何OSGi R4.2+环境中使用,如 Apache FelixEclipse Equinox ,(这个)Bundle导出 io.vertx.core*

但是 Bundle 对 Jackson 和 Netty 有一些依赖,若部署Vert.x Core Bundle则需要:

14|Active     |    1|Jackson-annotations (2.6.0)
15|Active     |    1|Jackson-core (2.6.2)
16|Active     |    1|jackson-databind (2.6.2)
18|Active     |    1|Netty/Buffer (4.0.31.Final)
19|Active     |    1|Netty/Codec (4.0.31.Final)
20|Active     |    1|Netty/Codec/HTTP (4.0.31.Final)
21|Active     |    1|Netty/Codec/Socks (4.0.31.Final)
22|Active     |    1|Netty/Common (4.0.31.Final)
23|Active     |    1|Netty/Handler (4.0.31.Final)
24|Active     |    1|Netty/Transport (4.0.31.Final)
25|Active     |    1|Netty/Transport/SCTP (4.0.31.Final)
26|Active     |    1|Vert.x Core (3.1.0)

在Equinox上,您可能需要使用下边的框架属性禁用ContextFilter: eclipse.bundle.setTCCL=false

vertx run my-verticle.js                                 (1)
vertx run my-verticle.groovy                             (2)
vertx run my-verticle.rb                                 (3)
vertx run io.vertx.example.MyVerticle                    (4)
vertx run io.vertx.example.MVerticle -cp my-verticle.jar (5)
vertx run MyVerticle.java                                (6)

-conf <config_file> - 提供了Verticle的一些配置, config_file 是包含描述Verticle配置的JSON对象的文本文件的名称,该参数是可选的。

-cp <path> - 搜索Verticle和它使用的其他任何资源的路径,默认为 . (当前目录)。若您的Verticle引用了其他脚本、类或其他资源(例如jar文件),请确保这些脚本、其他资源存在此路径上。该路径可以包含由以下内容分隔的多个路径条目: : (冒号)或 ; (分号)——这取决于操作系统。每个路径条目可以是包含脚本的目录的绝对路径或相对路径,也可以是 jarzip 文件的绝对或相对文件名。一个示例路径可能是 -cp classes:lib/otherscripts:jars/myjar.jar:jars/otherjar.jar 。始终使用路径引用您的Verticle需要的任何资源,不要将它们放在系统类路径上,因为这会导致部署的Verticle之间的隔离问题。

-instances <instances> - 要实例化的Verticle实例的数目,每个Verticle实例都是严格单线程(运行)的,以便在可用的核上扩展应用程序,您可能需要部署多个实例。若省略,则部署单个实例。

-worker - 此选项可确定一个Verticle是否为Worker Verticle。

-cluster - 此选项确定Vert.x实例是否尝试与网络上的其他Vert.x实例形成集群,集群Vert.x实例允许Vert.x与其他节点形成分布式Event Bus。默认为false(非集群模式)。

-cluster-port - 若指定了 cluster 选项,则可以确定哪个端口将用于与其他Vert.x实例进行集群通信。默认为0——这意味着“选择一个空闲的随机端口”。除非您帧需要绑定特定端口,您通常不需要指定此参数。

-cluster-host - 若指定了 cluster 选项,则可以确定哪个主机地址将用于其他Vert.x实例进行集群通信。默认情况下,它将尝试从可用的接口中选一个。若您有多个接口而您想要使用指定的一个,就在这里指定。

-ha - 若指定,该Verticle将部署为(支持)高可用性(HA)。有关详细信息,请参阅相关章节。

-quorum - 该参数需要和 -ha 一起使用,它指定集群中所有HA部署ID处于活动状态的最小节点数,默认为0。

-hagroup - 该参数需要和 -ha 一起使用,它指定此节点将加入的HA组。集群中可以有多个HA组,节点只会故障转移到同一组中的其他节点。默认为 __DEFAULT__ 。

您可以创建自己的主类并在 MANIFEST 中指定,但建议您将代码编写成Verticle,并使用Vert.x中的 Launcher 类 ( io.vertx.core.Launcher ) 作为您的主类。这是在命令行中运行Vert.x使用的主类,因此允许您指定命令行参数,如 -instances 以便更轻松地扩展应用程序。

要将您的Verticle全部部署在这个 fat-jar 中时,您必须将下边信息写入MANIFEST:

由于 start 命令产生一个新的进程,传递给JVM的java选项不会被传播,所以您必须使用 java-opts 来配置JVM( -X-D …​)。若您使用 CLASSPATH 环境变量,请确保路径下包含所有需要的jar(vertx-core、您的jar和所有依赖项)。

该命令集是可扩展的,请参考 扩展 Vert.x 启动器 部分。

实时重部署

在开发时,可以方便在文件更改时实时重新部署应用程序。 vertx 命令行工具和更普遍的 Launcher 类提供了这个功能。这里有些例子:

vertx run MyVerticle.groovy --redeploy="**&#47;*.groovy" --launcher-class=io.vertx.core.Launcher
vertx run MyVerticle.groovy --redeploy="**&#47;*.groovy,**&#47;*.rb"  --launcher-class=io.vertx.core.Launcher
java io.vertx.core.Launcher run org.acme.MyVerticle --redeploy="**&#47;*.class"  --launcher-class=io.vertx.core
.Launcher -cp ...

重新部署过程如下执行。首先,您的应用程序作为后台应用程序启动(使用 start 命令)。当发现文件更改时,该进程将停止并重新启动该应用。这样可避免泄露。

要启用实时重新部署,请将 --redeploy 选项传递给 run 命令。 --redeploy 表示要监视的文件集,这个集合可使用 Ant 样式模式(使用 *? ),您也可以使用逗号( , )分隔它们来指定多个集合。模式相当于当前工作目录。

传递给 run 命令的参数最终会传递给应用程序,可使用 --java-opts 配置JVM虚拟机选项。例如,如果想传入一个 conf 参数或是系统属性,您可以使用 --java-opts="-conf=my-conf.json -Dkey=value"

--launcher-class 选项确定应用程序的主类启动器。它通常是一个 Launcher ,单您已使用了您自己的主类。

也可以在IDE中使用重部署功能:

Eclipse - 创建一个运行配置,使用 io.vertx.core.Launcher 类作为主类。在 Program Arguments 区域(参数选项卡中),写入 run your-verticle-fully-qualified-name --redeploy=*/.java --launcher-class=io.vertx.core.Launcher ,您还可以添加其他参数。随着 Eclipse 在保存时会增量编译您的文件,重部署工作会顺利进行。

IntelliJ - 创建一个运行配置(应用),将主类设置为 io.vertx.core.Launcher 。在程序参数中写: run your-verticle-fully-qualified-name --redeploy=*/.class --launcher-class=io.vertx.core.Launcher 。要触发重新部署,您需要显示构造项目或模块(Build → Make project)。

要调试应用程序,请将运行配置创建为远程应用程序,并使用 --java-opts 配置调试器。每次重新部署后,请勿忘记重新插入(re-plug)调试器,因为它每次都会创建一个新进程。

您还可以在重新部署周期中挂接(hook)构建过程:

java -jar target/my-fat-jar.jar --redeploy="**&#47;*.java" --on-redeploy="mvn package"
java -jar build/libs/my-fat-jar.jar --redeploy="src&#47;**&#47;*.java" --on-redeploy='./gradlew shadowJar'

"on-redeploy"选项指定在应用程序关闭后和重新启动之前调用的命令。因此,如果更新某些运行时工作,则可以钩住构建工具。例如,您可以启动 gulpgrunt 来更新您的资源。如果您的应用需要 --java-opts ,不要忘记将它添加到命令参数里:

java -jar target/my-fat-jar.jar --redeploy="**&#47;*.java" --on-redeploy="mvn package" --java-opts="-Dkey=val"
java -jar build/libs/my-fat-jar.jar --redeploy="src&#47;**&#47;*.java" --on-redeploy='./gradlew shadowJar' --java-opts="-Dkey=val"

重新部署功能还支持以下设置:

Vert.x发行版中使用的默认集群管理器是使用的 Hazelcast 集群管理器,但是它可以轻松被替换成实现了Vert.x集群管理器接口的不同实现,因为Vert.x集群管理器可替换的。

集群管理器必须实现 ClusterManager 接口,Vert.x在运行时使用Java的服务加载器 Service Loader 功能查找集群管理器,以便在类路径中查找 ClusterManager 的实例。

若您在命令行中使用Vert.x并要使用集群,则应确保Vert.x安装的 lib 目录包含您的集群管理器jar。

若您在 Maven/Gradle 项目使用Vert.x,则只需将集群管理器jar作为项目依赖添加。

您也可以以编程的方式在嵌入Vert.x 时使用 setClusterManager 指定集群管理器。

配置JUL日志记录

一个JUL日志记录配置文件可以使用普通的JUL方式指定——通过提供一个名为 java.util.logging.config.file 的系统属性值为您的配置文件。更多关于此部分以及JUL配置文件结构的内容,请参阅JUL日志记录文档。

Vert.x还提供了一种更方便的方式指定配置文件,无需设置系统属性。您只需在您的类路径中提供名为 vertx-default-jul-logging.properties 的JUL配置文件(例如在您的fatjar中),Vert.x将使用该配置文件配置JUL。

使用另一个日志框架

如果您不希望Vert.x使用JUL记录日志,您可以为其配置另一个日志记录框架,例如Log4J或SLF4J。

为此,您应该设置一个名为 vertx.logger-delegate-factory-class-name 的系统属性,该属性的值是一个实现了 LogDelegateFactory 接口的Java 类名。 我们为Log4J(版本1)、Log4J 2和SLF4J提供了预设的实现,类名为: io.vertx.core.logging.Log4jLogDelegateFactoryio.vertx.core.logging.Log4j2LogDelegateFactoryio.vertx.core.logging.SLF4JLogDelegateFactory 。 如您要使用这些实现,还应确保相关的Log4J或SLF4J的jar在您的类路径上。

请注意,提供的Log4J 1代理不支持参数化消息。Log4J 2的代理使用了像SLF4J代理这样的 {} 语法,JUL代理使用如 {x} 语法。

应用中记录日志

Vert.x本身只是一个库,您可以在自己的应用程序使用任何日志库的API来记录日志。

但是,若您愿意,也可以使用上述的Vert.x日志记录工具为应用程序记录日志。

为此,您需要使用 LoggerFactory 获取一个 Logger 对象以记录日志:

Logger logger = LoggerFactory.getLogger(className);
logger.info("something happened");
logger.error("oops!", exception);
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

I这意味着您的类路径中有 SLF4J-API 却没绑定。 SLF4J 记录的消息将会丢失。您应该将绑定加入您的类路径。参考 SLF4J user manual - Binding with a logging framework at deployment time 选择绑定并配置。

请注意,Netty会寻找 SLF4-API 的jar,并在缺省情况下使用它。

对等连接重置

若您的日志显示一堆:

io.vertx.core.net.impl.ConnectionBase
SEVERE: java.io.IOException: Connection reset by peer

这意味着客户端正在重置HTTP连接,而不是关闭它。此消息还可能表示您没有读取完整的有效负荷(连接在读取完全之前被切断)。

Vertx vertx = Vertx.vertx(new VertxOptions().
    setAddressResolverOptions(
        new AddressResolverOptions().
            addServer("192.168.0.1").
            addServer("192.168.0.2:40000"))

DNS 服务器的默认端口为 53 ,当服务器使用不同的端口时,可以使用冒号分隔符设置该端口: 192.168.0.2:40000

Vertx vertx = Vertx.vertx(new VertxOptions().
    setAddressResolverOptions(
        new AddressResolverOptions().
            setHostsPath("/path/to/hosts"))
Vertx vertx = Vertx.vertx(new VertxOptions().
    setAddressResolverOptions(
        new AddressResolverOptions().addSearchDomain("foo.com").addSearchDomain("bar.com"))

当使用搜索域列表时,点数的阈值为 1 ,或从Linux上的 /etc/resolv.conf 加载,也可使用 setNdots 方法配置特定值。

Vert.x允许您运行支持高可用(HA,High Availability)的Verticle。这种情况下,当运行Verticle的Vert.x实例突然挂掉时,该Veritlce将迁移到另一个Vert.x 实例。这个Vert.x 实例必须在同一个集群中。

自动故障转移

当Vert.x启用HA运行时,若一个运行了Verticle的Vert.x 实例失败或挂掉,则此Verticle将自动重新部署到集群中的另一个Vert.x 实例中。我们称这个为 Verticle 故障转移(failover)。

若要启用HA模式,在启动 Vert.x 应用的时候需要添加 -ha 参数到命令行:

vertx run my-verticle.js -ha

现在开启了HA环境,在集群中需要多添加一个Vert.x 实例,所以假设您已经有另一个已经启动的Vert.x 实例,例如:

vertx run my-other-verticle.js -ha

如果运行了 my-verticle.js 的Vert.x 实例现在死了(您可以通过执行 kill -9 杀死进程来测试), 运行 my-other-verticle.js 的Vert.x 实例将自动重新部署 my-verticle.js ,所以现在这个Vert.x 实例正在运行两个Verticle。

当使用Vert.x运行实例时,还可以选择指定的HA组。HA组表示集群中的逻辑节点组。只有具有相同HA组的节点能执行故障转移。若不指定HA组,则使用默认组 DEFAULT

要指定一个HA组,您可以在运行该Verticle时使用 -hagroup 开关。

vertx run my-verticle.js -ha -hagroup my-group

我们来看一个例子:

在第一个终端运行:

vertx run my-verticle.js -ha -hagroup g1

在第二个终端中,让我们使用相同组运行另一个Verticle:

vertx run my-other-verticle.js -ha -hagroup g1

最后,在第三个终端中,使用不同组启动另一个Verticle:

vertx run yet-another-verticle.js -ha -hagroup g2

如果终端1中的实例被杀掉,则它将故障转移到终端2中的实例,而不是具有不同组的终端3中的实例。

若终端3中的实例被杀掉,因为这个组中没有其他Vert.x实例,则它不会故障转移。

处理网络分区 - Quora

高可用HA实现同样支持 Quora(多数派机制)。Quorum 是分布式事务必须获得的最小票数才能被允许在分布式系统中执行操作的一个参数。

在启动 Vert.x 实例时,您可以指示它在部署任何HA部署之前需要一个 quorum 。该上下文环境中,一个 quorum 是集群中特定组的最小节点数。通常您选择 quorum 大小为 Q = 1 + N / 2 ,其中N是组中节点数。若集群中的Q节点少于HA节点,HA部署将被撤销。如果/当 quorum 重新获取时,他们将重新部署。通过这样做您可以防止网络分区,也就是脑裂(split brain)。

更多关于Quorum(多数派机制)的信息,请参考 这里

若要使用 quorum 运行Vert.x实例,您可以在命令行中指定 -quorum ,例如:

在第一个终端:

vertx run my-verticle.js -ha -quorum 3

此时,Vert.x实例将启动但不部署模块(尚未)因为目前集群中只有1个节点,而不是3个。

在第二个终端:

vertx run my-other-verticle.js -ha -quorum 3

此时,Vert.x实例将启动但不部署模块(尚未)因为目前集群中只有2个节点,而不是3个。

在第三个控制台,您可以启动另一个Vert.x的实例:

vertx run yet-another-verticle.js -ha -quorum 3

妙极!—— 我们有三个节点,这是 quorum 设置的值,此时,模块将自动部署在所有实例上。

若我们现在关闭或杀死其中一个节点,那么这些模块将在其他节点上自动撤销,因为不再满足 quorum(法定人数)。

Quora 也可以与HA组合使用,在这种情况下,每个特定组会解决 Quora。

preferring native transport will not prevent the application to execute, if your application requires native transport, you need to check isNativeTransportEnabled . <groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> <version>4.1.15.Final</version> <classifier>linux-x86_64</classifier> </dependency>

Native on Linux gives you extra networking options:

<groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> <version>4.1.15.Final</version> <classifier>osx-x86_64</classifier> </dependency>

MacOS Sierra and above are supported.

Native on BSD gives you extra networking options:

vertx.createHttpServer().requestHandler(req -> {
  // Handle application
}).listen(SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"), ar -> {
  if (ar.succeeded()) {
    // Bound to socket
  } else {
    ar.cause().printStackTrace();

As well as NetClient :

NetClient netClient = vertx.createNetClient();
// Only available on BSD and Linux
netClient.connect(SocketAddress.domainSocketAddress("/var/tmp/myservice.sock"), ar -> {
  if (ar.succeeded()) {
    // Connected
  } else {
    ar.cause().printStackTrace();

Vert.x Core提供了一个用于解析传递给程序的命令行参数API。它还可以打印帮助信息——详细说明命令行工具可用的选项。 即使这些功能远离Vert.x Core主题,该API也可在 Launcher 类中使用,可以在 fat-jar 和 vertx 命令行工具中使用。 另外,它支持多语言(可用于任何支持的语言),并可在Vert.x Shell中使用。

Vert.x CLI提供了一个描述命令行界面的模型,同时也是一个解析器,这个解析器可支持不同的语法:

CLI cli = CLI.create("copy")
    .setSummary("A command line interface to copy files.")
    .addOption(new Option()
        .setLongName("directory")
        .setShortName("R")
        .setDescription("enables directory support")
        .setFlag(true))
    .addArgument(new Argument()
        .setIndex(0)
        .setDescription("The source")
        .setArgName("source"))
    .addArgument(new Argument()
        .setIndex(1)
        .setDescription("The destination")
        .setArgName("target"));

您可以看到,您可以使用 CLI.create 创建一个新的 CLI 。 传递的字符串是 CLI 的名称。创建后,您可以设置摘要和描述,摘要的目的是简短(一行),而描述可以包含更多细节。每个选项和参数也使用 addArgumentaddOption 方法添加到 CLI 对象上。

Option 是由用户命令行中存在的 标识的命令行参数。选项至少必须有一个长名或一个短名。 长名称通常使用 -- 前缀,而短名称与单个 - 一起使用。选项可以获取用法中显示的描述(见下文)。选项可以接受0、1或几个值。 接受0值的选项是一个标志( flag ),必须使用 setFlag 声明。默认情况下,选项会接受一个值,但是您可以使用 setMultiValued 方法配置该选项接收多个值:

CLI cli = CLI.create("some-name")
    .setSummary("A command line interface illustrating the options valuation.")
    .addOption(new Option()
        .setLongName("flag").setShortName("f").setFlag(true).setDescription("a flag"))
    .addOption(new Option()
        .setLongName("single").setShortName("s").setDescription("a single-valued option"))
    .addOption(new Option()
        .setLongName("multiple").setShortName("m").setMultiValued(true)
        .setDescription("a multi-valued option"));

选项可以标记为必填项,在用户命令行中未设置必填选项在解析阶段会引发异常:

CLI cli = CLI.create("some-name")
    .addOption(new Option()
        .setLongName("mandatory")
        .setRequired(true)
        .setDescription("a mandatory option"));

非必填选项可以具有默认值,如果用户没有在命令行中设置该选项,即将使用该值:

CLI cli = CLI.create("some-name")
    .addOption(new Option()
        .setLongName("optional")
        .setDefaultValue("hello")
        .setDescription("an optional option with a default value"));

可以使用 setHidden 方法隐藏选项,隐藏选项不在用法中列出,但仍可在用户命令行中使用(针对高级用户)。

如果选项值被限制为一个固定集合,您可以设置不同的可接受选项:

CLI cli = CLI.create("some-name")
    .addOption(new Option()
        .setLongName("color")
        .setDefaultValue("green")
        .addChoice("blue").addChoice("red").addChoice("green")
        .setDescription("a color"));

也可以从JSON表单中实例化选项。

和选项不同,参数不具有 并由其索引标识。例如,在 java com.acme.Foo 中, com.acme.Foo 是一个参数。

参数没有名称,使用基于 0 的索引进行标识。第一个参数的索引为 0:

CLI cli = CLI.create("some-name")
    .addArgument(new Argument()
        .setIndex(0)
        .setDescription("the first argument")
        .setArgName("arg1"))
    .addArgument(new Argument()
        .setIndex(1)
        .setDescription("the second argument")
        .setArgName("arg2"));

如果不设置参数索引,则基于声明顺序会自动计算。

CLI cli = CLI.create("some-name")
    // will have the index 0
    .addArgument(new Argument()
        .setDescription("the first argument")
        .setArgName("arg1"))
    // will have the index 1
    .addArgument(new Argument()
        .setDescription("the second argument")
        .setArgName("arg2"));

argName 是可选的,并在消息中使用。

相比选项, Argument 可以:

CLI cli = CLI.create("copy")
    .setSummary("A command line interface to copy files.")
    .addOption(new Option()
        .setLongName("directory")
        .setShortName("R")
        .setDescription("enables directory support")
        .setFlag(true))
    .addArgument(new Argument()
        .setIndex(0)
        .setDescription("The source")
        .setArgName("source"))
    .addArgument(new Argument()
        .setIndex(0)
        .setDescription("The destination")
        .setArgName("target"));
StringBuilder builder = new StringBuilder();
cli.usage(builder);

上边生成的 usage 信息如下:

Usage: copy [-R] source target
A command line interface to copy files.
 -R,--directory   enables directory support

若需要调整 usage 信息,请查阅 UsageMessageFormatter 类的文档。

parse 解析方法返回包含值的 CommandLine 对象。默认情况下,它验证用户命令行,并检查每个必填选项和参数的设置以及每个选项接收的值的数量。 您可以通过传递 false 作为 parse 的第二个参数来禁用验证。 如果要检查参数或选项,即使解析的命令行无效,这也是有用的。

您可以使用 isValid 来检查 CommandLine 是否有效。

查询/审问阶段

解析后,您可以从 parse 方法返回的 CommandLine 对象中读取选项和参数的值:

CommandLine commandLine = cli.parse(userCommandLineArguments);
String opt = commandLine.getOptionValue("my-option");
boolean flag = commandLine.isFlagEnabled("my-flag");
String arg0 = commandLine.getArgumentValue(0);

您的一个选项可以被标记为“帮助”。如果用户命令行启用“帮助”选项,验证将不会失败,但是可以让您有机会检查用户是否需要帮助:

CLI cli = CLI.create("test")
    .addOption(
        new Option().setLongName("help").setShortName("h").setFlag(true).setHelp(true))
    .addOption(
        new Option().setLongName("mandatory").setRequired(true));
CommandLine line = cli.parse(Collections.singletonList("-h"));
// The parsing does not fail and let you do:
if (!line.isValid() && line.isAskingForHelp()) {
  StringBuilder builder = new StringBuilder();
  cli.usage(builder);
  stream.print(builder.toString());

描述 OptionArgument 类是无类型的,这意味着仅读取String值。 TypedOptionTypedArgument 可以指定一个类型,因此(String)原始值将转换为指定的类型。

CLI 定义中使用 TypedOptionTypedArgument OptionArgument

CLI cli = CLI.create("copy")
    .setSummary("A command line interface to copy files.")
    .addOption(new TypedOption<Boolean>()
        .setType(Boolean.class)
        .setLongName("directory")
        .setShortName("R")
        .setDescription("enables directory support")
        .setFlag(true))
    .addArgument(new TypedArgument<File>()
        .setType(File.class)
        .setIndex(0)
        .setDescription("The source")
        .setArgName("source"))
    .addArgument(new TypedArgument<File>()
        .setType(File.class)
        .setIndex(0)
        .setDescription("The destination")
        .setArgName("target"));

然后,您可以按下边方式获取转换的值:

CommandLine commandLine = cli.parse(userCommandLineArguments);
boolean flag = commandLine.getOptionValue("R");
File source = commandLine.getArgumentValue("source");
File target = commandLine.getArgumentValue("target");

Vert.x CLI可以转换的类:

CLI cli = CLI.create("some-name")
    .addOption(new TypedOption<Person>()
        .setType(Person.class)
        .setConverter(new PersonConverter())
        .setLongName("person"));

对于布尔值,布尔值将被评定为 true : onyes1true

若您的一个选项是 enum 类型,则(系统)会自动计算一组选项。

您还可以使用注解定义CLI。在类和 setter 方法上使用注解来定义:

&#64;Name("some-name")
&#64;Summary("some short summary.")
&#64;Description("some long description")
public class AnnotatedCli {
 private boolean flag;
 private String name;
 private String arg;
&#64;Option(shortName = "f", flag = true)
public void setFlag(boolean flag) {
  this.flag = flag;
&#64;Option(longName = "name")
public void setName(String name) {
  this.name = name;
&#64;Argument(index = 0)
public void setArg(String arg) {
 this.arg = arg;

注解后,您可以使用以下命令来定义 CLI 并注入值:

CLI cli = CLI.create(AnnotatedCli.class);
CommandLine commandLine = cli.parse(userCommandLineArguments);
AnnotatedCli instance = new AnnotatedCli();
CLIConfigurator.inject(commandLine, instance);
&#64;Name("my-command")
&#64;Summary("A simple hello command.")
public class MyCommand extends DefaultCommand {
 private String name;
 &#64;Option(longName = "name", required = true)
 public void setName(String n) {
   this.name = n;
 &#64;Override
 public void run() throws CLIException {
   System.out.println("Hello " + name);

Y您还需要实现一个 CommandFactory

public class HelloCommandFactory extends DefaultCommandFactory<HelloCommand> {
 public HelloCommandFactory() {
  super(HelloCommand.class);

然后创建 src/main/resources/META-INF/services/io.vertx.core.spi.launcher.CommandFactory 并且添加一行表示工厂类的完全限定名称:

io.vertx.core.launcher.example.HelloCommandFactory

构建包含命令的jar。确保包含了SPI文件( META-INF/services/io.vertx.core.spi.launcher.CommandFactory )。

然后,将包含该命令的jar放入fat-jar(或包含在其中)的类路径中,或放在Vert.x发行版的 lib 目录中,您将可以执行:

vertx hello vert.x
java -jar my-fat-jar.jar hello vert.x

在 fat-jar 中使用启动器

要在 fat-jar 中使用 Launcher 类,只需要将 MANIFESTMain-Class 设置为 io.vertx.core.Launcher 。 另外,将 MANIFESTMain-Verticle 条目设置为您的Main Verticle的名称。

默认情况下,它会执行 run 命令。但是,您可以通过设置 MANIFESTMain-Command 条目来配置默认命令。若在没有命令的情况下启动 fat-jar,则使用默认命令。

启动器子类

您还可以创建 Launcher 的子类来启动您的应用程序。这个类被设计成易于扩展的。

一个 Launcher 子类可以:

当 Vert.x 需要从类路径中读取文件(嵌入在 fat-jar 中,类路径中jar文件或其他文件)时,它会把文件复制到缓存目录。背后原因很简单:从 jar 或从输入流读取文件是阻塞的。 所以为了避免每次都付出代价,Vert.x 会将文件复制到其缓存目录中,并随后读取该文件。这个行为也可配置。

首先,默认情况下,Vert.x 使用 $CWD/.vertx 作为缓存目录,它在此之间创建一个唯一的目录,以避免冲突。 可以使用 vertx.cacheDirBase 系统属性配置该位置。如,若当前工作目录不可写(例如在不可变容器上下文环境中),请使用以下命令启动应用程序:

vertx run my.Verticle -Dvertx.cacheDirBase=/tmp/vertx-cache
java -jar my-fat.jar vertx.cacheDirBase=/tmp/vertx-cache

当您编辑资源(如HTML、CSS或JavaScript)时,这种缓存机制可能令人讨厌,因为它仅仅提供文件的第一个版本(因此,若您想重新加载页面,则不会看到您的编辑改变)。 要避免此行为,请使用 -Dvertx.disableFileCaching=true 启动应用程序。使用此设置,Vert.x 仍然使用缓存,但始终使用原始源刷新存储在缓存中的版本。 因此,如果您编辑从类路径提供的文件并刷新浏览器,Vert.x 会从类路径读取它,将其复制到缓存目录并从中提供。不要在生产环境使用这个设置,它很有可能影响性能。

最后,您可以使用 -Dvertx.disableFileCPResolving=true 完全禁用高速缓存。这个设置不是没有后果的。Vert.x将无法从类路径中读取任何文件(仅从文件系统)。 使用此设置时要非常小心。

Eclipse Vert.x is open source and dual-licensed under the Eclipse Public License 2.0 and Apache License 2.0.

This website is licensed under the CC BY-SA 3.0 License.
Design by Michel Krämer.