900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > mqtt客户端连接Emq服务器 断开重连的connect 一直超时等待

mqtt客户端连接Emq服务器 断开重连的connect 一直超时等待

时间:2019-07-28 01:19:15

相关推荐

mqtt客户端连接Emq服务器 断开重连的connect 一直超时等待

背景:当我们的设备更新完成,大量的设备重新连接到EMQ,导致EMQ 的cpu直接到100%,此时

错误日志:

info日志

在上述两张日志中,可以发现我publish消息时,错误日志为“客户端未连接”,但是在下面的info中,却发现在2毫秒后,仍然又receiver,那么客户端应该连接了。

背景:

那么真正的问题是,短时间内大量client连接到EMQ,导致EMQ的cpu达到100%,是的EMQ与我的服务器断开连接,服务器走断开重连,却一直没有走完client 连接 broker 的整个连接,所以我的服务器陷入了一直等待。

bug原因

先修知识:

心跳机制:

keep Alive 指定连接最大空闲时间T,当客户端检测道连接空闲时间超过T时,必须向Borker发送心跳报文PINGREQ, Broker收到心跳请求后返回心跳响应PINGRESP.若Broker超过1.5T时间没手法哦心跳请求则断开连接,并且投递遗嘱消息到订阅者,同样,若客户端超过一定时间仍没有收到心跳响应PINGRESP则断开连接。

(一)、为什么在EMQ压力大的时候,client 会与EMQ 断开连接??

EMQ与我的服务器有一个心跳机制,通过这个心跳机制来检测client与broker是否正常连接,在一定时间内心跳检测失败,则断开连接。EMQ因为cpu到大100%,消息处理不了,则对服务器的心跳PINGREQ,一直没有PINGRESP 回复,则EMQ提出client,服务器断开重连。

MQTT底层源码解析

1. mqtt 源码底层有一个心跳协议,起了一个pingTask的任务一直检测心跳活性

