# elasticsearch {
# hosts => ["http://localhost:9200"]
# index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
# #user => "elastic"
# #password => "changeme"
output {
kafka {
bootstrap_servers => "10.10.1.13:19092,10.10.1.12:19092,10.10.1.11:19092"
topic_id => "es613-input-01"
codec => "json"
when i was startup logstash, it's shutdown self shortly.
command line:
logstash -f es6_to_kafka.cfg --path.data /home/logstash/logstash-7.17.9/input.es6/
logfile:
[2023-07-09T12:13:11,000][INFO ][org.apache.kafka.clients.producer.ProducerConfig][main] ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [10.10.1.13:19092, 10.10.1.12:19092, 10.10.1.11:19092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 40000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
[2023-07-09T12:13:11,010][DEBUG][org.apache.kafka.clients.CommonClientConfigs][main] Disabling exponential reconnect backoff because reconnect.backoff.ms is set, but reconnect.backoff.max.ms is not.
[2023-07-09T12:13:11,062][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Starting Kafka producer I/O thread.
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.5.1
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: 0efa8fb0f4c73d92
[2023-07-09T12:13:11,064][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1688875991062
[2023-07-09T12:13:11,066][DEBUG][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Kafka producer started
[2023-07-09T12:13:11,067][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initialize connection to node 10.10.1.13:19092 (id: -1 rack: null) for sending metadata request
[2023-07-09T12:13:11,068][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node 10.10.1.13:19092 (id: -1 rack: null) using address /10.10.1.13
[2023-07-09T12:13:11,078][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
[2023-07-09T12:13:11,193][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>100, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>200, "pipeline.sources"=>["/home/logstash/logstash-7.17.9/config/es6_to_kafka.cfg"], :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:11,301][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
[2023-07-09T12:13:11,301][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node -1.
[2023-07-09T12:13:11,444][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node -1: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 2], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 1], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 2], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], UNKNOWN(48): 0 to 1, UNKNOWN(49): 0 to 1, UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
[2023-07-09T12:13:11,446][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.10.1.13:19092 (id: -1 rack: null)
[2023-07-09T12:13:11,460][INFO ][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Cluster ID: 3Btp8pAGT_-JvqZv2XADIw
[2023-07-09T12:13:11,460][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updated cluster metadata updateVersion 2 to MetadataCache{clusterId='3Btp8pAGT_-JvqZv2XADIw', nodes={1=10.10.1.13:19092 (id: 1 rack: null), 2=10.10.1.12:19092 (id: 2 rack: null), 3=10.10.1.11:19092 (id: 3 rack: null)}, partitions=[], controller=10.10.1.13:19092 (id: 1 rack: null)}
[2023-07-09T12:13:11,923][DEBUG][org.logstash.config.ir.CompiledPipeline][main] Compiled filter
P[filter-mutate{"remove_field"=>["@timestamp", "@version"]}|[file]/home/logstash/logstash-7.17.9/config/es6_to_kafka.cfg:15:1:```
mutate {
remove_field => ["@timestamp","@version"]
org.logstash.config.ir.compiler.ComputeStepSyntaxElement@53e5cb12
[2023-07-09T12:13:12,254][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>1.05}
[2023-07-09T12:13:12,717][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ParNew"}
[2023-07-09T12:13:12,723][DEBUG][logstash.instrument.periodicpoller.jvm] collector name {:name=>"ConcurrentMarkSweep"}
[2023-07-09T12:13:13,427][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2023-07-09T12:13:13,438][DEBUG][logstash.javapipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,440][DEBUG][org.logstash.execution.PeriodicFlush][main] Pushing flush onto pipeline.
[2023-07-09T12:13:13,489][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2023-07-09T12:13:13,696][DEBUG][logstash.inputs.elasticsearch][main][deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17] Closing {:plugin=>"LogStash::Inputs::Elasticsearch"}
[2023-07-09T12:13:13,701][DEBUG][logstash.pluginmetadata ][main][deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17] Removing metadata for plugin deab56f7bcac25a9f17f1aed5dc427063fec7dd1a76875aec773b517387fdf17
[2023-07-09T12:13:13,704][DEBUG][logstash.javapipeline ][main] Input plugins stopped! Will shutdown filter/output workers. {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,712][DEBUG][logstash.javapipeline ][main] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<LogStash::WorkerLoopThread:0x6aee0225 run>"}
[2023-07-09T12:13:13,739][DEBUG][logstash.filters.mutate ][main][bdcfc005a46d9e9adc9977589c77d7915ee81b2203841c6e9074076a6203df22] filters/LogStash::Filters::Mutate: removing field {:field=>"@timestamp"}
[2023-07-09T12:13:13,832][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initialize connection to node 10.10.1.12:19092 (id: 2 rack: null) for sending metadata request
[2023-07-09T12:13:13,833][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node 10.10.1.12:19092 (id: 2 rack: null) using address /10.10.1.12
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node 2. Fetching API versions.
[2023-07-09T12:13:13,835][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node 2.
[2023-07-09T12:13:13,839][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node 2: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 2], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 1], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 2], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], UNKNOWN(48): 0 to 1, UNKNOWN(49): 0 to 1, UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
[2023-07-09T12:13:13,839][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name='es613-input-01')], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node 10.10.1.12:19092 (id: 2 rack: null)
[2023-07-09T12:13:13,844][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updating last seen epoch for partition es613-input-01-0 from null to epoch 0 from new metadata
[2023-07-09T12:13:13,845][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updating last seen epoch for partition es613-input-01-3 from null to epoch 0 from new metadata
[2023-07-09T12:13:13,849][DEBUG][org.apache.kafka.clients.Metadata][main] [Producer clientId=producer-1] Updated cluster metadata updateVersion 3 to MetadataCache{clusterId='3Btp8pAGT_-JvqZv2XADIw', nodes={1=10.10.1.13:19092 (id: 1 rack: null), 2=10.10.1.12:19092 (id: 2 rack: null), 3=10.10.1.11:19092 (id: 3 rack: null)}, partitions=[PartitionMetadata(, error=NONE, partition=es613-input-01-5, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,1, isr=3,1, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-0, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,3, isr=1,3, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-2, leader=Optional[3], leaderEpoch=Optional[0], replicas=3,2, isr=3,2, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-1, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,1, isr=2,1, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-4, leader=Optional[2], leaderEpoch=Optional[0], replicas=2,3, isr=2,3, offlineReplicas=), PartitionMetadata(, error=NONE, partition=es613-input-01-3, leader=Optional[1], leaderEpoch=Optional[0], replicas=1,2, isr=1,2, offlineReplicas=)], controller=10.10.1.13:19092 (id: 1 rack: null)}
[2023-07-09T12:13:13,868][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating connection to node 10.10.1.13:19092 (id: 1 rack: null) using address /10.10.1.13
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Completed connection to node 1. Fetching API versions.
[2023-07-09T12:13:13,870][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Initiating API versions fetch from node 1.
[2023-07-09T12:13:13,873][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Recorded API versions for node 1: (Produce(0): 0 to 9 [usable: 8], Fetch(1): 0 to 13 [usable: 11], ListOffsets(2): 0 to 7 [usable: 5], Metadata(3): 0 to 12 [usable: 9], LeaderAndIsr(4): 0 to 5 [usable: 4], StopReplica(5): 0 to 3 [usable: 2], UpdateMetadata(6): 0 to 7 [usable: 6], ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable: 8], OffsetFetch(9): 0 to 8 [usable: 7], FindCoordinator(10): 0 to 4 [usable: 3], JoinGroup(11): 0 to 7 [usable: 7], Heartbeat(12): 0 to 4 [usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5 [usable: 5], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 4 [usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 3], CreateTopics(19): 0 to 7 [usable: 5], DeleteTopics(20): 0 to 6 [usable: 4], DeleteRecords(21): 0 to 2 [usable: 1], InitProducerId(22): 0 to 4 [usable: 3], OffsetForLeaderEpoch(23): 0 to 4 [usable: 3], AddPartitionsToTxn(24): 0 to 3 [usable: 1], AddOffsetsToTxn(25): 0 to 3 [usable: 1], EndTxn(26): 0 to 3 [usable: 1], WriteTxnMarkers(27): 0 to 1 [usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 3], DescribeAcls(29): 0 to 2 [usable: 2], CreateAcls(30): 0 to 2 [usable: 2], DeleteAcls(31): 0 to 2 [usable: 2], DescribeConfigs(32): 0 to 4 [usable: 2], AlterConfigs(33): 0 to 2 [usable: 1], AlterReplicaLogDirs(34): 0 to 2 [usable: 1], DescribeLogDirs(35): 0 to 2 [usable: 1], SaslAuthenticate(36): 0 to 2 [usable: 2], CreatePartitions(37): 0 to 3 [usable: 2], CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0 to 2 [usable: 2], ExpireDelegationToken(40): 0 to 2 [usable: 2], DescribeDelegationToken(41): 0 to 2 [usable: 2], DeleteGroups(42): 0 to 2 [usable: 2], ElectLeaders(43): 0 to 2 [usable: 2], IncrementalAlterConfigs(44): 0 to 1 [usable: 1], AlterPartitionReassignments(45): 0 [usable: 0], ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable: 0], UNKNOWN(48): 0 to 1, UNKNOWN(49): 0 to 1, UNKNOWN(50): 0, UNKNOWN(51): 0, UNKNOWN(56): 0, UNKNOWN(57): 0, UNKNOWN(60): 0, UNKNOWN(61): 0, UNKNOWN(65): 0, UNKNOWN(66): 0, UNKNOWN(67): 0)
[2023-07-09T12:13:13,947][DEBUG][logstash.javapipeline ][main] Shutdown waiting for worker thread {:pipeline_id=>"main", :thread=>"#<LogStash::WorkerLoopThread:0x584528e6 dead>"}
[2023-07-09T12:13:13,949][DEBUG][logstash.filters.mutate ][main] Closing {:plugin=>"LogStash::Filters::Mutate"}
[2023-07-09T12:13:13,950][DEBUG][logstash.pluginmetadata ][main] Removing metadata for plugin bdcfc005a46d9e9adc9977589c77d7915ee81b2203841c6e9074076a6203df22
[2023-07-09T12:13:13,951][DEBUG][logstash.outputs.kafka ][main] Closing {:plugin=>"LogStash::Outputs::Kafka"}
[2023-07-09T12:13:13,953][INFO ][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2023-07-09T12:13:13,953][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Beginning shutdown of Kafka producer I/O thread, sending remaining records.
[2023-07-09T12:13:13,960][DEBUG][org.apache.kafka.clients.producer.internals.Sender][main] [Producer clientId=producer-1] Shutdown of Kafka producer I/O thread has completed.
[2023-07-09T12:13:13,962][DEBUG][org.apache.kafka.clients.producer.KafkaProducer][main] [Producer clientId=producer-1] Kafka producer has been closed
[2023-07-09T12:13:13,962][DEBUG][logstash.pluginmetadata ][main] Removing metadata for plugin 6d3ce71faa4075d3ba098a5d1ec2b8b3590d50460ad1978e43101e553a2a68c8
[2023-07-09T12:13:13,963][DEBUG][logstash.javapipeline ][main] Pipeline has been shutdown {:pipeline_id=>"main", :thread=>"#<Thread:0x4599a1c6 run>"}
[2023-07-09T12:13:13,965][INFO ][logstash.javapipeline ][main] Pipeline terminated {"pipeline.id"=>"main"}
[2023-07-09T12:13:14,023][DEBUG][logstash.agent ] Shutting down all pipelines {:pipelines_count=>0}
[2023-07-09T12:13:14,030][DEBUG][logstash.agent ] Converging pipelines state {:actions_count=>1}
[2023-07-09T12:13:14,034][DEBUG][logstash.agent ] Executing action {:action=>LogStash::PipelineAction::Delete/pipeline_id:main}
[2023-07-09T12:13:14,043][INFO ][logstash.pipelinesregistry] Removed pipeline from registry successfully {:pipeline_id=>:main}
[2023-07-09T12:13:14,049][DEBUG][logstash.instrument.periodicpoller.os] Stopping
[2023-07-09T12:13:14,058][DEBUG][logstash.instrument.periodicpoller.jvm] Stopping
[2023-07-09T12:13:14,060][DEBUG][logstash.instrument.periodicpoller.persistentqueue] Stopping
[2023-07-09T12:13:14,060][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] Stopping
[2023-07-09T12:13:14,098][DEBUG][logstash.agent ] API WebServer has stopped running
[2023-07-09T12:13:14,099][INFO ][logstash.runner ] Logstash shut down.
Elasticsearch is a trademark of Elasticsearch BV, registered in the U.S.
and in other countries
Brand
Code of Conduct
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.