添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
14
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spring Boot + Spring Kafka で Kafka にメッセージを送信してみる

Posted at

Spring Kafka を利用して、Kafka に対してメッセージを送信する方法を調べたのでまとめてみました。
以下では、Spring Boot と組み合わせて利用しています。

Kafka の構築は割愛します。公式のQUICK START 参照してください。
https://kafka.apache.org/quickstart

Spring Boot アプリケーションは、pom.xml に Spring Kafka を追加します。

pom.xml
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.0.RELEASE</version>
        <relativePath/>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

また、application.yml で Kafka が起動しているサーバを指定します。
デフォルトは localhost:9092 になっているので、そこから変更する必要がある場合のみ指定しいます。

application.yml
spring:
    kafka:
        bootstrap-servers: hostname:9092

KafkaTemplateというクラスを利用して、Kafka にメッセージを送信することができます。
型引数を指定する必要があり、それぞれ送信するデータの key と value の型を指定します。

なおデフォルトでは、key、value ともに StringSerializerによってシリアライズされるため、型引数は String にしておくのが無難です。

送信する方法

send メソッドで Kafka にメッセージを送信します。最低限 topic と value を指定する必要があります。
また、sendDefault メソッドを利用すると、topic の指定を省略することができます。
デフォルトのトピックは、application.ymlspring.kafka.template.default-topic で設定します。

どちらのメソッドも、戻り値はListenableFutureになっていて、非同期処理となります。
同期処理にする場合は、get()を呼び出す必要があります。

コールバックはListenableeFutureCallbackSuccessCallbackで定義します。
成功時のコールバックだけを定義したいなら後者でOKです。

@Component
public class HelloKafka {
    @Autowired
    KafkaTemplate<String, String> template;
    public void helloKafka(){
        // コールバックを設定しない非同期呼び出し
        template.send("sample", "value");
        // コールバックを設定する非同期呼び出し
        template.send("sample", "value").addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result){
                // 成功時の処理
            @Override
            public void onFailure(Throwable ex) {
                // 失敗時の処理
        // 同期呼び出し
        template.send("sample", "value").get();
ProducerListener

KafkaTemplate には ProducerListenerで共通のコールバックを定義できます。
デフォルトは LoggingProducerListenerが設定されていて、エラー発生時にログを出力するようになっています。

自作する場合は、ProducerListener を実装したクラスを作成し、Bean として登録します。

@Bean
public ProducerListener<Object, Object> listener() {
    return new ProducerListener<Object, Object>() {
        @Override
        public void onSuccess(String topic,




    
 Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
            log.info("topic: {}, partition: {}, key: {}, value: {}", topic, partition, key, value);
Serializer

上述の通り、デフォルトではStrigSerializer を利用しています。
そのため、送信できるデータは String および、String にキャスト可能な型のみとなります。

Serializer を変更するには、application.yml を編集する必要があります。
例えば JSON にシリアライズする JsonSerializer に変更するには以下のように設定します。

application.yml
spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.springframework.kafka.support.serializer.JsonSerializer

key、value のそれぞれに設定が必要です。

Transaction

トランザクション制御を有効化するためには、application.ymltransaction-id-prefix を設定します。

application.yml
spring:
  kafka:
    producer:
      transaction-id-prefix: tx

トランザクションを有効化した場合、トランザクション管理下でない状態で send メソッドを利用すると、トランザクション管理下で実行してね、というエラーが発生するようになります。

@Transactinalでトランザクション制御

@Transactional を付与することによって、メソッドごとにトランザクションを制御することができます。
propagation なども設定できますが、isolation は Kafka が対応していないため、デフォルトの値を変更した場合、例外が発生します。

@Component
@Transactional
public class HelloKafka {
    @Autowired
    KafkaTemplate<String, String> template;
    public void helloKafka(){
        template.send("sample", "value");
        // ロールバックする
        throw new RuntimeException();
executeInTransactionを利用する

KafkaTemplateexecuteInTransaction メソッドを利用して、トランザクション制御することも可能です。
この場合、Callback で指定した処理が同一トランザクションとして取り扱われます

@Component
public class HelloKafka {
    @Autowired
    KafkaTemplate<String, String> template;
    public void helloKafka(){
        template.executeInTransaction(t -> {
            // この中の処理が同一トランザクション
            t.send("sample", "value");
            t.send("sample", "vvvvv");
            return false;

なお、@Transactional が付与されたメソッド内で、executeInTransaction を利用した場合、もとのトランザクションに参加するのではなく、新規トランザクションが生成されます。(REQUIRES_NEW となる。)

動作確認時の注意

kafka-console-consumer を用いて動作確認をしていたのですが、トランザクションが思い通りの動作にならないことにしばらくハマりました。
原因は、このツールの isolation-level が、デフォルトでは read_uncommitted になっていたため、コミットされていないメッセージも表示されてしまい、トランザクション制御が上手くできていないと勘違いしてしまいました。

このツールで確認する場合は以下のように isolation-level を read_committed にしてください。

kafka-console-consumer --bootstrap-server localhost:9092 --topic sample --isolation-level read_committed

他のツール等で確認する場合にも、isolation-level が何に設定されているかを確認するといいと思います。

以上のように、手軽に Kafka にメッセージを送信することができました。
公式のリファレンスを見ると、DataSourceTransactionManagerなどの他のトランザクションマネージャと同期してトランザクション管理できるだとか、ReplyingKafkaTemplate なるものがあるらしいので、もう少し調べてみたいところです。

14
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?