对于fields而言,创建的Flink作业运行脚本如下:CREATE TABLE orders (
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = 'kafkaTopic',
'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
'properties.group.id' = 'GroupId',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
CREATE TABLE redisSink (
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string,
primary key (order_id) not enforced
) WITH (
'connector' = 'redis',
'host' = 'RedisIP',
'password' = 'RedisPassword',
'data-type' = 'list',
'deploy-mode' = 'master-replica',
'schema-syntax' = 'fields'
insert into redisSink select * from orders;