MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。
dll引用
private IMqttClient mqttClient = null;
private bool isReconnect = true;
private string _clientId = string.Empty;
private string _ip = "127.0.0.1"; //本地测试IP
private string _port = "1883";
private string userName = "";
private string passWord = "";
private List<MqttResponse> receiveList = new List<MqttResponse>();
private Dictionary<string, List<MqttResponse>> dictReceives = new Dictionary<string, List<MqttResponse>>();
private List<string> comboxDataSource = new List<string>() { "下发任务指令", "下发通知指令" };
public FormNurseApp()
InitializeComponent();
private void FormNurseApp_Load(object sender, EventArgs e)
if (!DesignMode)
_clientId = $"mdsd-nurse-{Guid.NewGuid().ToString("N")}";
comboBoxTemplate.Items.Add("");
comboxDataSource.ForEach(x => { comboBoxTemplate.Items.Add(x); });
isReconnect = true;
Task.Run(async () => { await ConnectMqttServerAsync(); });
/// <summary>
/// 订阅主题
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void BtnSubscribe_Click(object sender, EventArgs e)
await Subscribe();
/// <summary>
/// 发送主题消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void BtnSend_Click(object sender, EventArgs e)
await Publish();
/// <summary>
/// 发布
/// </summary>
/// <returns></returns>
private async Task Publish(string topics = "", string content = "")
string topic = txtPublishTopic.Text.Trim();
if (!string.IsNullOrWhiteSpace(topics))
topic = topics;
if (string.IsNullOrEmpty(topic))
MessageBox.Show("发布主题不能为空!");
return;
string inputString = txtContent.Text.Trim();
if (!string.IsNullOrWhiteSpace(content))
inputString = content;
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(inputString)
.WithAtMostOnceQoS()
.WithRetainFlag(true)
.Build();
await mqttClient.PublishAsync(message);
/// <summary>
/// 订阅
/// </summary>
/// <returns></returns>
private async Task Subscribe()
string topic = txtSubscribeTopic.Text.Trim();
if (string.IsNullOrEmpty(topic))
MessageBox.Show("订阅主题不能为空!");
return;
if (!mqttClient.IsConnected)
MessageBox.Show("MQTT客户端尚未连接!");
return;
// 订阅一个主题
await mqttClient.SubscribeAsync(new TopicFilterBuilder()
.WithTopic(topic)
.WithAtMostOnceQoS()
.Build()
Invoke(new Action(() =>
txtReceiveMessage.AppendText($"已订阅[{topic}]主题{Environment.NewLine}");
/// <summary>
/// 连接MQTT服务器
/// </summary>
/// <returns></returns>
private async Task ConnectMqttServerAsync()
if (mqttClient == null)
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
mqttClient.Connected += MqttClient_Connected;
mqttClient.Disconnected += MqttClient_Disconnected;
var options = new MqttClientOptionsBuilder()
.WithClientId(_clientId)
.WithTcpServer(_ip, Convert.ToInt32(_port))
.WithCredentials(userName, passWord)
.WithCleanSession()
.Build();
await mqttClient.ConnectAsync(options);
catch (Exception ex)
Invoke(new Action(() =>
txtReceiveMessage.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine);
/// <summary>
/// 连接MQTT服务器成功后事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void MqttClient_Connected(object sender, EventArgs e)
Invoke(new Action(() =>
txtReceiveMessage.Clear();
txtReceiveMessage.AppendText("已连接到MQTT服务器!" + Environment.NewLine);
/// <summary>
/// 断开连接后事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void MqttClient_Disconnected(object sender, EventArgs e)
Invoke(new Action(() =>
txtReceiveMessage.Clear();
DateTime curTime = new DateTime();
curTime = DateTime.UtcNow;
txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]");
txtReceiveMessage.AppendText("已断开MQTT连接!" + Environment.NewLine);
// 重连
if (isReconnect)
Invoke(new Action(() =>
txtReceiveMessage.AppendText("正在尝试重新连接..." + Environment.NewLine);
var options = new MqttClientOptionsBuilder()
.WithClientId(_clientId)
.WithTcpServer(_ip, Convert.ToInt32(_port))
.WithCredentials(userName, passWord)
.WithCleanSession()
.Build();
Invoke(new Action(async () =>
await Task.Delay(TimeSpan.FromSeconds(5));
await mqttClient.ConnectAsync(options);
catch
txtReceiveMessage.AppendText("### 重连失败 ###" + Environment.NewLine);
Invoke(new Action(() =>
txtReceiveMessage.AppendText("已下线!" + Environment.NewLine);
/// <summary>
/// 接收消息
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
Invoke(new Action(() =>
txtReceiveMessage.AppendText($"{"【收到新消息】"}{Environment.NewLine}");
Invoke(new Action(() =>
string jsonString = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
FormatterData(jsonString);
/// <summary>
/// 解析返回结果
/// </summary>
/// <param name="jsonString"></param>
private async void FormatterData(string jsonString)
txtReceiveMessage.AppendText($">> Payload = {jsonString}{Environment.NewLine}");
//{"Command":"Notify","Content":{"SerialNo":"001","OrgCode":"MD230720","Status":2,"DeviceCode":"E56FA68B4F31","Msg":"指令发送成功"},"Timestamp":1690253539841}
JObject jObject = (JObject)JsonConvert.DeserializeObject(jsonString);
string command = jObject["Command"].ToString();
string serialNo = jObject["Content"]["SerialNo"].ToString();
string orgCode = jObject["Content"]["OrgCode"].ToString();
string status = jObject["Content"]["Status"].ToString();
string msg = jObject["Content"]["Msg"].ToString();
string deviceCode = jObject["Content"]["DeviceCode"].ToString();
string timestamp = jObject["Timestamp"].ToString();
if (string.IsNullOrWhiteSpace(deviceCode)) return;
if (command != "Task") return;
if (dictReceives.ContainsKey(serialNo))
var list = dictReceives[serialNo];
if (list != null && list.Count > 0)
var firstApply = list.FirstOrDefault();
txtReceiveMessage.AppendText($">> 任务已被{firstApply.DeviceCode}领取,时间:{firstApply.Timestamp} {Environment.NewLine}");
// 回传消息给 MQTT,通知手表等终端
MqttNotifyRequst requst = new MqttNotifyRequst();
requst.command = "Notify";
requst.timestamp = GetTimeStamp(DateTime.Now);
requst.content.content = $" 任务已被{firstApply.DeviceCode}领取";
requst.content.deviceCodes = new string[] { deviceCode };
string content = JsonConvert.SerializeObject(requst);
string topic = "commands/down";
await Publish(topic, content);
MqttResponse response = new MqttResponse()
Command = command,
DeviceCode = deviceCode,
Msg = msg,
OrgCode = orgCode,
SerialNo = serialNo,
Status = Convert.ToInt32(status),
Timestamp = GetDateTime(long.Parse(timestamp))
txtReceiveMessage.AppendText($">> 任务领取成功,时间:{response.Timestamp} {Environment.NewLine}");
List<MqttResponse> list = new List<MqttResponse> { response }; // 只记录第一次点击确认领任务的信息
dictReceives.Add(serialNo, list);
// 回传消息给 MQTT,通知手表等终端
MqttNotifyRequst requst = new MqttNotifyRequst();
requst.command = "Notify";
requst.timestamp = GetTimeStamp(DateTime.Now);
requst.content.content = $" 任务领取成功";
requst.content.deviceCodes = new string[] { deviceCode };
string content = JsonConvert.SerializeObject(requst);
string topic = "commands/down";
await Publish(topic, content);
/// <summary>
/// 下拉选择事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void comboBoxTemplate_SelectedValueChanged(object sender, EventArgs e)
List<string> list = new List<string>();
if (this.comboBoxTemplate.Text == "下发任务指令")
MqttTaskRequest request = new MqttTaskRequest();
request.command = "Task";
request.timestamp = GetTimeStamp(DateTime.Now);
request.content.content = $"接患者任务";
request.content.serialNo = "001";
if (ckbWatch1.Checked)
list.Add("E56FA68B4F31");
if (ckbWatch2.Checked)
list.Add("E6A1ECE6E3C3");
request.content.code = "1";
request.content.deviceCodes = list.ToArray();
txtContent.Text = JsonConvert.SerializeObject(request);
else if (this.comboBoxTemplate.Text == "下发通知指令")
//txtContent.Text= "{\r\n\"command\": \"Notify\",\r\n\"content\":\r\n{\r\n\"serialNo\": \"001\",\r\n\"orgCode\": \"MD230720\",\r\n\"deviceCodes\":\r\n[\r\n\"E6A1ECE6E3C3\",\r\n\"E56FA68B4F31\",\r\n],\r\n\"title\": \"通知\",\r\n\"content\": \"接患者\"\r\n},\r\n\"timestamp\": 1667974675354\r\n}";
MqttNotifyRequst request = new MqttNotifyRequst();
request.command = "Notify";
request.timestamp = GetTimeStamp(DateTime.Now);
request.content.content = $"任务领取成功";
request.content.serialNo = "001"; // 指令流水号,当天唯一
if (ckbWatch1.Checked)
list.Add("E56FA68B4F31");
if (ckbWatch2.Checked)
list.Add("E6A1ECE6E3C3");
request.content.deviceCodes = list.ToArray();
txtContent.Text = JsonConvert.SerializeObject(request);
txtContent.Text = "";