**结果:本地模拟测试了,不会丢失** 生产者 :这边作为客户端也连接进mqtt,for循环5000个发送消息至对应主题 for (int x=0;x mqttPushClient.publish("ruby", String.valueOf(finalX))).start(); } 消费者:加了Thread.sleep(1000);模拟插入到redis ExecutorService executorService = Executors.newFixedThreadPool(10); private static AtomicInteger num = new AtomicInteger(0); @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + message.getQos()); String s = new String(message.getPayload()); log.info("接收消息内容 : " + s); executorService.execute(() -> { // 处理接收到的消息 handler(s); }); } private void handler(String s) { try { Thread.sleep(1000); num.incrementAndGet(); System.err.println(num.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } } 最后一个线程走完正好5000,生产者很快就发完了,消费者睡一秒导致整体这消费速度有点慢 