Spring Kafka を利用して、Kafka に対してメッセージを送信する方法を調べたのでまとめてみました。
以下では、Spring Boot と組み合わせて利用しています。
Kafka の構築は割愛します。公式のQUICK START 参照してください。
https://kafka.apache.org/quickstart
Spring Boot アプリケーションは、pom.xml に Spring Kafka を追加します。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.0.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
また、application.yml
で Kafka が起動しているサーバを指定します。
デフォルトは localhost:9092
になっているので、そこから変更する必要がある場合のみ指定しいます。
application.yml
spring:
kafka:
bootstrap-servers: hostname:9092
KafkaTemplate
というクラスを利用して、Kafka にメッセージを送信することができます。
型引数を指定する必要があり、それぞれ送信するデータの key と value の型を指定します。
なおデフォルトでは、key、value ともに StringSerializer
によってシリアライズされるため、型引数は String
にしておくのが無難です。
送信する方法
send
メソッドで Kafka にメッセージを送信します。最低限 topic と value を指定する必要があります。
また、sendDefault
メソッドを利用すると、topic の指定を省略することができます。
デフォルトのトピックは、application.yml
の spring.kafka.template.default-topic
で設定します。
どちらのメソッドも、戻り値はListenableFuture
になっていて、非同期処理となります。
同期処理にする場合は、get()
を呼び出す必要があります。
コールバックはListenableeFutureCallback
かSuccessCallback
で定義します。
成功時のコールバックだけを定義したいなら後者でOKです。
@Component
public class HelloKafka {
@Autowired
KafkaTemplate<String, String> template;
public void helloKafka(){
// コールバックを設定しない非同期呼び出し
template.send("sample", "value");
// コールバックを設定する非同期呼び出し
template.send("sample", "value").addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result){
// 成功時の処理
@Override
public void onFailure(Throwable ex) {
// 失敗時の処理
// 同期呼び出し
template.send("sample", "value").get();
ProducerListener
KafkaTemplate
には ProducerListener
で共通のコールバックを定義できます。
デフォルトは LoggingProducerListener
が設定されていて、エラー発生時にログを出力するようになっています。
自作する場合は、ProducerListener
を実装したクラスを作成し、Bean として登録します。
@Bean
public ProducerListener<Object, Object> listener() {
return new ProducerListener<Object, Object>() {
@Override
public void onSuccess(String topic,
Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
log.info("topic: {}, partition: {}, key: {}, value: {}", topic, partition, key, value);
Serializer
上述の通り、デフォルトではStrigSerializer
を利用しています。
そのため、送信できるデータは String
および、String
にキャスト可能な型のみとなります。
Serializer を変更するには、application.yml
を編集する必要があります。
例えば JSON にシリアライズする JsonSerializer
に変更するには以下のように設定します。
application.yml
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
key、value のそれぞれに設定が必要です。
Transaction
トランザクション制御を有効化するためには、application.yml
に transaction-id-prefix
を設定します。
application.yml
spring:
kafka:
producer:
transaction-id-prefix: tx
トランザクションを有効化した場合、トランザクション管理下でない状態で send
メソッドを利用すると、トランザクション管理下で実行してね、というエラーが発生するようになります。
@Transactinal
でトランザクション制御
@Transactional
を付与することによって、メソッドごとにトランザクションを制御することができます。
propagation なども設定できますが、isolation は Kafka が対応していないため、デフォルトの値を変更した場合、例外が発生します。
@Component
@Transactional
public class HelloKafka {
@Autowired
KafkaTemplate<String, String> template;
public void helloKafka(){
template.send("sample", "value");
// ロールバックする
throw new RuntimeException();
executeInTransaction
を利用する
KafkaTemplate
の executeInTransaction
メソッドを利用して、トランザクション制御することも可能です。
この場合、Callback で指定した処理が同一トランザクションとして取り扱われます
@Component
public class HelloKafka {
@Autowired
KafkaTemplate<String, String> template;
public void helloKafka(){
template.executeInTransaction(t -> {
// この中の処理が同一トランザクション
t.send("sample", "value");
t.send("sample", "vvvvv");
return false;
なお、@Transactional
が付与されたメソッド内で、executeInTransaction
を利用した場合、もとのトランザクションに参加するのではなく、新規トランザクションが生成されます。(REQUIRES_NEW
となる。)
動作確認時の注意
kafka-console-consumer を用いて動作確認をしていたのですが、トランザクションが思い通りの動作にならないことにしばらくハマりました。
原因は、このツールの isolation-level が、デフォルトでは read_uncommitted になっていたため、コミットされていないメッセージも表示されてしまい、トランザクション制御が上手くできていないと勘違いしてしまいました。
このツールで確認する場合は以下のように isolation-level を read_committed にしてください。
kafka-console-consumer --bootstrap-server localhost:9092 --topic sample --isolation-level read_committed
他のツール等で確認する場合にも、isolation-level が何に設定されているかを確認するといいと思います。
以上のように、手軽に Kafka にメッセージを送信することができました。
公式のリファレンスを見ると、DataSourceTransactionManager
などの他のトランザクションマネージャと同期してトランザクション管理できるだとか、ReplyingKafkaTemplate
なるものがあるらしいので、もう少し調べてみたいところです。