1.7.2 配置项
打开配置文件,配置 Backend 规则,可使用 topic 通配符 过滤要应用的消息:
## 订阅时初始化 ACK 记录
backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## 消息抵达时更新抵达状态
backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## 取消订阅时删除记录行
backend.pgsql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
1.7.3 使用示例
在 EMQ X 管理控制台 WebSocket 页面中建立连接后,订阅 QoS > 0 的主题:
此时 mqtt_acked
表将插入初始化数据行,每向主题发布一条 QoS > 0 的消息,消息抵达后数据行 mid 将自增 1:
代理订阅中满足 QoS > 0 的 topic 也会初始化记录,客户端取消订阅后相关记录将被删除。
1.8 自定义 SQL
除去插件内置函数、表结构外,emqx_backend_pgsql 还支持自定义 SQL 语句,通过使用如 ${clientid}
模板语法动态构造 SQL 语句实现如客户端连接历史、更新自定义数据表等操作。
1.8.1 SQL语句参数说明
hook | 可用参数 | 示例(sql语句中${name} 表示可获取的参数) |
---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(topic,{qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(msgid,{topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(msgid,{topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(msgid,{topic}) |
1.8.2 更新自定义数据表示例
应用现有设备表 clients
,具有设备连接认证、设备状态记录、设备管理等基本字段用于其他管理业务,现需要将 EMQ X 设备状态同步至该表中:
CREATE TABLE "public"."clients" (
"id" serial,
"deviceUsername" varchar(50), -- MQTT username
"client_id" varchar(50), -- MQTT client_id
"password" varchar(50), -- MQTT password
"is_super" boolean DEFAULT 'false', -- 是否 ACL super 客户端
"owner" int, -- 创建用户
"productID" int, -- 所属产品
"state" boolean DEFAULT 'false', -- 在线状态
PRIMARY KEY ("id")
-- 初始化系统中已存在示例数据,此时 state 为 false
INSERT INTO "public"."clients"("deviceUsername", "client_id", "password", "is_super", "owner", "productID", "state") VALUES('mqtt_10c61f1a1f47', 'mqtt_10c61f1a1f47', '9336EBF25087D91C818EE6E9EC29F8C1', TRUE, 1, 21, FALSE);
自定义 UPDATE SQL 语句:
## connected / disconnected hook 中配置自定义 UPDATE SQL
## 可以配置多条 SQL 语句 "SQL": ["sql_a", "sql_b", "sql_c"]
## 连接时
backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["update clients set state = true where client_id = ${clientid}"]}, "pool": "pool1"}
## 断开时
backend.pgsql.hook.client.disconnected.3 = {"action": {"sql": ["update clients set state = false where client_id = ${clientid}"]}, "pool": "pool1"}
客户端上线时将填充并执行预定的 SQL 语句,更新设备在线状态 state
字段为 true
:
1.9 高级选项
backend.pgsql.time_range = 5s
backend.pgsql.max_returned_count = 500
2 EMQX 插件持久化MySQL
EMQX 插件持久化系列 (五)MySQL MQTT 数据存储 | EMQ
2.1 准备
2.1.1 初始化数据表
插件运行依赖以下几张数据表,数据表需要用户自行创建,表结构不可改动。
mqtt_client 存储设备在线状态
DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`state` varchar(3) DEFAULT NULL, -- 在线状态 0 离线 1 在线
`node` varchar(100) DEFAULT NULL, -- 所属节点
`online_at` datetime DEFAULT NULL, -- 上线时间
`offline_at` datetime DEFAULT NULL, -- 下线时间
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_client_idx` (`clientid`),
UNIQUE KEY `mqtt_client_key` (`clientid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_sub 存储设备的主题订阅关系
DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`topic` varchar(255) DEFAULT NULL,
`qos` int(3) DEFAULT NULL,
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_sub_idx` (`clientid`,`topic`(255),`qos`),
UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_msg 存储 MQTT 消息
DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`msgid` varchar(100) DEFAULT NULL,
`topic` varchar(1024) NOT NULL,
`sender` varchar(1024) DEFAULT NULL,
`node` varchar(60) DEFAULT NULL,
`qos` int(11) NOT NULL DEFAULT '0',
`retain` tinyint(2) DEFAULT NULL,
`payload` blob,
`arrived` datetime NOT NULL, -- 是否抵达(QoS > 0)
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_retain 存储 Retain 消息
DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`topic` varchar(200) DEFAULT NULL,
`msgid` varchar(60) DEFAULT NULL,
`sender` varchar(100) DEFAULT NULL,
`node` varchar(100) DEFAULT NULL,
`qos` int(2) DEFAULT NULL,
`payload` blob,
`arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_retain_key` (`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_acked 存储客户端消息确认
DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(200) DEFAULT NULL,
`topic` varchar(200) DEFAULT NULL,
`mid` int(200) DEFAULT NULL,
`created` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.2 配置 EMQX 服务器
通过 RPM 方式安装的 EMQX,MySQL 相关的配置文件位于 /etc/emqx/plugins/emqx_backend_mysql.conf
,本文仅测试 MySQL 持久化的功能,大部分配置不需要做更改。填入用户名、密码、数据库即可:
backend.mysql.server = 127.0.0.1:3306
backend.mysql.username = root
backend.mysql.password = 123456
backend.mysql.database = mqtt
保持剩下部分的配置文件不变,然后需要启动该插件。启动插件的方式有 命令行
、 控制台
和 REST API
三种方式,读者可以任选其一。
2.2.1 通过命令行启动
emqx_ctl plugins load emqx_backend_mysql
2.2.2 通过管理控制台启动
EMQX 管理控制台 插件 页面中,找到 emqx_backend_mysql 插件,点击 启动。
2.2.3 通过 REST API 启动
使用 PUT /api/v4/nodes/:node/plugins/:plugin_name/load
API 可以启动插件。
2.3 客户端在线状态存储
客户端上下线时,插件将更新在线状态、上下线时间、节点客户端列表至 MySQL 数据库。
2.3.1 配置项
打开配置文件,配置 Backend 规则:
## hook: client.connected、client.disconnected
## action/function: on_client_connected、on_client_disconnected
## 客户端上下线
backend.mysql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
2.3.2 使用示例
浏览器打开 http://127.0.0.1:18083
EMQX 管理控制台,在 工具 -> Websocket 中新建一个客户端连接,指定 clientid 为 sub_client,点击连接,连接成功后手动断开:
在 MySQL Workbeanch 中点击 mqtt_client
表查看,此时将写入 / 更新一条客户端上下线记录:
2.4 客户端代理订阅
客户端上线时,存储模块直接从数据库读取预设待订阅列表,代理加载订阅主题。在客户端需要通过预定主题通信(接收消息)场景下,应用能从数据层面设定 / 改变代理订阅列表。
2.4.1 配置项
打开配置文件,配置 Backend 规则:
## hook: client.connected
## action/function: on_subscribe_lookup
backend.mysql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
2.4.2 使用示例
当 sub_client
设备上线时,需要为其订阅 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:
在 mqtt_sub
表中初始化插入代理订阅主题信息:
insert into mqtt_sub(clientid, topic, qos) values("sub_client", "sub_client/upstream", 1);
insert into mqtt_sub(clientid, topic, qos) values("sub_client", "sub_client/downlink", 1);
EMQX 管理控制台 WebSocket 页面,以 clientid sub_client
新建一个客户端连接,切换至订阅页面,可见当前客户端自动订阅了 sub_client/upstream
与 sub_client/downlink
两个 QoS 1 的主题:
切换回管理控制台 WebSocket 页面,向 sub_client/downlink
主题发布消息,可在消息订阅列表收到发布的消息。
2.5 持久化发布消息
2.5.1 配置项
打开配置文件,配置 Backend 规则,支持使用 topic
参数进行消息过滤,此处使用 #
通配符存储任意主题消息:
## hook: message.publish
## action/function: on_message_publish
backend.mysql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
2.5.2 使用示例
在 EMQX 管理控制台 WebSocket 页面中,向主题 upstream_topic
发布多条消息,EMQX 将消息列表持久化至 mqtt_msg
表中:
暂只支持 QoS 1 2 的消息持久化。
2.6 Retain 消息持久化
2.6.1 配置项
打开配置文件,配置 Backend 规则:
## 同时开启以下规则,启用 retain 持久化三个生命周期
## 发布非空 retain 消息时 (存储)
backend.mysql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## 设备订阅主题时查询 retain 消息
backend.mysql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## 发布空 retain 消息时 (清除)
backend.mysql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
2.6.2 使用示例
在 EMQX 管理控制台 WebSocket 页面中建立连接后,发布消息勾选保留:
发布(消息不为空)
非空的 retain 消息发布时,EMQX 将以 topic 为唯一键,持久化该条消息至 mqtt_retain
表中,相同主题下发不同的 retain 消息,只有最后一条消息会被持久化:
客户端订阅 retain 主题后,EMQX 将查询 mqtt_retain
数据表,执行投递 retain 消息操作。
发布(消息为空)
MQTT 协议中,发布空的 retain 消息将清空 retain 记录,此时 retain 记录将从 mqtt_retain
表中删除。
2.7 消息确认持久化
开启消息确认 (ACK) 持久化后,客户端订阅 QoS 1、QoS 2 级别的主题时,EMQX 将在数据库以 clientid + topic 为唯一键初始化 ACK 记录。
2.7.1 配置项
打开配置文件,配置 Backend 规则,可使用 topic 通配符 过滤要应用的消息:
## 订阅时初始化 ACK 记录
backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## 消息抵达时更新抵达状态
backend.mysql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## 取消订阅时删除记录行
backend.mysql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
2.7.2 使用示例
在 EMQX 管理控制台 WebSocket 页面中建立连接后,订阅 QoS > 0 的主题:
此时 mqtt_acked
表将插入初始化数据行,每向主题发布一条 QoS > 0 的消息,消息抵达后数据行 mid 将自增 1:
代理订阅中满足 QoS > 0 的 topic 也会初始化记录,客户端取消订阅后相关记录将被删除。
2.8 自定义 SQL
除去插件内置函数、表结构外,emqx_backend_mysql 还支持自定义 SQL 语句,通过使用如 ${clientid}
模板语法动态构造 SQL 语句实现如客户端连接历史、更新自定义数据表等操作。
2.8.1 SQL语句参数说明
hook | 可用参数 | 示例(sql语句中${name} 表示可获取的参数) |
---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(topic,{qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
2.8.2 客户端连接 log 示例
设计表结构如下:
CREATE TABLE `mqtt`.`connect_logs` (
`id` INT NOT NULL AUTO_INCREMENT,
`clientid` VARCHAR(255) NULL,
`created_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, -- 记录时间
`state` INT NOT NULL DEFAULT 0, -- 记录类型: 0 下线 1 上线
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
自定义 SQL:
## connected hook 中配置自定义 SQL
## 可以配置多条 SQL 语句 "SQL": ["sql_a", "sql_b", "sql_c"]
## 连接时
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}, 1)"]}, "pool": "pool1"}
## 断开时
backend.mysql.hook.client.disconnected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}, 0)"]}, "pool": "pool1"}
客户端上下线时将填充并执行预定的 SQL 语句,将连接记录写入 connect_logs
表。
2.9 高级选项
backend.mysql.time_range = 5s
backend.mysql.max_returned_count = 500
3 EMQX桥接数据到Kafka
物联网数据接入之EMQ免费开源版桥接Kafka(数据保存到Kafka)_我在北国不背锅的博客-CSDN博客
<物联网>SpringBoot后台客户端获取MQTT消息并保存到数据库(EMQ X Rule Engine规则引擎)_liszlove的博客-CSDN博客_mqtt 规则引擎
SpringBoot中使用MQTT接收订阅主题的信息并保存到数据库的相关问题_玉念聿辉的博客-CSDN博客_mqtt订阅并保存到数据库
MQTT将订阅到的消息存储数据库_Mr.Qubb的博客-CSDN博客_mqtt订阅并保存到数据库
EMQ X 规则引擎系列(一):数据桥接到消息队列(Kafka) - EMQX - 博客园 (cnblogs.com)
3.1 架构设计
如使用EMQ企业版,企业版支持数据转发Kafka的插件,但企业版收费。
现需要使用代码的方式将EMQ接收的数据转发到Kafka。
3.2 代码实现EMQ数据转发Kafka
3.2.1 导入maven依赖
<!--mqtt-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.22</version>
</dependency>
3.2.2 实现EMQ Broker
主程序类实现连接 EMQ Broker,并进行消息接收的代码:
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public static void main(String[] args) {
String subTopic = "testtopic/#";
String broker = "tcp://192.168.72.141:1883";
String clientId = "mqttjs_efadb873";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
//connOpts.setUserName("emqx_test");
//connOpts.setPassword("emqx_test_password".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
//System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
// MqttMessage message = new MqttMessage(content.getBytes());
// message.setQos(qos);
// client.publish(pubTopic, message);
// System.out.println("Message published");
// client.disconnect();
// System.out.println("Disconnected");
// client.close();
// System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
3.2.3 回调处理
回调消息处理类OnMessageCallback:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class OnMessageCallback implements MqttCallback{
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
//接收到的消息发送到Kafka
MqttKafkaProducer.pushData(new String(message.getPayload()));
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
3.2.4 Kafka消息发送
Kafka消息发送类MqttKafkaProducer:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MqttKafkaProducer {
public static void pushData(String msgData) {
Properties props = new Properties();
//集群地址,多个服务器用","分隔
props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
//重新发送消息次数,到达次数返回错误
props.put("retries", 0);
//Producer会尝试去把发往同一个Partition的多个Requests进行合并,batch.size指明了一次Batch合并后Requests总大小的上限。如果这个值设置的太小,可能会导致所有的Request都不进行Batch。
props.put("batch.size", 163840);
//Producer默认会把两次发送时间间隔内收集到的所有Requests进行一次聚合然后再发送,以此提高吞吐量,而linger.ms则更进一步,这个参数为每次发送增加一些delay,以此来聚合更多的Message。
props.put("linger.ms", 1);
//在Producer端用来存放尚未发送出去的Message的缓冲区大小
props.put("buffer.memory", 33554432);
//key、value的序列化,此处以字符串为例,使用kafka已有的序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//props.put("partitioner.class", "com.kafka.demo.Partitioner");//分区操作,此处未写
props.put("acks", "1");
props.put("request.timeout.ms", "60000");
props.put("compression.type","lz4");
//创建生产者
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//通过时间做轮循,均匀分布设置的partition,提高效率。
int partition = (int) (System.currentTimeMillis() % 3);
//写入名为"test-partition-1"的topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("emqtopic",partition, UUID.randomUUID().toString(), msgData);
try {
producer.send(producerRecord).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("写入emqtopic:" + msgData);
//producer.close();
3.2.5 结果