private class PingTask extends TimerTask {private static final String methodName = "PingTask.run";public void run() {//@Trace 660=Check schedule at {0}log.fine(CLASS_NAME, methodName, "660", new Object[]{Long.valueOf(System.nanoTime())});comms.checkForActivity();//检测心跳活性}}

2.通过心跳协议,检测当前客户端活性

public MqttToken checkForActivity(){ return this.checkForActivity(null); } public MqttToken checkForActivity(IMqttActionListener pingCallback){ MqttToken token = null; try{ token = clientState.checkForActivity(pingCallback); //检测心跳超时等待 }catch(MqttException e){ handleRunException(e); }catch(Exception e){handleRunException(e); } return token; }

3检测心跳超时等待

long nextPingTime = this.keepAlive;if (connected && this.keepAlive > 0) {long time = System.nanoTime();int delta = 100000;synchronized (pingOutstandingLock) { if (pingOutstanding > 0 && (time - lastInboundActivity >= keepAlive + delta)) {ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);}

(二)、为什么我们的服务器会在重连EMQ时,一直卡在Connect??

MQTT客户端与EMQ主机连接过程如图所示:

1. Client会与Broker建立TCP网络层连接,此时还没有进行MQTT协议。

2. Client发送CONNECT数据包给Broker、

3. Broker在收到CONNECT数据包之后,给Client返回一个CONNACK数据包

2.1. client向EMQ发送Connect后进入阻塞等待

1.默认情况下,MQTT客户端连接EMQ,在发送CONNECT包后,会进入组赛等待

public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {aClient.connect(options, null, null).waitForCompletion(getTimeToWait());}

2.默认情况下timeToWait =-1

protected long timeToWait = -1; // How long each method should wait for action to completepublic long getTimeToWait() {return this.timeToWait;}

3.因为默认情况下timeout=-1

public void waitForCompletion(long timeout) throws MqttException {final String methodName = "waitForCompletion";//@TRACE 407=key={0} wait max={1} token={2}log.fine(CLASS_NAME,methodName, "407",new Object[]{getKey(), Long.valueOf(timeout), this});MqttWireMessage resp = waitForResponse(timeout);//等待回复if (resp == null && !completed) {//@TRACE 406=key={0} timed out token={1}log.fine(CLASS_NAME,methodName, "406",new Object[]{getKey(), this});exception = new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);throw exception;}checkResult();}

protected MqttWireMessage waitForResponse(long timeout) throws MqttException {final String methodName = "waitForResponse";synchronized (responseLock) {//@TRACE 400=>key={0} timeout={1} sent={2} completed={3} hasException={4} response={5} token={6}log.fine(CLASS_NAME, methodName, "400",new Object[]{getKey(), Long.valueOf(timeout),Boolean.valueOf(sent),Boolean.valueOf(completed),(exception==null)?"false":"true",response,this},exception);while (!pleted) {if (this.exception == null) {try {//@TRACE 408=key={0} wait max={1}log.fine(CLASS_NAME,methodName,"408",new Object[] {getKey(), Long.valueOf(timeout)});if (timeout <= 0) { //传过来的timeout是timeToWait =-1responseLock.wait();//进入一直等待} else {responseLock.wait(timeout);}} catch (InterruptedException e) {exception = new MqttException(e);}}if (!pleted) {if (this.exception != null) {//@TRACE 401=failed with exceptionlog.fine(CLASS_NAME,methodName,"401",null,exception);throw exception;}if (timeout > 0) {// time up and still not completedbreak;}}}}//@TRACE 402=key={0} response={1}log.fine(CLASS_NAME,methodName, "402",new Object[]{getKey(), this.response});return this.response;}

2.2 client向EMQ发送connect后怎么唤醒阻塞等待

什么时候会结束线程等待呢?

当快照服务器接收EMQ发送的CONNACK时结束线程等待。

1. 进入connect方法,会调用connectActionListener 的connect方法

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {final String methodName = "connect";//... 省略部分代码未粘贴//...this.connOpts = options;this.userContext = userContext;final boolean automaticReconnect = options.isAutomaticReconnect();comms.setNetworkModules(createNetworkModules(serverURI, options));comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));//...省略部分代码未粘贴//...comms.setNetworkModuleIndex(0);connectActionListener.connect();//连接事件监听,进入此方法return userToken;}

public void connect() throws MqttPersistenceException {MqttToken token = new MqttToken(client.getClientId());token.setActionCallback(this);token.setUserContext(this);//...省略部分代码未粘贴try {comms.connect(options, token);//进入此方法}catch (MqttException e) {onFailure(token, e);}}

2. connectActionListener 的connect方法,启动ConnectBG线程去执行CONNECT(后台启动线程,执行connect)

public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {final String methodName = "connect"; synchronized (conLock) { if (isDisconnected() && !closePending) { //@TRACE 214=state=CONNECTING log.fine(CLASS_NAME,methodName,"214"); conState = CONNECTING; conOptions = options; //...省略部分代码未粘贴this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval()); this.clientState.setCleanSession(conOptions.isCleanSession()); this.clientState.setMaxInflight(conOptions.getMaxInflight()); tokenStore.open(); //进入下面这个对象的run方法 ConnectBG conbg = new ConnectBG(this, token, connect, executorService); conbg.start();} else { //...省略部分代码未粘贴 } } }

3. ConnectBG线程里找到run()方法,在这里启动了网络连接、receiver,sender,callback,并向EMQ服务器发送connect包

public void run() {Thread.currentThread().setName(threadName); final String methodName = "connectBG:run"; MqttException mqttEx = null; try {//...省略部分代码未粘贴//网络连接NetworkModule networkModule = networkModules[networkModuleIndex];networkModule.start(); //接收线程启动,进入此对象的润方法 receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream()); receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);//发送线程启动 sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());sender.start("MQTT Snd: "+getClient().getClientId(), executorService); //回调线程启动callback.start("MQTT Call: "+getClient().getClientId(), executorService); //发送connect包 internalSend(conPacket, conToken); } catch (MqttException ex) { //@TRACE 212=connect failed: unexpected exception log.fine(CLASS_NAME, methodName, "212", null, ex); mqttEx = ex; } catch (Exception ex) { //@TRACE 209=connect failed: unexpected exception log.fine(CLASS_NAME, methodName, "209", null, ex); mqttEx = ExceptionHelper.createMqttException(ex); } if (mqttEx != null) { shutdownConnection(conToken, mqttEx); } } }

4. 进入receiver的run方法,当获取到EMQ的MqttAck时,进入notifyReceiverAck()方法

public void run() {recThread = Thread.currentThread();recThread.setName(threadName);final String methodName = "run";MqttToken token = null;synchronized (lifecycle) {current_state = State.RUNNING;}try {State my_target;synchronized (lifecycle) {my_target = target_state;}while (my_target == State.RUNNING && (in != null)) {try {// instanceof checks if message is nullif (message instanceof MqttAck) {token = tokenStore.getToken(message);if (token!=null) {synchronized (token) {// Ensure the notify processing is done under a lock on the token// This ensures that the send processing can complete before the// receive processing starts! ( request and ack and ack processing// can occur before request processing is complete if not!//进入此方法clientState.notifyReceivedAck((MqttAck)message);}} //...省略部分代码未粘贴}

5. notifyReceiverAck()方法会校验消息类型,此处有MqttConnack回信校验。

protected void notifyReceivedAck(MqttAck ack) throws MqttException {//...省略部分代码未粘贴else if (ack instanceof MqttConnack) {int rc = ((MqttConnack) ack).getReturnCode();if (rc == 0) {synchronized (queueLock) {if (cleanSession) {clearState();// Add the connect token back in so that users can be // notified when connect completes.tokenStore.saveToken(token,ack);}inFlightPubRels = 0;actualInFlight = 0;restoreInflightMessages();connected();}} else {mex = ExceptionHelper.createMqttException(rc);throw mex;}clientComms.connectComplete((MqttConnack) ack, mex);//进入此方法notifyResult(ack, token, mex);tokenStore.removeToken(ack);// Notify the sender thread that there maybe work for it to do nowsynchronized (queueLock) {queueLock.notifyAll();}} else {notifyResult(ack, token, mex);releaseMessageId(ack.getMessageId());tokenStore.removeToken(ack);}checkQuiesceLock();}

7. 进入notifyResult方法,此方法会将responseLock等待线程唤醒

protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {final String methodName = "notifyResult";// unblock any threads waiting on the token token.internalTok.markComplete(ack, ex);//进入此方法token.internalTok.notifyComplete();//...省略部分代码未粘贴}

protected void notifyComplete() {//...省略部分代码未粘贴synchronized (responseLock) {// If pending complete is set then normally the token can be marked// as complete and users notified. An abnormal error may have // caused the client to shutdown beween pending complete being set// and notifying the user. In this case - the action must be failed.if (exception == null && pendingComplete) {completed = true;pendingComplete = false;} else {pendingComplete = false;}//唤醒等待的线程responseLock.notifyAll();}synchronized (sentLock) {sent=true;sentLock.notifyAll();}}

解决方案:

1.设置mqttClient的timeToWait,则responseLock 不会一直等待,等待时间到后会报异常

2.设置断线自动重连

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。