我们使用KafkaTemplate.send(String data)这个方法发送消息到Kafka中,显然这个方法并不能满足我们系统的需求,那我们需要查看一下KafkaTemplate所实现的接口,看看还提供了什么方法。
当我们发送消息到Kafka后,我们又怎么去确认消息是否发送成功呢?这就涉及到KafkaTemplate的发送回调方法了。
接下来我们开始正式讲解
查看发送接口
首先我们Ctrl+鼠标左键进入KafkaTemplate的源代码中查看一下,可以看到有关发送的接口如下。
这里的参数还是比较简单的,值得一提的事,方法中有个Long类型的时间戳(timestamp)参数,这是Kafka0.10版本提供的新功能,主要用来使用时间索引进行查询数据以及日志切分清除策略。
还有一个ProducerRecord参数,这个类其实就是整合了topic、partition、data等数据的消费实体类。
topic:这里填写的是Topic的名字
partition:这里填写的是分区的id,其实也是就第几个分区,id从0开始。表示指定发送到该分区中
timestamp:时间戳,一般默认当前时间戳
key:消息的键
data:消息的数据
ProducerRecord:消息对应的封装类,包含上述字段
Message<?>:Spring自带的Message封装类,包含消息及消息头
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
View Code
使用sendDefault发送消息
首先在KafkaConfiguration编写一个带有默认Topic参数的KafkaTemplate,同时为另外一个KafkaTemplate加上@Primary注解,
@Primary注解的意思是在拥有多个同类型的Bean时优先使用该Bean,到时候方便我们使用@Autowired注解自动注入。
//这个是我们之前编写的KafkaTemplate代码,加入@Primary注解
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
return template;
@Bean("defaultKafkaTemplate")
@Primary
public KafkaTemplate<Integer, String> defaultKafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
template.setDefaultTopic("topic.quick.default");
return template;
View Code
接着编写测试方法,可以看到我们这里调用的是sendDefault方法,而且并没有在方法参数上添加topicName,
这是因为我们在声明defaultKafkaTemplate这个Bean的时候添加了这行代码 template.setDefaultTopic("topic.quick.default"),
只要调用sendDefault方法,kafkaTemplate会自动把消息发送到名为"topic.quick.default"的Topic中。
@Resource
private KafkaTemplate defaultKafkaTemplate;
@Test
public void testDefaultKafkaTemplate() {
defaultKafkaTemplate.sendDefault("I`m send msg to default topic");
View Code
这里也顺便测试一下其他几个吧。
@Test
public void testTemplateSend() {
//发送带有时间戳的消息
kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");
//使用ProducerRecord发送消息
ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
kafkaTemplate.send(record);
//使用Message发送消息
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);
View Code
KafkaTemplate异步发送消息
发送时间较长的时候会导致进程提前关闭导致无法调用回调时间。主要是因为KafkaTemplate发送消息是采取异步方式发送的,我们可以看下KafkaTemplate的源代码
这是我们刚才调用的发送消息方法,可以看到KafkaTemplate会使用ProducerRecord把我们传递进来的参数再一次封装,最后调用doSend方法发送消息到Kafka中
send(String topic, V data)
public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
return this.doSend(producerRecord);
View Code
ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord)
doSend方法先是检测是否开启事务,紧接着使用SettableListenableFuture发送消息,然后判断是否启动自动冲洗数据到Kafka中,我们再接着看看SettableListenableFuture实现了什么接口
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
if (this.transactional) {
Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
final Producer<K, V> producer = this.getTheProducer();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sending: " + producerRecord);
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
try {
if (exception == null) {
future.set(new SendResult(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
if (KafkaTemplate.this.logger.isTraceEnabled()) {
KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
} else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onError(producerRecord, exception);
if (KafkaTemplate.this.logger.isDebugEnabled()) {
KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);
} finally {
if (!KafkaTemplate.this.transactional) {
KafkaTemplate.this.closeProducer(producer, false);
if (this.autoFlush) {
this.flush();
if (this.logger.isTraceEnabled()) {
this.logger.trace("Sent: " + producerRecord);
return future;
View Code
可以看到SettableListenableFuture实现了ListenableFuture接口,ListenableFuture则实现了Future接口,
Future是Java自带的实现异步编程的接口,支持返回值的异步,而我们使用Thread或者Runnable都是不带返回值的。
public class SettableListenableFuture<T> implements ListenableFuture<T>
public interface ListenableFuture<T> extends Future<T>
View Code
KafkaTemplate同步发送消息
KafkaTemplate异步发送消息大大的提升了生产者的并发能力,但某些场景下我们并不需要异步发送消息,这个时候我们可以采取同步发送方式,实现也是非常简单的,
我们只需要在send方法后面调用get方法即可。
Future模式中,我们采取异步执行事件,等到需要返回值得时候我们再调用get方法获取future的返回值
@Test
public void testSyncSend() throws ExecutionException, InterruptedException {
kafkaTemplate.send("topic.quick.demo", "test sync send message").get();
View Code
get方法还有一个比较有意思的重载方法,get(long timeout, TimeUnit unit),当send方法耗时大于get方法所设定的参数时会抛出一个超时异常,
但需要注意,这里仅抛出异常,消息还是会发送成功的。
这里的测试方法设置send耗时必须小于 一微秒(那必须得失败呀,嘿嘿嘿),运行后我们可以看到抛出的异常,但也发现消息能发送成功并被监听器接收了。
那这功能有什么作用呢,如果还没有接触过SQL慢查询可以去了解一下,使用该方法作为SQL慢查询记录的条件。
@Test
public void testTimeOut() throws ExecutionException, InterruptedException, TimeoutException {
kafkaTemplate.send("topic.quick.demo", "test send message timeout").get(1,TimeUnit.MICROSECONDS);
View Code
2018-09-08 16:36:09.110 INFO 7724 --- [ demo-0-C-1] com.viu.kafka.listen.DemoListener : demo receive : test send message timeout
java.util.concurrent.TimeoutException
View Code
消息结果回调
一般来说我们都会去获取KafkaTemplate发送消息的结果去判断消息是否发送成功,如果消息发送失败,则会重新发送或者执行对应的业务逻辑。所以这里我们去实现这个功能。
KafkaSendResultHandler
第一步还是编写一个消息结果回调类KafkaSendResultHandler。
当我们使用KafkaTemplate发送消息成功的时候回调用OnSuccess方法,发送失败则会调用onError方法。
@Component
public class KafkaSendResultHandler implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Message send success : " + producerRecord.toString());
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send error : " + producerRecord.toString());
View Code
接下来就使用KafkaSendResultHandler实现消息发送结果回调,这里需要休眠,发送时间较长的时候会导致进程提前关闭导致无法调用回调时间。
主要是因为KafkaTemplate发送消息是采取异步方式发送的
@Autowired
private KafkaSendResultHandler producerListener;
@Test
public void testProducerListen() throws InterruptedException {
kafkaTemplate.setProducerListener(producerListener);
kafkaTemplate.send("topic.quick.demo", "test producer listen");
Thread.sleep(1000);
View Code
运行测试方法,我们可以看到控制台输出的日志如下
2018-09-08 15:51:39.975 INFO 10268 --- [ad | producer-1] c.v.k.handler.KafkaSendResultHandler : Message send success : ProducerRecord(topic=topic.quick.demo, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=test producer listen, timestamp=null)
View Code