添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

响应式(Reactive) PostgreSQL 客户端

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池参数
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建客户端连接池
SqlClient client = PgPool.client(connectOptions, poolOptions);
// 一个简单查询
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  // Now close the pool
  client.close();
PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池参数
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建一个池化的客户端
SqlClient client = PgPool.client(connectOptions, poolOptions);

池化PostgreSQL客户端使用连接池去执行数据库操作, 所有操作都会遵循从池里拿到连接、执行、释放连接到池里这三个步骤。

您可以传入一个连接池到正在运行的Vert.x实例里:

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池参数
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建一个池化的客户端
SqlClient client = PgPool.client(vertx, connectOptions, poolOptions);

如果不再需要客户端,您需要将其释放:

client.close();

当您想要在同一条连接上执行多个操作时,您需要从连接池中获取 connection 连接。

您可以很方便地从连接池里拿到一条连接:

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池参数
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建一个池化的客户端
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);
// 从连接池中获取一个连接
pool.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");
  // 所有操作都在一个连接中执行
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // Release the connection to the pool
      conn.close();
}).onComplete(ar -> {
  if (ar.succeeded()) {
    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());

为了连接可以重用,一旦当前连接上的操作已经完成,您需要关闭并释放连接到连接池里。

SqlClient client = PgPool.client(vertx, connectOptions, poolOptions);
// 流水线操作(Pipelined)
Future<RowSet<Row>> res1 = client.query(sql).execute();
// 连接池
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);
// 不是流水线操作
Future<RowSet<Row>> res2 = pool.query(sql).execute();
PgPool pool = PgPool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    // 使用连接池
}, new DeploymentOptions().setInstances(4));

您也可以用以下方式在每个 Verticle 中创建可共享的连接池:

vertx.deployVerticle(() -> new AbstractVerticle() {
  PgPool pool;
  @Override
  public void start() {
    // 创建一个可共享的连接池
    // 或获取已有的可共享连接池,并创建对原连接池的借用
    // 当 verticle 被取消部署时,借用会被自动释放
    pool = PgPool.pool(database, new PoolOptions()
      .setMaxSize(maxSize)
      .setShared(true)
      .setName("my-pool"));
}, new DeploymentOptions().setInstances(4));

第一次创建可共享的连接池时,会创建新连接池所需的资源。之后再调用该创建方法时,会复用之前的连接池,并创建 对原有连接池的借用。当所有的借用都被关闭时,该连接池的资源也会被释放。

默认情况下,客户端需要创建一个 TCP 连接时,会复用当前的 event-loop 。 这个可共享的 HTTP 客户端会 以一种安全的模式,在使用它的 verticle 中随机选中一个 verticle,并使用它的 event-loop。

您可以手动设置一个客户端可以使用的 event-loop 的数量

PgPool pool = PgPool.pool(database, new PoolOptions()
  .setMaxSize(maxSize)
  .setShared(true)
  .setName("my-pool")
  .setEventLoopSize(4));
PgConnectOptions connectOptions = new PgConnectOptions()
  .setHost("/var/run/postgresql")
  .setPort(5432)
  .setDatabase("the-db");
// 连接池参数
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建一个池化的客户端
PgPool client = PgPool.pool(connectOptions, poolOptions);
// 创建一个池化的客户端 with a vertx instance
// Make sure the vertx instance has enabled native transports
PgPool client2 = PgPool.pool(vertx, connectOptions, poolOptions);

更多详情可以在这里找到 Vert.x 文档

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池参数
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// 从数据对象中创建连接池
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
  // 处理您的连接

您也可以使用 setPropertiesaddProperty 方法配置通用属性。注意 setProperties 将覆盖默认的客户端属性。

// 设置默认schema Map<String, String> props = new HashMap<>(); props.put("search_path", "myschema"); connectOptions.setProperties(props);

关于可用属性的更多信息可以在这里找到 PostgreSQL Manuals

连接 URI

除了使用 PgConnectionOptions 对象,我们也提供了另一种基于URI的可选配置方案:

String connectionUri = "postgresql://dbuser:[email protected]:5432/mydb";
// 从连接 URI 创建连接池
PgPool pool = PgPool.pool(connectionUri);
// 从连接 URI 创建连接
PgConnection.connect(vertx, connectionUri, res -> {
  // 处理您的连接

关于连接uri字符串格式的更多信息可以在这里找到 PostgreSQL 手册

目前,客户端支持以下参数:

if (ar.succeeded()) { RowSet<Row> result = ar.result(); System.out.println("Got " + result.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage());

执行预查询也是一样的操作。

SQL字符通过位置引用实际的参数,并使用数据库的语法 `$1`, `$2`, etc…​

client
  .preparedQuery("SELECT * FROM users WHERE id=$1")
  .execute(Tuple.of("julien"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet 实例

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

或者 UPDATE/INSERT 类型的查询:

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
  .execute(Tuple.of("Julien", "Viet"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

Row对象(Row)可以让您通过索引位置获取相应的数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存过的预处理语句去执行一次性的预查询:

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = $1")
  .execute(Tuple.of("julien"), ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());

您也可以创建 PreparedStatement 并自主地管理它的生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id = $1", ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"), ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
  .executeBatch(batch, res -> {
  if (res.succeeded()) {
    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
client
  .preparedQuery("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id")
  .execute(Tuple.of("white", "red", "blue"))
  .onSuccess(rows -> {
    for (Row row : rows) {
      System.out.println("generated key: " + row.getInteger("color_id"));

只要 SQL 语句中存在 RETURNING 子句,就可以生效:

client
  .query("DELETE FROM color RETURNING color_name")
  .execute()
  .onSuccess(rows -> {
    for (Row row : rows) {
      System.out.println("deleted color: " + row.getString("color_name"));

带有 RETURNING 语句的批量查询创建了一个 RowSet , 这个RowSet包含了该批量查询中的每一个元素。

client
  .preparedQuery("INSERT INTO color (color_name) VALUES ($1) RETURNING color_id")
  .executeBatch(Arrays.asList(Tuple.of("white"), Tuple.of("red"), Tuple.of("blue")))
  .onSuccess(res -> {
    for (RowSet<Row> rows = res;rows.next() != null;rows = rows.next()) {
      Integer colorId = rows.iterator().next().getInteger("color_id");
      System.out.println("generated key: " + colorId);
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Emad", "Alblueshi")
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(v -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);

也可以通过连接对象创建预查询:

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE $1")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Julien"))
      .eventually(v -> pq.close())
  ).onSuccess(rows -> {
  // All rows
pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Emad", "Alblueshi")
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
    .compose(tx -> conn
      // Various statements
      .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
      .execute()
      .compose(res2 -> conn
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
        .execute())
      // Commit the transaction
      .compose(res3 -> tx.commit()))
    // Return the connection to the pool
    .eventually(v -> conn.close())
    .onSuccess(v -> System.out.println("Transaction succeeded"))
    .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block) ,事务已回滚和 completion 方法的返回值future返回了 TransactionRollbackException 异常时:

tx.completion()
  .onFailure(err -> {
  System.out.println("Transaction failed => rolled back");
pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();
    // Cursors require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();
        // Create a cursor
        Cursor cursor = pq.cursor(Tuple.of("julien"));
        // Read 50 rows
        cursor.read(50, ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - commit the transaction
              tx.commit();

游标释放时需要同时执行关闭操作:

cursor.read(50, ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();

stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();
    // Streams require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();
        // Fetch 50 rows at a time
        RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
        // Use the stream
        stream.exceptionHandler(err -> {
          System.out.println("Error: " + err.getMessage());
        stream.endHandler(v -> {
          // Close the stream to release the resources in the database
          stream.close(closed -> {
            tx.commit(committed -> {
              System.out.println("End of stream");
        stream.handler(row -> {
          System.out.println("User: " + row.getString("last_name"));

上边的stream会批量读取 50 行并同时将其转换为流,当这些行记录被传递给处理器时, 会以此类推地读取下一批的 50 行记录。

stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。

JSON (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

JSONB (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

POINT (io.vertx.pgclient.data.Point)

LINE (io.vertx.pgclient.data.Line)

LSEG (io.vertx.pgclient.data.LineSegment)

BOX (io.vertx.pgclient.data.Box)

PATH (io.vertx.pgclient.data.Path)

POLYGON (io.vertx.pgclient.data.Polygon)

CIRCLE (io.vertx.pgclient.data.Circle)

TSVECTOR (java.lang.String)

TSQUERY (java.lang.String)

INET (io.vertx.pgclient.data.Inet)

MONEY (io.vertx.pgclient.data.Money)

Tuple tuple = Tuple.of(new String[]{ "a", "tuple", "with", "arrays" });
// 将字符串数组添加到元组
tuple.addArrayOfString(new String[]{"another", "array"});
// 获取第一个字符串数组
String[] array = tuple.getArrayOfStrings(0);
if (ar.succeeded()) { Row row = ar.result().iterator().next(); System.out.println(row.getLocalDate("LocalDate").equals(LocalDate.MAX)); } else { System.out.println("Failure: " + ar.cause().getMessage());
client
  .preparedQuery("SELECT address, (address).city FROM address_book WHERE id=$1")
  .execute(Tuple.of(3),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Full Address " + row.getString(0) + ", City " + row.getString(1));
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

您也可以向PostgreSQL写入字符串

client
  .preparedQuery("INSERT INTO address_book (id, address) VALUES ($1, $2)")
  .execute(Tuple.of(3, "('Anytown', 'Second Ave', false)"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
client
  .preparedQuery("SELECT to_tsvector( $1 ) @@ to_tsquery( $2 )")
  .execute(Tuple.of("fat cats ate fat rats", "fat & rat"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Match : " + row.getBoolean(0));
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

tsvectortsquery 可以使用java的 String 类型来从数据库中获取

client
  .preparedQuery("SELECT to_tsvector( $1 ), to_tsquery( $2 )")
  .execute(Tuple.of("fat cats ate fat rats", "fat & rat"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Vector : " + row.getString(0) + ", query : "+row.getString(1));
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
client
  .preparedQuery("INSERT INTO colors VALUES ($2)")
  .execute(Tuple.of("red"),  res -> {
    // ...
  .preparedQuery("INSERT INTO colors VALUES ($1)")
  .execute(Tuple.of(Color.red))
  .flatMap(res ->
    client
      .preparedQuery("SELECT color FROM colors")
      .execute()
  ).onComplete(res -> {
    if (res.succeeded()) {
      RowSet<Row> rows = res.result();
      for (Row row : rows) {
        System.out.println(row.get(Color.class, "color"));

String and PostgreSQL enumerated types 对应Java枚举类的 name() 方法的返回值。

Numbers类型对应Java枚举类的 ordinal() 方法的返回值。

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));
// 使用收集器运行查询
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute(ar -> {
  if (ar.succeeded()) {
    SqlResult<Map<Long, String>> result = ar.result();
    // 获取收集器创建的映射
    Map<Long, String> map = result.value();
    System.out.println("Got " + map);
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

collector 式查询的结果集处理过程中不能再拿到 Row 的引用,因为 pg 客户端在处理 collector 时,只会用一个 row 处理整个集合。

Java的 Collectors 类提供了很多很有趣的预定义的 collector,比如您可以很容易 从 row 集合里得到一个字符串:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
// 使用收集器运行查询
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();
      // 获取收集器创建的字符串
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
connection.notificationHandler(notification -> {
  System.out.println("Received " + notification.getPayload() + " on channel " + notification.getChannel());
connection
  .query("LISTEN some-channel")
  .execute(ar -> {
  System.out.println("Subscribed to channel");

PgSubscriber (PgSubscriber) 是一种用作 处理单条连接上的订阅的通道(channel)管理器:

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
// You can set the channel before connect
subscriber.channel("channel1").handler(payload -> {
  System.out.println("Received " + payload);
subscriber.connect(ar -> {
  if (ar.succeeded()) {
    // Or you can set the channel after connect
    subscriber.channel("channel2").handler(payload -> {
      System.out.println("Received " + payload);

channel(通道)方法的参数即通道名称(接收端)需要和PostgreSQL发送通知时的通道名称保持一致。 注意这里和SQL中的通道名称的形式不同,在 PgSubscriber 内部会把待提交的通道名称预处理为带引号的形式:

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
subscriber.connect(ar -> {
    if (ar.succeeded()) {
      // 复杂通道名称 - PostgreSQL 中的名称需要带引号的 ID
      subscriber.channel("Complex.Channel.Name").handler(payload -> {
        System.out.println("Received " + payload);
      subscriber.channel("Complex.Channel.Name").subscribeHandler(subscribed -> {
        subscriber.actualConnection()
          .query("NOTIFY \"Complex.Channel.Name\", 'msg'")
          .execute(notified -> {
            System.out.println("Notified \"Complex.Channel.Name\"");
      // PostgreSQL 简单 ID 强制小写
      subscriber.channel("simple_channel").handler(payload -> {
          System.out.println("Received " + payload);
      subscriber.channel("simple_channel").subscribeHandler(subscribed -> {
        // 以下简单频道标识符被强制小写
        subscriber.actualConnection()
          .query("NOTIFY Simple_CHANNEL, 'msg'")
          .execute(notified -> {
            System.out.println("Notified simple_channel");
      // 以下频道名称比当前频道名称长
      // (NAMEDATALEN = 64) - 1 == 63 个字符限制,将被截断
      subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb")
        .handler(payload -> {
        System.out.println("Received " + payload);

您可以自定义一个方法来实现重连,该方法的参数为 retries (重试次数), 返回值为 amountOfTime(重试间隔):

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
// 每次 100 毫秒后最多重新连接 10 次
subscriber.reconnectPolicy(retries -> {
  if (retries < 10) {
    return 100L;
  } else {
    return -1L;

默认的策略是不重连。

connection.noticeHandler(notice -> {
  System.out.println("Received notice " + notice.getSeverity() + "" + notice.getMessage());
  if (ar.succeeded()) {
    // imagine this is a long query and is still running
    System.out.println("Query success");
  } else {
    // 服务器将在取消请求后中止当前查询
    System.out.println("Failed to query due to " + ar.cause().getMessage());
connection.cancelRequest(ar -> {
  if (ar.succeeded()) {
    System.out.println("Cancelling request has been sent");
  } else {
    System.out.println("Failed to send cancelling request");

为客户端连接添加SSL的操作,您可以参考Vert.x的 NetClientPgConnectOptions 配置操作。 当前版本客户端支持全部的PostgreSql SSL模式配置,您可以通过 sslmode 配置它们。客户端默认不启用SSL模式。 ssl 参数仅作为一种设置 sslmode 的快捷方式。 setSsl(true) 等价于 setSslMode(VERIFY_CA)setSsl(false) 等价于 setSslMode(DISABLE)

PgConnectOptions options = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSslMode(SslMode.VERIFY_CA)
  .setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));
PgConnection.connect(vertx, options, res -> {
  if (res.succeeded()) {
    // 带 SSL 的连接
  } else {
    System.out.println("Could not connect " + res.cause());

更多详细信息可以在这里找到 Vert.x documentation

pool.connectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    //  将连接释放回连接池,以被该应用程序复用
    conn.close();

连接完成后,您应该释放该连接以通知连接池该数据库连接可以被使用

Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();
// Execute the query
single.subscribe(result -> {
  System.out.println("Got " + result.size() + " rows ");
}, err -> {
  System.out.println("Failure: " + err.getMessage());
Maybe<RowSet<Row>> maybe = pool.withConnection(conn ->
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .rxExecute()
    .flatMap(result -> conn
      .query("SELECT * FROM Users")
      .rxExecute())
    .toMaybe());
maybe.subscribe(rows -> {
  // Success
}, err -> {
  // Failed
Completable completable = pool.withTransaction(conn ->
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .rxExecute()
    .flatMap(result -> conn
      .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
      .rxExecute())
    .toMaybe())
  .ignoreElement();
completable.subscribe(() -> {
  // Transaction succeeded
}, err -> {
  // Transaction failed
Observable<Row> observable = pool.rxGetConnection().flatMapObservable(conn -> conn
  .rxBegin()
  .flatMapObservable(tx ->
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapObservable(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toObservable();
      .doAfterTerminate(tx::commit)));
// Then subscribe
observable.subscribe(row -> {
  System.out.println("User: " + row.getString("last_name"));
}, err -> {
  System.out.println("Error: " + err.getMessage());
}, () -> {
  System.out.println("End of stream");

下边是使用 Flowable 的例子:

Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
  .rxBegin()
  .flatMapPublisher(tx ->
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapPublisher(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toFlowable();
      .doAfterTerminate(tx::commit)));
// Then subscribe
flowable.subscribe(new Subscriber<Row>() {
  private Subscription sub;
  @Override
  public void onSubscribe(Subscription subscription) {
    sub = subscription;
    subscription.request(1);
  @Override
  public void onNext(Row row) {
    sub.request(1);
    System.out.println("User: " + row.getString("last_name"));
  @Override
  public void onError(Throwable err) {
    System.out.println("Error: " + err.getMessage());
  @Override
  public void onComplete() {
    System.out.println("End of stream");