MQTT消费者接收的数据会出现丢失吗?-灵析社区

silennn

MQTT这边有个消费端,订阅了对应主题,messageArrived接收订阅的消息。handle方法就是不走数据库直接插入到redis,后期再消费。现在五千个设备,如果每隔1小时同时向MQTT这边上报数据,消费端这边设置了10个线程,这种写法会丢失数据吗? ExecutorService executorService = Executors.newFixedThreadPool(10); public synchronized void messageArrived (final String topic, MqttMessage message ) throws Exception { final String msg = new String(message.getPayload()); // System.err.println("【MQTT-消费端】接收消息主题 : " + topic); // System.err.println("【MQTT-消费端】接收消息内容 : " + msg); executorService.execute(new Runnable() { public void run() { handle(topic,msg); } }); }

阅读量:14

点赞量:0

问AI
结果:本地模拟测试了,不会丢失 生产者 :这边作为客户端也连接进mqtt,for循环5000个发送消息至对应主题 for (int x=0;x mqttPushClient.publish("ruby", String.valueOf(finalX))).start(); } 消费者:加了Thread.sleep(1000);模拟插入到redis ExecutorService executorService = Executors.newFixedThreadPool(10); private static AtomicInteger num = new AtomicInteger(0); @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + message.getQos()); String s = new String(message.getPayload()); log.info("接收消息内容 : " + s); executorService.execute(() -> { // 处理接收到的消息 handler(s); }); } private void handler(String s) { try { Thread.sleep(1000); num.incrementAndGet(); System.err.println(num.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } } 最后一个线程走完正好5000,生产者很快就发完了,消费者睡一秒导致整体这消费速度有点慢 "image.png" (https://wmlx-new-image.oss-cn-shanghai.aliyuncs.com/images/20241024/9c9a0c58521980629aa32494a6f1a269.png)