生产者有什么办法感知到队列中,积压消息得数量吗? 需要自己扩展逻辑吗
微服务与幂等性随着应用架构由单体架构到微服务架构进行演变,现如今市面上超过50%的应用都会基于分布式或微服务完成系统架构设计。在微服务架构体系内,就会存在若干个微服务,这些服务可能基于RPC或者HTTPS等协议进行通讯。那么既然服务之间存在相互调用,那么必然存在服务调用延迟或者失败的情况,当出现这种问题,服务端会进行重试等操作或客户端有可能会进行多次点击提交。如果这样请求多次的话,那最终处理的数据结果就一定要保证统一,如支付场景。此时就需要通过保证业务幂等性方案来完成。幂等性简介幂等本身是一个数学概念。即 f(n) = 1^n ,无论n为多少,f(n)的值永远为1。在编程开发中,对于幂等的定义为:无论对某一个资源操作了多少次,其影响都应是相同的。 换句话说就是:在接口重复调用的情况下,对系统产生的影响是一样的,但是返回值允许不同,如查询。幂等性包括数据幂等、接口幂等、服务幂等、消息幂等。以SQL为例:select * from table where id=1。此SQL无论执行多少次,虽然结果有可能出现不同,都不会对数据产生 改变,具备幂等性。insert into table(id,name) values(1,'小莫') 。此SQL如果id或name有唯一性约束,多次操作只允许插 入一条记录,则具备幂等性。如果不是,则不具备幂等性,多次操作会产生多条数据。update table set score=100 where id = 1 。此SQL无论执行多少次,对数据产生的影响都是相同的。具备幂等性。update table set score=50+score where id = 1 。此SQL涉及到了计算,每次操作对数据都会产生影响。 不具备幂等性。delete from table where id = 1 。此SQL多次操作,产生的结果相同,具备幂等性。幂等性设计主要从两个维度进行考虑:空间、时间。空间:定义了幂等的范围,如生成订单的话,不允许出现重复下单。时间:定义幂等的有效期。有些业务需要永久性保证幂等,如下单、支付等。而部分业务只要保证一段时间幂等即可。同时对于幂等的使用一般都会伴随着出现锁的概念,用于解决并发安全问题。业务与幂等性在业务开发与分布式系统设计中,幂等性是一个非常重要的概念,有非常多的场景需要考虑幂等性的问题,尤其对于现在的分布式系统,经常性的考虑重试、重发等操作,一旦产生这些操作,则必须要考虑幂等性问题。以交易系统、支付系统等尤其明显,如:当用户购物进行下单操作,用户操作多次,但订单系统对于本次操作只能产生一个订单。当用户对订单进行付款,支付系统不管出现什么问题,应该只对用户扣一次款。当支付成功对库存扣减时,库存系统对订单中商品的库存数量也只能扣减一次。当对商品进行发货时,也需保证物流系统有且只能发一次货。在电商系统中还有非常多的场景需要保证幂等性。但是一旦考虑幂等后,服务逻辑务必会变的更加复杂。因此是否要考虑幂等,需要根据具体业务场景具体分析。而且在实现幂等时,还会把并行执行的功能改为串行化,降低了执行效率。此处以下单减库存为例,当用户生成订单成功后,会对订单中商品进行扣减库存。 订单服务会调用库存服务 进行库存扣减。库存服务会完成具体扣减实现。现在对于功能调用的设计,有可能出现调用超时,因为出现如网络抖动,虽然库存服务执行成功了,但结果并没有在超时时间内返回,则订单服务也会进行重试。那就会出现问题,stock对于之前的执行已经成功了, 只是结果没有按时返回。而订单服务又重新发起请求对商品进行库存扣减。 此时出现库存扣减两次的问题。 对于这种问题,就需要通过幂等性进行结果。接口幂等对于幂等的考虑,主要解决两点前后端交互与服务间交互。这两点有时都要考虑幂等性的实现。从前端的思路解决 的话,主要有三种:前端防重、PRG模式、Token机制。前端防重通过前端防重保证幂等是最简单的实现方式,前端相关属性和JS代码即可完成设置。可靠性并不好,有经验的人员 可以通过工具跳过页面仍能重复提交。主要适用于表单重复提交或按钮重复点击。PRG模式PRG模式即POST-REDIRECT-GET。当用户进行表单提交时,会重定向到另外一个提交成功页面,而不是停留在原先的表单页面。这样就避免了用户刷新导致重复提交。同时防止了通过浏览器按钮前进/后退导致表单重复提交。 是一种比较常见的前端防重策略。Token机制方案介绍通过token机制来保证幂等是一种非常常见的解决方案,同时也适合绝大部分场景。该方案需要前后端进行一定程 度的交互来完成。1)服务端提供获取token接口,供客户端进行使用。服务端生成token后,如果当前为分布式架构,将token存放 于redis中,如果是单体架构,可以保存在jvm缓存中。2)当客户端获取到token后,会携带着token发起请求。3)服务端接收到客户端请求后,首先会判断该token在redis中是否存在。如果存在,则完成进行业务处理,业务 处理完成后,再删除token。如果不存在,代表当前请求是重复请求,直接向客户端返回对应标识。但是现在有一个问题,当前是先执行业务再删除token。在高并发下,很有可能出现第一次访问时token存在,完 成具体业务操作。但在还没有删除token时,客户端又携带token发起请求,此时,因为token还存在,第二次请求 也会验证通过,执行具体业务操作。对于这个问题的解决方案的思想就是并行变串行。会造成一定性能损耗与吞吐量降低。第一种方案:对于业务代码执行和删除token整体加线程锁。当后续线程再来访问时,则阻塞排队。第二种方案:借助redis单线程和incr是原子性的特点。当第一次获取token时,以token作为key,对其进行自增。 然后将token进行返回,当客户端携带token访问执行业务代码时,对于判断token是否存在不用删除,而是对其继续incr。如果incr后的返回值为2。则是一个合法请求允许执行,如果是其他值,则代表是非法请求,直接返回。那如果先删除token再执行业务呢?其实也会存在问题,假设具体业务代码执行超时或失败,没有向客户端返回明确结果,那客户端就很有可能会进行重试,但此时之前的token已经被删除了,则会被认为是重复请求,不再进行业务处理。这种方案无需进行额外处理,一个token只能代表一次请求。一旦业务执行出现异常,则让客户端重新获取令牌, 重新发起一次访问即可。推荐使用先删除token方案但是无论先删token还是后删token,都会有一个相同的问题。每次业务请求都回产生一个额外的请求去获取 token。但是,业务失败或超时,在生产环境下,一万个里最多也就十个左右会失败,那为了这十来个请求,让其他九千九百多个请求都产生额外请求,就有一些得不偿失了。虽然redis性能好,但是这也是一种资源的浪费。实现基于自定义业务流程实现这种实现方式省略,与传统实现无异。基于自定义注解实现直接把token实现嵌入到方法中会造成大量重复代码的出现。因此可以通过自定义注解将上述代码进行改造。在需 要保证幂等的方法上,添加自定义注解即可。1. 在token_common中新建自定义注解Idemptentpublic class IdemptentInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (!(handler instanceof HandlerMethod)) { return true; } HandlerMethod handlerMethod = (HandlerMethod) handler; Method method = handlerMethod.getMethod(); Idemptent annotation = method.getAnnotation(Idemptent.class); if (annotation != null){ //进行幂等性校验 checkToken(request); } return true; } @Autowired private RedisTemplate redisTemplate; //幂等性校验 private void checkToken(HttpServletRequest request) { String token = request.getHeader("token"); if (StringUtils.isEmpty(token)){ throw new RuntimeException("非法参数"); } boolean delResult = redisTemplate.delete(token); if (!delResult){ //删除失败 throw new RuntimeException("重复请求"); } } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { } }2. 修改token_service_order启动类,让其继承WebMvcConfigurerAdapter@Bean public IdemptentInterceptor idemptentInterceptor() { return new IdemptentInterceptor(); } @Override public void addInterceptors(InterceptorRegistry registry) { //幂等拦截器 registry.addInterceptor(idemptentInterceptor()); super.addInterceptors(registry); }3. 更新token_service_order与token_service_order_api,新增添加订单方法,并且方法添加自定义幂等注解@Idemptent @PostMapping("/genOrder2") public String genOrder2(@RequestBody Order order){ order.setId(String.valueOf(idWorker.nextId())); order.setCreateTime(new Date()); order.setUpdateTime(new Date()); int result = orderService.addOrder(order); if (result == 1){ System.out.println("success"); return "success"; }else { System.out.println("fail"); return "fail"; } }
什么是消息通知系统消息通知系统,顾名思义即通知消息的传达处理系统。目的是为了让用户获得需要得到的消息及提醒并进行处理。消息通知微服务的定位是“平台内”的“消息”功能,分为全员消息,订阅类消息,点对点消息。例如系统通知,私信,@类消息全员消息系统通知,活动通知,管理员公告等全部用户都会收到的消息订阅类消息关注某一类数据的用户,该类数据有更新时向用户发送的消息。例如关注某位大v的微博,公众号,订阅某位知名作家的专栏点对点消息某位用户对另外一位用户进行操作后,系统向被操作的用户发送的消息。例如点赞,发红包。系统特性通讯方式这里先不考虑后端整体实现,前端与后端之间通讯方式如何选型,如何实现实时/准实时数据交互:需要介绍下三种通讯方式:短连接客户端和服务器每进行一次通讯,就建立一次连接,通讯结束就中断连接。HTTP是一个简单的请求-响应协议,它通常运行在TCP之上。HTTP/1.0使用的TCP默认是短连接。长连接是指在建立连接后可以连续多次发送数据,直到双方断开连接。HTTP从1.1版本起,底层的TCP使用的长连接。使用长连接的HTTP协议,会在响应头加入代码:Connection:keep-alive短连接和长连接的区别通讯流程短连接:创建连接 -> 传输数据 -> 关闭连接 长连接:创建连接 -> 传输数据 -> 保持连接 -> 传输数据 -> …… -> 关闭连接适用场景短连接:并发量大,数据交互不频繁情况长连接:数据交互频繁,点对点的通讯websocket协议什么是websocket协议WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。何谓全双工:全双工(FullDuplex)是通讯传输的一个术语。双方在通信时允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时进行信号的双向传输。指A→B的同时B→A,就像是双向车道。单工就就像是汽车的单行道,是在只允许甲方向乙方传送信息,而乙方不能向甲方传送 。服务器向客户端发送数据的功能是websocket协议的典型使用场景三种通信方式的优缺点优缺点如下:在 WebSocket中,浏览器和服务器只需要完成一次握手,就可以创建持久性的连接,并进行双向数据传输。在推送功能的实现技术上,相比使用Ajax 定时轮询的方式(setInterval),WebSocket 更节省服务器资源和带宽。出于服务器性能和实时性考虑,前后端通讯方式采用WebSocket协议实现。
为什么使用Netty我们已经有了NIO能够提高程序效率了,为什么还要使用Netty?简单的说:Netty封装了JDK的NIO,让你用得更爽,你不用再写一大堆复杂的代码了。官方术语:Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。下面是使用Netty不使用JDK原生NIO的一些原因:使用JDK自带的NIO需要了解太多的概念,编程复杂Netty底层IO模型随意切换,而这一切只需要做微小的改动,就可以直接从NIO模型变身为IO模型Netty自带的拆包解包,异常检测等机制,可以从NIO的繁重细节中脱离出来,只需要关心业务逻辑Netty解决了JDK的很多包括空轮询在内的bugNetty底层对线程,selector做了很多细小的优化,精心设计的线程模型做到非常高效的并发处理自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手Netty社区活跃,遇到问题随时邮件列表或者issueNetty已经历各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大和IO编程一样的案例:添加Netty依赖<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.5.Final</version> </dependency>服务端:public class NettyServer { public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup boos = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap .group(boos, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) .bind(8000); } }客户端:public class NettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }); Channel channel = bootstrap.connect("127.0.0.1", 8000).channel(); while (true) { channel.writeAndFlush("测试数据"); Thread.sleep(2000); } } }Netty的事件驱动例如很多系统都会提供 onClick() 事件,这个事件就代表鼠标按下事件。事件驱动模型的大体思路如下:有一个事件队列;鼠标按下时,往事件队列中增加一个点击事件;有个事件泵,不断循环从队列取出事件,根据不同的事件,调用不同的函数;事件一般都各自保存各自的处理方法的引用。这样,每个事件都能找到对应的处理方法;为什么使用事件驱动?程序中的任务可以并行执行任务之间高度独立,彼此之间不需要互相等待在等待的事件到来之前,任务不会阻塞Netty使用事件驱动的方式作为底层架构,包括:事件队列(event queue):接收事件的入口。分发器(event mediator):将不同的事件分发到不同的业务逻辑单元。事件通道(event channel):分发器与处理器之间的联系渠道。事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作。核心组件Netty 的功能特性图:Netty 功能特性:传输服务,支持 BIO 和 NIO。容器集成:支持 OSGI、JBossMC、Spring、Guice 容器。协议支持:HTTP、Protobuf、二进制、文本、WebSocket 等,支持自定义协议。BIO和NIO的区别:Netty框架包含如下的组件:ServerBootstrap :用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端。Bootstrap:不接受新的连接,并且是在父通道类完成一些操作,一般用于客户端的。Channel:对网络套接字的I/O操作,例如读、写、连接、绑定等操作进行适配和封装的组件。EventLoop:处理所有注册其上的channel的I/O操作。通常情况一个EventLoop可为多个channel提供服务。EventLoopGroup:包含有多个EventLoop的实例,用来管理 event Loop的组件,可以理解为一个线程池,内部维护了一组线程。ChannelHandler和ChannelPipeline:例如一个流水线车间,当组件从流水线头部进入,穿越流水线,流水线上的工人按顺序对组件进行加工,到达流水线尾部时商品组装完成。流水线相当于ChannelPipeline,流水线工人相当于ChannelHandler,源头的组件当做event。ChannelInitializer:用于对刚创建的channel进行初始化,将ChannelHandler添加到channel的ChannelPipeline处理链路中。ChannelFuture:与jdk中线程的Future接口类似,即实现并行处理的效果。可以在操作执行成功或失败时自动触发监听器中的事件处理方法。上面的Netty框架包含如下的组件大概看的有点蒙,我们对之前编写的代码加上注释:服务端:public class NettyServer { public static void main(String[] args) { // 用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端。 ServerBootstrap serverBootstrap = new ServerBootstrap(); // EventLoopGroup包含有多个EventLoop的实例,用来管理event Loop的组件 // 接受新连接线程 NioEventLoopGroup boos = new NioEventLoopGroup(); // 读取数据的线程 NioEventLoopGroup worker = new NioEventLoopGroup(); //服务端执行 serverBootstrap .group(boos, worker) // Channel对网络套接字的I/O操作, // 例如读、写、连接、绑定等操作进行适配和封装的组件。 .channel(NioServerSocketChannel.class) // ChannelInitializer用于对刚创建的channel进行初始化 // 将ChannelHandler添加到channel的ChannelPipeline处理链路中。 .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { // 组件从流水线头部进入,流水线上的工人按顺序对组件进行加工 // 流水线相当于ChannelPipeline // 流水线工人相当于ChannelHandler ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { //这个工人有点麻烦,需要我们告诉他干啥事 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) .bind(8000); } } 客户端:public class NettyClient { public static void main(String[] args) throws InterruptedException { // 不接受新的连接,并且是在父通道类完成一些操作,一般用于客户端的。 Bootstrap bootstrap = new Bootstrap(); // EventLoopGroup包含有多个EventLoop的实例,用来管理event Loop的组件 NioEventLoopGroup group = new NioEventLoopGroup(); //客户端执行 bootstrap.group(group) // Channel对网络套接字的I/O操作, // 例如读、写、连接、绑定等操作进行适配和封装的组件。 .channel(NioSocketChannel.class) // 用于对刚创建的channel进行初始化, // 将ChannelHandler添加到channel的ChannelPipeline处理链路中。 .handler(new ChannelInitializer<Channel>() {
Kafka消息可靠性第一种情况消费端消息丢失 场景描述: 位移提交:对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。单词"offset"可以编译为"偏移量",也可以翻译为"位移",在很多的中文资料中都会交叉使用"偏移量"和"位移"这两个词,对于消息在分区中的位置,我们将offset称之为"偏移量";对于消费者消费到的位置,将offset称为"位移",有时候也会更明确地称之为"消费位移"("偏移量"是在讲分区存储层面的内容,"位移"是在讲消费层面的内容)当然对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的。当每一次调用poll()方法时,它返回的是还没有消费过的消息集(当然这个前提是消息以及存储在Kafka中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来的动作称之为"提交",消费者在消费完消息之后需要执行消费位移的提交。默认情况下Kafka的消费位移提交是自动提交,这个由消费者客户端参数enable.auto.commit配置,默认值为true。当然这个默认的自动提交并不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒。自动位移提交的动作是在poll()方法的逻辑里面完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。自动位移提交带来为问题: 1、重复消费 2、消息丢失问题描述重复消费原理如下图所示: 消息丢失如下图所示:解决方案解决方案:将自动位移提交更改为手动位移提交注意: 1、在单独使用Kafka的java客户端将位移提交的模式更改为手动位移提交,那么我们就需要显示的调用consumer的方法完成位移提交。// 通过Kafka的方式消费消息,如果将位移提交更改为手动位移提交。那么我们就需要主动调用consumer的方法完成位移提交 private static void consumerFromKafka() { // 创建属性集合,设置初始化参数 Properties properties = new Properties(); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("bootstrap.servers", "192.168.23.131:9092"); properties.put("group.id", "dd.demo"); properties.put("enable.auto.commit" , "false") ; // 创建KafkaConsumer对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("dd")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } // consumer.commitSync(); // 进行同步位移提交,会阻塞当前线程 consumer.commitAsync(new OffsetCommitCallback(){ @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null) { System.out.println(offsets); } else { LOGGER.error("fail to commit offsets {}", offsets, exception); } } }); } } 2、 在使用的是spring boot和Kafka进行整合,当我们将spring.kafka.consumer.enable-auto-commit的值设置为false以后,只有在特定的提交模式下我们可以手动进行提交。在一些提交模式下由spring根据约定的条件控制提交。常见的提交模式是在ContainerProperties.AckMode这个枚举类中定义。AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:RECORD : 每处理一条commit一次BATCH : 每次poll的时候批量提交一次,频率取决于每次poll的调用频率TIME : 每次间隔ackTime的时间去commitCOUNT : 累积达到ackCount次的ack去commitCOUNT_TIME: ackTime或ackCount哪个条件先满足,就commitMANUAL : listener负责ack,但是背后也是批量上去MANUAL_IMMEDIATE : listner负责ack,每调用一次,就立即commitspring配置如下:# kafka配置 spring: kafka: consumer: bootstrap-servers: 192.168.23.131:9092 enable-auto-commit: false # 设置手动位移提交 listener: ack-mode: manual_immediate 代码实现:@KafkaListener(topics = "dd" , groupId = "dd.demo") public void consumerHandler(String msg , KafkaConsumer consumer) { LOGGER.info("consumer topic is : {} , msg is ----> {} " , "itheima" , msg); // consumer.commitSync(); // 同步位移提交 consumer.commitAsync((offsets , e) -> { if(e == null){ LOGGER.info("位移提交成功...."); }else { Set<Map.Entry<TopicPartition, OffsetAndMetadata>> entries =offsets.entrySet(); for(Map.Entry<TopicPartition , OffsetAndMetadata> entry :entries) { TopicPartition topicPartition = entry.getKey(); OffsetAndMetadata offsetAndMetadata = entry.getValue(); LOGGER.info("位移提交失败....topicPartition is {} ,offsets is {} " , topicPartition.partition() ,offsetAndMetadata.offset()); } } }); } 第二种情况生产者将消息发送到Broker中以后,消息还没有被及时消费,此时Broker宕机了,这样就会导致消息丢失。问题描述Kafka消息的发送流程: 在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程会根据指定的条件,不断从RecordAccumulator中拉取消息发送到Kafka broker。如下图所示: Sender线程拉取消息的条件:缓冲区大小达到一定的阈值(默认是16384byte),可以通过spring.kafka.producer.batch-size进行设定缓冲区等待的时间达到设置的阈值(默认是0), 可以通过linger.ms属性进行设定发送消息的三种方式:1、发后即忘 2、同步消息发送 3、异步消息发送发后即忘 只管往kafka发送消息(消息只发送到缓冲区)而并不关心消息是否正确到达。正常情况没什么问题,不过有些时候(比如不可重试异常)会造成消息的丢失。 这种发送方式性能最高,可靠性最差。如下所示:// 演示消息发送: 发送即忘 public static void sendMessageMethod01() { for(int x = 0 ; x < 5 ; x++) { String msg = "Kakfa环境测试...." + x ; kafkaTemplate.send("itheima" , msg) ; // 发后即 忘 LOGGER.info("send msg success ---> {} " , msg); } } 同步消息发送 其实kafkaTemplate.send方法并不是返回void,而是ListenableFuture<SendResult<K, V>>,该类继承了jdk concurrent包的Future。如下:@Override public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data); return doSend(producerRecord); } 12345 要实现同步发送的,可以利用返回的ListenableFuture实现,如下:// 演示消息发送: 同步发送 public static void sendMessageMethod02() throws ExecutionException, InterruptedException { for(int x = 0 ; x < 5 ; x++) { String msg = "Kakfa环境测试...." + x ; kafkaTemplate.send("itheima" , msg).get() ; // 同步消息发 送 LOGGER.info("send msg success ---> {} " , msg); } } 实际上send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send()方法之后直接链式调用了get()方法可以阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。这种方式性能最差,可靠性较好。异步发送 在send方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。异步发送方式的示例如下:// 演示消息发送: 异步发送 public static void sendMessageMethod03() throws ExecutionException, InterruptedException { for(int x = 0 ; x < 5 ; x++) { // Kafka消息的异步发送 String msg = "Kakfa环境测试...." + x ; kafkaTemplate.send("itheima" , msg).addCallback((obj) -> { LOGGER.info("send msg to kafka broker success ---> {} " , ((SendResult)obj).getProducerRecord().value()); } , (t) -> { t.printStackTrace(); }); LOGGER.info("send msg to local cache success ---> {} " , msg); } } 这种方式的特点:性能较好,可靠性也有保障解决方案针对上述情况所产生的消息丢失,可以的解决方案有如下几种:给topic设置replication.factor参数大于1,要求每个partition必须最少有两个副本搭建Kafka集群,让各个分区副本均衡的分配到不同的broker上在producer端设置acks=all,要求每条数据写入replicas后,才认为写入成功:ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。ack=1,默认值为1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息(Kafka生产者内部机制)。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来 的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。ack=-1/all, 只有当所有参与复制的节点都收到消息时,生产者会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。当生产者发送消息完毕以后,没有收到Broker返回的ack,此时就会触发重试机制或者抛出异常。我们可以通过retries参数设置重试次数(spring boot和Kafka整合默认的重试次数为0),发送客户端会进行重试直到broker返回ack。
修改配置修改消息通知微服务模块tensquare_notice的pom文件,添加下面的dependency依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.5.Final</version> </dependency>2. 修改application.yml文件,添加下面的配置 rabbitmq: host: 192.168.200.128这样消息通知微服务就引入了netty框架,并且具有了和Rabbitmq交互的能力实现Netty的整合整合分析现在的通讯模式如下:因为使用到了WebSocket和Netty,整合方式和以前有所不同,整合步骤:编写NettyServer,启动Netty服务。使用配置Bean创建Netty服务。编写NettyConfig。编写和WebSocket进行通讯处理类MyWebSocketHandler,进行MQ和WebSocket的消息处理。使用配置Bean创建Rabbit监听器容器,使用监听器。编写RabbitConfig。编写Rabbit监听器SysNoticeListener,用来获取MQ消息并进行处理。五个类的关系如下图:实现整合1. 编写ApplicationContextProvider.java,这个类是工具类,作用是获取Spring容器中的实例@Component public class ApplicationContextProvider implements ApplicationContextAware { /** * 上下文对象实例 */ private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } /** * 获取applicationContext * * @return */ public static ApplicationContext getApplicationContext() { return applicationContext; } /** * 通过name获取 Bean. * * @param name * @return */ public Object getBean(String name) { return getApplicationContext().getBean(name); } /** * 通过class获取Bean. * * @param clazz * @param <T> * @return */ public <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } /** * 通过name,以及Clazz返回指定的Bean * * @param name * @param clazz * @param <T> * @return */ public <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); } } 2. 编写NettyServerpublic class NettyServer { public void start(int port) { System.out.println("准备启动Netty。。。"); ServerBootstrap serverBootstrap = new ServerBootstrap(); //用来处理新连接的 EventLoopGroup boos = new NioEventLoopGroup(); //用来处理业务逻辑的,读写。。。 EventLoopGroup worker = new NioEventLoopGroup(); serverBootstrap.group(boos, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { //请求消息解码器 ch.pipeline().addLast(new HttpServerCodec()); // 将多个消息转换为单一的request或者response对象 ch.pipeline().addLast(new HttpObjectAggregator(65536)); //处理WebSocket的消息事件 ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws")); //创建自己的webSocket处理器,就是用来编写业务逻辑的 MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler(); ch.pipeline().addLast(myWebSocketHandler); } }).bind(port); } } 3. 编写NettyConfig@Configuration public class NettyConfig { @Bean public NettyServer createNettyServer() { NettyServer nettyServer = new NettyServer(); //启动Netty服务,使用新的线程启动 new Thread(){ @Override public void run() { nettyServer.start(1234); } }.start(); return nettyServer; } }4. 编写MyWebSocketHandlerpublic class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static ObjectMapper MAPPER = new ObjectMapper(); // 送Spring容器中获取消息监听器容器,处理订阅消息sysNotice SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext() .getBean("sysNoticeContainer"); //从Spring容器中获取RabbitTemplate RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext() .getBean(RabbitTemplate.class); //存放WebSocket连接Map,根据用户id存放 public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap(); //用户请求WebSocket服务端,执行的方法 @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //约定用户第一次请求携带的数据:{"userId":"1"} //获取用户请求数据并解析 String json = msg.text(); //解析json数据,获取用户id String userId = MAPPER.readTree(json).get("userId").asText(); //第一次请求的时候,需要建立WebSocket连接 Channel channel = userChannelMap.get(userId); if (channel == null) { //获取WebSocket的连接 channel = ctx.channel(); //把连接放到容器中 userChannelMap.put(userId, channel); } //只用完成新消息的提醒即可,只需要获取消息的数量 //获取RabbitMQ的消息内容,并发送给用户 RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate); //拼接获取队列名称 String queueName = "article_subscribe_" + userId; //获取Rabbit的Properties容器 Properties queueProperties = rabbitAdmin.getQueueProperties(queueName); //获取消息数量 int noticeCount = 0; //判断Properties是否不为空 if (queueProperties != null) { // 如果不为空,获取消息的数量 noticeCount = (int) queueProperties.get("QUEUE_MESSAGE_COUNT"); } //封装返回的数据 HashMap countMap = new HashMap(); countMap.put("sysNoticeCount", noticeCount); Result result = new Result(true, StatusCode.OK, "查询成功", countMap); //把数据发送给用户 channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result))); //把消息从队列里面清空,否则MQ消息监听器会再次消费一次 if (noticeCount > 0) { rabbitAdmin.purgeQueue(queueName, true); } //为用户的消息通知队列注册监听器,便于用户在线的时候, //一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据 sysNoticeContainer.addQueueNames(queueName); } }5. 编写RabbitConfig@Configuration public class RabbitConfig { @Bean("sysNoticeContainer") public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //使用Channel container.setExposeListenerChannel(true); //设置自己编写的监听器 container.setMessageListener(new SysNoticeListener()); return container; } }6. 编写SysNoticeListenerpublic class SysNoticeListener implements ChannelAwareMessageListener { private static ObjectMapper MAPPER = new ObjectMapper(); @Override public void onMessage(Message message, Channel channel) throws Exception { //获取用户id,可以通过队列名称获取 String queueName = message.getMessageProperties().getConsumerQueue(); String userId = queueName.substring(queueName.lastIndexOf("_") + 1); io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId); //判断用户是否在线 if (wsChannel != null) { //如果连接不为空,表示用户在线 //封装返回数据 HashMap countMap = new HashMap(); countMap.put("sysNoticeCount", 1); Result result = new Result(true, StatusCode.OK, "查询成功", countMap); // 把数据通过WebSocket连接主动推送用户 wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result))); } } } 7. 修改启动类,添加Netty服务的启动public static void main(String[] args) { SpringApplication.run(NoticeApplication.class, args); NettyServer server = ApplicationContextProvider.getApplicationContext().getBean(NettyServer.class); try { server.start(12345); } catch (Exception e) { e.printStackTrace(); } }至此,基本代码已经完成,可以搞个前端页面测试下。
kafka介绍Kafka是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统,可作为大数据量的消息中间件。kafka的具体架构如下所示:Producer 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。 生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。Comsumer 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。Topic 在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果把Kafka看做为一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。Partition topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。Partition offset 每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的位置。Replicas of partition 副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的 partition中消费数据,而是从为leader的partition中读取数据。副本之间是一主多从的关系。Broker Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。Leader 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。Follower Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与 Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。 当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中 删除,重新创建一个Follower。Zookeeper Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。AR(Assigned Replicas) 分区中所有的副本统称为AR。ISR(In-Sync Replicas) 所有与Leader部分保持一致的副(包括Leader副本在内)本组成ISR。OSR(Out-of-Sync-Replicas) 与Leader副本同步滞后过多的副本。
整体设计用户获取新的消息通知有两种模式上线登录后向系统主动索取在线时系统向接收者主动推送新消息设想下,用户的通知消息和新通知提醒数据都放在数据库中,数据库的读写操作频繁。如果消息量大,DB压力较大,可能出现数据瓶颈。这边按照上述两种模式,拆分下设计:上线登录后向系统索取此模式是接受者请求系统,系统将新的消息通知返回给接收者的模式,流程如下:接收者向服务端netty请求WebSocket连接Netty服务把连接放到自己的连接池中Netty根据接受者信息向RabbitMQ查询消息如果有新消息,返回新消息通知使用WebSocket连接向,接收者返回新消息的数量在线时系统向接收者主动推送此模式是系统将新的消息通知返回给接收者的模式,流程如下:RabbitMQ将新消息数据推送给NettyNetty从连接池中取出接收者的WebSocket连接Netty通过接收者的WebSocket连接返回新消息的数量Rabbitmq搭建在虚拟机中启动RabbitMQdocker run -id --name=tensquare_rabbit -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15672:15672 -p 25672:25672 rabbitmq:management访问地址:http://192.168.200.128:15672登录账号: guest登录密码: guestIO编程上面提到了Netty,在开始了解Netty之前,先来实现一个客户端与服务端通信的程序,使用传统的IO编程和使用NIO编程有什么不一样。传统IO编程每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。阻塞I/O的通信模型示意图如下:业务场景:客户端每隔两秒发送字符串给服务端,服务端收到之后打印到控制台。public class IOServer { public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(8000); while (true) { // (1) 阻塞方法获取新的连接 Socket socket = serverSocket.accept(); new Thread() { @Override public void run() { String name = Thread.currentThread().getName(); try { // (2) 每一个新的连接都创建一个线程,负责读取数据 byte[] data = new byte[1024]; InputStream inputStream = socket.getInputStream(); while (true) { int len; // (3) 按字节流方式读取数据 while ((len = inputStream.read(data)) != -1) { System.out.println("线程" + name + ":" + new String(data, 0, len)); } } } catch (Exception e) { } } }.start(); } } }客户端实现:public class MyClient { public static void main(String[] args) { //测试使用不同的线程数进行访问 for (int i = 0; i < 5; i++) { new ClientDemo().start(); } } static class ClientDemo extends Thread { @Override public void run() { try { Socket socket = new Socket("127.0.0.1", 8000); while (true) { socket.getOutputStream().write(("测试数据").getBytes()); socket.getOutputStream().flush(); Thread.sleep(2000); } } catch (Exception e) { } } } }从服务端代码中我们可以看到,在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环。如果在用户数量较少的情况下运行是没有问题的,但是对于用户数量比较多的业务来说,服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了。如果有1万个连接就对应1万个线程,继而1万个while死循环,这种模型存在以下问题:当客户端越多,就会创建越多的处理线程。线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费。并且如果务器遭遇洪峰流量冲击,例如双十一活动,线程池会瞬间被耗尽,导致服务器瘫痪。因为是阻塞式通信,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。IO编程中数据读写是以字节流为单位,效率不高。NIO编程NIO,也叫做new-IO或者non-blocking-IO,可理解为非阻塞IO。NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,我们用一幅图来对比一下IO与NIO:如上图所示,IO模型中,一个连接都会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读。但是在大多数情况下,1万个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为没有数据。而在NIO模型中,可以把这么多的while死循环变成一个死循环,这个死循环由一个线程控制。这就是NIO模型中选择器(Selector)的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到选择器上,通过检查这个选择器,就可以批量监测出有数据可读的连接,进而读取数据。NIO的三大核心组件:通道(Channel)、缓冲(Buffer)、选择器(Selector)通道(Channel)是传统IO中的Stream(流)的升级版。Stream是单向的、读写分离(inputstream和outputstream),Channel是双向的,既可以进行读操作,又可以进行写操作。缓冲(Buffer)Buffer可以理解为一块内存区域,可以写入数据,并且在之后读取它。选择器(Selector)选择器(Selector)可以实现一个单独的线程来监控多个注册在她上面的信道(Channel),通过一定的选择机制,实现多路复用的效果。NIO相对于IO的优势:IO是面向流的,每次都是从操作系统底层一个字节一个字节地读取数据,并且数据只能从一端读取到另一端,不能前后移动流中的数据。NIO则是面向缓冲区的,每次可以从这个缓冲区里面读取一块的数据,并且可以在需要时在缓冲区中前后移动。IO是阻塞的,这意味着,当一个线程读取数据或写数据时,该线程被阻塞,直到有一些数据被读取,或数据完全写入,在此期间该线程不能干其他任何事情。而NIO是非阻塞的,不需要一直等待操作完成才能干其他事情,而是在等待的过程中可以同时去做别的事情,所以能最大限度地使用服务器的资源。NIO引入了IO多路复用器selector。selector是一个提供channel注册服务的线程,可以同时对接多个Channel,并在线程池中为channel适配、选择合适的线程来处理channel。由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高。和前面一样的场景,使用NIO实现(复制代码演示效果即可):public class NIOServer { public static void main(String[] args) throws IOException { // 负责轮询是否有新的连接 Selector serverSelector = Selector.open(); // 负责轮询处理连接中的数据 Selector clientSelector = Selector.open(); new Thread() { @Override public void run() { try { // 对应IO编程中服务端启动 ServerSocketChannel listenerChannel = ServerSocketChannel.open(); listenerChannel.socket().bind(new InetSocketAddress(8000)); listenerChannel.configureBlocking(false); // OP_ACCEPT表示服务器监听到了客户连接,服务器可以接收这个连接了 listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT); while (true) { // 监测是否有新的连接,这里的1指的是阻塞的时间为1ms if (serverSelector.select(1) > 0) { Set<SelectionKey> set = serverSelector.selectedKeys(); Iterator<SelectionKey> keyIterator = set.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { try { // (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); // OP_READ表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了) clientChannel.register(clientSelector, SelectionKey.OP_READ); } finally { keyIterator.remove(); } } } } } } catch (IOException ignored) { } } }.start(); new Thread() { @Override public void run() { String name = Thread.currentThread().getName(); try { while (true) { // (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms if (clientSelector.select(1) > 0) { Set<SelectionKey> set = clientSelector.selectedKeys(); Iterator<SelectionKey> keyIterator = set.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isReadable()) { try { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // (3) 读取数据以块为单位批量读取 clientChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("线程" + name + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer) .toString()); } finally { keyIterator.remove(); key.interestOps(SelectionKey.OP_READ); } } } } } } catch (IOException ignored) { } } }.start(); } }
消息幂等在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂 等等。本章节以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。消息重试演示消息队列的消息幂等性,主要是由MQ重试机制引起的。因为消息生产者将消息发送到MQ-Server后,MQ-Server 会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入死信队列,最终进入到数据库中,接着设置定时job查询这些数据,进行手动补偿。1. 修改consumer一方的配置文件# 消费者监听相关配置 listener: simple: retry: # 开启消费者(程序出现异常)重试机制,默认开启并一直重试 enabled: true # 最大重试次数 max‐attempts: 5 # 重试间隔时间(毫秒) initial‐interval: 30002. 设置消费异常当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。消息幂等解决要保证消息幂等性的话,其实最终要解决的就是保证多次操作,造成的影响是相同的。那么其解决方案的思路与服务间幂等的思路其实基本都是一致的。消息防重表,解决思路与服务间幂等的防重表一致。redis。利用redis防重。 这两种方案是最常见的解决方案。其实现思路其实都是一致的。消息缓冲区对于RabbitMQ的使用,默认情况下,每条消息都会进行分别的ack通知,消费完一条后,再来消费下一条。但是 这样就会造成大量消息的阻塞情况。所以为了提升消费者对于消息的消费速度,可以增加consumer数据或者对消 息进行批量消费。MQ接收到producer发送的消息后,不会直接推送给consumer。而是积攒到一定数量后,再进 行消息的发送。 这种方式的实现,可以理解为是一种缓冲区的实现,提升了消息的消费速度,但是会在一定程度上 舍弃结果返回的实时性。对于批量消费来说,也是需要考虑幂等的。对于幂等性的解决方案,沿用刚才的思路即可解决。
实现思想对于分布式锁的实现,zookeeper天然携带的一些特性能够很完美的实现分布式锁。其内部主要是利用znode节点 特性和watch机制完成。znode节点在zookeeper中节点会分为四类,分别是:持久节点:一旦创建,则永久存在于zookeeper中,除非手动删除。持久有序节点:一旦创建,则永久存在于zookeeper中,除非手动删除。同时每个节点都会默认存在节点序 号,每个节点的序号都是有序递增的。如demo000001、demo000002…demo00000N。临时节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。临时有序节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。同时每个节点都会默认存在节点序 号,每个节点的序号都是有序递增的。如demo000001、demo000002…demo00000N。watch监听机制watch监听机制主要用于监听节点状态变更,用于后续事件触发,假设当B节点监听A节点时,一旦A节点发生修改、删除、子节点列表发生变更等事件,B节点则会收到A节点改变的通知,接着完成其他额外事情。实现原理其实现思想是当某个线程要对方法加锁时,首先会在zookeeper中创建一个与当前方法对应的父节点,接着每个要获取当前方法的锁的线程,都会在父节点下创建一个临时有序节点,因为节点序号是递增的,所以后续要获取锁的线程在zookeeper中的序号也是逐次递增的。根据这个特性,当前序号最小的节点一定是首先要获取锁的线程,因此可以规定序号最小的节点获得锁。所以,每个线程再要获取锁时,可以判断自己的节点序号是否是最小的,如果是则获取到锁。当释放锁时,只需将自己的临时有序节点删除即可。根据上图,在并发下,每个线程都会在对应方法节点下创建属于自己的临时节点,且每个节点都是临时且有序的。 那么zookeeper又是如何有序的将锁分配给不同线程呢? 这里就应用到了watch监听机制。每当添加一个新的临时节点时,其都会基于watcher机制监听着它本身的前一个节点等待前一个节点的通知,当前一个节点删除时,就轮到它来持有锁了。然后依次类推。zookeeper是基于cp模式,能够保证数据强一致性。基于watch机制实现锁释放的自动监听,锁操作性能较好。频繁创建节点,对于zk服务器压力较大,吞吐量没有redis强。原理剖析&实现低效锁思想&实现在通过zookeeper实现分布式锁时,有另外一种实现的写法,这种也是非常常见的,但是它的效率并不高,此处可以先对这种实现方式进行探讨。此种实现方式,只会存在一个锁节点。当创建锁节点时,如果锁节点不存在,则创建成功,代表当前线程获取到锁,如果创建锁节点失败,代表已经有其他线程获取到锁,则该线程会监听锁节点的释放。当锁节点释放后,则继续尝试创建锁节点加锁。羊群效应这种方案的低效点就在于,只有一个锁节点,其他线程都会监听同一个锁节点,一旦锁节点释放后,其他线程都会收到通知,然后竞争获取锁节点。这种大量的通知操作会严重降低zookeeper性能,对于这种由于一个被watch的 znode节点的变化,而造成大量的通知操作,叫做羊群效应。高效锁思想&实现为了避免羊群效应的出现,业界内普遍的解决方案就是,让获取锁的线程产生排队,后一个监听前一个,依次排序。推荐使用这种方式实现分布式锁按照上述流程会在根节点下为每一个等待获取锁的线程创建一个对应的临时有序节点,序号最小的节点会持有锁, 并且后一个节点只监听其前面的一个节点,从而可以让获取锁的过程有序且高效。具体流程生成操作标识是为了防止feign调用超时出现重试,如果没有操作标识的话,库存服务无法判定是一次操作还是 多次操作,通过标识可以用于区分重试时当前是哪次操作。从而避免多次扣减库存情况的出现。库存服务先检查redis再检查Mysql,出于两点考虑:避免服务间重试时,库存服务无法区分是否为同一个操作,导致相同操作被执行多次。同时缓存结合关系型 数据库,可以起到减轻数据库压力的作用。库存流水表不仅用于区分操作,同时每一次扣减库存时信息都会被记录,可以用于后期的库存信息统计等操作。 总的来说,就是通过操作标识结合zookeeper分布式锁,完成mysql乐观锁的操作,思想上都是相同的。redis分布式锁单节点Redis实现分布式锁原理&实现分布式锁的一个很重要的特性就是互斥性,同一时间内多个调用方加锁竞争,只能有一个调用方加锁成功。而redis是基于单线程模型的,可以利用这个特性让调用方的请求排队,对于并发请求,只会有一个请求能获取到锁。redis实现分布式锁也很简单,基于客户端的几个API就可以完成,主要涉及三个核心API:setNx():向redis中存key-value,只有当key不存在时才会设置成功,否则返回0。用于体现互斥性。expire():设置key的过期时间,用于避免死锁出现。delete():删除key,用于释放锁。单节点问题锁续期当对业务进行加锁时,锁的过期时间,绝对不能想当然的设置一个值。假设线程A在执行某个业务时加锁成功 并设置锁过期时间。但该业务执行时间过长,业务的执行时间超过了锁过期时间,那么在业务还没执行完 时,锁就自动释放了。接着后续线程就可以获取到锁,又来执行该业务。就会造成线程A还没执行完,后续线 程又来执行,导致同一个业务逻辑被重复执行。因此对于锁的超时时间,需要结合着业务执行时间来判断, 让锁的过期时间大于业务执行时间。上面的方案是一个基础解决方案,但是仍然是有问题的。业务执行时间的影响因素太多了,无法确定一个准确值,只能是一个估值。无法百分百保证业务执行期间, 锁只能被一个线程占有。如想保证的话,可以在创建锁的同时创建一个守护线程,同时定义一个定时任务每隔一段时间去为未释放的锁增加过期时间。当业务执行完,释放锁后,再关闭守护线程。 这种实现思想可以用来解决锁续期。服务单点&集群问题在单点redis虽然可以完成锁操作,可一旦redis服务节点挂掉了,则无法提供锁操作。 在生产环境下,为了保证redis高可用,会采用异步复制方法进行主从部署。当主节点写入数据成功,会异步的将 数据复制给从节点,并且当主节点宕机,从节点会被提升为主节点继续工作。假设主节点写入数据成功,在没有将数据复制给从节点时,主节点宕机。则会造成提升为主节点的从节点中是没有锁信息的,其他线程则又可以继续加 锁,导致互斥失效。Redisson实现分布式锁redisson是redis官网推荐实现分布式锁的一个第三方类库。其内部完成的功能非常强大,对各种锁都有实现,同 时对于使用者来说非常简单,让使用者能够将更多的关注点放在业务逻辑上。此处重点利用Redisson解决单机 Redis锁产生的两个问题。单机Redisson实现分布式锁实现基于redisson实现分布式锁很简单,直接基于lock()&unlock()方法操作即可。1. 添加依赖<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <!--Redis分布式锁--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.1</version> </dependency>2. 修改配置文件server: redis: host: 192.168.200.150 port: 6379 database: 0 jedis: pool: max-active: 500 max-idle: 1000 min-idle: 43. 修改springboot启动类@Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Bean public RedissonClient redissonClient(){ RedissonClient redissonClient; Config config = new Config(); String url = "redis://" + host + ":" + port; config.useSingleServer().setAddress(url); try { redissonClient = Redisson.create(config); return redissonClient; } catch (Exception e) { e.printStackTrace(); return null; } }4. 定义锁工具类@Component public class RedissonLock { @Autowired private RedissonClient redissonClient; /** * 加锁 * @param lockKey * @return */ public boolean addLock(String lockKey){ try { if (redissonClient == null){ System.out.println("redisson client is null"); return false; } RLock lock = redissonClient.getLock(lockKey); //设置锁超时时间为5秒,到期自动释放 lock.lock(10, TimeUnit.SECONDS); System.out.println(Thread.currentThread().getName()+": 获取到锁"); //加锁成功 return true; } catch (Exception e) { e.printStackTrace(); return false; } } public boolean releaseLock(String lockKey){ try{ if (redissonClient == null){ System.out.println("redisson client is null"); return false; } RLock lock = redissonClient.getLock(lockKey); lock.unlock(); System.out.println(Thread.currentThread().getName()+": 释放锁"); return true; }catch (Exception e){ e.printStackTrace(); return false; } } }5. 测试@SpringBootTest @RunWith(SpringRunner.class) public class RedissonLockTest { @Autowired private RedissonLock redissonLock; @Test public void easyLock(){ //模拟多个10个客户端 for (int i=0;i<10;i++) { Thread thread = new Thread(new LockRunnable()); thread.start(); } try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } private class LockRunnable implements Runnable { @Override public void run() { redissonLock.addLock("demo"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } redissonLock.releaseLock("demo"); } } }6. 执行效果根据执行效果可知,多线程并发获取所时,当一个线程获取到锁,其他线程则获取不到,并且其内部会不断尝试获取锁,当持有锁的线程将锁释放后,其他线程则会继续去竞争锁。lock()源码分析在上述加锁方法实现中,最核心就是getLock()和lock()。get()源码非常简单,根据当前传入的锁名称创建并返回 一个RLock对象。当获取到RLock对象后,调用其内部的lock()执行加锁操作。根据源码描述,当线程获取锁时,如果没有获取到锁,则会让其进入自旋,直到获取到锁。 如果获取到锁,则会一直保留到调用unLock()手动释放或根据传入的 leaseTime时间自动释放。当前传入两个参数值:锁超时时间,时间单位。主要用于避免死锁的出现,假设持有锁的redis节点宕机,到期后锁可以自动释放。lock()方法中还会调用lock()的另外一个重载方法,需要传入三个参数:过期时间、时间单位、是否中断。在三个参数的lock()重载方法中,首先会获取当前线程id,接着调用tryAcquire()方法尝试获取锁,如果返回值为 null,代表获取到锁。 如果返回值不是null,则根据当前线程id创建异步任务并放入线程池中,接着进入自旋,在 自旋过程中,尝试调用tryAcquire()获取锁,如果获取到则退出自旋。否则会不断的尝试获取锁。在lock()方法中,最核心的是tryAcquire()。其内部核心实现会调用tryAcquireAsync(),并传入过期时间、时间单位和当前线程id,进行锁的获取。如果leaseTime不为-1,代表设置了有效时间,接着调用tryAcquireAsync()去获取锁。如果是-1的话,则默认把永不过期改为30秒过期,并且创建异步任务,如果没有获取到锁,则什么都不做。如果获取到了锁,则调用scheduleExpirationRenewal()对当前线程id的锁进行延时。最终的tryLockInnerAsync()则是获取锁的具体实现。可以看到,其内部是基于lua脚本语言完成锁获取的。因为获取锁的过程涉及到了多步,为了保证执行过程的原子性,所以使用了lua,最核心的就是要理解这段lua脚本的执行过程。对于这款lua脚本来说,KEYS[1]代表需要加锁的key,ARGV[1]代表锁的超时时间,ARGV[2]代表锁的唯一标识。 对于这段lua脚本,简单来说:检查锁key是否被占用了,如果没有则设置锁key和唯一标识,初始值为1,并且设置锁key的过期时间。如果锁key存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间。返回锁key的失效时间毫秒数。unLock()源码分析在释放锁时,unlock()内部会调用unlockAsync()对当前线程持有的锁进行释放。其内部最终会执行 unlockInnerAsync()方法完成锁释放并返回结果。在unlockInnerAsync()中仍然是结合lua脚本完成释放锁操作。相关参数:KEYS[1]:当前锁key。KEYS[2]:redis消息的ChannelName,每个锁对应唯一的一个 channelName。ARGV[1]:redis消息体,用于标记redis的key已经解锁,用于通知其他线程申请锁。ARGV[2]:锁超时时间。ARGV[3]:锁的唯一标识。判断锁key和锁的唯一标识是否匹配,如果不匹配,表示锁已经被占用,那么直接返回。如果是当前线程持有锁,则value值-1,用于重入操作。如果-1后的值大于0,则对锁设置过期时间。如果-1后的值为0,则删除锁key,并发布消息,该锁已被释放。用于通知其他线程申请锁。锁续期对于锁续期问题,在单点redis实现分布式锁时已经介绍过了,用于防止业务执行超时或宕机而引起的业务被重复执行。根据对lock方法的解析,可以发现,当设置完过期时间后,当前锁的过期时间就已经被设定了,不会发生改变, 到期后则会被自动释放,因此在业务执行中,通过lock()方法加锁会造成隐患。看门狗所谓的看门狗是redisson用于自动延长锁有效期的实现机制。其本质是一个后台线程,用于不断延长锁key的生存时间。改造锁示例代码,让锁超时时间为1秒,但是业务执行时,需要耗时3秒,此时执行可以发现,多线程间在上一个锁没有释放的情况下,后续线程又获取到了锁。但是解锁的时候,出现异常,因为加锁时的唯一标识与解锁时的唯 一标识发生了改变,造成死锁。因为业务执行多久无法确定一个准确值,所以在看门狗的实现中,不需要对锁key设置过期时间,当过期时间为-1 时,这时会启动一个定时任务,在业务释放锁之前,会一直不停的增加这个锁的有效时间,从而保证在业务执行完 毕前,这把锁不会被提前释放掉。要开启看门狗机制也很简单,只需要将加锁时使用lock()改为tryLock()即可。并且根据之前lock的源码分析,如果没有设置锁超时,默认过期时间为30秒即watchdog每隔30秒来进行一次续期,该值可以修改。config.setLockWatchdogTimeout(3000L);进行测试,当加锁后,线程睡眠10秒钟,然后释放锁,可以看到在这段时间内,当前线程会一直持有锁,直到锁释放。在多线程环境下,也是阻塞等待进行锁的获取。红锁当在单点redis中实现redis锁时,一旦redis服务器宕机,则无法进行锁操作。因此会考虑将redis配置为主从结 构,但在主从结构中,数据复制是异步实现的。假设在主从结构中,master会异步将数据复制到slave中,一旦某 个线程持有了锁,在还没有将数据复制到slave时,master宕机。则slave会被提升为master,但被提升为slave的 master中并没有之前线程的锁信息,那么其他线程则又可以重新加锁。redlock算法redlock是一种基于多节点redis实现分布式锁的算法,可以有效解决redis单点故障的问题。官方建议搭建五台redis服务器对redlock算法进行实现。在redis官网中,对于redlock算法的实现思想也做了详细的介绍。地址:https://redis.io/topics/distlock。整个实现过程分为五步:记录获取锁前的当前时间。使用相同的key,value获取所有redis实例中的锁,并且设置获取锁的时间要远远小于锁自动释放的时间。假设锁自动释放时间是10秒,则获取时间应在5-50毫秒之间。通过这种方式避免客户端长时间等待一个已经关闭的实例,如果一个实例不可用了,则尝试获取下一个实例。客户端通过获取所有实例的锁后的时间减去第一步的时间,得到的差值要小于锁自动释放时间,避免拿到一个已经过期的锁。并且要有超过半数的redis实例成功获取到锁,才算最终获取锁成功。如果不是超过半数,有可能 出现多个客户端重复获取到锁,导致锁失效。当已经获取到锁,那么它的真正失效时间应该为:过期时间-第三步的差值。如果客户端获取锁失败,则在所有redis实例中释放掉锁。为了保证更高效的获取锁,还可以设置重试策略,在一定时间后重新尝试获取锁,但不能是无休止的,要设置重试次数。虽然通过redlock能够更加有效的防止redis单点问题,但是仍然是存在隐患的。假设redis没有开启持久化, clientA获取锁后,所有redis故障重启,则会导致clientA锁记录消失,clientB仍然能够获取到锁。这种情况虽然发生几率极低,但并不能保证肯定不会发生。保证的方案就是开始AOF持久化,但是要注意同步的策略,使用每秒同步,如果在一秒内重启,仍然数据丢失。使用always又会造成性能急剧下降。官方推荐使用默认的AOF策略即每秒同步,且在redis停掉后,要在ttl时间后再重启。 缺点就是ttl时间内redis无法对外提供服务。红锁实现redisson对于红锁的实现已经非常完善,通过其内部提供的api既可以完成红锁的操作。1. 新建配置类@Configuration public class RedissonRedLockConfig { public RedissonRedLock initRedissonClient(String lockKey){ Config config1 = new Config(); config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0); RedissonClient redissonClient1 = Redisson.create(config1); Config config2 = new Config(); config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0); RedissonClient redissonClient2 = Redisson.create(config2); Config config3 = new Config(); config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0); RedissonClient redissonClient3 = Redisson.create(config3); Config config4 = new Config(); config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0); RedissonClient redissonClient4 = Redisson.create(config4); Config config5 = new Config(); config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0); RedissonClient redissonClient5 = Redisson.create(config5); RLock rLock1 = redissonClient1.getLock(lockKey); RLock rLock2 = redissonClient2.getLock(lockKey); RLock rLock3 = redissonClient3.getLock(lockKey); RLock rLock4 = redissonClient4.getLock(lockKey); RLock rLock5 = redissonClient5.getLock(lockKey); RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5); return redissonRedLock; } }2. 新建测试类,完成加锁与解锁操作@SpringBootTest @RunWith(SpringRunner.class) public class RedLockTest { @Autowired private RedissonRedLockConfig redissonRedLockConfig; @Test public void easyLock(){ //模拟多个10个客户端 for (int i=0;i<10;i++) { Thread thread = new Thread(new RedLockTest.RedLockRunnable()); thread.start(); } try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } private class RedLockRunnable implements Runnable { @Override public void run() { RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo"); try { boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS); if (lockResult){ System.out.println("获取锁成功"); TimeUnit.SECONDS.sleep(3); } } catch (InterruptedException e) { e.printStackTrace(); }finally { redissonRedLock.unlock(); System.out.println("释放锁"); } } } }redissonRedLock加锁源码分析public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long newLeaseTime = -1; if (leaseTime != -1) { newLeaseTime = unit.toMillis(waitTime)*2; } long time = System.currentTimeMillis(); long remainTime = -1; if (waitTime != -1) { remainTime = unit.toMillis(waitTime); } long lockWaitTime = calcLockWaitTime(remainTime); /** * 1. 允许加锁失败节点个数限制(N-(N/2+1)),当前假设五个节点,则允许失败节点数为2 */ int failedLocksLimit = failedLocksLimit(); /** * 2. 遍历所有节点执行lua加锁,用于保证原子性 */ List<RLock> acquiredLocks = new ArrayList<>(locks.size()); for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) { RLock lock = iterator.next(); boolean lockAcquired; /** * 3.对节点尝试加锁 */ try { if (waitTime == -1 && leaseTime == -1) { lockAcquired = lock.tryLock(); } else { long awaitTime = Math.min(lockWaitTime, remainTime); lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS); } } catch (RedisResponseTimeoutException e) { // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点 unlockInner(Arrays.asList(lock)); lockAcquired = false; } catch (Exception e) { // 抛出异常表示获取锁失败 lockAcquired = false; } if (lockAcquired) { /** *4. 如果获取到锁则添加到已获取锁集合中 */ acquiredLocks.add(lock); } else { /** * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1)) * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了 * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功 */ if (locks.size() - acquiredLocks.size() == failedLocksLimit()) { break; } if (failedLocksLimit == 0) { unlockInner(acquiredLocks); if (waitTime == -1 && leaseTime == -1) { return false; } failedLocksLimit = failedLocksLimit(); acquiredLocks.clear(); // reset iterator while (iterator.hasPrevious()) { iterator.previous(); } } else { failedLocksLimit--; } } /** * 6.计算从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则申请锁失败,返回false */ if (remainTime != -1) { remainTime -= System.currentTimeMillis() - time; time = System.currentTimeMillis(); if (remainTime <= 0) { unlockInner(acquiredLocks); return false; } } } if (leaseTime != -1) { List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size()); for (RLock rLock : acquiredLocks) { RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS); futures.add(future); } for (RFuture<Boolean> rFuture : futures) { rFuture.syncUninterruptibly(); } } /** * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true */ return true; }