公司项目使用了StreamListener进行监听redis stream流消息数据;但每隔十几二十天的就会失效监听不到数据; 初步判断:应该是网络或者连接数等问题导致程序与redis服务断开连接,但问题还是无法定位。 以下是代码,有大佬知道或者遇到过这类问题嘛,还请指教如何解决! @Bean public List subscription(RedisConnectionFactory factory){ List resultList = new ArrayList(); var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); for (String redisStreamName : redisStreamNames) { initStream(redisStreamName,groups[0]); var listenerContainer = StreamMessageListenerContainer.create(factory,options); Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(groups[0], this.getClass().getName()), StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()), streamListener); resultList.add(subscription); listenerContainer.start(); } return resultList; } public class ListenerMessage implements StreamListener> { RedisCache redisCache; public ListenerMessage(RedisCache redisCache){ this.redisCache = redisCache; } @Override public void onMessage(MapRecord entries) { try{ Map map = entries.getValue(); String private_chat = map.get("private_chat"); MessageSave messageSave = JSON.toJavaObject(JSON.parseObject(private_chat),MessageSave.class); log.info("当前正在处理:{}",messageSave.getMsgtime()); QyTagService qyTagService = SpringUtils.getBean(QyTagService.class); qyTagService.auditPrivateMessage(messageSave); //check用于验证key和对应消息是否一直 log.info("stream name :{}, body:{}, check:{}",entries.getStream(), map,(entries.getStream().equals(map.get("name")))); redisCache.ack(entries.getStream(),"group2",entries.getId().getValue()); redisCache.delField(entries.getStream(),entries.getId().getValue()); }catch (Exception e){ log.error("error message:{}",e.getMessage()); } } } redis配置: redis: expire: 60000 # 过期时间 database: 0 # Redis使用的库 host: port: 6379 #端口号 timeout: 100000 # 连接超时时间(毫秒) cache: type: redis #使用redis做缓存