900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > 前端websocket连接mqtt服务器(Paho-mqtt mqttws31.js)以及断开重连

前端websocket连接mqtt服务器(Paho-mqtt mqttws31.js)以及断开重连

时间:2022-03-08 20:57:19

相关推荐

前端websocket连接mqtt服务器(Paho-mqtt mqttws31.js)以及断开重连

项目需求,java端连接C#,想在前台处理订阅消息,用了websocket和mqtt,查了一上午,因为本机没有node.js,关于paho-mqtt.js的例子连接不了,比如301错误码、什么二进制数据、decode、握手失败等等,后来看了源码,总结了一个连接成功的例子,先写几个问题:

websocket的使用::websocket允许服务端向客户端主动推送消息,是在jQuery/js中使用mqtt的基础。mqtt客户端:

paho-mqtt.js:基于浏览器的库,使用websocket连接到mqtt

mqtt.js:是用 JavaScript 编写的,可用于 Node.js 和浏览器。有node.js的话还是推荐这个。paho-mqtt.js:一定要下载对,中间下了一个,源码有问题,一直提示Client未定义,推一个网址,或者去我的资源里面下载:MQTT服务端端口号:由于是用websocket,所以一定要选择服务端里对用的mqtt over websocket端口号,因为请求路径是ws或wss开头,切记!!!!!! 不然连接不上

/ajax/libs/paho-mqtt/1.0.1/mqttws31.js

<script src="/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>

1. 简单的基础实现代码

基础版不支持重连,源码里options中没有重连参数,需要手动写,带重连机制的在demo里面,配置reconnect参数

js:

// 连接参数var option = {"ServerUri": "", // 服务器hostname"ServerPort": 8080, // port端口号"UserName": "admin ",// 用户名"Password": "123",//密码"ClientId": "C_client",//客户端id,自己设置"TimeOut": 5000,// 超时"KeepAlive": 0,// 心跳"CleanSession": false, // 清除session,设置为true时,每次都以新session信息向服务器沟留~"SSL":false// 是否加密 (加密是wss前缀)}// 创建一个客户端 var client = new Paho.MQTT.Client(option.ServerUri, Number(option.ServerPort), "clientId");client.onConnectionLost = onConnectionLost; // 定义连接丢失方法client.onMessageArrived = onMessageArrived; // 定义消息送达方法client.connect({onSuccess:onConnect}); // 连接// 连接成功后回调function onConnect() {// Once a connection has been made, make a subscription and send a message.console.log("onConnect");// 订阅主题client.subscribe("/World");// 新建消息message = new Paho.MQTT.Message("Hello");message.destinationName = "/World";// 发送消息client.send(message);};// 连接丢失处理function onConnectionLost(responseObject) {if (responseObject.errorCode !== 0)console.log("onConnectionLost:"+responseObject.errorMessage);};// 消息送达处理function onMessageArrived(message) {console.log("onMessageArrived:"+message.payloadString);client.disconnect();};

连接成功!

2. 方法说明:

