添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Reactive DB2 客户端

< groupId > io.vertx </ groupId > < artifactId > vertx-db2-client </ artifactId > < version > 4.0.3 </ version > </ dependency >
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建客户端池
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);
// 简单查询
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  // 现在关闭客户端池
  client.close();
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建池化的客户端
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);

池化的客户端使用连接池,任何操作都将借用连接池中的连接来执行该操作, 并将连接释放回连接池中。

如果您使用 Vert.x 运行,您可以将 Vertx 实例传递给它:

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建池化的客户端
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);

当您不再需要连接池时,您需要将其释放:

pool.close();

当您需要在同一连接上执行多个操作时,您需要使用 connection 客户端。

您可以轻松地从连接池中获取一个:

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建池化的客户端
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);
// 从连接池获取一个连接
client.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");
  // 以下所有操作都在同一个连接上执行
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // 将连接释放回连接池
      conn.close();
}).onComplete(ar -> {
  if (ar.succeeded()) {
    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());

连接使用完后,您必须关闭它以释放到连接池中,以便可以重复使用。

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// 从数据对象创建连接池
DB2Pool pool = DB2Pool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
  // 使用连接进行处理

您也可以使用 setPropertiesaddProperty 方法配置通用配置项。但请注意调用 setProperties 方法会覆盖默认的客户端配置。

连接 URI

除了使用 DB2ConnectOptions 数据对象进行配置外,我们还为您提供了另外一种使用连接URI进行配置的方法:

String connectionUri = "db2://dbuser:[email protected]:50000/mydb";
// 从连接URI创建连接池
DB2Pool pool = DB2Pool.pool(connectionUri);
// 从连接URI创建连接
DB2Connection.connect(vertx, connectionUri, res -> {
  // 使用连接进行处理

连接字符串的URI格式为:

db2://<USERNAME>:<PASSWORD>@<HOSTNAME>:<PORT>/<DBNAME>

目前,客户端支持以下的连接 uri 参数关键字:

if (ar.succeeded()) { RowSet<Row> result = ar.result(); System.out.println("Got " + result.size() + " rows "); } else { System.out.println("Failure: " + ar.cause().getMessage());

执行预查询也是一样的操作。

SQL字符通过位置引用实际的参数,并使用数据库的语法 `?` ​

client
  .preparedQuery("SELECT * FROM users WHERE id=$1")
  .execute(Tuple.of("andy"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet 实例

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

或者 UPDATE/INSERT 类型的查询:

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
  .execute(Tuple.of("Andy", "Guibert"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

Row对象(Row)可以让您通过索引位置获取相应的数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));

或者通过名称

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。

您也可以直接访问得到多种类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存过的预处理语句去执行一次性的预查询:

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = ?")
  .execute(Tuple.of("julien"), ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());

您也可以创建 PreparedStatement 并自主地管理它的生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id= ?", ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"), ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julient Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
batch.add(Tuple.of("andy", "Andy Guibert"));
// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
  .executeBatch(batch, res -> {
  if (res.succeeded()) {
    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());

通过将查询包装在 SELECT <COLUMNS> FROM FINAL TABLE ( <SQL> ),可以获取生成的键,例如:

client
  .preparedQuery("SELECT color_id FROM FINAL TABLE ( INSERT INTO color (color_name) VALUES (?), (?), (?) )")
  .execute(Tuple.of("white", "red", "blue"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Inserted " + rows.rowCount() + " new rows.");
    for (Row row : rows) {
      System.out.println("generated key: " + row.getInteger("color_id"));
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Andy", "Guibert")
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(v -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);

也可以通过连接对象创建预查询:

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE $1")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Andy"))
      .eventually(v -> pq.close())
  ).onSuccess(rows -> {
  // All rows
pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Andy", "Guibert")
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
      .compose(tx -> conn
        // Various statements
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
        .execute()
        .compose(res2 -> conn
          .query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
          .execute())
        // Commit the transaction
        .compose(res3 -> tx.commit()))
      // Return the connection to the pool
      .eventually(v -> conn.close())
      .onSuccess(v -> System.out.println("Transaction succeeded"))
      .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block) ,事务已回滚和 completion 方法的返回值future返回了 TransactionRollbackException 异常时:

tx.completion()
  .onFailure(err -> {
    System.out.println("Transaction failed => rolled back");
pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();
    // Cursors require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();
        // Create a cursor
        Cursor cursor = pq.cursor(Tuple.of("julien"));
        // Read 50 rows
        cursor.read(50, ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - commit the transaction
              tx.commit();

游标释放时需要同时执行关闭操作:

cursor.read(50, ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();

stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();
    // Streams require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();
        // Fetch 50 rows at a time
        RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
        // Use the stream
        stream.exceptionHandler(err -> {
          System.out.println("Error: " + err.getMessage());
        stream.endHandler(v -> {
          tx.commit();
          System.out.println("End of stream");
        stream.handler(row -> {
          System.out.println("User: " + row.getString("last_name"));

上边的stream会批量读取 50 行并同时将其转换为流,当这些行记录被传递给处理器时, 会以此类推地读取下一批的 50 行记录。

stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。

.query("SELECT an_int_column FROM exampleTable") .execute(ar -> { RowSet<Row> rowSet = ar.result(); Row row = rowSet.iterator().next(); // INTEGER 类型字段读取出来是 java.lang.Integer Object value = row.getValue(0); // 转换为 java.lang.Long Long longValue = row.getLong(0);
client.preparedQuery("SELECT day_name FROM FINAL TABLE ( INSERT INTO days (day_name) VALUES (?), (?), (?) )")
.execute(Tuple.of(Days.FRIDAY, Days.SATURDAY, Days.SUNDAY), ar -> {
 if (ar.succeeded()) {
  RowSet<Row> rows = ar.result();
  System.out.println("Inserted " + rows.rowCount() + " new rows");
  for (Row row : rows) {
	  System.out.println("Day: " + row.get(Days.class, "day_name"));
 } else {
  System.out.println("Failure: " + ar.cause().getMessage());
client.preparedQuery("SELECT day_num FROM FINAL TABLE ( INSERT INTO days (day_num) VALUES (?), (?), (?) )")
   .execute(Tuple.of(Days.FRIDAY.ordinal(), Days.SATURDAY.ordinal(), Days.SUNDAY.ordinal()), ar -> {
   	if (ar.succeeded()) {
   		RowSet<Row> rows = ar.result();
   		System.out.println("Inserted " + rows.rowCount() + " new rows");
   		for (Row row : rows) {
   			System.out.println("Day: " + row.get(Days.class, "day_num"));
   	} else {
   		System.out.println("Failure: " + ar.cause().getMessage());

String类型使用Java枚举 name() 方法返回的名字进行匹配。

数值类型使用Java枚举 ordinal() 方法返回的序数进行匹配,row.get() 方法获取到的是整型值对应Java枚举序数的枚举值的 name() 值。

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));
// 运行查询使用集合类
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute(ar -> {
  if (ar.succeeded()) {
    SqlResult<Map<Long, String>> result = ar.result();
    // 获取用集合类创建的map
    Map<Long, String> map = result.value();
    System.out.println("Got " + map);
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());

集合类处理不能保留 Row 的引用,因为只有一个 Row 对象用于处理整个集合。

Java Collectors 提供了许多有趣的预定义集合类,例如, 您可以直接用 Row 中的集合轻松拼接成一个字符串:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();
      // 获取用集合类创建的String
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
DB2ConnectOptions options = new DB2ConnectOptions()
  .setPort(50001)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSsl(true)
  .setTrustStoreOptions(new JksOptions()
      .setPath("/path/to/keystore.p12")
      .setPassword("keystoreSecret"));
DB2Connection.connect(vertx, options, res -> {
  if (res.succeeded()) {
    // 使用SSL进行连接
  } else {
    System.out.println("Could not connect " + res.cause());

更多详细信息,请参阅 Vert.x 文档.