public class KafkaItemReaderTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1);
private KafkaItemReader<String, String> reader;
private KafkaTemplate<String, String> template;
private Properties consumerProperties;
@BeforeClass
public static void setUpTopics() {
embeddedKafka.getEmbeddedKafka().addTopics(
new NewTopic("topic1", 1, (short) 1),
new NewTopic("topic2", 2, (short) 1),
new NewTopic("topic3", 1, (short) 1),
new NewTopic("topic4", 2, (short) 1),
new NewTopic("topic5", 1, (short) 1),
new NewTopic("topic6", 1, (short) 1)
@Before
public void setUp() {
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProperties);
this.template = new KafkaTemplate<>(producerFactory);
this.consumerProperties = new Properties();
this.consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafka.getEmbeddedKafka().getBrokersAsString());
this.consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "1");
this.consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
this.consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
after:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class KafkaItemReaderTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
private KafkaItemReader<String, String> reader;
private KafkaTemplate<String, String> template;
private Properties consumerProperties;
@BeforeAll
public void setUpTopics() {
embeddedKafkaBroker.addTopics(
new NewTopic("topic1", 1, (short) 1),
new NewTopic("topic2", 2, (short) 1),
new NewTopic("topic3", 1, (short) 1),
new NewTopic("topic4", 2, (short) 1),
new NewTopic("topic5", 1, (short) 1),
new NewTopic("topic6", 1, (short) 1)
@BeforeEach
public void setUp() {
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafkaBroker);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProperties);
this.template = new KafkaTemplate<>(producerFactory);
this.consumerProperties = new Properties();
this.consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafkaBroker.getBrokersAsString());
this.consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "1");
this.consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
this.consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());