1. 客户端实例化var client = new Paho.MQTT.Client(host, port, path, clientId);// host => 客户端hostname(ip),string// port => 客户端端口号,number// path => 路径 默认"/mqtt",string// clientId => 客户端id,String// Client()方法写两个参数(hostname和port)时,会默认将port值赋值给clientId, path默认为/mqtt// 写三个参数(hostname、port、clientId),path依旧默认为/mqtt2. 连接断开回调函数,断开后重连 client.onConnectionLost = onConnectionLost;//连接丢失事件function onConnectionLost(e) {if (e.errorCode !== 0) {WriteToStatus("连接异常断开:" + e.errorMessage);// 定时器重连 reconnect = setInterval(function () {client.connect({invocationContext: {host: option.ServerUri,//IP地址port: option.ServerPort,//端口号path: client.path,clientId: option.ClientId//标识},timeout: option.TimeOut,//连接超时时间keepAliveInterval: option.KeepAlive,//心跳间隔cleanSession: option.CleanSession,//是否清理SessionuseSSL: option.SSL,//是否启用SSLuserName: option.UserName, //用户名password: option.Password, //密码onSuccess: onConnect,//连接成功回调事件onFailure: onError//连接失败回调事件});},1000);}}3. 收到消息回调函数client.onMessageArrived = onMessageArrived;//接收消息事件function onMessageArrived(data) {WriteToStatus("收到消息:" + data.payloadString);// 写一些自己的逻辑什么的...}4. 订阅主题client.subscribe(filter,subscribeOptions);// filter: 主题名称(可以只传filter参数)// subscribeOptions: 参数,包括qos、OnSuccess、onFailure、invocationContext、timeout// 源码部分:this.subscribe = function (filter, subscribeOptions) {if (typeof filter !== "string")throw new Error("Invalid argument:"+filter);subscribeOptions = subscribeOptions || {} ;validate(subscribeOptions, {qos:"number",invocationContext:"object",onSuccess:"function",onFailure:"function",timeout:"number"});if (subscribeOptions.timeout && !subscribeOptions.onFailure)throw new Error("subscribeOptions.timeout specified with no onFailure callback.");if (typeof subscribeOptions.qos !== "undefined"&& !(subscribeOptions.qos === 0 || subscribeOptions.qos === 1 || subscribeOptions.qos === 2 ))throw new Error(format(ERROR.INVALID_ARGUMENT, [subscribeOptions.qos, "subscribeOptions.qos"]));client.subscribe(filter, subscribeOptions);};5. 取消订阅主题client.unsubscribe(filter, unsubscribeOptions);// 同订阅方法(可以只传filter参数)6. 连接服务器client.connect({invocationContext: {host: option.ServerUri,//IP地址port: option.ServerPort,//端口号path: client.path,clientId: option.ClientId//标识},timeout: option.TimeOut,//连接超时时间keepAliveInterval: option.KeepAlive,//心跳间隔cleanSession: option.CleanSession,//是否清理SessionuseSSL: option.SSL,//是否启用SSLuserName: option.UserName, //用户名password: option.Password, //密码onSuccess: onConnect,//连接成功回调事件onFailure: onError//连接失败回调事件});7. 断开连接client.disconnect();8. 重连这个js里没有配置重连的参数,只能自己写一个定时器,连接断开的时候开启,连接成功后clear9. 发送消息client.send(topic,payload,qos,retained); // topic: 主题// payload: 内容// qos: 发送频率// retained:是否保留(选择true,其他客户端订阅时会自动获取内容)也可以将参数封装: eg:let message = new Paho.MQTT.Message($("#txtContent").val());message.destinationName = currentTopic.TopicName;message.retained = true;client.send(message);// 源码:this.send = function (topic,payload,qos,retained) {var message ;if(arguments.length == 0){throw new Error("Invalid argument."+"length");}else if(arguments.length == 1) {if (!(topic instanceof Message) && (typeof topic !== "string"))throw new Error("Invalid argument:"+ typeof topic);message = topic;if (typeof message.destinationName === "undefined")throw new Error(format(ERROR.INVALID_ARGUMENT,[message.destinationName,"Message.destinationName"]));client.send(message);}else {//parameter checking in Message objectmessage = new Message(payload);message.destinationName = topic;if(arguments.length >= 3)message.qos = qos;if(arguments.length >= 4)message.retained = retained;client.send(message);}};

3. 测试版demo

demo是看一个博客上的,然后拿过来自己改了一些适合我的js和服务器,完善了几个功能点:

html:

