记一次MQTT错误使用的姿势

抛开场景谈方案都是耍流氓

项目中使用了emqx来对硬件进行管理和状态数据的更新,但由于错误的使用姿势,导致在某个项目中,硬件设备超过了500个后,后台服务器接收处理mqtt的消息达到了惊人的2分钟延迟。

而项目甲方的服务器问题,导致无法修改emqx的配置来进行调优,流程特别繁琐,而且在调整过程中需要不断尝试。

所以就尝试在后台服务代码层面解决问题。

分析

当前场景为:
有A、B、C、D 等n个设备,后台服务器订阅了所有设备的所有topic,每台设备在数秒内会上报一条状态消息,由后台服务器处理、存档,并且将设备最新的信息,再发回设备端,另外需要在 A 设备上展示 B、C、D等设备的状态,同时A设备操作B设备的开、关门的业务。

有一天发现,当B设备开关门后,或者A指令B开门后,需要长达2分钟才能响应,在跟踪代码之后,发现了项目中错误的使用姿势,例如mqtt服务不止用来做状态更新上报等,还在请求中加入了大量的数据,又或者过分依赖mqtt的消息来做业务处理。

改进

在查阅文档,以及做了很多测试后,发现一开始对emqx的订阅有一些误解,原以为它的客户端(org.eclipse.paho.client.mqttv3.MqttClient)是非阻塞的,可以同时处理很多请求,后来经过测试发现,同一个客户端虽然可以既当做发送,又可以做订阅,但都是同步的,特别是发送的Qos=2的时候,并且当订阅数据爆炸的时候,客户端会依次处理,所以在大量的状态上报堆积后,业务处理不过来。

于是尝试了2种方案

  • 由于在以前的多后台实例的情况下(共享订阅),将发送Client,和订阅Client分开,并且订阅Client调整为5个,双节点情况下就是10个订阅客户端
  • 订阅客户端不跳转,而是将 messageArrived 回调方法中全部改成异步(其他线程去执行)

最后测试下来,还是采用了方案2,因为方案1会导致大量的数据锁等待超时或者死锁,在业务代码没有做优化切割前,是不适合这么做的,而在方案2中,代码比较简单点,增加了线程池,并且在数据落库的时候加个同步锁接口,目前稳定运行中。

经验分享

  • 将订阅客户端和发送客户端分开
  • 不重要的数据,发送的Qos设置成0即可,数据丢了不可惜
  • 不要在mqtt订阅中处理大量的业务,可以用新线程去处理
  • 增加订阅实例,使用共享订阅

站内相关文章:

Comment ()
如果您有不同的看法,或者疑问,欢迎指教