Flink入门程序异常,记录一下跟大家分享。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.flink.runtime.client.JobExecutionException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at Streaming.ReadFromKafka.main(ReadFromKafka.java:41)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge.assignPartitions(KafkaConsumerCallBridge.java:42)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.reassignPartitions(KafkaConsumerThread.java:405)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:243)

当各位遇到这个错误的时候,相信你们也是写的Flink的入门程序,读取或者写入kafka。网上的资料少之甚少,经过一番寻找还是找到了一点东西。希望大家以后可以少走弯路。

【尖叫提示】:这是入门级别的一个大坑。

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.9.0.1</version>

</dependency>

这个kafka-clients的版本一定要写这个。

如果写下面这个,则会报错:具体原因应该是1.0.0的不支持了。

org.apache.flink.runtime.client.JobExecutionException: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List;)V

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_2.11</artifactId>

<version>1.6.0</version>

</dependency>

具体的代码如下:

package Streaming;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import java.util.Properties;
 * Created with IntelliJ IDEA.
 * User: @ziyu  [email protected]
 * Date: 2018-09-10
 * Time: 11:25
 * Description: kafka.Streaming.ReadFromKafka
public class ReadFromKafka {
    public static void main(String args[]) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.2.41:9092");
        properties.setProperty("group.id", "test");
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer09("flink-demo", new SimpleStringSchema(), properties));
        stream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;
            public String map(String value) throws Exception {
                return "Stream Value: " + value;
        }).print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();

如果运行的话,只要环境修改好了,然后引入Flink连接kafka 的依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.6.0</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.9.0.1</version>
            </dependency>
        <!-- Flink Connector Kafka | exclude Kafka implementation to use MapR -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

1.kafka创建flink-demo 的主题

2.启动kafka 的生产者和消费者,观察时候可以互通

3.如果上述都没问题,启动Flink

4.运行本地程序,观察输出即可

以上为初学Flink遇到的一个比较棘手的问题,希望大家少走弯路。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

  • 浏览量 1.2w
  • 收藏 0
  • 0

所有评论(0)