添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。它的设计思想是轻巧、开放、简单、规范,易于实现。这些特点使得它对很多场景来说都是很好的选择,特别是对于受限的环境如机器与机器的通信(M2M)以及物联网环境(IoT)。 dll引用 private IMqttClient mqttClient = null; private bool isReconnect = true; private string _clientId = string.Empty; private string _ip = "127.0.0.1"; //本地测试IP private string _port = "1883"; private string userName = ""; private string passWord = ""; private List<MqttResponse> receiveList = new List<MqttResponse>(); private Dictionary<string, List<MqttResponse>> dictReceives = new Dictionary<string, List<MqttResponse>>(); private List<string> comboxDataSource = new List<string>() { "下发任务指令", "下发通知指令" }; public FormNurseApp() InitializeComponent(); private void FormNurseApp_Load(object sender, EventArgs e) if (!DesignMode) _clientId = $"mdsd-nurse-{Guid.NewGuid().ToString("N")}"; comboBoxTemplate.Items.Add(""); comboxDataSource.ForEach(x => { comboBoxTemplate.Items.Add(x); }); isReconnect = true; Task.Run(async () => { await ConnectMqttServerAsync(); }); /// <summary> /// 订阅主题 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void BtnSubscribe_Click(object sender, EventArgs e) await Subscribe(); /// <summary> /// 发送主题消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private async void BtnSend_Click(object sender, EventArgs e) await Publish(); /// <summary> /// 发布 /// </summary> /// <returns></returns> private async Task Publish(string topics = "", string content = "") string topic = txtPublishTopic.Text.Trim(); if (!string.IsNullOrWhiteSpace(topics)) topic = topics; if (string.IsNullOrEmpty(topic)) MessageBox.Show("发布主题不能为空!"); return; string inputString = txtContent.Text.Trim(); if (!string.IsNullOrWhiteSpace(content)) inputString = content; var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(inputString) .WithAtMostOnceQoS() .WithRetainFlag(true) .Build(); await mqttClient.PublishAsync(message); /// <summary> /// 订阅 /// </summary> /// <returns></returns> private async Task Subscribe() string topic = txtSubscribeTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) MessageBox.Show("订阅主题不能为空!"); return; if (!mqttClient.IsConnected) MessageBox.Show("MQTT客户端尚未连接!"); return; // 订阅一个主题 await mqttClient.SubscribeAsync(new TopicFilterBuilder() .WithTopic(topic) .WithAtMostOnceQoS() .Build() Invoke(new Action(() => txtReceiveMessage.AppendText($"已订阅[{topic}]主题{Environment.NewLine}"); /// <summary> /// 连接MQTT服务器 /// </summary> /// <returns></returns> private async Task ConnectMqttServerAsync() if (mqttClient == null) var factory = new MqttFactory(); mqttClient = factory.CreateMqttClient(); mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived; mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += MqttClient_Disconnected; var options = new MqttClientOptionsBuilder() .WithClientId(_clientId) .WithTcpServer(_ip, Convert.ToInt32(_port)) .WithCredentials(userName, passWord) .WithCleanSession() .Build(); await mqttClient.ConnectAsync(options); catch (Exception ex) Invoke(new Action(() => txtReceiveMessage.AppendText($"连接到MQTT服务器失败!" + Environment.NewLine + ex.Message + Environment.NewLine); /// <summary> /// 连接MQTT服务器成功后事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void MqttClient_Connected(object sender, EventArgs e) Invoke(new Action(() => txtReceiveMessage.Clear(); txtReceiveMessage.AppendText("已连接到MQTT服务器!" + Environment.NewLine); /// <summary> /// 断开连接后事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void MqttClient_Disconnected(object sender, EventArgs e) Invoke(new Action(() => txtReceiveMessage.Clear(); DateTime curTime = new DateTime(); curTime = DateTime.UtcNow; txtReceiveMessage.AppendText($">> [{curTime.ToLongTimeString()}]"); txtReceiveMessage.AppendText("已断开MQTT连接!" + Environment.NewLine); // 重连 if (isReconnect) Invoke(new Action(() => txtReceiveMessage.AppendText("正在尝试重新连接..." + Environment.NewLine); var options = new MqttClientOptionsBuilder() .WithClientId(_clientId) .WithTcpServer(_ip, Convert.ToInt32(_port)) .WithCredentials(userName, passWord) .WithCleanSession() .Build(); Invoke(new Action(async () => await Task.Delay(TimeSpan.FromSeconds(5)); await mqttClient.ConnectAsync(options); catch txtReceiveMessage.AppendText("### 重连失败 ###" + Environment.NewLine); Invoke(new Action(() => txtReceiveMessage.AppendText("已下线!" + Environment.NewLine); /// <summary> /// 接收消息 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) Invoke(new Action(() => txtReceiveMessage.AppendText($"{"【收到新消息】"}{Environment.NewLine}"); Invoke(new Action(() => string jsonString = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); FormatterData(jsonString); /// <summary> /// 解析返回结果 /// </summary> /// <param name="jsonString"></param> private async void FormatterData(string jsonString) txtReceiveMessage.AppendText($">> Payload = {jsonString}{Environment.NewLine}"); //{"Command":"Notify","Content":{"SerialNo":"001","OrgCode":"MD230720","Status":2,"DeviceCode":"E56FA68B4F31","Msg":"指令发送成功"},"Timestamp":1690253539841} JObject jObject = (JObject)JsonConvert.DeserializeObject(jsonString); string command = jObject["Command"].ToString(); string serialNo = jObject["Content"]["SerialNo"].ToString(); string orgCode = jObject["Content"]["OrgCode"].ToString(); string status = jObject["Content"]["Status"].ToString(); string msg = jObject["Content"]["Msg"].ToString(); string deviceCode = jObject["Content"]["DeviceCode"].ToString(); string timestamp = jObject["Timestamp"].ToString(); if (string.IsNullOrWhiteSpace(deviceCode)) return; if (command != "Task") return; if (dictReceives.ContainsKey(serialNo)) var list = dictReceives[serialNo]; if (list != null && list.Count > 0) var firstApply = list.FirstOrDefault(); txtReceiveMessage.AppendText($">> 任务已被{firstApply.DeviceCode}领取,时间:{firstApply.Timestamp} {Environment.NewLine}"); // 回传消息给 MQTT,通知手表等终端 MqttNotifyRequst requst = new MqttNotifyRequst(); requst.command = "Notify"; requst.timestamp = GetTimeStamp(DateTime.Now); requst.content.content = $" 任务已被{firstApply.DeviceCode}领取"; requst.content.deviceCodes = new string[] { deviceCode }; string content = JsonConvert.SerializeObject(requst); string topic = "commands/down"; await Publish(topic, content); MqttResponse response = new MqttResponse() Command = command, DeviceCode = deviceCode, Msg = msg, OrgCode = orgCode, SerialNo = serialNo, Status = Convert.ToInt32(status), Timestamp = GetDateTime(long.Parse(timestamp)) txtReceiveMessage.AppendText($">> 任务领取成功,时间:{response.Timestamp} {Environment.NewLine}"); List<MqttResponse> list = new List<MqttResponse> { response }; // 只记录第一次点击确认领任务的信息 dictReceives.Add(serialNo, list); // 回传消息给 MQTT,通知手表等终端 MqttNotifyRequst requst = new MqttNotifyRequst(); requst.command = "Notify"; requst.timestamp = GetTimeStamp(DateTime.Now); requst.content.content = $" 任务领取成功"; requst.content.deviceCodes = new string[] { deviceCode }; string content = JsonConvert.SerializeObject(requst); string topic = "commands/down"; await Publish(topic, content); /// <summary> /// 下拉选择事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void comboBoxTemplate_SelectedValueChanged(object sender, EventArgs e) List<string> list = new List<string>(); if (this.comboBoxTemplate.Text == "下发任务指令") MqttTaskRequest request = new MqttTaskRequest(); request.command = "Task"; request.timestamp = GetTimeStamp(DateTime.Now); request.content.content = $"接患者任务"; request.content.serialNo = "001"; if (ckbWatch1.Checked) list.Add("E56FA68B4F31"); if (ckbWatch2.Checked) list.Add("E6A1ECE6E3C3"); request.content.code = "1"; request.content.deviceCodes = list.ToArray(); txtContent.Text = JsonConvert.SerializeObject(request); else if (this.comboBoxTemplate.Text == "下发通知指令") //txtContent.Text= "{\r\n\"command\": \"Notify\",\r\n\"content\":\r\n{\r\n\"serialNo\": \"001\",\r\n\"orgCode\": \"MD230720\",\r\n\"deviceCodes\":\r\n[\r\n\"E6A1ECE6E3C3\",\r\n\"E56FA68B4F31\",\r\n],\r\n\"title\": \"通知\",\r\n\"content\": \"接患者\"\r\n},\r\n\"timestamp\": 1667974675354\r\n}"; MqttNotifyRequst request = new MqttNotifyRequst(); request.command = "Notify"; request.timestamp = GetTimeStamp(DateTime.Now); request.content.content = $"任务领取成功"; request.content.serialNo = "001"; // 指令流水号,当天唯一 if (ckbWatch1.Checked) list.Add("E56FA68B4F31"); if (ckbWatch2.Checked) list.Add("E6A1ECE6E3C3"); request.content.deviceCodes = list.ToArray(); txtContent.Text = JsonConvert.SerializeObject(request); txtContent.Text = "";