@Nonnull @Override public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout) List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>(); for (ConsumerRecord<byte[], byte[]> record : consumer.poll(timeout)) { polledRecords.add(new OrderedPartitionableRecord<>( record.topic(), record.partition(), record.offset(), record.value() == null ? null : ImmutableList.of(record.value()) return polledRecords;
@Test(expected = IllegalStateException.class) public void testPollWithNoSubscription() { try (KafkaConsumer<byte[], byte[]> consumer = newConsumer((String) null)) { consumer.poll(Duration.ZERO);
@Test public void testConsumerWithInvalidCredentials() { Map<String, Object> props = new HashMap<>(saslClientConfigs); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + server.port()); props.put(ConsumerConfig.GROUP_ID_CONFIG, ""); StringDeserializer deserializer = new StringDeserializer(); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, deserializer, deserializer)) { consumer.subscribe(Arrays.asList(topic)); consumer.poll(Duration.ofSeconds(10)); fail("Expected an authentication error!"); } catch (SaslAuthenticationException e) { // OK } catch (Exception e) { throw new AssertionError("Expected only an authentication error, but another error occurred.", e);
@Test(expected = IllegalStateException.class) public void testPollWithEmptyUserAssignment() { try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.assign(Collections.<TopicPartition>emptySet()); consumer.poll(Duration.ZERO);
@Test(expected = AuthenticationException.class) public void testPollAuthenticationFailure() { final KafkaConsumer<String, String> consumer = consumerWithPendingAuthentication(); consumer.subscribe(singleton(topic)); consumer.poll(Duration.ZERO);
@Test(expected = IllegalStateException.class) public void testPollWithEmptySubscription() { try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId)) { consumer.subscribe(Collections.<String>emptyList()); consumer.poll(Duration.ZERO);
@Test public void testNextTupleEmitsAtMostOneTuple() { //The spout should emit at most one message per call to nextTuple //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition); Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>(); records.put(partition, SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 10)); when(consumerMock.poll(anyLong())) .thenReturn(new ConsumerRecords<>(records)); spout.nextTuple(); verify(collectorMock, times(1)).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
private void doTestModeCannotReplayTuples(KafkaSpoutConfig<String, String> spoutConfig) {
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(), msgIdCaptor.capture());
assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
spout.fail(msgIdCaptor.getValue());
reset(consumerMock);
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 1, 1))));
spout.nextTuple();
//The consumer should not be seeking to retry the failed tuple, it should just be continuing from the current position
verify(consumerMock, never()).seek(eq(partition), anyLong());
private void doFilterNullTupleTest(KafkaSpoutConfig.ProcessingGuarantee processingGuaranteee) {
//STORM-3059
KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
.setProcessingGuarantee(processingGuaranteee)
.setTupleTrackingEnforced(true)
.setRecordTranslator(new NullRecordTranslator<>())
.build();
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
spout.nextTuple();
verify(collectorMock, never()).emit(any(), any(), any());
private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String> spoutConfig) {
KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, spoutConfig.getMaxUncommittedOffsets()))))
.thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
SpoutWithMockedConsumerSetupHelper.createRecords(partition, spoutConfig.getMaxUncommittedOffsets() - 1, spoutConfig.getMaxUncommittedOffsets()))));
for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() * 2; i++) {
spout.nextTuple();
verify(consumerMock, times(2)).poll(anyLong());
verify(collectorMock, times(spoutConfig.getMaxUncommittedOffsets() * 2)).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList());
@Test public void testPollThrowsInterruptExceptionIfInterrupted() { final Time time = new MockTime(); final Metadata metadata = createMetadata(); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); final PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, false); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); consumer.poll(Duration.ZERO); // interrupt the thread and call poll try { Thread.currentThread().interrupt(); expectedException.expect(InterruptException.class); consumer.poll(Duration.ZERO); } finally { // clear interrupted state again since this thread may be reused by JUnit Thread.interrupted(); consumer.close(Duration.ofMillis(0));
@SuppressWarnings("deprecation") @Test public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() { final Time time = new MockTime(); Metadata metadata = createMetadata(); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); final PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); consumer.poll(0L); // The underlying client SHOULD get a fetch request final Queue<ClientRequest> requests = client.requests(); Assert.assertEquals(1, requests.size()); final Class<? extends AbstractRequest.Builder> aClass = requests.peek().requestBuilder().getClass(); Assert.assertEquals(FetchRequest.Builder.class, aClass);
@Test public void verifyPollTimesOutDuringMetadataUpdate() { final Time time = new MockTime(); Metadata metadata = createMetadata(); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); final PartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); consumer.poll(Duration.ZERO); // The underlying client should NOT get a fetch request final Queue<ClientRequest> requests = client.requests(); Assert.assertEquals(0, requests.size());
@Test public void fetchResponseWithUnexpectedPartitionIsIgnored() { Time time = new MockTime(); Metadata metadata = createMetadata(); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); PartitionAssignor assignor = new RangeAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true); consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>(); fetches1.put(tp0, new FetchInfo(0, 1)); fetches1.put(t2p0, new FetchInfo(0, 10)); // not assigned and not fetched client.prepareResponseFrom(fetchResponse(fetches1), node); consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO); assertEquals(0, records.count()); consumer.close(Duration.ofMillis(0));
@Test public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { Time time = new MockTime(); Metadata metadata = createMetadata(); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); PartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, true); consumer.assign(singleton(tp0)); consumer.seekToBeginning(singleton(tp0));