<
groupId
>
org.apache.kafka
</
groupId
>
<
artifactId
>
kafka-clients
</
artifactId
>
<
version
>
2.4.0
</
version
>
</
dependency
>
2 AdminClient 学习
2.1 如何创建AdminClient
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.junit.Test;
import java.util.Properties;
public class CreateAdminClientTest {
@Test
public void test() {
Properties prop = new Properties();
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(prop);
AdminClientConfig就是kafka提供的一个配置的参数的名称的常量类,具体可以配置客户端哪些参数,就可以参考这个类。
比如这里的BOOTSTRAP_SERVERS_CONFIG
,就是配置的kafka的 地址
bootstrap.servers = [localhost:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2.2 创建topic
package study.wyy.kafka.java.admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before;
import org.junit.Test;
import javax.lang.model.element.VariableElement;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
public class TopicTest {
private AdminClient client;
@Before
public void createClient() {
Properties prop = new Properties();
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
client = AdminClient.create(prop);
@Test
public void testCreat() {
short rs = 1;
* 参数一:topic的名字
* 参数二: 分区数量
* 参数三:副本数量
NewTopic newTopic = new NewTopic("java-api-study", 1, rs);
CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
Thread.sleep(500);
System.out.println(result.numPartitions("java-api-study").get());
2.3 查询topic
无条件查询
@Test
public void testList() throws ExecutionException, InterruptedException {
ListTopicsResult listTopicsResult = client.listTopics();
KafkaFuture<Set<String>> names = listTopicsResult.names();
names.get().forEach(System.out::println);
System.out.println("========================");
KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
listings.get().forEach(System.out::println);
java-api-study
========================
(name=java-api-study, internal=false)
@Test
public void testList2() throws ExecutionException, InterruptedException {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = client.listTopics(options);
KafkaFuture<Set<String>> names = listTopicsResult.names();
names.get().forEach(System.out::println);
System.out.println("========================");
KafkaFuture<Collection<TopicListing>> listings = listTopicsResult.listings();
listings.get().forEach(System.out::println);
java-api-study
__consumer_offsets
========================
(name=java-api-study, internal=false)
(name=__consumer_offsets, internal=true)
__consumer_offsets
: kafka内置的topic,用于记录消费消息的偏移量
2.4 删除topic
@Test
public void testDel() throws ExecutionException, InterruptedException {
Collection<String> topics = Arrays.asList("java-api-study");
DeleteTopicsResult deleteTopicsResult = client.deleteTopics(topics);
2.5 查询topic消息信息
@Test
public void testDescribeTopics() throws ExecutionException, InterruptedException {
Collection<String> topics = Arrays.asList("java-api-study");
DescribeTopicsResult describeTopicsResult = client.describeTopics(topics);
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
descriptionMap.forEach((k, v) -> {
System.out.println("topicName: " + k);
System.out.println("topicDesc: " + v);
topicName: java-api-study
topicDesc: (name=java-api-study,
internal=false,
partitions=(partition=0, leader=localhost:9092 (id: 0 rack: null), replicas=localhost:9092 (id: 0 rack: null), isr=localhost:9092 (id: 0 rack: null)), authorizedOperations=[])
wyaoyao
粉丝