<!DOCTYPE html><html><head><meta http-equiv="Content-Type" content="text/html; charset=utf-8"/><title>paho-mqtt测试demo</title><meta charset="utf-8" /><script src="/assets/expand/plugins/jquery/jquery-3.2.1.min.js"></script><script src="/assets/module/mqtt/mqttws31.js" type="text/javascript"></script><style>#content{background-color: white;width: 800px;}ul{padding-top: 20px;}li,#subTopics{display: inline;}#logResult > li{display: block;}#logDiv {height: 400px;overflow-y: auto;background-color: white;border: 1px solid #eaeaea;width: 700px;margin-left: 40px;}</style></head><body><div id="content"><ul><li><input id="txtIp" type="text" value="192.168.1.102"/></li><li><input id="txtPort" type="text" value="1803"/></li><li><input id="btnConnect" type="button" value="连接"/></li><li><input id="btnDisconnect" type="button" value="断开" disabled="disabled"/></li></ul><ul><li><div id="subTopics"></div></li><li><input id="btnSubscribe" type="button" value="订阅" disabled="disabled"/></li><li><input id="btnUnSubscribe" type="button" value="取消订阅" disabled="disabled"/></li><li><input id="btnClear" type="button" value="清空日志" /></li></ul><div id="logDiv"><ul id="logResult"></ul></div><ul style="padding-bottom: 15px;"><li><select id="pubTopics"></select></li><li><input id="txtContent" type="text"/></li><li><input id="btnPublish" type="button" value="发布" disabled="disabled"/></li></ul></div><script type="application/javascript" >//所有主题var allTopics = [// TopicName: 主题名称// TopicStr: 显示在多选框上的主题{"TopicName": "topic1", "TopicStr": "topic1" },{"TopicName": "topic2", "TopicStr": "topic2" },{"TopicName": "topic3", "TopicStr": "topic3" }];//选中订阅主题var selectedTopics = [];//客户端选项var option = {"ServerUri": "192.168.1.102","ServerPort": 1803,"UserName": "admin","Password": "123","ClientId": "","TimeOut": 5,"KeepAlive": 100,"CleanSession": false,"SSL":false,"reconnect" : true}// 定义客户端var client;$(function () {// 订阅主题选项 显示subTopicsShow(allTopics);// 发布主题选项 显示pubTopicsShow(allTopics);// 订阅主题选中事件$("#subTopics input[type=checkbox]").on("click", function () {let topic;for(let i in allTopics){if (allTopics[i].TopicName == $(this).val()) {topic = allTopics[i];}}if ($(this).is(":checked")) {//选中selectedTopics.push(topic);}else {//取消选择if(selectedTopics.length>0){for(var i in selectedTopics){var tmp = selectedTopics[i];if (tmp.TopicName == t) {selectedTopics.splice(i, 1);}}}}});// 订阅按钮点击$("#btnSubscribe").on("click", function () {if (!client) {alert("请连接服务端");return;}if(selectedTopics.length==0){alert("请选择要订阅的主题!");return;}var msg = "";for(var i in selectedTopics){var t = selectedTopics[i];client.subscribe(t.TopicName);msg += t.TopicName+";"}logConsole("成功订阅主题:" + msg);enable($("#btnUnSubscribe"), true);});/*取消订阅按钮点击事件*/$("#btnUnSubscribe").click(function () {if(selectedTopics.length == 0){alert("请选择要取消订阅的主题!");return;}let msg = "";for(let i in selectedTopics){client.unsubscribe(selectedTopics[i].TopicName);msg += selectedTopics[i].TopicName+";"}logConsole("成功取消主题订阅:" + msg);});//发布按钮点击事件$("#btnPublish").on("click", function () {if(!client){alert("请连接服务端");return;}if($("#txtContent").val()==""){alert("请输入要发布的内容");return;}let message = new Paho.MQTT.Message($("#txtContent").val());message.destinationName = $("#pubTopics").val();message.retained = true;client.send(message);logConsole("发布了主题为" + message.destinationName + "的消息:" + $("#txtContent").val())});});// 全部订阅主题选项显示function subTopicsShow(topics) {let html = "";for (let i = 0; i < topics.length;i++){html += topics[i].TopicStr;html += '<input type="checkbox" value="'+topics[i].TopicName+'"/>';}$("#subTopics").html(html);}// 发布主题选项显示function pubTopicsShow(topics) {let html = "";for (let i = 0; i < topics.length; i++) {html += '<option value="' + topics[i].TopicName + '">' + topics[i].TopicStr + '</option>';}$("#pubTopics").html(html);}/*清空按钮点击事件*/$("#btnClear").click(function () {$("#logResult").empty();});$(function () {//连接按钮点击事件$("#btnConnect").on("click", function () {if ($("#txtIp").val()!="") {option.ServerUri = $("#txtIp").val();}else {alert("请输入服务端IP!");return;}if($("#txtPort").val()!=""){option.ServerPort = Number($("#txtPort").val());}else {alert("请输入端口号!");return;}//客户端实例化client = new Paho.MQTT.Client(option.ServerUri, Number(option.ServerPort), "/mqtt","web");client.onConnectionLost = onConnectionLost;//绑定连接断开事件client.onMessageArrived = onMessageArrived;//绑定接收消息事件//连接服务端client.connect({invocationContext: {host: option.ServerUri,//IP地址port: option.ServerPort,//端口号path: client.path,clientId: option.ClientId//标识},timeout: option.TimeOut,//连接超时时间keepAliveInterval: option.KeepAlive,//心跳间隔cleanSession: option.CleanSession,//是否清理SessionuseSSL: option.SSL,//是否启用SSLuserName: option.UserName, //用户名password: option.Password, //密码onSuccess: onConnect,//连接成功回调事件onFailure: onError,//连接失败回调事件,reconnect: option.reconnect});});//断开按钮点击事件$("#btnDisconnect").on("click", function () {client.disconnect();console.log(client);client = null;logConsole("已断开连接!");enable($("#btnConnect"), true);enable($("#btnDisconnect"), false);enable($("#btnPublish"), false);enable($("#btnSubscribe"), false);enable($("#btnUnSubscribe"), false);});});//连接成功回调事件function onConnect() {logConsole("连接成功!");enable($("#btnConnect"), false);enable($("#btnDisconnect"), true);enable($("#btnPublish"), true);enable($("#btnSubscribe"), true);enable($("#btnUnSubscribe"), false);}//连接失败回调事件function onError(e) {logConsole("连接失败:" + e);enable($("#btnConnect"), true);enable($("#btnDisconnect"), true);enable($("#btnPublish"), false);enable($("#btnSubscribe"), false);}//连接断开事件function onConnectionLost(e) {if (e.errorCode !== 0) {logConsole("连接异常断开:" + e.errorMessage);enable($("#btnConnect"), true);enable($("#btnDisconnect"), true);enable($("#btnPublish"), false);enable($("#btnSubscribe"), false);}}//接收消息事件function onMessageArrived(data) {logConsole("收到消息:" + data.payloadString);}// 日志显示输出function logConsole(data) {var now = new Date();var message = '[' + now.toLocaleTimeString() + ']' + data;$("#logResult").append('<li>' + message + '</li>');}//切换按钮状态function enable(button,enabled) {if (enabled) {button.removeAttr("disabled");}else {button.attr("disabled", "disabled");}}</script></body></html>

效果:

demo里配置了断开重连参数,这个js跟上面分享的js文件多了reconnect部分,里面源码改了一个地方,之前重连成功后没有进入onSuccess方法,更改后重连触发成功事件,目前测试没有问题,如果有问题请大佬们指出!

js文件我放在我的资源里,或者从百度云下载也可!

链接:/s/1L85YGbD1x6zBIuIqqDBYjg 提取码:bmq9

4. 放几个公用的mqtt服务器:

TCP端口: 18831

WEBSOCKET端口:

1883 : MQTT, 未加密

8883 : MQTT, 加密

8884 : MQTT, 加密, client certificate required

8080 : MQTT over WebSockets, 未加密

8081 : MQTT over WebSockets, 加密

没有用户名和密码,不写或者随便写都行~

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。