Flink 的 RabbitMQ 连接器依赖了 “RabbitMQ AMQP Java Client”,它基于三种协议下发行:Mozilla Public License 1.1 (“MPL”)、GNU General Public License version 2 (“GPL”) 和 Apache License version 2 (“ASL”)。
如果用户发布的内容是基于 Flink 的 RabbitMQ 连接器的(进而重新发布了 “RabbitMQ AMQP Java Client” ),那么一定要注意这可能会受到 Mozilla Public License 1.1 (“MPL”)、GNU General Public License version 2 (“GPL”)、Apache License version 2 (“ASL”) 协议的限制.
finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000).build();finalDataStream<String>stream=env.addSource(newRMQSource<String>(connectionConfig,// config for the RabbitMQ connection
"queueName",// name of the RabbitMQ queue to consume
true,// use correlation ids; can be false if only at-least-once is required
newSimpleStringSchema()))// deserialization schema to turn messages into Java objects
.setParallelism(1);// non-parallel source is only required for exactly-once
valenv=StreamExecutionEnvironment.getExecutionEnvironment// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...)valconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000).buildvalstream=env.addSource(newRMQSource[String](connectionConfig,// config for the RabbitMQ connection
"queueName",// name of the RabbitMQ queue to consume
true,// use correlation ids; can be false if only at-least-once is required
newSimpleStringSchema))// deserialization schema to turn messages into Java objects
.setParallelism(1)// non-parallel source is only required for exactly-once
env=StreamExecutionEnvironment.get_execution_environment()# checkpointing is required for exactly-once or at-least-once guaranteesenv.enable_checkpointing(...)connection_config=RMQConnectionConfig.Builder() \
.set_host("localhost") \
.set_port(5000) \
.build()stream=env \
.add_source(RMQSource(connection_config,"queueName",True,SimpleStringSchema(),.set_parallelism(1)
服务质量 (QoS) / 消费者预取(Consumer Prefetch)
finalDataStream<String>stream=...finalRMQConnectionConfigconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000).build();stream.addSink(newRMQSink<String>(connectionConfig,// config for the RabbitMQ connection
"queueName",// name of the RabbitMQ queue to send messages to
newSimpleStringSchema()));// serialization schema to turn Java objects to messages
valstream:DataStream[String]=...valconnectionConfig=newRMQConnectionConfig.Builder().setHost("localhost").setPort(5000).buildstream.addSink(newRMQSink[String](connectionConfig,// config for the RabbitMQ connection
"queueName",// name of the RabbitMQ queue to send messages to
newSimpleStringSchema))// serialization schema to turn Java objects to messages
stream=...connection_config=RMQConnectionConfig.Builder() \
.set_host("localhost") \
.set_port(5000) \
.build()stream.add_sink(RMQSink(connection_config,# config for the RabbitMQ connection'queueName',# name of the RabbitMQ queue to send messages toSimpleStringSchema()))# serialization schema to turn Java objects to messages