java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;
Flink入门程序异常,记录一下跟大家分享。SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html
Flink入门程序异常,记录一下跟大家分享。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.flink.runtime.client.JobExecutionException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at Streaming.ReadFromKafka.main(ReadFromKafka.java:41)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)
当各位遇到这个错误的时候,相信你们也是写的Flink的入门程序,读取或者写入kafka。网上的资料少之甚少,经过一番寻找还是找到了一点东西。希望大家以后可以少走弯路。
【尖叫提示】:这是入门级别的一个大坑。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
这个kafka-clients的版本一定要写这个。
如果写下面这个,则会报错:具体原因应该是1.0.0的不支持了。
org.apache.flink.runtime.client.JobExecutionException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V |
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.0</version>
</dependency>
具体的代码如下:
package Streaming;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import java.util.Properties;
* Created with IntelliJ IDEA.
* User: @ziyu [email protected]
* Date: 2018-09-10
* Time: 11:25
* Description: kafka.Streaming.ReadFromKafka
public class ReadFromKafka {
public static void main(String args[]) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.2.41:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer09("flink-demo", new SimpleStringSchema(), properties));
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
return "Stream Value: " + value;
}).print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
如果运行的话,只要环境修改好了,然后引入Flink连接kafka 的依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.6.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
<!-- Flink Connector Kafka | exclude Kafka implementation to use MapR -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
1.kafka创建flink-demo 的主题
2.启动kafka 的生产者和消费者,观察时候可以互通
3.如果上述都没问题,启动Flink
4.运行本地程序,观察输出即可
以上为初学Flink遇到的一个比较棘手的问题,希望大家少走弯路。
所有评论(0)