添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
< groupId > org.apache.kafka </ groupId > < artifactId > kafka-clients </ artifactId > < version > 2.4.0 </ version > </ dependency >

2 AdminClient 学习

2.1 如何创建AdminClient

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.junit.Test;
import java.util.Properties;
public class CreateAdminClientTest {
    @Test
    public void test() {
        Properties prop = new Properties();
         // 设置kafka的地址,如果是多个使用 逗号 分隔
        prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        AdminClient adminClient = AdminClient.create(prop);

AdminClientConfig就是kafka提供的一个配置的参数的名称的常量类,具体可以配置客户端哪些参数,就可以参考这个类。

比如这里的BOOTSTRAP_SERVERS_CONFIG,就是配置的kafka的 地址

bootstrap.servers = [localhost:9092]  ## 刚刚设置的就是这个参数
client.dns.lookup = default
client.id = 
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS

2.2 创建topic

package study.wyy.kafka.java.admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before;
import org.junit.Test;
import javax.lang.model.element.VariableElement;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
public class TopicTest {
    private AdminClient client;
    @Before
    public void createClient() {
        Properties prop = new Properties();
        prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        client = AdminClient.create(prop);
    @Test
    public void testCreat() {
        short rs = 1;
         * 参数一:topic的名字
         * 参数二: 分区数量
         * 参数三:副本数量
        NewTopic newTopic = new NewTopic("java-api-study", 1, rs);
        // createTopics 接收的是一个newTopic的集合,所以是可以一次创建多个topic的
        CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
         // 避免客户端连接太快断开而导致Topic没有创建成功
        Thread.sleep(500);
        // 获取topic设置的partition数量
        System.out.println(result.numPartitions("java-api-study").get());

2.3 查询topic

  • 无条件查询
  • @Test
    public void testList() throws ExecutionException, InterruptedException {
        ListTopicsResult listTopicsResult = client.listTopics();
        // 获取名字
        KafkaFuture<Set<String>> names = listTopicsResult.names();
        names.get().forEach(System.out::println);
        System.out.println("========================");
        KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
        listings.get().forEach(System.out::println);
    
    java-api-study
    ========================
    (name=java-api-study, internal=false)
    
    @Test
    public void testList2() throws ExecutionException, InterruptedException {
        ListTopicsOptions options = new ListTopicsOptions();
        // 查询内置的topic
        options.listInternal(true);
        ListTopicsResult listTopicsResult = client.listTopics(options);
        // 获取名字
        KafkaFuture<Set<String>> names = listTopicsResult.names();
        names.get().forEach(System.out::println);
        System.out.println("========================");
        KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
        listings.get().forEach(System.out::println);
    
    java-api-study
    __consumer_offsets
    ========================
    (name=java-api-study, internal=false)
    (name=__consumer_offsets, internal=true)
    

    __consumer_offsets: kafka内置的topic,用于记录消费消息的偏移量

    2.4 删除topic

    @Test
    public void testDel() throws ExecutionException, InterruptedException {
        Collection<String> topics = Arrays.asList("java-api-study");
        DeleteTopicsResult deleteTopicsResult = client.deleteTopics(topics);
    

    2.5 查询topic消息信息

    @Test
    public void testDescribeTopics() throws ExecutionException, InterruptedException {
        Collection<String> topics = Arrays.asList("java-api-study");
        DescribeTopicsResult describeTopicsResult = client.describeTopics(topics);
        Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
        descriptionMap.forEach((k, v) -> {
            System.out.println("topicName: " + k);
            System.out.println("topicDesc: " + v);
    
    topicName: java-api-study
    topicDesc: (name=java-api-study, 
    internal=false, 
    partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=[])
        wyaoyao
           
    粉丝