提笔写架构
深入浅出RPC---5、网络IO模型
网络IO模型有哪些网络IO模型主要包含以下几种:同步阻塞 IO(BIO)同步非阻塞 IO(NIO)IO 多路复用信号驱动IO异步非阻塞 IO(AIO)常用的是同步阻塞 IO 和 IO 多路复用模型。什么是阻塞IO模型IO多路复用IO多路复用的实现主要有select,poll和epoll模式。文件描述符:在Linux系统中一切皆可以看成是文件,文件又可分为:普通文件、目录文件、链接文件和设备文件。 三者的区别:基于select的 I/O 复用模型:在基于select的 I/O 复用模型中,会用到 Select,这个函数也会使进程阻塞,但是和阻塞 I/O 所不同的是这两 个函数可以同时对多个 I/O 操作。而且可以同时对多个读操作,多个写操作的 I/O 函数进行检测,直到有数据 可读或可写时,才真正调用 I/O 操作函数。select/poll处理流程:epoll的处理流程:epoll 是线程安全的。 epoll 不仅告诉你sock组里面的数据,还会告诉你具体哪个sock连接有数据,不用进程 独自轮询查找。为什么阻塞 IO 和 IO 多路复用最为常用?在实际的网络 IO 的应用中,需要的是系统内核的支持以及编程语言的支持。现在大多数系统内核都会支持阻 塞 IO、非阻塞 IO 和 IO 多路复用,但像信号驱动 IO、异步 IO,只有高版本的 Linux 系统内核才会支持。RPC 框架应该采用哪种网络 IO 模型?IO 多路复用应用特点:IO 多路复用更适合高并发的场景,可以用较少的进程(线程)处理较多的 socket 的 IO 请求,但使用难度比较高。阻塞 IO应用特点:与 IO 多路复用相比,阻塞 IO 每处理一个 socket 的 IO 请求都会阻塞进程(线程),但使用难度较低。RPC框架应用:RPC 调用在大多数的情况下,是一个高并发调用的场景, 在 RPC 框架的实现中,一般会选择 IO 多路复用的方式。
提笔写架构
深入浅出RPC---1、RPC架构
RPC是什么RPC概述在单体架构体系时期,我们写一个函数都是在本地注入调用就行了。但是在互联网公司,服务都是部署在不同服务器上的分布式系统,如何调用呢?RPC 全称 Remote Procedure Call——远程过程调用。RPC技术简单说就是为了解决远程调用服务的一种技术,在提供强大的远程调用能力时不损失本地调用的语义简洁性。RPC框架Dubbo:国内最早开源的 RPC 框架,由阿里巴巴公司开发并于 2011 年末对外开源,仅支持 Java 语言。Motan:微博内部使用的 RPC 框架,于 2016 年对外开源,仅支持 Java 语言。 Tars:腾讯内部使用的 RPC 框架,于2017 年对外开源,仅支持 C++ 语言。Spring Cloud:国外 Pivotal 公司 2014 年对外开源的 RPC框架,提供了丰富的生态组件。gRPC:Google 于 2015 年对外开源的跨语言 RPC 框架,支持多种语言。Thrift:最初是由 Facebook 开发的内部系统跨语言的 RPC 框架,2007 年贡献给了 Apache 基金,成为Apache 开源项目之一,支持多种语言。RPC框架优点RPC框架一般使用长链接,不必每次通信都要3次握手,减少网络开销。RPC框架一般都有注册中心,有丰富的监控管理发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作 协议私密,安全性较高 RPC协议更简单内容更小,效率更高,服务化架构、服务化治理,RPC框架是一个强力的支撑。应用场景特征:长链接通讯;注册发布机制;安全性,没有暴露资源操作;微服务支持;应用举例:分布式操作系统的进程间通讯;构造分布式设计的软件环境;远程数据库服务;分布式应用程序设计;分布式程序的调试;RPC原理RPC架构调用流程具体调用过程如下:服务消费者(client客户端)通过本地调用的方式调用服务。客户端存根(client stub)接收到请求后负责将方法、入参等信息序列化(组装)成能够进行网络传输的消息 体。客户端存根(client stub)找到远程的服务地址,并且将消息通过网络发送给服务端。服务端存根(server stub)收到消息后进行解码(反序列化操作)。服务端存根(server stub)根据解码结果调用本地的服务进行相关处理。本地服务执行具体业务逻辑并将处理结果返回给服务端存根(server stub)。服务端存根(server stub)将返回结果重新打包成消息(序列化)并通过网络发送至消费方。客户端存根(client stub)接收到消息,并进行解码(反序列化)。服务消费方得到最终结果。涉及技术动态代理 生成Client Stub(客户端存根)和Server Stub(服务端存根)的时候需要用到java动态代理技术。序列化 在网络中,所有的数据都将会被转化为字节进行传送,需要对这些参数进行序列化和反序列化操作。 目前主流高效的开源序列化框架有Kryo、fastjson、Hessian、Protobuf等。NIO通信 Java 提供了 NIO 的解决方案,Java 7 也提供了更优秀的 NIO.2 支持。可以采用Netty或者mina框架来解决 NIO数据传输的问题。开源的RPC框架Dubbo就是采用NIO通信,集成支持netty、mina、grizzly。服务注册中心 通过注册中心,让客户端连接调用服务端所发布的服务。主流的注册中心组件:Redis、Zookeeper、Consul 、Etcd。Dubbo采用的是ZooKeeper提供服务注册与发现功能。负载均衡 在高并发的场景下,需要多个节点或集群来提升整体吞吐能力。健康检查 健康检查包括,客户端心跳和服务端主动探测两种方式。
提笔写架构
红包雨架构设计---3、核心编码
核心编码核心编码分为两个部分:活动信息预热和抽奖活动信息预热取出活动种子定时器从数据库取出活动种子,判断时间 @Scheduled(cron = "0 * * * * ?")
public void execute() {
// 当前时间
Date now = new Date();
// 查询将来1分钟内要开始的活动
CardGameExample example = new CardGameExample();
CardGameExample.Criteria criteria = example.createCriteria();
// 开始时间大于当前时间
criteria.andStarttimeGreaterThan(now);
// 小于等于(当前时间+1分钟)
criteria.andStarttimeLessThanOrEqualTo(DateUtils.addMinutes(now,1));
List<CardGame> list = gameMapper.selectByExample(example);
if(list.size() == 0){
// 没有查到要开始的活动
log.info("game list scan : size = 0");
return;
}
log.info("game list scan : size = {}",list.size());
}活动基本信息预热如果存在有相关活动种子,则将活动数据预热,放进缓存 list.forEach(game ->{
//活动开始时间
long start = game.getStarttime().getTime();
//活动结束时间
long end = game.getEndtime().getTime();
//计算活动结束时间到现在还有多少秒,作为redis key过期时间
long expire = (end - now.getTime())/1000;
//活动持续时间(ms)
long duration = end - start;
//活动基本信息
game.setStatus(1);
redisUtil.set(RedisKeys.INFO+game.getId(),game,-1);
log.info("load game info:{},{},{},{}",
game.getId(),game.getTitle(),game.getStarttime(),game.getEndtime());
}活动奖品信息预热获取活动奖品信息和奖品数量等配置信息,并生成令牌桶,进行奖品信息预热 //活动奖品信息
List<CardProductDto> products = gameLoadMapper.getByGameId(game.getId());
Map<Integer,CardProduct> productMap = new HashMap<>(products.size());
products.forEach(p -> productMap.put(p.getId(),p));
log.info("load product type:{}",productMap.size());
//奖品数量等配置信息
CardGameProductExample productExample = new CardGameProductExample();
productExample.createCriteria().andGameidEqualTo(game.getId());
List<CardGameProduct> gameProducts = gameProductMapper.selectByExample(productExample);
log.info("load bind product:{}",gameProducts.size());
//令牌桶
List<Long> tokenList = new ArrayList();
gameProducts.forEach(cgp ->{
//生成amount个start到end之间的随机时间戳做令牌
for (int i = 0; i < cgp.getAmount(); i++) {
long rnd = start + new Random().nextInt((int)duration);
//为什么乘1000,再额外加一个随机数呢? - 防止时间段奖品多时重复
//记得取令牌判断时间时,除以1000,还原真正的时间戳
long token = rnd * 1000 + new Random().nextInt(999);
//将令牌放入令牌桶
tokenList.add(token);
//以令牌做key,对应的商品为value,创建redis缓存
log.info("token -> game : {} -> {}",token/1000 ,productMap.get(cgp.getProductid()).getName());
//token到实际奖品之间建立映射关系
redisUtil.set(RedisKeys.TOKEN + game.getId() +"_"+token,productMap.get(cgp.getProductid()),expire);
}
});
//排序后放入redis队列
Collections.sort(tokenList);
log.info("load tokens:{}",tokenList);
//从右侧压入队列,从左到右,时间戳逐个增大
redisUtil.rightPushAll(RedisKeys.TOKENS + game.getId(),tokenList);
redisUtil.expire(RedisKeys.TOKENS + game.getId(),expire);
奖品策略预热获取奖品策略信息,进行策略预热CardGameRulesExample rulesExample = new CardGameRulesExample();
rulesExample.createCriteria().andGameidEqualTo(game.getId());
List<CardGameRules> rules = gameRulesMapper.selectByExample(rulesExample);
//遍历策略,存入redis hset
rules.forEach(r -> {
redisUtil.hset(RedisKeys.MAXGOAL +game.getId(),r.getUserlevel()+"",r.getGoalTimes());
redisUtil.hset(RedisKeys.MAXENTER +game.getId(),r.getUserlevel()+"",r.getEnterTimes());
redisUtil.hset(RedisKeys.RANDOMRATE +game.getId(),r.getUserlevel()+"",r.getRandomRate());
log.info("load rules:level={},enter={},goal={},rate={}",
r.getUserlevel(),r.getEnterTimes(),r.getGoalTimes(),r.getRandomRate());
});
redisUtil.expire(RedisKeys.MAXGOAL +game.getId(),expire);
redisUtil.expire(RedisKeys.MAXENTER +game.getId(),expire);
redisUtil.expire(RedisKeys.RANDOMRATE +game.getId(),expire);
活动状态更新活动状态变更为已预热,禁止管理后台再随便变动 game.setStatus(1);
gameMapper.updateByPrimaryKey(game);
抽奖获取活动信息、校验获取活动信息,进行活动和用户信息的基础校验 Date now = new Date();
//获取活动基本信息
CardGame game = (CardGame) redisUtil.get(RedisKeys.INFO+gameid);
//判断活动是否开始
//如果活动信息还没加载进redis,无效
//如果活动已经加载,预热完成,但是开始时间 > 当前时间,也无效
if (game == null || game.getStarttime().after(now)){
return new ApiResult(-1,"活动未开始",null);
}
//判断活动是否已结束
if (now.after(game.getEndtime())){
return new ApiResult(-1,"活动已结束",null);
}
//获取当前用户
HttpSession session = request.getSession();
CardUser user = (CardUser) redisUtil.get(RedisKeys.SESSIONID+session.getId());
if (user == null){
return new ApiResult(-1,"未登陆",null);
}else{
//第一次抽奖,发送消息队列,用于记录参与的活动(redis分布式锁)
if (!redisUtil.hasKey(RedisKeys.USERGAME+user.getId()+"_"+gameid)){
redisUtil.set(RedisKeys.USERGAME+user.getId()+"_"+gameid,1,(game.getEndtime().getTime() - now.getTime())/1000);
//持久化抽奖记录,扔给消息队列处理
CardUserGame userGame = new CardUserGame();
userGame.setUserid(user.getId());
userGame.setGameid(gameid);
userGame.setCreatetime(new Date());
rabbitTemplate.convertAndSend(RabbitKeys.QUEUE_PLAY,userGame);
}
}
//用户可抽奖次数
Integer enter = (Integer) redisUtil.get(RedisKeys.USERENTER+gameid+"_"+user.getId());
if (enter == null){
enter = 0;
redisUtil.set(RedisKeys.USERENTER+gameid+"_"+user.getId(),enter,(game.getEndtime().getTime() - now.getTime())/1000);
}
//根据会员等级,获取本活动允许的最大抽奖次数
Integer maxenter = (Integer) redisUtil.hget(RedisKeys.MAXENTER+gameid,user.getLevel()+"");
//如果没设置,默认为0,即:不限制次数
maxenter = maxenter==null ? 0 : maxenter;
//次数对比
if (maxenter > 0 && enter >= maxenter){
//如果达到最大次数,不允许抽奖
return new ApiResult(-1,"您的抽奖次数已用完",null);
}else{
redisUtil.incr(RedisKeys.USERENTER+gameid+"_"+user.getId(),1);
}
//用户已中奖次数
Integer count = (Integer) redisUtil.get(RedisKeys.USERHIT+gameid+"_"+user.getId());
if (count == null){
count = 0;
redisUtil.set(RedisKeys.USERHIT+gameid+"_"+user.getId(),count,(game.getEndtime().getTime() - now.getTime())/1000);
}
//根据会员等级,获取本活动允许的最大中奖数
Integer maxcount = (Integer) redisUtil.hget(RedisKeys.MAXGOAL+gameid,user.getLevel()+"");
//如果没设置,默认为0,即:不限制次数
maxcount = maxcount==null ? 0 : maxcount;
//次数对比
if (maxcount > 0 && count >= maxcount){
//如果达到最大次数,不允许抽奖
return new ApiResult(-1,"您已达到最大中奖数",null);
}
抢令牌以上校验全部过关,进入下一步:拿令牌。拿到合法令牌就中奖。 Long token;
switch (game.getType()) {
//时间随机
case 1:
//随即类比较麻烦,按设计时序图走
//java调redis,有原子性问题!
token = (Long) redisUtil.leftPop(RedisKeys.TOKENS+gameid);
if (token == null){
//令牌已用光,说明奖品抽光了
return new ApiResult(-1,"奖品已抽光",null);
}
//判断令牌时间戳大小,即是否中奖
//这里记住,取出的令牌要除以1000,参考job项目,令牌生成部分
if (now.getTime() < token/1000){
//当前时间小于令牌时间戳,说明奖品未到发放时间点,放回令牌,返回未中奖
redisUtil.leftPush(RedisKeys.TOKENS+gameid,token);
return new ApiResult(0,"未中奖",null);
}
break;
case 2:
//瞬间秒杀类简单,直接获取令牌,有就中,没有就说明抢光了
token = (Long) redisUtil.leftPop(RedisKeys.TOKENS+gameid);
if (token == null){
//令牌已用光,说明奖品抽光了
return new ApiResult(-1,"奖品已抽光",null);
}
break;
case 3:
//幸运转盘类,先给用户随机剔除,再获取令牌,有就中,没有就说明抢光了
//一般这种情况会设置足够的商品,卡在随机上
Integer randomRate = (Integer) redisUtil.hget(RedisKeys.RANDOMRATE+gameid,user.getLevel()+"");
if (randomRate == null){
randomRate = 100;
}
//注意这里的概率设计思路:
//每次请求取一个0-100之间的随机数,如果这个数没有落在范围内,直接返回未中奖
if( new Random().nextInt(100) > randomRate ){
return new ApiResult(0,"未中奖",null);
}
token = (Long) redisUtil.leftPop(RedisKeys.TOKENS+gameid);
if (token == null){
//令牌已用光,说明奖品抽光了
return new ApiResult(-1,"奖品已抽光",null);
}
break;
default:
return new ApiResult(0,"不支持的活动类型",null);
}//end switch
中奖处理拿到合法令牌说明已经中奖,剩下的就是中奖通知,和中奖信息持久化 //以上逻辑走完,拿到了合法的token,说明很幸运,中奖了!
//抽中的奖品:
CardProduct product = (CardProduct) redisUtil.get(RedisKeys.TOKEN + gameid +"_"+token);
//中奖次数加1
redisUtil.incr(RedisKeys.USERHIT+gameid+"_"+user.getId(),1);
//投放消息给队列,中奖后的耗时业务,交给消息模块处理
CardUserHit hit = new CardUserHit();
hit.setGameid(gameid);
hit.setHittime(now);
hit.setProductid(product.getId());
hit.setUserid(user.getId());
rabbitTemplate.convertAndSend(RabbitKeys.EXCHANGE_DIRECT,RabbitKeys.QUEUE_HIT, hit);
//返回给前台中奖信息
return new ApiResult(1,"恭喜中奖",product);
提笔写架构
深入浅出RPC---4、服务注册发现和健康监测
服务注册发现服务注册发现的作用感知服务端的变化,获取最新服务节点的连接信息。服务注册发现的处理流程**服务注册:**服务提供方将对外暴露的接口发布到注册中心内,注册中心为了检测服务的有效状态,一般会建 立双向心跳机制。**服务订阅:**服务调用方去注册中心查找并订阅服务提供方的 IP,并缓存到本地用于后续调用。如何实现服务的注册发现基于 ZooKeeper 的服务发现方式:在 ZooKeeper 中创建一个服务根路径,可以根据接口名命名(例如:/micro/service/com.test.orderService),在这个路径再创建服务提供方与调用方目录(server、client),分别用来存储服务提供方和调用方的节点信息。服务端发起注册时,会在服务提供方目录中创建一个临时节点,节点中存储注册信息。客户端发起订阅时,会在服务调用方目录中创建一个临时节点,节点中存储调用方的信息,同时watch 服务提供方的目录(/micro/service/com.test.orderService/server)中所有的服务节点数据。当服务端产生变化时ZK就会通知给订阅的客户端。ZooKeeper方案的特点:强一致性,ZooKeeper 集群的每个节点的数据每次发生更新操作,都会通知其它 ZooKeeper 节点同时执行更新。健康监测为什么需要健康监测比如网络中的波动,硬件设施的老化等等。可能造成集群当中的某个节点存在问题,无法正常调用。健康监测实现分析心跳检测的过程总共包含以下状态:健康状态波动状态失败状态完善的解决方案阈值: 健康监测增加失败阈值记录。成功率: 可以再追加调用成功率的记录(成功次数/总次数)。探针:对服务节点有一个主动的存活检测机制。
提笔写架构
红包雨架构设计---1、技术架构
概述京东、淘宝的红包雨相信大家都参与过,其实业务并不复杂,在某段时间内随机发放不同的红包,用于进行抢单抽奖,直到奖品抽完。应用场景时间随机在一段时间内,设置一批礼品,这些礼品不定时的出现,尽量在这段时间内均匀抛出,一旦出现,就可以被抓走。类似抓红包。瞬间秒杀用于抢单或者秒杀场景,到点后,用户一起抽奖,机会均等,谁抢的快算谁的。这个并发比较高。但是活动时间相对较短。机会随机常见于转盘类活动。不同等级的用户,设定不同的中奖概率,一般配合设置用户最大可抽奖次数,比如5次机会, 能不能中奖,根据概率判定。一般活动时间设置的较长,比如几天。系统要求并发性抽奖系统比如涉及到访问量大的问题。系统涉及所面临的第一关,即活动开始的瞬间,大批用户点击的涌入。怎样 设计系统以达到如此高并发情况下的及时响应是本项目的重中之重。库存控制抽奖面临的必然是奖品。数量控制是必须要做到精准吻合。不允许出现设置了5个奖品,最终6人中奖这种类似的问题出现。其中的本质是奖品库存的控制。投放策略在活动时间段内,管理员设置好的一堆奖品如何投放?红包何时出现?什么时候可以被抽中?这些都涉及到投放策略。边界控制活动何时开始?何时结束?倒计时如何控制。这涉及到活动的边界。开始前要提防用户提前进入抽奖。结束后要即使反馈结果给用户,告知活动已结束。活动自由配置活动的配置由后台管理员完成,可以自由配置活动的开始结束时间,主题、活动简介、有哪些奖品、不同等级的用户中奖的策略。这就要求系统必须具备足够的业务灵活度。中奖策略每个用户参与抽奖后,要遵从后台管理员所设定的中奖策略,典型的场景是最大抽奖次数和最大中奖次数。技术架构技术架构拆分为调度服务、消息服务、鉴权服务、红包服务、商品服务等多个微服务,并搭建API网关、注册中心、配置中心等基础设施。 设计原则动静分离后台springboot启动微服务模块;前台静态文件分离,nginx或者cdn直接响应,不要再绕后台应用机器;CDN考虑并发量较高情况下,为减少带宽压力,可临时租借CDN进行挂载;微服务化将模块细粒度拆分,如:调度服务、消息服务、鉴权服务、红包服务等,平台微服务化;借助k8s等容器管理功能,实现不同服务的副本部署,滚动更新;负载均衡多个实例之间通过nginx做负载均衡,提升并发性能;红包服务负载较高,可以考虑部署多个节点,进行负载均衡;异步消息中奖后,中奖人及奖品信息要持久化到数据库。引入rabbitmq,将抽奖操作与数据库操作异步隔离。抽奖中奖后,只需要将中奖信息放入rabbitmq,并立即返回中奖信息给前端用户。后端msg模块消费rabbitmq消息,缓慢处理。缓存预热每隔1分钟扫描一次活动表,查询未来1分钟内将要开始的活动。将扫到的活动加载进redis,包括活动详细信息,中奖策略信息,奖品信息,抽奖令牌。活动正式开始后,基于redis数据做查询,不必再与数据库打交道。
提笔写架构
深入浅出RPC---2、序列化技术
RPC深入解析上文说到RPC架构,那RPC底层涉及哪些技术呢,下面我们来详细解析下。序列化技术序列化流程序列化作用在网络传输中,数据必须采用二进制形式, 所以在RPC调用过程中, 需要采用序列化技术,对入参对象和返 回值对象进行序列化与反序列化。序列化处理要素解析效率:序列化协议应该首要考虑的因素,像xml/json解析起来比较耗时,需要解析doom树,二进 制自定义协议解析起来效率要快很多。压缩率:同样一个对象,xml/json传输起来有大量的标签冗余信息,信息有效性低,二进制自定义协议 占用的空间相对来说会小很多。扩展性与兼容性:是否能够利于信息的扩展,并且增加字段后旧版客户端是否需要强制升级,这都是需 要考虑的问题,在自定义二进制协议时候,要做好充分考虑设计。可读性与可调试性:xml/json的可读性会比二进制协议好很多,并且通过网络抓包是可以直接读取,二 进制则需要反序列化才能查看其内容。跨语言:有些序列化协议是与开发语言紧密相关的,例如dubbo的Hessian序列化协议就只能支持Java 的RPC调用。通用性:xml/json非常通用,都有很好的第三方解析库,各个语言解析起来都十分方便,二进制数据的 处理方面也有Protobuf和Hessian等插件,在做设计的时候尽量做到较好的通用性。序列化方式自定义的二进制协议来实现序列化:下面以User对象例举讲解:User对象:package com.test;
public class User {
/**
* 用户编号
*/
private String userNo = "0001";
/**
* 用户名称
*/
private String name = "test";
}包体的数据组成:业务指令为0x00000001占1个字节,类的包名com.test占10个字节, 类名User占4个字节; 属性UserNo名称占6个字节,属性类型string占2个字节表示,属性值为0001占4个字节; 属性name名称占4个字节,属性类型string占2个字节表示,属性值为zhangsan占8个字节; 包体共计占有1+10+4+6+2+4+4+2+8 = 41字节。包头的数据组成:版本号v1.0占4个字节,消息包体实际长度为41占4个字节表示,序列号0001占4个字节,校验码32位表示占4 个字节。包头共计占有4+4+4+4 = 16字节。包尾的数据组成: 通过回车符标记结束\r\n,占用1个字节。整个包的序列化二进制字节流共41+16+1 = 58字节。这里讲解的是整个序列化的处理思路, 在实际的序列化处理中还要考虑更多细节,比如说方法和属性的区分,方法权限的标记,嵌套类型的处理等等;JDK原生序列化public static void main(String[] args) throws IOException, ClassNotFoundException {
String basePath = "D:/TestCode";
FileOutputStream fos = new FileOutputStream(basePath + "tradeUser.clazz");
TradeUser tradeUser = new TradeUser();
tradeUser.setName("Mirson");
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(tradeUser);
oos.flush();
oos.close();
FileInputStream fis = new FileInputStream(basePath + "tradeUser.clazz");
ObjectInputStream ois = new ObjectInputStream(fis);
TradeUser deStudent = (TradeUser) ois.readObject();
ois.close();
System.out.println(deStudent);
}
在Java中,序列化必须要实现java.io.Serializable接口。通过ObjectOutputStream和ObjectInputStream对象进行序列化及反序列化操作。虚拟机是否允许反序列化,不仅取决于类路径和功能代码是否一致,一个非常重要的一点是两个类的 序列化 ID 是否一致(也就是在代码中定义的序列ID private static final long serialVersionUID)序列化并不会保存静态变量。要想将父类对象也序列化,就需要让父类也实现Serializable 接口。Transient关键字的作用是控制变量的序列化,在变量声明前加上该关键字,可以阻止该变量被序列 化到文件中,在被反序列化后,transient变量的值被设为初始值,如基本类型 int为 0,封装对象型 Integer则为null。服务器端给客户端发送序列化对象数据并非加密的,如果对象中有一些敏感数据比如密码等,那么在 对密码字段序列化之前,最好做加密处理,这样可以一定程度保证序列化对象的数据安全。JSON序列化一般在HTTP协议的RPC框架通信中,会选择JSON方式。优点:JSON具有较好的扩展性、可读性和通用性。缺点:JSON序列化占用空间开销较大,没有JAVA的强类型区分,需要通过反射解决,解析效率和压缩率都 较差。如果对并发和性能要求较高,或者是传输数据量较大的场景,不建议采用JSON序列化方式。Hessian2序列化Hessian 是一个动态类型,二进制序列化,并且支持跨语言特性的序列化框架。 Hessian 性能上要比 JDK、JSON 序列化高效很多,并且生成的字节数也更小。有非常好的兼容性和稳定 性,所以 Hessian 更加适合作为 RPC 框架远程通信的序列化协议。代码示例: TradeUser tradeUser = new TradeUser();
tradeUser.setName("Mirson");
//tradeUser对象序列化处理
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Hessian2Output output = new Hessian2Output(bos);
output.writeObject(tradeUser);
output.flushBuffer();
byte[] data = bos.toByteArray();
bos.close();
//tradeUser对象反序列化处理
ByteArrayInputStream bis = new ByteArrayInputStream(data);
Hessian2Input input = new Hessian2Input(bis);
TradeUser deTradeUser = (TradeUser) input.readObject();
input.close();
System.out.println(deTradeUser);
Hessian自身也存在一些缺陷,大家在使用过程中要注意:对Linked系列对象不支持,比如LinkedHashMap、LinkedHashSet 等,但可以通过CollectionSerializer类修复。Locale 类不支持,可以通过扩展 ContextSerializerFactory类修复。Byte/Short在反序列化的时候会转成 Integer。Protobuf序列化Protobuf 是 Google 推出的开源序列库,它是一种轻便、高效的结构化数据存储格式,可以用于结构化 数据序列化,支持 Java、Python、C++、Go 等多种语言。Protobuf 使用的时候需要定义 IDL(Interface description language),然后使用不同语言的 IDL 编译 器,生成序列化工具类,它具备以下优点:压缩比高,体积小,序列化后体积相比 JSON、Hessian 小很多;IDL能清晰地描述语义,可以帮助并保证应用程序之间的类型不会丢失,无需类似 XML 解析器;序列化反序列化速度很快,不需要通过反射获取类型;消息格式的扩展、升级和兼容性都不错,可以做到向后兼容。代码示例:Protobuf脚本定义:// 定义Proto版本
syntax = "proto3";
// 是否允许生成多个JAVA文件
option java_multiple_files = false;
// 生成的包路径
option java_package = "com.itcast.bulls.stock.struct.netty.trade";
// 生成的JAVA类名
option java_outer_classname = "TradeUserProto";
// 预警通知消息体
message TradeUser {
/**
* 用户ID
*/
int64 userId = 1 ;
/**
* 用户名称
*/
string userName = 2 ;
}
代码操作:// 创建TradeUser的Protobuf对象
TradeUserProto.TradeUser.Builder builder = TradeUserProto.TradeUser.newBuilder(); builder.setUserId(101);
builder.setUserName("Mirson");
//将TradeUser做序列化处理
TradeUserProto.TradeUser msg = builder.build();
byte[] data = msg.toByteArray();
//反序列化处理, 将刚才序列化的byte数组转化为TradeUser对象
TradeUserProto.TradeUser deTradeUser = TradeUserProto.TradeUser.parseFrom(data); System.out.println(deTradeUser);
提笔写架构
深入浅出RPC---3、动态代理
动态代理流程内部接口如何调用实现RPC的调用内部核心技术采用的就是动态代理。JDK动态代理的如何实现?示例代码:package com.rpc.sample.proxy;
import sun.misc.ClassLoaderUtil;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
public class JDKDynamicProxy {
/**
* 定义用户的接口
*/
public interface User {
String job();
}
/**
* 实际的调用对象
*/
public static class Teacher {
public String invoke(){
return "i'm a Teacher";
}
}
/**
* 创建JDK动态代理类
*/
public static class JDKProxy implements InvocationHandler {
private Object target;
JDKProxy(Object target) {
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] paramValues) {
return ((Teacher)target).invoke();
}
}
public static void main(String[] args){
// 构建代理器
JDKProxy proxy = new JDKProxy(new Teacher());
ClassLoader classLoader = JDKDynamicProxy.class.getClassLoader();
// 生成代理类
User user = (User) Proxy.newProxyInstance(classLoader, new Class[]{User.class}, proxy);
// 接口调用
System.out.println(user.job());
}
}
JDK动态代理的实现原理:JDK内部如何处理?代理类 $Proxy里面会定义相同签名的接口,然后内部会定义一个变量绑定JDKProxy代理对象,当调用 User.job接口方法,实质上调用的是JDKProxy.invoke()方法。为什么要加入动态代理?第一, 缺点: 不便于管理,不利于扩展维护。第二, 优点: 可以做到拦截,添加其他额外功能。动态代理开源技术1.Cglib 动态代理Cglib是一个强大的、高性能的代码生成包。Javassist 动态代理 一个开源的分析、编辑和创建Java字节码的类库,dubbo内部动态代理采用Javassist 。引入maven库: <dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.21.0-GA</version>
</dependency>示例代码:public interface ProxyFactory {
<T> T getProxy(Object target, InvocationHandler handler) throws Throwable;
}import com.bytebeats.codelab.javassist.proxy.ProxyFactory;
import java.lang.reflect.InvocationHandler;
public class JavassistProxyFactory implements ProxyFactory {
@Override
public <T> T getProxy(Object target, InvocationHandler handler) throws Throwable {
return (T) ProxyGenerator.newProxyInstance(Thread.currentThread().getContextClassLoader(),
target.getClass(), handler);
}
}
package com.bytebeats.codelab.javassist.proxy.javassist;
import com.bytebeats.codelab.javassist.util.ClassUtils;
import javassist.*;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class ProxyGenerator {
private static final AtomicInteger counter = new AtomicInteger(1);
private static ConcurrentHashMap<Class<?>, Object> proxyInstanceCache = new ConcurrentHashMap<>();
public static Object newProxyInstance(ClassLoader classLoader, Class<?> targetClass, InvocationHandler invocationHandler)
throws Exception {
if(proxyInstanceCache.containsKey(targetClass)){
return proxyInstanceCache.get(targetClass);
}
ClassPool pool = ClassPool.getDefault();
//生成代理类的全限定名
String qualifiedName = generateClassName(targetClass);
// 创建代理类
CtClass proxy = pool.makeClass(qualifiedName);
//接口方法列表
CtField mf = CtField.make("public static java.lang.reflect.Method[] methods;", proxy);
proxy.addField(mf);
CtField hf = CtField.make("private " + InvocationHandler.class.getName() + " handler;", proxy);
proxy.addField(hf);
CtConstructor constructor = new CtConstructor(new CtClass[]{pool.get(InvocationHandler.class.getName())}, proxy);
constructor.setBody("this.handler=$1;");
constructor.setModifiers(Modifier.PUBLIC);
proxy.addConstructor(constructor);
proxy.addConstructor(CtNewConstructor.defaultConstructor(proxy));
// 获取被代理类的所有接口
List<Class<?>> interfaces = ClassUtils.getAllInterfaces(targetClass);
List<Method> methods = new ArrayList<>();
for (Class cls : interfaces) {
CtClass ctClass = pool.get(cls.getName());
proxy.addInterface(ctClass);
Method[] arr = cls.getDeclaredMethods();
for (Method method : arr) {
int ix = methods.size();
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for(int j=0;j<pts.length;j++) {
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
}
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
if(!Void.TYPE.equals(rt) )
code.append(" return ").append(asArgument(rt, "ret")).append(";");
StringBuilder sb = new StringBuilder(1024);
sb.append(modifier(method.getModifiers())).append(' ').append(getParameterType(rt)).append(' ').append(method.getName());
sb.append('(');
for(int i=0;i<pts.length;i++)
{
if( i > 0 )
sb.append(',');
sb.append(getParameterType(pts[i]));
sb.append(" arg").append(i);
}
sb.append(')');
Class<?>[] ets = method.getExceptionTypes(); //方法抛出异常
if( ets != null && ets.length > 0 )
{
sb.append(" throws ");
for(int i=0;i<ets.length;i++)
{
if( i > 0 )
sb.append(',');
sb.append(getParameterType(ets[i]));
}
}
sb.append('{').append(code.toString()).append('}');
CtMethod ctMethod = CtMethod.make(sb.toString(), proxy);
proxy.addMethod(ctMethod);
methods.add(method);
}
}
proxy.setModifiers(Modifier.PUBLIC);
Class<?> proxyClass = proxy.toClass(classLoader, null);
proxyClass.getField("methods").set(null, methods.toArray(new Method[0]));
Object instance = proxyClass.getConstructor(InvocationHandler.class).newInstance(invocationHandler);
Object old = proxyInstanceCache.putIfAbsent(targetClass, instance);
if(old!=null){
instance = old;
}
return instance;
}
private static String modifier(int mod) {
if( Modifier.isPublic(mod) ) return "public";
if( Modifier.isProtected(mod) ) return "protected";
if( Modifier.isPrivate(mod) ) return "private";
return "";
}
/**
* 数组类型返回 String[]
* @param c
* @return
*/
public static String getParameterType(Class<?> c) {
if(c.isArray()) { //数组类型
StringBuilder sb = new StringBuilder();
do {
sb.append("[]");
c = c.getComponentType();
} while( c.isArray() );
return c.getName() + sb.toString();
}
return c.getName();
}
private static String asArgument(Class<?> cl, String name) {
if( cl.isPrimitive() ) {
if( Boolean.TYPE == cl )
return name + "==null?false:((Boolean)" + name + ").booleanValue()";
if( Byte.TYPE == cl )
return name + "==null?(byte)0:((Byte)" + name + ").byteValue()";
if( Character.TYPE == cl )
return name + "==null?(char)0:((Character)" + name + ").charValue()";
if( Double.TYPE == cl )
return name + "==null?(double)0:((Double)" + name + ").doubleValue()";
if( Float.TYPE == cl )
return name + "==null?(float)0:((Float)" + name + ").floatValue()";
if( Integer.TYPE == cl )
return name + "==null?(int)0:((Integer)" + name + ").intValue()";
if( Long.TYPE == cl )
return name + "==null?(long)0:((Long)" + name + ").longValue()";
if( Short.TYPE == cl )
return name + "==null?(short)0:((Short)" + name + ").shortValue()";
throw new RuntimeException(name+" is unknown primitive type.");
}
return "(" + getParameterType(cl) + ")"+name;
}
private static String generateClassName(Class<?> type){
return String.format("%s$Proxy%d", type.getName(), counter.getAndIncrement());
}
}
Byte Buddy 字节码增强库 Byte Buddy是致力于解决字节码操作和 简化操作复杂性的开源框架。几种动态代理性能比较:Byte Buddy > Javassist > CGLIB > JDK
提笔写架构
深入浅出RPC---6、零拷贝和时间轮
什么是零拷贝系统内核处理 IO 操作分为两个阶段:等待数据和拷贝数据。等待数据,就是系统内核在等待网卡接收到数据后,把数据写到内核中。拷贝数据,就是系统内核在获取到数据后,将数据拷贝到用户进程的空间中。具体流程:所谓的零拷贝,就是取消用户空间与内核空间之间的数据拷贝操作,应用进程每一次的读写操作,都可以通 过一种方式,让应用进程向用户空间写入或者读取数据,就如同直接向内核空间写入或者读取数据一样,再 通过 DMA 将内核中的数据拷贝到网卡,或将网卡中的数据 copy 到内核。RPC框架中的零拷贝应用Netty 框架是否也有零拷贝机制?Netty 的零拷贝则有些不一样,他完全站在了用户空间上,也就是基于 JVM 之上。Netty当中的零拷贝是如何实现的?RPC 并不会把请求参数作为一个整体数据包发送到对端机器上,中间可能会拆分,也可能会合并其他请求, 所以消息都需要有边界。接收到消息之后,需要对数据包进行处理,根据边界对数据包进行分割和合并,最终获得完整的消息。Netty零拷贝主要体现在三个方面:Netty的接收和发送ByteBuffer是采用DIRECT BUFFERS,使用堆外的直接内存(内存对象分配在JVM中 堆以外的内存)进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果采用传统堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后写入Socket中。Netty提供了组合Buffer对象,也就是CompositeByteBuf 类,可以将 ByteBuf 分解为多个共享同一个 存储区域的 ByteBuf,避免了内存的拷贝。零拷贝带来的作用就是避免没必要的 CPU 拷贝,减少了 CPU 在用户空间与内核空间之间的上下文切换,从 而提升了网络通信效率与应用程序的整体性能。时间轮为什么需要时间轮?在Dubbo中,为增强系统的容错能力,会有相应的监听判断处理机制。比如RPC调用的超时机制的实现,消 费者判断RPC调用是否超时,如果超时会将超时结果返回给应用层。在Dubbo最开始的实现中,是将所有的 返回结果(DefaultFuture)都放入一个集合中,并且通过一个定时任务,每隔一定时间间隔就扫描所有的 future,逐个判断是否超时。这样的实现方式虽然比较简单,但是存在一个问题就是会有很多无意义的遍历操作开销。比如一个RPC调用的 超时时间是10秒,而设置的超时判定的定时任务是2秒执行一次,那么可能会有4次左右无意义的循环检测判 断操作。为了解决上述场景中的类似问题,Dubbo借鉴Netty,引入了时间轮算法,减少无意义的轮询判断操作。时间轮原理对于以上问题, 目的是要减少额外的扫描操作就可以了。比如说一个定时任务是在5 秒之后执行,那么在 4.9 秒之后才扫描这个定时任务,这样就可以极大减少 CPU开销。这时我们就可以利用时钟轮的机制了。时钟轮的实质上是参考了生活中的时钟跳动的原理,那么具体是如何实现呢?在时钟轮机制中,有时间槽和时钟轮的概念,时间槽就相当于时钟的刻度;而时钟轮就相当于指针跳动的一 个周期,我们可以将每个任务放到对应的时间槽位上。如果时钟轮有 10 个槽位,而时钟轮一轮的周期是 10 秒,那么我们每个槽位的单位时间就是 1 秒,而下一层时间 轮的周期就是 100 秒,每个槽位的单位时间也就是 10 秒,这就好比秒针与分针, 在秒针周期下, 刻度单位为 秒, 在分针周期下, 刻度为分。假设现在我们有 3 个任务,分别是任务 A(0.9秒之后执行)、任务 B(2.1秒后执行)与任务 C(12.1秒之后执 行),我们将这 3 个任务添加到时钟轮中,任务 A 被放到第 0 槽位,任务 B 被放到第 2槽位,任务 C 被放到下一 层时间轮的第2个槽位,如下图所示:通过这个场景我们可以了解到,时钟轮的扫描周期仍是最小单位1秒,但是放置其中的任务并没有反复扫描,每个 任务会按要求只扫描执行一次, 这样就能够很好的解决CPU 浪费的问题。Dubbo中的时间轮原理是如何实现?主要是通过Timer,Timeout,TimerTask几个接口定义了一个定时器的模型,再通过HashedWheelTimer这个类 实现了一个时间轮定时器(默认的时间槽的数量是512,可以自定义这个值)。它对外提供了简单易用的接口,只需要调用newTimeout接口,就可以实现对只需执行一次任务的调度。通过该定时器,Dubbo在响应的场景中实现了高效的任务调度。时间轮在RPC的应用调用超时上面所讲的客户端调用超时的处理,就可以应用到时钟轮,我们每发一次请求,都创建一个处理请求超时的定时任务放到时钟轮里,在高并发、高访问量的情况下,时钟轮每次只轮询一个时间槽位中的任务,这样会节省大量的 CPU。启动加载调用端与服务端启动也可以应用到时钟轮,比如说在服务启动完成之后要去加载缓存,执行定时任务等, 都可以放在时钟轮里。定时心跳检测RPC 框架调用端定时向服务端发送的心跳检测,来维护连接状态,我们可以将心跳的逻辑封装为一个心跳任务,放到时钟轮里。心跳是要定时重复执行的,而时钟轮中的任务执行一遍就被移除了,对于这种需要重复执行的定时任务我们该如何处理呢?我们在定时任务逻辑结束的最后,再加上 一段逻辑, 重设这个任务的执行时间,把它重新丢回到时钟轮里。这样就可以实现循环执行。
提笔写架构
红包雨架构设计---2、流程设计
业务流程会员登录APP或者小程序;调度器查询要进行的活动种子;数据反馈活动种子信息;活动预热,根据奖品数据,生成奖品池令牌桶;活动时间到,前台弹出红包雨,会员参与抽奖;前端获取活动信息;缓存返回活动信息给前端;进行用户登录、资格等基础校验;基础校验通过则获取令牌;缓存返回获取到的令牌;校验令牌有效性,是否在活动时间;令牌检验失败则令牌放回队列;令牌有效则说明中奖,组装中奖信息;提示用户中奖;异步通知消息服务中奖信息;执行持久化操作,入库中奖信息;缓存体系活动基本信息k-v,以活动id为key,活动对象为value,永不超时redisUtil.set(RedisKeys.INFO+game.getId(),game,‐1);活动策略信息hset,以活动id为group,用户等级为key,策略值为valueredisUtil.hset(RedisKeys.MAXGOAL + game.getId(),r.getUserlevel()+"",r.getGoalTimes()); redisUtil.hset(RedisKeys.MAXENTER + game.getId(),r.getUserlevel()+"",r.getEnterTimes());抽奖令牌桶双端队列(不清楚的自行百度),以活动id为key,在活动时间段内,随机生成时间戳做令牌,有多少个奖品就生成多少个令牌。令牌即奖品发放的时间点。从小到大排序后从右侧入队。redisUtil.rightPushAll(RedisKeys.TOKENS + game.getId(),tokenList);奖品映射信息k-v , 以活动id_令牌为key,奖品信息为value,会员获取到令牌后,如果令牌有效,则用令牌token值,来这里获取奖品详细信息redisUtil.set(RedisKeys.TOKEN + game.getId() +"_"+token,productMap.get(cgp.getProductid()),expire);令牌设计技巧假设活动时间间隔太短,奖品数量太多。那么极有可能产生的时间戳发生重复。解决技巧:额外再附加一个随机因子。将 (时间戳 * 1000 + 3位随机数)作为令牌。抽奖时,将抽中的令牌/1000 ,还原真实的时间戳。//活动持续时间(ms)
long duration = end ‐ start;
long rnd = start + new Random().nextInt((int)duration);
//为什么乘1000,再额外加一个随机数呢? ‐ 防止时间段奖品多时重复
long token = rnd * 1000 + new Random().nextInt(999);中奖计数k-v,以活动id_用户id作为key,中奖数为value,利用redis原子性,中奖后incr增加计数。抽奖次数计数也是同样的道理redisUtil.incr(RedisKeys.USERHIT+gameid+"_"+user.getId(),1);中奖逻辑判断抽奖时,从令牌桶左侧出队和当前时间比较,如果令牌时间戳小于等于当前时间,令牌有效,表示中奖。大于当前时间,则令牌无效,将令牌还回,从左侧压入队列。
提笔写架构
深入浅出RPC技术精解
通过"RPC架构"、"序列化技术"、"动态代理"、"网络IO模型"、"零拷贝和时间轮"五大小节,全面揭示RPC技术的核心
提笔写架构
红包雨架构设计
红包雨架构设计
提笔写架构
深入浅出消息队列---3、RabbitMQ延迟队列
延迟队列当用户秒杀成功以后,就需要引导用户去订单页面进行支付。如果用户在规定的时间之内(30分钟),没有完成订单的支付,此时我们就需要进行库存的回退操作。库存回退架构回退库存的架构如下图所示:过期时间目前有两种方法可以设置消息的TTL: 1、通过队列的属性设置,队列中所有的消息都有相同的过期时间。 2、是对消息本身进行单独的设置,每条消息的TTL可以不同。 如果两种方法一起使用,则消息的TTL以两者之间较小的那个数值为准。消息在队列中的生存时间一旦 超过设置的TTL值时,就会变成"死信"。设置消息的TTL在发送消息的时候可以直接通过消息属性来设置消息的过期时间,代码如下所示:// 测试发送消息时设置发送消息的过期时间
private static void sendPreMessageTTL() {
// void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
rabbitTemplate.convertAndSend("direct.exchange" , "create" , "测试延迟消息发送,为每一个消息去设置过期时间" , (message) -> {
MessageProperties messageProperties =
message.getMessageProperties(); // 获取消息属性对象
messageProperties.setExpiration("30000");
// 设置消息的过期时间为30秒
return message ;
});
}
如果消息没有及时消费,那么经过30秒以后,消息变成死信,Rabbitmq会将这个消息直接丢弃。设置队列的TTL我们也可以直接通过队列的属性设置消息的过期时间,队列中所有的消息都有相同的过期时间,代码如下所示:// 声明队列
@Bean(name = "direct.queue_02")
public Queue commonQueue02() {
QueueBuilder queueBuilder = QueueBuilder.durable("direct.queue_02"); queueBuilder.ttl(30000) ; // 设置队列消息的过期时间,为30
秒
return queueBuilder.build() ;
}
死信队列当一个消息变成死信了以后,默认情况下这个消息会被mq删除。如果我们给队列指定了"死信交换机"(DLX: Dead-Letter-Exchange),那么此时这个消息就会转发到死信交换机,进而被与死信交换机绑定的队列(死信队列)进行消费。从而实现了延迟消息发送的效果。原理介绍具体的原理如下图所示: 一个消息成为死信常见情况: 1、消息过期 2、队列达到最大长度(可以通过x-max-length参数来指定队列的长度,如果不指定,可以认为是无限长)具体实现接下来我们就来演示一下死信队列的使用,使用死信队列来实现消息的延迟发送,具体的代码实现如下所示: 1、声明死信交换机、死信队列、死信交换机和死信队列的绑定// 声明死信交换机
@Bean(name = "dlx.exchange")
public Exchange dlxExchange() {
return
ExchangeBuilder.directExchange("dlx.exchange").durable(true).build() ;
}
// 声明死信队列
@Bean(name = "dlx.queue")
public Queue dlxQueue() {
return QueueBuilder.durable("dlx.queue").build() ;
}
// 完成死信队列和死信交换机的绑定
@Bean
public Binding dlxQueueBindToDlxExchange(@Qualifier(value =
"dlx.exchange") Exchange exchange , @Qualifier(value = "dlx.queue") Queue queue) {
return
BindingBuilder.bind(queue).to(exchange).with("delete").noargs() ;
}
2、将死信队列作为普通队列的属性设置过去// 声明队列
@Bean(name = "direct.queue_02")
public Queue commonQueue02() {
QueueBuilder queueBuilder =
QueueBuilder.durable("direct.queue_02");
queueBuilder.deadLetterExchange("dlx.exchange") ; // 将死信交换机作 为普通队列的属性设置过去
queueBuilder.deadLetterRoutingKey("delete") ; // 设置消息的 routingKey
queueBuilder.ttl(30000) ; // 设置队列消息的 过期时间,为30秒
queueBuilder.maxLength(2) ; // 设置队列的最大 长度
return queueBuilder.build() ;
}
3、消费端进行同样的设置,并且指定消费死信队列@Component
public class RabbitmqDlxQueueConsumer {
// 创建日志记录器
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitmqDlxQueueConsumer.class) ; @RabbitListener(queues = "dlx.queue")
public void dlxQueueConsumer(String msg) {
LOGGER.info("dlx queue msg is : {} " , msg );
}
}
另外还可以基于RabbitMQ 插件实现的延时队列。
提笔写架构
消息通知系统详解3---Netty
为什么使用Netty我们已经有了NIO能够提高程序效率了,为什么还要使用Netty?简单的说:Netty封装了JDK的NIO,让你用得更爽,你不用再写一大堆复杂的代码了。官方术语:Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。下面是使用Netty不使用JDK原生NIO的一些原因:使用JDK自带的NIO需要了解太多的概念,编程复杂Netty底层IO模型随意切换,而这一切只需要做微小的改动,就可以直接从NIO模型变身为IO模型Netty自带的拆包解包,异常检测等机制,可以从NIO的繁重细节中脱离出来,只需要关心业务逻辑Netty解决了JDK的很多包括空轮询在内的bugNetty底层对线程,selector做了很多细小的优化,精心设计的线程模型做到非常高效的并发处理自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手Netty社区活跃,遇到问题随时邮件列表或者issueNetty已经历各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大和IO编程一样的案例:添加Netty依赖<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>服务端:public class NettyServer {
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
NioEventLoopGroup boos = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap
.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8000);
}
}客户端:public class NettyClient {
public static void main(String[] args) throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
NioEventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new StringEncoder());
}
});
Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
while (true) {
channel.writeAndFlush("测试数据");
Thread.sleep(2000);
}
}
}Netty的事件驱动例如很多系统都会提供 onClick() 事件,这个事件就代表鼠标按下事件。事件驱动模型的大体思路如下:有一个事件队列;鼠标按下时,往事件队列中增加一个点击事件;有个事件泵,不断循环从队列取出事件,根据不同的事件,调用不同的函数;事件一般都各自保存各自的处理方法的引用。这样,每个事件都能找到对应的处理方法;为什么使用事件驱动?程序中的任务可以并行执行任务之间高度独立,彼此之间不需要互相等待在等待的事件到来之前,任务不会阻塞Netty使用事件驱动的方式作为底层架构,包括:事件队列(event queue):接收事件的入口。分发器(event mediator):将不同的事件分发到不同的业务逻辑单元。事件通道(event channel):分发器与处理器之间的联系渠道。事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作。核心组件Netty 的功能特性图:Netty 功能特性:传输服务,支持 BIO 和 NIO。容器集成:支持 OSGI、JBossMC、Spring、Guice 容器。协议支持:HTTP、Protobuf、二进制、文本、WebSocket 等,支持自定义协议。BIO和NIO的区别:Netty框架包含如下的组件:ServerBootstrap :用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端。Bootstrap:不接受新的连接,并且是在父通道类完成一些操作,一般用于客户端的。Channel:对网络套接字的I/O操作,例如读、写、连接、绑定等操作进行适配和封装的组件。EventLoop:处理所有注册其上的channel的I/O操作。通常情况一个EventLoop可为多个channel提供服务。EventLoopGroup:包含有多个EventLoop的实例,用来管理 event Loop的组件,可以理解为一个线程池,内部维护了一组线程。ChannelHandler和ChannelPipeline:例如一个流水线车间,当组件从流水线头部进入,穿越流水线,流水线上的工人按顺序对组件进行加工,到达流水线尾部时商品组装完成。流水线相当于ChannelPipeline,流水线工人相当于ChannelHandler,源头的组件当做event。ChannelInitializer:用于对刚创建的channel进行初始化,将ChannelHandler添加到channel的ChannelPipeline处理链路中。ChannelFuture:与jdk中线程的Future接口类似,即实现并行处理的效果。可以在操作执行成功或失败时自动触发监听器中的事件处理方法。上面的Netty框架包含如下的组件大概看的有点蒙,我们对之前编写的代码加上注释:服务端:public class NettyServer {
public static void main(String[] args) {
// 用于接受客户端的连接以及为已接受的连接创建子通道,一般用于服务端。
ServerBootstrap serverBootstrap = new ServerBootstrap();
// EventLoopGroup包含有多个EventLoop的实例,用来管理event Loop的组件
// 接受新连接线程
NioEventLoopGroup boos = new NioEventLoopGroup();
// 读取数据的线程
NioEventLoopGroup worker = new NioEventLoopGroup();
//服务端执行
serverBootstrap
.group(boos, worker)
// Channel对网络套接字的I/O操作,
// 例如读、写、连接、绑定等操作进行适配和封装的组件。
.channel(NioServerSocketChannel.class)
// ChannelInitializer用于对刚创建的channel进行初始化
// 将ChannelHandler添加到channel的ChannelPipeline处理链路中。
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 组件从流水线头部进入,流水线上的工人按顺序对组件进行加工
// 流水线相当于ChannelPipeline
// 流水线工人相当于ChannelHandler
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
//这个工人有点麻烦,需要我们告诉他干啥事
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.out.println(msg);
}
});
}
})
.bind(8000);
}
}
客户端:public class NettyClient {
public static void main(String[] args) throws InterruptedException {
// 不接受新的连接,并且是在父通道类完成一些操作,一般用于客户端的。
Bootstrap bootstrap = new Bootstrap();
// EventLoopGroup包含有多个EventLoop的实例,用来管理event Loop的组件
NioEventLoopGroup group = new NioEventLoopGroup();
//客户端执行
bootstrap.group(group)
// Channel对网络套接字的I/O操作,
// 例如读、写、连接、绑定等操作进行适配和封装的组件。
.channel(NioSocketChannel.class)
// 用于对刚创建的channel进行初始化,
// 将ChannelHandler添加到channel的ChannelPipeline处理链路中。
.handler(new ChannelInitializer<Channel>() {
提笔写架构
深入浅出消息队列---8、Kafka消息可靠性
Kafka消息可靠性第一种情况消费端消息丢失 场景描述: 位移提交:对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。单词"offset"可以编译为"偏移量",也可以翻译为"位移",在很多的中文资料中都会交叉使用"偏移量"和"位移"这两个词,对于消息在分区中的位置,我们将offset称之为"偏移量";对于消费者消费到的位置,将offset称为"位移",有时候也会更明确地称之为"消费位移"("偏移量"是在讲分区存储层面的内容,"位移"是在讲消费层面的内容)当然对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的。当每一次调用poll()方法时,它返回的是还没有消费过的消息集(当然这个前提是消息以及存储在Kafka中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来的动作称之为"提交",消费者在消费完消息之后需要执行消费位移的提交。默认情况下Kafka的消费位移提交是自动提交,这个由消费者客户端参数enable.auto.commit配置,默认值为true。当然这个默认的自动提交并不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒。自动位移提交的动作是在poll()方法的逻辑里面完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。自动位移提交带来为问题: 1、重复消费 2、消息丢失问题描述重复消费原理如下图所示: 消息丢失如下图所示:解决方案解决方案:将自动位移提交更改为手动位移提交注意: 1、在单独使用Kafka的java客户端将位移提交的模式更改为手动位移提交,那么我们就需要显示的调用consumer的方法完成位移提交。// 通过Kafka的方式消费消息,如果将位移提交更改为手动位移提交。那么我们就需要主动调用consumer的方法完成位移提交
private static void consumerFromKafka() {
// 创建属性集合,设置初始化参数
Properties properties = new Properties();
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers", "192.168.23.131:9092");
properties.put("group.id", "dd.demo");
properties.put("enable.auto.commit" , "false") ;
// 创建KafkaConsumer对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("dd"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
// consumer.commitSync(); // 进行同步位移提交,会阻塞当前线程
consumer.commitAsync(new OffsetCommitCallback(){
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
LOGGER.error("fail to commit offsets {}", offsets, exception);
}
}
});
}
}
2、 在使用的是spring boot和Kafka进行整合,当我们将spring.kafka.consumer.enable-auto-commit的值设置为false以后,只有在特定的提交模式下我们可以手动进行提交。在一些提交模式下由spring根据约定的条件控制提交。常见的提交模式是在ContainerProperties.AckMode这个枚举类中定义。AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:RECORD : 每处理一条commit一次BATCH : 每次poll的时候批量提交一次,频率取决于每次poll的调用频率TIME : 每次间隔ackTime的时间去commitCOUNT : 累积达到ackCount次的ack去commitCOUNT_TIME: ackTime或ackCount哪个条件先满足,就commitMANUAL : listener负责ack,但是背后也是批量上去MANUAL_IMMEDIATE : listner负责ack,每调用一次,就立即commitspring配置如下:# kafka配置
spring:
kafka:
consumer:
bootstrap-servers: 192.168.23.131:9092
enable-auto-commit: false # 设置手动位移提交
listener:
ack-mode: manual_immediate
代码实现:@KafkaListener(topics = "dd" , groupId = "dd.demo")
public void consumerHandler(String msg , KafkaConsumer consumer) {
LOGGER.info("consumer topic is : {} , msg is ----> {} " , "itheima"
, msg);
// consumer.commitSync(); // 同步位移提交
consumer.commitAsync((offsets , e) -> {
if(e == null){
LOGGER.info("位移提交成功....");
}else {
Set<Map.Entry<TopicPartition, OffsetAndMetadata>> entries =offsets.entrySet();
for(Map.Entry<TopicPartition , OffsetAndMetadata> entry :entries) {
TopicPartition topicPartition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
LOGGER.info("位移提交失败....topicPartition is {} ,offsets is {} " , topicPartition.partition() ,offsetAndMetadata.offset());
}
}
});
}
第二种情况生产者将消息发送到Broker中以后,消息还没有被及时消费,此时Broker宕机了,这样就会导致消息丢失。问题描述Kafka消息的发送流程: 在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程会根据指定的条件,不断从RecordAccumulator中拉取消息发送到Kafka broker。如下图所示: Sender线程拉取消息的条件:缓冲区大小达到一定的阈值(默认是16384byte),可以通过spring.kafka.producer.batch-size进行设定缓冲区等待的时间达到设置的阈值(默认是0), 可以通过linger.ms属性进行设定发送消息的三种方式:1、发后即忘 2、同步消息发送 3、异步消息发送发后即忘 只管往kafka发送消息(消息只发送到缓冲区)而并不关心消息是否正确到达。正常情况没什么问题,不过有些时候(比如不可重试异常)会造成消息的丢失。 这种发送方式性能最高,可靠性最差。如下所示:// 演示消息发送: 发送即忘
public static void sendMessageMethod01() {
for(int x = 0 ; x < 5 ; x++) {
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("itheima" , msg) ; // 发后即
忘
LOGGER.info("send msg success ---> {} " , msg);
}
}
同步消息发送 其实kafkaTemplate.send方法并不是返回void,而是ListenableFuture<SendResult<K, V>>,该类继承了jdk concurrent包的Future。如下:@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V
data) {
ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic,
data);
return doSend(producerRecord);
} 12345
要实现同步发送的,可以利用返回的ListenableFuture实现,如下:// 演示消息发送: 同步发送
public static void sendMessageMethod02() throws ExecutionException,
InterruptedException {
for(int x = 0 ; x < 5 ; x++) {
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("itheima" , msg).get() ; // 同步消息发
送
LOGGER.info("send msg success ---> {} " , msg);
}
}
实际上send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。在执行send()方法之后直接链式调用了get()方法可以阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。这种方式性能最差,可靠性较好。异步发送 在send方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。异步发送方式的示例如下:// 演示消息发送: 异步发送
public static void sendMessageMethod03() throws ExecutionException,
InterruptedException {
for(int x = 0 ; x < 5 ; x++) {
// Kafka消息的异步发送
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("itheima" , msg).addCallback((obj) -> {
LOGGER.info("send msg to kafka broker success ---> {} " ,
((SendResult)obj).getProducerRecord().value());
} , (t) -> {
t.printStackTrace();
});
LOGGER.info("send msg to local cache success ---> {} " , msg);
}
}
这种方式的特点:性能较好,可靠性也有保障解决方案针对上述情况所产生的消息丢失,可以的解决方案有如下几种:给topic设置replication.factor参数大于1,要求每个partition必须最少有两个副本搭建Kafka集群,让各个分区副本均衡的分配到不同的broker上在producer端设置acks=all,要求每条数据写入replicas后,才认为写入成功:ack=0, 生产者在成功写入消息之前不会等待任何来自服务器的响应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。ack=1,默认值为1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息(Kafka生产者内部机制)。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来 的及同步数据到follower节点,首领节点崩溃,就会导致数据丢失。ack=-1/all, 只有当所有参与复制的节点都收到消息时,生产者会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。当生产者发送消息完毕以后,没有收到Broker返回的ack,此时就会触发重试机制或者抛出异常。我们可以通过retries参数设置重试次数(spring boot和Kafka整合默认的重试次数为0),发送客户端会进行重试直到broker返回ack。
提笔写架构
深入浅出Zookeeper---4、ZK一致性与同步原理
CAP定理 CAP定理,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性)这三个基本需求,最多只能同时满足其中的2个。Consistency(一致性): 多个副本节点保持数据的一致性。Availability(可用性): 指系统一直处于可用的状态。Partition Tolerance(分区容错性): 遇到任何网络分区故障, 仍然能够保证对外提供满足一致性和可用性的服务。ZK为什么不能满足可用性呢?集群中网络出现分割的故障,ZK将他们从自己的管理范围内踢出去, 外界就不能访问这个节点.ZK在什么情况下是不能保证可用呢?ZK在进行leade选举时, 集群都是不可用的状态。选举leader的期间, 持续的时间短的话是数百毫秒, 长的话持续数秒。客户端加入重试机制来做补偿。ZAB协议解析概述ZAB (Atomic Broadcast Protocol)协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间数据一致性。ZAB与PAXOS的联系与区别Paxos算法的目的在于设计分布式的一致性状态机系统。 ZAB协议的设计目的在于分布式的高可用数据主备系统。 ZAB借鉴了Paxos算法,做了相应的改进,ZAB协议除了遵从Paxos算法中的读阶段和写阶段,还有加入了同步阶段。ZAB协议两个过程ZAB 协议主要包括两个过程: 消息广播和崩溃恢复。和三个阶段: 分为Discovery 发现,Synchronization 同步,Broadcast 广播。消息广播过程 :Leader 节点接受事务提交,并且将新的 Proposal 请求广播给 Follower 节点,收集各个节点的反馈,决定是否进行 Commit。崩溃恢复过程 如果在同步过程中出现 Leader 节点宕机,会进入崩溃恢复阶段,重新进行 Leader选举,崩溃恢复阶段还 包含数据同步操作,同步集群中最新的数据,保持集群的数据一致性。ZAB协议三个阶段ZAB存在三个阶段:发现阶段、同步阶段和广播阶段。其中发现阶段等同于Paxos的读阶段,广播阶段等同于Paxos的写阶段。 涉及专业术语:CEpoch:Follower 发送自己处理过的最后一个事务 Proposal 的 epoch 值。NewEpoch:Leader根据接收 Follower 的 epoch,来生成新一轮 epoch 值。Ack-E:Follower 确认接收 Leader 的新epoch。NewLeader:确立领导地位,向其他 Follower 发送 NewLeader 消息。Ack-LD:Follower确认接收 Leader 的 NewLeader 消息。 Commit-LD:提交新 Leader 的 proposal。Propose:Leader 开启一个新的事务。Ack:Follower 确认接收 Leader 的 Proposal。Commit:Leader 发送给 Follower,要求所有 Follower 提交事务 Proposal。Discovery(发现阶段) 处理过程: 1. Follower 将自己最后处理的事务 Proposal 的 epoch 值发送给 Leader,消息 CEpoch(F.p) , F.p 可以提取出 zxid。
2. 当 Leader 接收到过半的 Follower 的 CEpoch 消息后,Leader 生成 NewEpoch(e') 发送给这些 过半的 Follower, e' 是比任何从 CEpoch 消息中收到的 epoch 值都要大。
3. Follower 一旦从 Leader 处收到 NewEpoch(e') 消息,会先做判断,如果 e'<F.acceptedEpoch ,并且F.state = election 也就是Looking状态,那么会重新回到Leader选举 阶段。
4. Leader 一旦收到了过半的 Follower 的确认消息。它会从这些过半的 Follower 中选取一个 F,并 使用它作为初始化事务的集合(用于同步集群数据),然后结束发现阶段。既然要选择需要同步 的事务集合,必然要选择事务最全的,所以,须满足epoch 是最大的且 zxid 也是最大的条件。
Synchronization(同步阶段)第一个过程 NewLeader:Leader 将新 epoch 和 S’ 以 NewLeader(e’, S’) 的消息形式发送给所有过半 (Quorum) 的 Follower。在上一阶段 L.history = F.history ,所以 S’ 就是流程图中的L.history 。第二过程ACK:当 Follower 接收到NewLeader(e’, S’) 消息后,做相应判断: 1. 如果 Follower 的 epoch 等于 e’ ,也就是确认是不是该Follower的信息,因为前一阶段已经存储了最新的 e’ 。Follower 将会执行事务应用操作,将接收 S’ 中的所有事务 Proposal,只是接收不作其他处理。 2. 如果 Follower 的 epoch 不等于 e’ ,即不是这一轮的 Follower信息,直接回退至选举阶段。 3. Leader 在接收到过半的 Follower 的 Ack 消息后,发送 Commit 消息至所有的 Follower进行同 步,之后进入下一阶段即 Broadcast(消息广播)。第三个过程 Commit:在收到 Leader 的 Commit 消息后,按顺序依次调用 abdeliver() 处理 S’ 的 每一个事务,随后结束这一阶段的处理。Broadcast(广播阶段)第一个过程 Propose:Leader 收到来自客户端新的事务请求后,会生成对应的事务 Proposal,并根据 zxid 的顺序(递增)向追随自己的所有 Follower 发送 P<e’, <v, z>> ,其中 epoch(z) == e’ 。第二个过程 Ack:Follower 根据收到消息的次序来处理这些 Proposal,并追加到 H 中去,然后通知给 Leader。第三个过程 Commit:一旦 Follower 收到来自 Leader 的 Commit 消息,将调用 abdeliver() 提交事务 。 这里是按照zxid的顺序来提交事务。 广播请求处理流程:4.Leader(主)服务器接收 Client(客户端) 的请求,为其生成 Proposal(提案)。5.然后将 Proposal 发送给所有 Follower (从)。主会为每一个从分配一个单独的队列,以保证消息的有 序性。6.主节点等待所有从服务器反馈 Ack,当有过半的从服务器 Ack 之后,主节点会提交本地事务。然后广播 Commit 给所有,从节点接收到 Commit 之后完成提交。
提笔写架构
深入浅出业务幂等性---1、幂等性介绍与接口幂等
微服务与幂等性随着应用架构由单体架构到微服务架构进行演变,现如今市面上超过50%的应用都会基于分布式或微服务完成系统架构设计。在微服务架构体系内,就会存在若干个微服务,这些服务可能基于RPC或者HTTPS等协议进行通讯。那么既然服务之间存在相互调用,那么必然存在服务调用延迟或者失败的情况,当出现这种问题,服务端会进行重试等操作或客户端有可能会进行多次点击提交。如果这样请求多次的话,那最终处理的数据结果就一定要保证统一,如支付场景。此时就需要通过保证业务幂等性方案来完成。幂等性简介幂等本身是一个数学概念。即 f(n) = 1^n ,无论n为多少,f(n)的值永远为1。在编程开发中,对于幂等的定义为:无论对某一个资源操作了多少次,其影响都应是相同的。 换句话说就是:在接口重复调用的情况下,对系统产生的影响是一样的,但是返回值允许不同,如查询。幂等性包括数据幂等、接口幂等、服务幂等、消息幂等。以SQL为例:select * from table where id=1。此SQL无论执行多少次,虽然结果有可能出现不同,都不会对数据产生 改变,具备幂等性。insert into table(id,name) values(1,'小莫') 。此SQL如果id或name有唯一性约束,多次操作只允许插 入一条记录,则具备幂等性。如果不是,则不具备幂等性,多次操作会产生多条数据。update table set score=100 where id = 1 。此SQL无论执行多少次,对数据产生的影响都是相同的。具备幂等性。update table set score=50+score where id = 1 。此SQL涉及到了计算,每次操作对数据都会产生影响。 不具备幂等性。delete from table where id = 1 。此SQL多次操作,产生的结果相同,具备幂等性。幂等性设计主要从两个维度进行考虑:空间、时间。空间:定义了幂等的范围,如生成订单的话,不允许出现重复下单。时间:定义幂等的有效期。有些业务需要永久性保证幂等,如下单、支付等。而部分业务只要保证一段时间幂等即可。同时对于幂等的使用一般都会伴随着出现锁的概念,用于解决并发安全问题。业务与幂等性在业务开发与分布式系统设计中,幂等性是一个非常重要的概念,有非常多的场景需要考虑幂等性的问题,尤其对于现在的分布式系统,经常性的考虑重试、重发等操作,一旦产生这些操作,则必须要考虑幂等性问题。以交易系统、支付系统等尤其明显,如:当用户购物进行下单操作,用户操作多次,但订单系统对于本次操作只能产生一个订单。当用户对订单进行付款,支付系统不管出现什么问题,应该只对用户扣一次款。当支付成功对库存扣减时,库存系统对订单中商品的库存数量也只能扣减一次。当对商品进行发货时,也需保证物流系统有且只能发一次货。在电商系统中还有非常多的场景需要保证幂等性。但是一旦考虑幂等后,服务逻辑务必会变的更加复杂。因此是否要考虑幂等,需要根据具体业务场景具体分析。而且在实现幂等时,还会把并行执行的功能改为串行化,降低了执行效率。此处以下单减库存为例,当用户生成订单成功后,会对订单中商品进行扣减库存。 订单服务会调用库存服务 进行库存扣减。库存服务会完成具体扣减实现。现在对于功能调用的设计,有可能出现调用超时,因为出现如网络抖动,虽然库存服务执行成功了,但结果并没有在超时时间内返回,则订单服务也会进行重试。那就会出现问题,stock对于之前的执行已经成功了, 只是结果没有按时返回。而订单服务又重新发起请求对商品进行库存扣减。 此时出现库存扣减两次的问题。 对于这种问题,就需要通过幂等性进行结果。接口幂等对于幂等的考虑,主要解决两点前后端交互与服务间交互。这两点有时都要考虑幂等性的实现。从前端的思路解决 的话,主要有三种:前端防重、PRG模式、Token机制。前端防重通过前端防重保证幂等是最简单的实现方式,前端相关属性和JS代码即可完成设置。可靠性并不好,有经验的人员 可以通过工具跳过页面仍能重复提交。主要适用于表单重复提交或按钮重复点击。PRG模式PRG模式即POST-REDIRECT-GET。当用户进行表单提交时,会重定向到另外一个提交成功页面,而不是停留在原先的表单页面。这样就避免了用户刷新导致重复提交。同时防止了通过浏览器按钮前进/后退导致表单重复提交。 是一种比较常见的前端防重策略。Token机制方案介绍通过token机制来保证幂等是一种非常常见的解决方案,同时也适合绝大部分场景。该方案需要前后端进行一定程 度的交互来完成。1)服务端提供获取token接口,供客户端进行使用。服务端生成token后,如果当前为分布式架构,将token存放 于redis中,如果是单体架构,可以保存在jvm缓存中。2)当客户端获取到token后,会携带着token发起请求。3)服务端接收到客户端请求后,首先会判断该token在redis中是否存在。如果存在,则完成进行业务处理,业务 处理完成后,再删除token。如果不存在,代表当前请求是重复请求,直接向客户端返回对应标识。但是现在有一个问题,当前是先执行业务再删除token。在高并发下,很有可能出现第一次访问时token存在,完 成具体业务操作。但在还没有删除token时,客户端又携带token发起请求,此时,因为token还存在,第二次请求 也会验证通过,执行具体业务操作。对于这个问题的解决方案的思想就是并行变串行。会造成一定性能损耗与吞吐量降低。第一种方案:对于业务代码执行和删除token整体加线程锁。当后续线程再来访问时,则阻塞排队。第二种方案:借助redis单线程和incr是原子性的特点。当第一次获取token时,以token作为key,对其进行自增。 然后将token进行返回,当客户端携带token访问执行业务代码时,对于判断token是否存在不用删除,而是对其继续incr。如果incr后的返回值为2。则是一个合法请求允许执行,如果是其他值,则代表是非法请求,直接返回。那如果先删除token再执行业务呢?其实也会存在问题,假设具体业务代码执行超时或失败,没有向客户端返回明确结果,那客户端就很有可能会进行重试,但此时之前的token已经被删除了,则会被认为是重复请求,不再进行业务处理。这种方案无需进行额外处理,一个token只能代表一次请求。一旦业务执行出现异常,则让客户端重新获取令牌, 重新发起一次访问即可。推荐使用先删除token方案但是无论先删token还是后删token,都会有一个相同的问题。每次业务请求都回产生一个额外的请求去获取 token。但是,业务失败或超时,在生产环境下,一万个里最多也就十个左右会失败,那为了这十来个请求,让其他九千九百多个请求都产生额外请求,就有一些得不偿失了。虽然redis性能好,但是这也是一种资源的浪费。实现基于自定义业务流程实现这种实现方式省略,与传统实现无异。基于自定义注解实现直接把token实现嵌入到方法中会造成大量重复代码的出现。因此可以通过自定义注解将上述代码进行改造。在需 要保证幂等的方法上,添加自定义注解即可。1. 在token_common中新建自定义注解Idemptentpublic class IdemptentInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!(handler instanceof HandlerMethod)) {
return true;
}
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
Idemptent annotation = method.getAnnotation(Idemptent.class);
if (annotation != null){
//进行幂等性校验
checkToken(request);
}
return true;
}
@Autowired
private RedisTemplate redisTemplate;
//幂等性校验
private void checkToken(HttpServletRequest request) {
String token = request.getHeader("token");
if (StringUtils.isEmpty(token)){
throw new RuntimeException("非法参数");
}
boolean delResult = redisTemplate.delete(token);
if (!delResult){
//删除失败
throw new RuntimeException("重复请求");
}
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
}
}2. 修改token_service_order启动类,让其继承WebMvcConfigurerAdapter@Bean
public IdemptentInterceptor idemptentInterceptor() {
return new IdemptentInterceptor();
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
//幂等拦截器
registry.addInterceptor(idemptentInterceptor());
super.addInterceptors(registry);
}3. 更新token_service_order与token_service_order_api,新增添加订单方法,并且方法添加自定义幂等注解@Idemptent
@PostMapping("/genOrder2")
public String genOrder2(@RequestBody Order order){
order.setId(String.valueOf(idWorker.nextId()));
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
int result = orderService.addOrder(order);
if (result == 1){
System.out.println("success");
return "success";
}else {
System.out.println("fail");
return "fail";
}
}
提笔写架构
深入浅出消息队列---2、RabbitMQ介绍
RabbitMQ架构回顾RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的。它的实现架构如下图所示: 各个组件介绍: Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列 Queue:消息的载体,每个消息都会被投到一个或多个队列 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来 Routing Key:路由关键字,exchange根据这个关键字进行消息投递 Producer:消息生产者,就是投递消息的程序 Consumer:消息消费者,就是接受消息的程序 Channel:消息通道,在客户端的每个连接里,可建立多个channelAMQP协议介绍目前Rabbitmq最新的版本默认支持的是AMQP 0-9-1,该协议总共包含了3部分:Module Layer: 位于协议的最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自定义的业务逻辑。 例如,客户端可以是使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅消费一个队列中的消息。Session Layer: 位于中间层,主要负责将客户端的命令发送给服务端,在将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性的同步机制和错误处理。Transport Layer: 位于最底层,主要传输二进制数据流,提供帧的处理、信道的复用、错误检查和数据表示等。AMQP生产者流转过程接下来我们就来了解一些AMQP协议中生产者发送消息时所涉及到的相关命令,以及在发送数据的时候命令的传输的整体过程。 命令的传输流程(通过抓包工具进行抓取,选择虚拟机网卡)如下图所示:当客户端与Broker建立连接的时候,客户端会向Broker发送一个Protocol Header 0-9-1的报文头,以此通知Broker本次交互才采用的是AMQP 0-9-1协议;紧接着Broker返回Connection.Start来建立连接,在连接的过程中涉及Connection.Start/.Start-OK、Connection.Tune/.Tune-OK、Connection.Open/.Open-OK这6个命令的交互;连接建立以后需要创建通道,会使用到Channel.Open , Channel.Open-OK命令,在进行交换机声明的时候需要使用到Exchange.Declare以及Exchange.Declare-OK的命令。以此类推,在声明队列以及完成队列和交换机的绑定的时候都会使用到指定的命令来完成。在进行消息发送的时候会使用到Basic.Publish命令完成,这个命令还包含了Conetent-Header和Content-Body。Content Header里面包含的是消息体的属性,Content-Body包含了消息体本身。 由于我们本次演示的代码是spring boot和rabbitmq进行整合以后的代码,没有涉及到通道的关闭以及连接的释放,因此看不到关闭通道以及连接的相关命令传输过程。AMQP消费者流转过程接下来我们就来了解一些AMQP协议中消费消费时所涉及到的相关命令,以及在消费消息的时候命令的传输的整体过程。 命令的传输流程如下图所示: 我们发现消费者消费消息的时候,所涉及到的命令和生成者大部分都是相同的。在原有的基础之上,多个几 个命令:Basic.Qos/.Qos-OK以及Basic.Consume和Basic.Consume-OK。其中Basic.Qos/.Qos-OK这两个命令主要用来确认消费者最大能保持的未确认的消息数时使用。Basic.Consume和Basic.Consume-OK这两个命令主要用来进行消息消费确认。AMQP命令概览交换机的类型fanout扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:大规模多用户在线游戏可以使用它来处理排行榜更新等全局事件体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端分发系统使用它来广播各种状态和配置更新在群聊的时候,它被用来分发消息给参与群聊的用户 扇型交换机图例:direct直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队 列。直连交换机主要用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:topic前面提到的 direct 规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.而Topic 的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。 它的约定是: 1)binding key 中可以存在两种特殊字符 * 与 # ,用于做模糊匹配,其中 * 用于匹配一个单词, # 用于匹配多个单词(可以是零个) 2)routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“topic.email”、“topic.sms"、 binding key 与 routing key 一样也是句点号 “.” 分隔的字符串。下面介绍它是如何工作的:headersheaders 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换机的时候指定一组键值对,当发生消息到交换机时,Rabbitmq会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换机绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换机性能会很差,所以在生产环境中基本不会使用。下面介绍它是如何工作的: “x-match”的匹配规则(就是上面介绍的all、any的规则)小结:Rabbitmq中的这四种交换机,使用的较多的是direct。主要是基于两个原因:direct 交换机可以实现fanout的功能性能排序:fanout > direct >> topic。
提笔写架构
深入浅出单点登录---4、基于OAuth实现的统一认证
概述OAuth2 实质是为第三方应用颁发一个具有时效性的Token令牌,使其他服务或第三方应用能够通过令牌获取相关 资源。 常见的场景: 比如进入某个网站没有账号信息, 但可以通过QQ、微信、支付宝等账号进行登陆, 在这个 登陆过程中采用的就是Oauth2协议; OAUTH2不仅支持认证,还具备授权功能, 比如通过QQ登录获取用户头 像,基本资料等。OAuth2角色resource owner : 资源所有者,具备访问该资源的实体, 如果是某个人, 被称为end-user。resources server: 资源服务器,受保护的资源服务器, 具备提供资源能力, 如订单服务, 商品服务等。client: 客户端,这并不是指用户,而是对资源服务器发起请求的应用程序,比如前后分离项目, 前端服务访问管理接口, 访问后台业务功能接口。authorization server: 授权服务器, 能够给客户端颁发令牌, 这个就是我们上面所讲的统一认证授权服务器。user-agent: 用户代理,作为资源所有者与客户端沟通的工具, 比如APP, 浏览器等。OAuth2 协议流程OAuth2包含四种授权模式:授权码模式;隐式/简化授权模式;密码模式;客户端模式。5. Resource Owner 与 Client 之间 , 资源所有者向Client发起认证请求, Client再返回认证授权信息。6. Client 收到 Resource Owner 的认证请求后, 会去Authorization Server 申请访问令牌, Authorization Server会让Client 进行认证, 通过之后会返回Access Token。7. Client 拿到 Authorization Server 的 Acceess Token , 访问Resource Server,Resource Server 验证之后, 返回被保护的资源信息。8. Resource Server 可以通过JWT在本地进行验证, 也可以访问 Authorization Server, 对Client 的请求的合法性进行验证。OAuth2 授权码模式客户端携带 client_id, scope, redirect_uri, state 等信息引导用户请求授权服务器的授权端点下发 code。授权服务器验证客户端身份,验证通过则询问用户是否同意授权(此时会跳转到用户能够直观看到的授权页面,等待用户点击确认授权)。假设用户同意授权,此时授权服务器会将 code 和 state(如果客户端传递了该参数)拼接在 redirect_uri 后 面,以302(重定向)形式下发 code。客户端携带 code, redirect_uri, 以及 client_secret 请求授权服务器的令牌端点下发 access_token。授权服务器验证客户端身份,同时验证 code,以及 redirect_uri 是否与请求 code 时相同,验证通过后下发 access_token,并选择性下发 refresh_token,支持令牌的刷新。示例:1、授权请求:response_type=code // 必选项
&client_id={客户端的ID} // 必选项
&redirect_uri={重定向URI} // 可选项
&scope={申请的权限范围} // 可选项
&state={任意值} // 可选项2、授权响应参数:code={授权码} // 必填
&state={任意文字} // 如果授权请求中包含 state的话那就是必填3、令牌请求:grant_type=authorization_code // 必填
&code={授权码} // 必填 必须是认证服务器响应给的授权码
&redirect_uri={重定向URI} // 如果授权请求中包含 redirect_uri 那就是必填
&code_verifier={验证码} // 如果授权请求中包含 code_challenge 那就是必填4、令牌响应:"access_token":"{访问令牌}", // 必填
"token_type":"{令牌类型}", // 必填
"expires_in":{过期时间}, // 任意
"refresh_token":"{刷新令牌}", // 任意
"scope":"{授权范围}" // 如果请求和响应的授权范围不一致就必填OAuth2 隐式/简化模式资源拥有者(用户)通过代理(WEB浏览器)访问客户端程序,发起简化模式认证。客户端(Client)向认证服务器(Auth Server)发起请求, 此时客户端携带了客户端标识(client_id)和重定向地址(redirect_uri)。客户端(Client)拿到令牌 token 后就可以向第三方的资源服务器请求资源了。示例:1、授权请求:response_type=token // 必选项
&client_id={客户端的ID} // 必选项
&redirect_uri={重定向URI} // 可选项
&scope={申请的权限范围} // 可选项
&state={任意值} // 可选项2、授权响应参数:&access_token={令牌信息} // 必填
&expires_in={过期时间} // 任意
&state={任意文字} // 如果授权请求中包含 state 那就是必填
&scope={授权范围} // 如果请求和响应的授权范围不一致就必填思考:为什么要有授权码和简化模式?看完这两种模式, 可能会有些疑问, 为什么要这么麻烦, 直接一次请求 返回TOKEN不就可以吗?我们可以看出, 两者主要差别, 是少了code验证环节, 直接返回token了, code验证是客户端与认证服务器在后台进行请求获取, 代理是获取不到TOKEN的, 如果缺少这个环节, 直接返回TOKEN, 相当于直接暴露给所有参 与者, 存在安全隐患, 所以简化模式,一般用于信赖度较高的环境中使用。OAuth2 密码模式资源拥有者直接通过客户端发起认证请求。客户端提供用户名和密码, 向认证服务器发起请求认证。认证服务器通过之后, 客户端(Client)拿到令牌 token 后就可以向第三方的资源服务器请求资源了。示例:1、令牌请求:grant_type=password // 必填
&username={用户ID} // 必填
&password={密码} // 必填
&scope={授权范围} // 任意2、令牌响应:"access_token":"{访问令牌}", // 必填
"token_type":"{令牌类型}", // 必填
"expires_in":"{过期时间}", // 任意
"refresh_token":"{刷新令牌}", // 任意
"scope":"{授权范围}" // 如果请求和响应的授权范围不一致就必填此模式简化相关步骤, 直接通过用户和密码等隐私信息进行请求认证, 认证服务器直接返回token, 这需要整个 环境具有较高的安全性。OAuth2 客户端模式此模式最为简单直接, 由客户端直接发起请求。客户端与服务器信赖度较高, 服务端根据请求直接认证返回token信息。客户端(Client)拿到令牌 token 后就可以向第三方的资源服务器请求资源了。这种模式一般在内部服务之间应用, 授权一次, 长期可用, 不用刷新token。示例:1、令牌请求:grant_type=client_credentials // 必填
client_id={客户端的ID} // 必填
client_secret={客户端的密钥} // 必填
&scope={授权范围} // 任意2、令牌响应:"access_token":"{访问令牌}", // 必填
"token_type":"{令牌类型}", // 必填
"expires_in":"{过期时间}", // 任意
"scope":"{授权范围}" // 如果请求和响应的授权范围不一致就必填增强Token技术解决方案优势与应用场景基于Token的鉴权方案,实现方式有多种,增强Token属于其中一种,为什么要采用增强Token方式,它能够解决怎样的问题?普通Token认证方式,没有附带必要的用户信息,如果要查询,需要再次调用OAuth2的用户资料认证接口,会增加传输开销;JWT虽然能够附带一定用户信息,但受限于长度,存储空间有限; 如果既要保障性能, 又要求能够存储一定的信息,就可以采用增强Token方案,它是将信息存储至Redis缓存中,作为资源服务,接收到Token之后, 可以直接从Redis中获取信息。 它可以适用于微服务架构下,有一定用户信息要求的场景,比如订单服务、资金服务需要获取用户的基本资料,但如果是跨IDC,跨区域,需要暴露外网的情况下,不推荐采用此方案,因为需要保障数据的安全性。
提笔写架构
深入浅出单点登录---3、基于SAML实现的统一认证
概述SAML 2.0 用来在安全域中交换身份验证(Authentication)数据和 授权(Authorization)数据。SAML 2.0基于 XML协议,使用包含断言(Assertions)的安全令牌在SAML授权方(即身份提供者IdP)和SAML消费方(即服务 提供者SP)之间传递委托人(终端用户)的信息。SAML 2.0 可以实现基于网络跨域的单点登录(SSO), 以便于减少向一个用户分发多个身份验证令牌的管理开销。什么是断言断言是一个包含了由SAML授权方提供的0到多个声明(statement)的信息包。SAML断言通常围绕一个主题生 成。该主题使用声明。SAML 2.0规范定义了三种断言声明,详细信息如下:身份验证(Authentication):该断言的主题是在某个时间通过某种方式被认证。属性(Attribute):该言的主题和某种属性相关联。授权决策(Authorization Decision):该断言的主题被允许或者被禁止访问某个资源。断言举例:<?xml version="1.0" encoding="utf-8"?>
<saml:Assertion xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema‐instance" ID="b07b804c‐7c29‐ea16‐7300‐4f3d6f7928ac" Version="2.0" IssueInstant="2004‐12‐05T09:22:05Z">
<saml:Issuer>https://idp.example.org/SAML2</saml:Issuer>
<ds:Signature xmlns:ds="http://www.w3.org/2000/09/xmldsig#">...</ds:Signature>
<saml:Subject>
<saml:NameID Format="urn:oasis:names:tc:SAML:2.0:nameid‐format:transient">3f7b3dcf‐1674‐4ecd‐92c8‐1544f346baf8</saml:NameID>
<saml:SubjectConfirmation Method="urn:oasis:names:tc:SAML:2.0:cm:bearer">
<saml:SubjectConfirmationData InResponseTo="aaf23196‐1773‐2113‐474a‐fe114412ab72" Recipient="https://sp.example.com/SAML2/SSO/POST" NotOnOrAfter="2004‐12‐05T09:27:05Z"/>
</saml:SubjectConfirmation>
</saml:Subject>
<saml:Conditions NotBefore="2004‐12‐05T09:17:05Z" NotOnOrAfter="2004‐12‐05T09:27:05Z">
<saml:AudienceRestriction>
<saml:Audience>https://sp.example.com/SAML2</saml:Audience>
</saml:AudienceRestriction>
</saml:Conditions>
<saml:AuthnStatement AuthnInstant="2004‐12‐05T09:22:00Z" SessionIndex="b07b804c‐7c29‐ea16‐7300‐4f3d6f7928ac">
<saml:AuthnContext>
<saml:AuthnContextClassRef>urn:oasis:names:tc:SAML:2.0:ac:classes:PasswordProtectedTransport</saml:AuthnContextClassRef>
</saml:AuthnContext>
</saml:AuthnStatement>
<saml:AttributeStatement>
<saml:Attribute xmlns:x500="urn:oasis:names:tc:SAML:2.0:profiles:attribute:X500" x500:Encoding="LDAP" NameFormat="urn:oasis:names:tc:SAML:2.0:attrname‐format:uri" Name="urn:oid:1.3.6.1.4.1.5923.1.1.1.1" FriendlyName="eduPersonAffiliation">
<saml:AttributeValue xsi:type="xs:string">member</saml:AttributeValue>
<saml:AttributeValue xsi:type="xs:string">staff</saml:AttributeValue>
</saml:Attribute>
</saml:AttributeStatement>
</saml:Assertion>
它主要是用来实现Web浏览器的单点登录。该断言包括一个身份验证断言 saml:AuthnStatement 和一个属性断言 saml:AttributeStatement ,SP将使用该属性断言实现访问控制。工作流程SAML 认证流程一般都会牵涉到两方:服务提供方(SP)和身份提供方(IdP),典型的 SP 有阿里云、腾讯云以及 很多很多的 SaaS 服务;IdP 其实就是我们企业自己,因为用户目录在我们这里。 访问 SP 服务的时候,SP 会向 IdP 发送一个 SAML Request(具体是什么我们暂时不关心),请求 IdP 判断用户身 份。IdP 收到 SAML Request 后,可以通过某种手段对用户身份进行认证,如果已登录,可以直接返回用户身份信 息给 SP;如果未登录,可以弹出一个登录框,用户登录之后再将用户身份返回给 SP。SP 收到用户信息之后,再在 自己的数据库里面找出对应的用户,然后以这个用户的身份访问 SP 服务。用户通过浏览器访问网站(SP),网站提供服务但是并不负责用户认证。SP 向 IDP 发送了一个 SAML 认证请求,同时 SP将用户浏览器 重定向到 IDP。IDP 在验证完来自 SP 的合法请求,在浏览器中呈现登陆表单让用户填写用户名与密码信息,进行登陆。用户登陆成功, IDP 会生成一个包含用户信息的 SAMLtoken(SAML token 又称为 SAML Assertion,本质 上是 XML 节点)。IDP 向 SP 返回token,并且将 用户重定向 到 SP。SP 对拿到的 token进行验证,并从中解析出用户信息,例如用户是谁以及用户的权限有哪些。此时可以根据 这些授权信息允许用户访问我们网站的内容。授权机制SAML 只是认证协议,自身并不提供授权功能, 可以通过XACML实现授权。XACML 是可扩展访问控制标记语言,以XML的形式描述策略语言和授权决策请求/响应,提供管理授权决策的语法。SAML 和 XACML 结合实现权限访问控制,映射关系:SAML 和 XACML 结合控制应用模型:该模型是一个完整的访问控制体系结构,包含身份验证和授权两部分。身份验证可 以接受来自其它系统的各种安全 令牌,包括 SAML 断言,对请求主体进行验证并产生 SAML 身份验证断言。只要合作的第三方服务联合信任,就可 以实现 服务的安全交互以及用户 的单点登录。模型的授权基于 PMI 统一授权管理体系,授权系统向 AA(属性权威机构)请 求关于 Web 服务请求主体的属性信 息,AA 实现 SAML 接口,返回 SAML 属性断言。模型使用统一的策略语言 XACML,由 SAML 为其提供底层传输机制,适用于各种类型的访问 控制系统。策略可以 被不同的应用使用,使策略的管理更加容易。应用场景目前SAML广泛应用于云服务的认证,比如阿里云、AWS和腾讯云等,在云服务上面维护统一的用户信息进行身份 认证。SAML认证一般分为两部分,用户池与角色身份池。用户池可以让应用程序接入,也可以通过第三方身份提供商 (IdP) ,对用户身份进行认证。角色身份池可以通过凭证来控制访问云服务资源,比如阿里云推送服务,Amazon S3 和 DynamoDB等。以AWS的Amazon Cognito为例,简单介绍下它的应用:通过SAML协议验证用户身份,然后授予用户访问其他 AWS 服务的权限。在第一步中,您的应用程序用户通过用户池登录,并在成功进行身份验证后收到用户池令牌。接下来,您的应用程序通过用户池令牌交换 AWS 凭证。最后,您的应用程序用户可以使用这些 AWS 凭证来访问其他 AWS 服务(如 Amazon S3 或 DynamoDB)。AWS云服务接入方案用户池进行身份验证用户使用用户池进行身份验证。应用程序用户可以通过用户池直接登录,也可以通过第三方身份提供商 (IdP) 联合。用户池管理从通过 Facebook、Google、Amazon 和 Apple 进行的社交登录返回的以及从 OpenID Connect (OIDC) 和 SAML IdP 返回的令牌的处理开销。成功进行身份验证后, Web 或移动应用程序将收到来自 Amazon Cognito 的用户池令牌。可以使用这些令牌 检索允许的应用程序访问其他 AWS 服务的 AWS 凭证,也可以选择使用它们来控制对您的服务器端资源或 Amazon API Gateway 的访问。用户池访问服务器端资源用户池登录后,Web 或移动应用程序将收到用户池令牌。可以使用这些令牌控制对服务器端资源的访问。可 以创建用户池组来管理权限以及表示不同类型的用户。用户池和身份池访问云服务用户池登录认证成功之后,获取返回的令牌,再通过令牌换取身份池的信息, 拿去身份池信息就可以访问其他的云服务资源。支持第三方进行身份验证并使用身份池访问云服务身份池需来自第三方身份提供商,进行身份验证之后, 返回用户的 IdP 令牌。再通过令牌交换获取云服务的 身份池信息,身份池将授予可用来访问其他云服务的临时凭证。阿里云接入模式阿里云支持基于SAML 2.0的SSO(Single Sign On,单点登录),也称为身份联合登录。阿里云提供以下两种基于SAML 2.0协议的SSO方式:用户SSO:阿里云通过IdP颁发的SAML断言确定企业用户与阿里云RAM用户的对应关系 。企业用户登录后,使用 该RAM用户访问阿里云。 角色SSO:阿里云通过IdP颁发的SAML断言确定企业用户在阿里云上可以使用的RAM角 色。企业用户登录后,使用SAML断言中指定的RAM角色访问阿里云。请参见进行角色SSO。用户SSO当管理员在完成用户SSO的相关配置后,可以通过以下流程来实现用户SSO。Alice使用浏览器登录阿里云,阿里云将SAML认证请求返回给浏览器。浏览器向IdP转发SAML认证请求。IdP提示Alice登录,并在Alice登录成功后生成SAML响应返回给浏览器。浏览器将SAML响应转发给SSO服务。SSO服务通过SAML互信配置,验证SAML响应的数字签名来判断SAML断言的真伪,并通过SAML断言的 NameID元素值,匹配到对应阿里云账号中的RAM用户身份。SSO服务向浏览器返回控制台的URL。浏览器重定向到阿里云控制台。角色SSO企业员工Alice可登录到阿里云,使用浏览器在IdP的登录页面中选择阿里云作为目标服务。IdP生成一个SAML响应并返回给浏览器。浏览器重定向到SSO服务页面,并转发SAML响应给SSO服务。SSO服务使用SAML响应向阿里云STS服务请求临时安全凭证,并生成一个可以使用临时安全凭证登录阿里云 控制台的URL。SSO服务将URL返回给浏览器。浏览器重定向到该URL,以指定角色身份登录到阿里云控制台。
提笔写架构
消息通知系统详解4---整合Netty和WebSocket
修改配置修改消息通知微服务模块tensquare_notice的pom文件,添加下面的dependency依赖<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>2. 修改application.yml文件,添加下面的配置 rabbitmq:
host: 192.168.200.128这样消息通知微服务就引入了netty框架,并且具有了和Rabbitmq交互的能力实现Netty的整合整合分析现在的通讯模式如下:因为使用到了WebSocket和Netty,整合方式和以前有所不同,整合步骤:编写NettyServer,启动Netty服务。使用配置Bean创建Netty服务。编写NettyConfig。编写和WebSocket进行通讯处理类MyWebSocketHandler,进行MQ和WebSocket的消息处理。使用配置Bean创建Rabbit监听器容器,使用监听器。编写RabbitConfig。编写Rabbit监听器SysNoticeListener,用来获取MQ消息并进行处理。五个类的关系如下图:实现整合1. 编写ApplicationContextProvider.java,这个类是工具类,作用是获取Spring容器中的实例@Component
public class ApplicationContextProvider implements ApplicationContextAware {
/**
* 上下文对象实例
*/
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 获取applicationContext
*
* @return
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通过name获取 Bean.
*
* @param name
* @return
*/
public Object getBean(String name) {
return getApplicationContext().getBean(name);
}
/**
* 通过class获取Bean.
*
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
/**
* 通过name,以及Clazz返回指定的Bean
*
* @param name
* @param clazz
* @param <T>
* @return
*/
public <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
2. 编写NettyServerpublic class NettyServer {
public void start(int port) {
System.out.println("准备启动Netty。。。");
ServerBootstrap serverBootstrap = new ServerBootstrap();
//用来处理新连接的
EventLoopGroup boos = new NioEventLoopGroup();
//用来处理业务逻辑的,读写。。。
EventLoopGroup worker = new NioEventLoopGroup();
serverBootstrap.group(boos, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
//请求消息解码器
ch.pipeline().addLast(new HttpServerCodec());
// 将多个消息转换为单一的request或者response对象
ch.pipeline().addLast(new HttpObjectAggregator(65536));
//处理WebSocket的消息事件
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
//创建自己的webSocket处理器,就是用来编写业务逻辑的
MyWebSocketHandler myWebSocketHandler = new MyWebSocketHandler();
ch.pipeline().addLast(myWebSocketHandler);
}
}).bind(port);
}
}
3. 编写NettyConfig@Configuration
public class NettyConfig {
@Bean
public NettyServer createNettyServer() {
NettyServer nettyServer = new NettyServer();
//启动Netty服务,使用新的线程启动
new Thread(){
@Override
public void run() {
nettyServer.start(1234);
}
}.start();
return nettyServer;
}
}4. 编写MyWebSocketHandlerpublic class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static ObjectMapper MAPPER = new ObjectMapper();
// 送Spring容器中获取消息监听器容器,处理订阅消息sysNotice
SimpleMessageListenerContainer sysNoticeContainer = (SimpleMessageListenerContainer) ApplicationContextProvider.getApplicationContext()
.getBean("sysNoticeContainer");
//从Spring容器中获取RabbitTemplate
RabbitTemplate rabbitTemplate = ApplicationContextProvider.getApplicationContext()
.getBean(RabbitTemplate.class);
//存放WebSocket连接Map,根据用户id存放
public static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap();
//用户请求WebSocket服务端,执行的方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//约定用户第一次请求携带的数据:{"userId":"1"}
//获取用户请求数据并解析
String json = msg.text();
//解析json数据,获取用户id
String userId = MAPPER.readTree(json).get("userId").asText();
//第一次请求的时候,需要建立WebSocket连接
Channel channel = userChannelMap.get(userId);
if (channel == null) {
//获取WebSocket的连接
channel = ctx.channel();
//把连接放到容器中
userChannelMap.put(userId, channel);
}
//只用完成新消息的提醒即可,只需要获取消息的数量
//获取RabbitMQ的消息内容,并发送给用户
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate);
//拼接获取队列名称
String queueName = "article_subscribe_" + userId;
//获取Rabbit的Properties容器
Properties queueProperties = rabbitAdmin.getQueueProperties(queueName);
//获取消息数量
int noticeCount = 0;
//判断Properties是否不为空
if (queueProperties != null) {
// 如果不为空,获取消息的数量
noticeCount = (int) queueProperties.get("QUEUE_MESSAGE_COUNT");
}
//封装返回的数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount", noticeCount);
Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
//把数据发送给用户
channel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
//把消息从队列里面清空,否则MQ消息监听器会再次消费一次
if (noticeCount > 0) {
rabbitAdmin.purgeQueue(queueName, true);
}
//为用户的消息通知队列注册监听器,便于用户在线的时候,
//一旦有消息,可以主动推送给用户,不需要用户请求服务器获取数据
sysNoticeContainer.addQueueNames(queueName);
}
}5. 编写RabbitConfig@Configuration
public class RabbitConfig {
@Bean("sysNoticeContainer")
public SimpleMessageListenerContainer create(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
//使用Channel
container.setExposeListenerChannel(true);
//设置自己编写的监听器
container.setMessageListener(new SysNoticeListener());
return container;
}
}6. 编写SysNoticeListenerpublic class SysNoticeListener implements ChannelAwareMessageListener {
private static ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//获取用户id,可以通过队列名称获取
String queueName = message.getMessageProperties().getConsumerQueue();
String userId = queueName.substring(queueName.lastIndexOf("_") + 1);
io.netty.channel.Channel wsChannel = MyWebSocketHandler.userChannelMap.get(userId);
//判断用户是否在线
if (wsChannel != null) {
//如果连接不为空,表示用户在线
//封装返回数据
HashMap countMap = new HashMap();
countMap.put("sysNoticeCount", 1);
Result result = new Result(true, StatusCode.OK, "查询成功", countMap);
// 把数据通过WebSocket连接主动推送用户
wsChannel.writeAndFlush(new TextWebSocketFrame(MAPPER.writeValueAsString(result)));
}
}
}
7. 修改启动类,添加Netty服务的启动public static void main(String[] args) {
SpringApplication.run(NoticeApplication.class, args);
NettyServer server = ApplicationContextProvider.getApplicationContext().getBean(NettyServer.class);
try {
server.start(12345);
} catch (Exception e) {
e.printStackTrace();
}
}至此,基本代码已经完成,可以搞个前端页面测试下。
提笔写架构
深入浅出消息队列---7、Kafka介绍
kafka介绍Kafka是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统,可作为大数据量的消息中间件。kafka的具体架构如下所示:Producer 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。 生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。Comsumer 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。Topic 在Kafka中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为topic。如果把Kafka看做为一个数据库,topic可以理解为数据库中的一张表,topic的名字即为表名。Partition topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。Partition offset 每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的位置。Replicas of partition 副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为follower的 partition中消费数据,而是从为leader的partition中读取数据。副本之间是一主多从的关系。Broker Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。 如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。Leader 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。Follower Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与 Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。 当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中 删除,重新创建一个Follower。Zookeeper Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。AR(Assigned Replicas) 分区中所有的副本统称为AR。ISR(In-Sync Replicas) 所有与Leader部分保持一致的副(包括Leader副本在内)本组成ISR。OSR(Out-of-Sync-Replicas) 与Leader副本同步滞后过多的副本。
提笔写架构
深入浅出单点登录---2、解决方案
设计方案-Cookie概述用户登录之后, 将认证信息存储至Cookie,当再次访问本服务或者访问其他应用服务时,直接从Cookie中传递 认证信息,进行鉴权处理。问题1. 如何保障Cookie内用户认证信息的安全性?第一, Cookie内不能存放用户名和密码等敏感信息, 可以生成一串Token进行替代;第二, 通过加密方式存储Cookie信息,并且采用https加密方式传输,设定Cookie有效期,在服务端设定 Token的有效 期,避免攻击者伪造用户身份。2. 如何解决跨域问题?在实际应用中, 经常会存在各种服务需要鉴权处理, 但受浏览器同源策略限制,无法去正常操作 Cookie数据, 解决方式有两种:第一种,采用iframe方式解决跨域问题, 实现Cookie共享,但要注意,父窗口获取子窗口在跨域下可以 正常获取,子窗口后去父窗口仍会存在跨域问题, 这点在实现的时候要注意。第二种,采用JSONP方式实现跨域传输,这需要在服务端设置允许跨域请求, response.setHeader(“Access-Control-Allow-Origin”, “*”); 设置允许任何域名跨域访问,服务端返回数 据时,再设置callback,才能完成跨域请求。跨域Cookie设计实现方案设计方案-分布式Session概述大型应用服务无论是整体拆分,还是集群部署,都会涉及到统一会话问题,如何保障各服务节点都能够统一 有效鉴权? 某个服务节点宕机,重启后如何恢复登录状态? 在Cookie禁用的情况下如何实现SSO? 由此产生了分布式Session设计方案。 分布式Session方案,实质是通过自定义的Session机制来处理用户的登录鉴权信息,实现单点登录。实现流程技术框架Spring Session : 它是目前主流的Session 管理解决方案,Spring Session 并非特定应用于HTTP, 它是一种 广义的分布式统一Session,支持WebSocket和WebSession等,并且可以基于Redis、MongoDB等多种高性 能缓存来实现。XXL-SSO: 它是一个分布式单点登录框架。只需要登录一次就可以访问所有相互信任的应用系统。拥有”轻量 级、分布式、跨域、Cookie+Token均支持、Web+APP均支持”等特性。现已开放源代码,开箱即用。架构图:设计方案-客户端令牌Token概述根据客户端身份信息由认证服务生成签名令牌,令牌中会包含基本的用户信息,客户端在请求资源服务时会 附带令牌,资源服务根据加密协议在本地进行验证, 或者发送给认证服务端进行校验。它可以解决分布式会 话的安全性问题,比如会话劫持,同时不需要集中统一维护session,能够做到无状态化处理。OAuth2和JWT 都是基于令牌Token实现的认证方案。适用场景JWT (JSON Web Token) 是一个开放安全的行业标准,用于多个系统之间传递安全可靠的信息。它由三部分组 成,头部(Header)、载荷(playload)与签名(Signature)。Token实质是一个无意义的UUID,需要服务端做记录与认证, 但JWT则赋予了用户的身份信息,可以采用自定义算法进行加密与解密,直接实现信息的传输交 换。那具体适用于哪些场景?可以适用于微服务应用, 无论是内部服务节点的认证与授权, 或是令牌与API网关结合的认证。可以适用于开放式的API接口访问,比如前后分离API对接,第三方API接口对接等。实现流程设计方案-CAS认证概述CAS(Central Authentication Service)是耶鲁大学的开源项目,宗旨是为web应用系统提供一种可靠的单点登录解决方案。CAS从安全性角度来考虑设计,用户在CAS输入用户名和密码之后通过ticket进行认证,能够 有效防止密码泄露。CAS广泛使用于传统应用场景中,比如企业内部的OA,ERP等应用,不适用于微服务架构。设计实现流程CAS代理认证有两个应用App1和App2,它们都是受Cas Server保护,请求它们时都需要通过Cas Server的认证。现需要在 App1中以Http方式请求访问App2,显然该请求将会被App2配置的Cas的AuthenticationFilter拦截并转向Cas Server,Cas Server将引导用户进行登录认证,这样我们也就不能真正的访问到App2了。针对这种应用场 景,Cas也提供了对应的支持。代理认证具体流程:App1先通过Cas Server的认证,然后向Cas Server申请一个针对于App2的proxy ticket,之后在访问App2时 把申请到的针对于App2的 proxy ticket 以参数 ticket 传递过去。App2的 AuthenticationFilter 将拦截 到该请求,发现该请求携带了 ticket 参数后将放行交由后续的Ticket Validation Filter处理。Ticket Validation Filter将会传递该ticket到Cas Server进行认证,显然该ticket是由Cas Server针对于App2发行的, App2在申请校验时是可以校验通过的,这样我们就可以正常的访问App2了。OpenID认证概述OIDC( OpenID Connect) 是属于是OAuth 2.0协议之上的简单身份层,用API进行身份交互,允许客户端根据 授权服务的认证结果确认用户的最终身份,它支持包括Web、移动、JavaScript在内的所有客户端类型。它与 OAuth的主要区别是在于, OpenID 只用于身份认证,例如允许一个账户登录多个网站;而OAuth可以用于授 权,允许授权的客户端访问指定的资源服务。应用场景如果有独立账号体系,需要为外部提供统一认证服务, 可以采用OIDC,OIDC目前有很多企业在使用,比如 Google的账号认证体系,Microsoft的账号体系也采用了OIDC。如何工作OAuth2提供了Access Token来解决授权第三方客户端访问受保护资源的问题;OIDC在这个基础上提供了ID Token来解决第三方客户端标识用户身份认证的问题。OIDC的核心在于在OAuth2的授权流程中,一并提供用 户的身份认证信息(ID Token)给到第三方客户端,ID Token使用JWT格式来包装,得益于JWT(JSON Web Token)的自包含性,紧凑性以及防篡改机制,使得ID Token可以安全的传递给第三方客户端程序并且容易被 验证。此外还提供了UserInfo的接口,用户获取用户的更完整的信息。工作流程1.当用户第一次登录时,将用户名 密码发送给用户服务2.验证用户将用户标识OpenId返回到客户端3.客户端进行存储4.访问子系统 将OpenId发送到子系统5子系统将OpenId转发到验证服务6.验证服务将用户认证信息返回给子系统7.子系统构建用户验证信息后将授权后的内容返回给客户端工作模式默认模式/简化模式(Implicit Flow):如果是Web应用服务,其所有的代码都有可能被加载到浏览器暴露出来,无法保证终端client_secret的安全性,则采用默认模式。授权码模式(Authentication Flow):如果是传统的客户端应用,后端服务和用户信息是隔离的,能 保证client_secret的不被泄露,就可以使用授权码模式流程。混合模式(Hybrid Flow): 实质上是以上两种模式的融合,混合模式下ID Token通过浏览器的前端通道传递,而AccessToken和Refresh Token通过后端获取,混合使用, 可以弥补两种模式的缺点,一般推荐使用混合模式。SAML2.0认证概述SAML 全称是 Security Assertion Markup Language。SAML是支持身份认证的协议,它可以通过支持 XACML协议进行权限控制。SAML是基于XML实现的协议,较OAUTH来说较复杂些,但功能也十分强大,支 持认证,权限控制和用户属性识别等。目前在云服务的接入使用比较广泛,作为重点内容, 在下面的章节做详细讲解。OAuth2认证概述OAuth 2.0 是一个行业的标准授权协议,它的最终目的是为第三方应用颁发一个有时效性的令牌 token,使得 第三方应用能够通过该令牌获取相关的资源。它的主要作用可以实现登录认证与授权,常见的场景:比如第 三方登录,当你要登录某个论坛,但没有账号,通过QQ 登录的过程就是采用 OAuth 2.0 协议, 通过OAuth2 的授权,可以获取QQ头像等资源信息。OAuth2是目前应用最为广泛的认证授权协议,这是重点内容,在下 面的章节做详细深入讲解。
提笔写架构
深入浅出业务幂等性---4、消息幂等
消息幂等在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂 等等。本章节以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。消息重试演示消息队列的消息幂等性,主要是由MQ重试机制引起的。因为消息生产者将消息发送到MQ-Server后,MQ-Server 会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。数据补偿方案:当重试多次后仍然出现异常,则让此条消息进入死信队列,最终进入到数据库中,接着设置定时job查询这些数据,进行手动补偿。1. 修改consumer一方的配置文件# 消费者监听相关配置
listener:
simple:
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max‐attempts: 5
# 重试间隔时间(毫秒)
initial‐interval: 30002. 设置消费异常当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。消息幂等解决要保证消息幂等性的话,其实最终要解决的就是保证多次操作,造成的影响是相同的。那么其解决方案的思路与服务间幂等的思路其实基本都是一致的。消息防重表,解决思路与服务间幂等的防重表一致。redis。利用redis防重。 这两种方案是最常见的解决方案。其实现思路其实都是一致的。消息缓冲区对于RabbitMQ的使用,默认情况下,每条消息都会进行分别的ack通知,消费完一条后,再来消费下一条。但是 这样就会造成大量消息的阻塞情况。所以为了提升消费者对于消息的消费速度,可以增加consumer数据或者对消 息进行批量消费。MQ接收到producer发送的消息后,不会直接推送给consumer。而是积攒到一定数量后,再进 行消息的发送。 这种方式的实现,可以理解为是一种缓冲区的实现,提升了消息的消费速度,但是会在一定程度上 舍弃结果返回的实时性。对于批量消费来说,也是需要考虑幂等的。对于幂等性的解决方案,沿用刚才的思路即可解决。
提笔写架构
消息通知系统详解1---通讯方式
什么是消息通知系统消息通知系统,顾名思义即通知消息的传达处理系统。目的是为了让用户获得需要得到的消息及提醒并进行处理。消息通知微服务的定位是“平台内”的“消息”功能,分为全员消息,订阅类消息,点对点消息。例如系统通知,私信,@类消息全员消息系统通知,活动通知,管理员公告等全部用户都会收到的消息订阅类消息关注某一类数据的用户,该类数据有更新时向用户发送的消息。例如关注某位大v的微博,公众号,订阅某位知名作家的专栏点对点消息某位用户对另外一位用户进行操作后,系统向被操作的用户发送的消息。例如点赞,发红包。系统特性通讯方式这里先不考虑后端整体实现,前端与后端之间通讯方式如何选型,如何实现实时/准实时数据交互:需要介绍下三种通讯方式:短连接客户端和服务器每进行一次通讯,就建立一次连接,通讯结束就中断连接。HTTP是一个简单的请求-响应协议,它通常运行在TCP之上。HTTP/1.0使用的TCP默认是短连接。长连接是指在建立连接后可以连续多次发送数据,直到双方断开连接。HTTP从1.1版本起,底层的TCP使用的长连接。使用长连接的HTTP协议,会在响应头加入代码:Connection:keep-alive短连接和长连接的区别通讯流程短连接:创建连接 -> 传输数据 -> 关闭连接 长连接:创建连接 -> 传输数据 -> 保持连接 -> 传输数据 -> …… -> 关闭连接适用场景短连接:并发量大,数据交互不频繁情况长连接:数据交互频繁,点对点的通讯websocket协议什么是websocket协议WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。何谓全双工:全双工(FullDuplex)是通讯传输的一个术语。双方在通信时允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时进行信号的双向传输。指A→B的同时B→A,就像是双向车道。单工就就像是汽车的单行道,是在只允许甲方向乙方传送信息,而乙方不能向甲方传送 。服务器向客户端发送数据的功能是websocket协议的典型使用场景三种通信方式的优缺点优缺点如下:在 WebSocket中,浏览器和服务器只需要完成一次握手,就可以创建持久性的连接,并进行双向数据传输。在推送功能的实现技术上,相比使用Ajax 定时轮询的方式(setInterval),WebSocket 更节省服务器资源和带宽。出于服务器性能和实时性考虑,前后端通讯方式采用WebSocket协议实现。
提笔写架构
深入浅出Zookeeper---2、应用场景
ZK的应用场景服务注册发现 分布式服务架构中,服务的注册与发现是最核心的基础服务之一,注册中心可以看做是分布式服务架构的通信中心。 Zookeeper集群,通过监听机制,实现服务信息的订阅: Zookeeper是采用ZAB协议保证了数据的强一致性。ZAB协议的实现原理是怎样?ZK是如何实现选举, Paxos算法又是如何运用的?这个在后面算法章节会讲到。分布式锁分布式锁的实现需要注意的: 同步访问共享资源。锁的可重入性:递归调用不会被阻塞、不会发生死锁(synchronized methodA(), ReentrantLock)锁的超时:避免死锁、死循环等意外情况锁的阻塞:保证原子性等 ->下单(范围广, 包含商品, 账户资金, 下单信息生成,将粒度细化。)锁的特性支持:阻塞锁、可重入锁、公平锁、联锁、信号量、读写锁分布式锁的使用需要注意:分布式锁的开销, 能不能用就不用加锁的粒度加锁的方式(并发量大, 缓存做实现)基于ZK的分布式锁实现: 整体思想:每个客户端发起加锁时,生成一个唯一的临时有序节点。 如何判断锁是否创建呢?只需要判断是否存 在临时节点(若存在, 则根据序号判断)。ZK的分布式锁的优缺点: 优点: 可以有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。 缺点: 性能上不如基于缓存实现的分布式锁。ZK的强一致性, 一个事务的操作在所有节点都同步完成。ZK是如何实现分布式锁?排它锁定义: 如果某个事务(Transaction)对数据对象(Object)加上了排他锁,那么在整个加锁期间,只允许该事务对数据进行读取或更新操作,其他任何事务不能对该数据对象做任何操作,直到该事务释放了排他锁… 排它锁实现流程: Zookeeper 的强一致性特性,能够很好地保证在分布式高并发情况下节点的创建一定能够保证全局唯一性, 即Zookeeper将会保证客户端无法重复创建一个已经存在的数据节点。可以利用Zookeeper这个特性,实现排他锁。 实现步骤:定义锁:通过Zookeeper上的数据节点来表示一个锁2.获取锁:客户端通过调用 create方法创建锁的临时节点,创建成功的客户端代表获取锁,没有获得锁 的客户端在该节点上注册Watcher监听,以便实时获取lock节点的变更情况3.释放锁 符合以下两种情况都可以让锁释放 当前获得锁的客户端发生宕机或异常,那么Zookeeper上这个临时节点就会被删除正常执行完业务逻辑,客户端主动删除自己创建的临时节点共享锁定义: 如果事务Transaction对数据对象Object加上了共享锁,在不是写操作事务下, 其他事务仍可以对Object进行读取操作。 共享锁实现流程:定义锁2.获取锁 如果是读请求,则创建 /lockpath/[hostname]-R-序号 节点,如果是写请求则创建 /lockpath/[hostname]-W- 序号节点3.读写顺序判断处理创建完节点后,获取 /lockpath 节点下的所有子节点,并对下面所有注册的子节点变更进行 Watcher监听;确定自己的节点序号在所有子节点中的顺序, 针对顺序的大小, 处理读写请求流程;对于读请求:1. 如果没有比自己序号更小的子节点,或者比自己序号小的子节点都是读请求,那么表明可以成功获取到了共享锁;2. 如果有比自己序号小的子节点有写请求,那么先进行等待 。对于写请求,如果存在比自己更小的节点,那么进行等待;接收到Watcher通知后, 再次处理上面的读写请求流程4.释放锁 与排它锁逻辑一致。 基于ZK的共享锁实现流程:共享锁产生的羊群效应解决方案羊群效应是什么? 羊群是一种很散乱的组织,平时在一起也是盲目地左冲右撞,但一旦有一只头羊动起来,其他的羊也会不假思索地一哄而上,全然不顾前面可能有狼或者不远处有更好的草。因此,“羊群效应”就是比喻人都有一种从众心理,从众心理很容易导致盲从,而盲从往往会陷入骗局或遭到失败。羊群效应该如何解决呢? 其实只需要改进Watch的监听处理流程:利用ZK实现公平选举ZK是如何实现集群选举呢?基于ZK的Watch机制,ZK的所有节点的读取操作,都可以附带一个Watch,一旦数据有变化,Watch 就会被触发,通知客户端数据发生变动。基于ZK实现的分布式锁,这里的锁是指排它锁,任意时刻,最多只有一个进程可以获取锁。什么是公平选举? 公平选举是要遵循公平性,大家都遵循规则,依照请求先后顺序,参与选举。选举处理流程 三台节点向ZK集群创建Sequence类型节点,每个节点所创建的序号不一样, 他们会判断自己所创建的节点 序号是否为最小,这个与顺序有关, 如果是最小, 则选取为Leader,否则为Follower角色。如果Leader出现问题如何处理? Leader 所在进程如果意外宕机,其与 ZooKeeper 间的 Session 结束,由于其创建的节点为Ephemeral类 型,故该节点会自动被删除。Follower角色节点是如何感知的? 在公平模式下, 每个Follower都会 Watch 序号刚好比自己序号小的节点。在上图中,调用方节点2会Watch 节点/Master/Leader1,调用方节点3会Watch节点/Master/Leader2。如果Leader宕机,/Master/Leader1 删除,调用方节点2就能得到通知。节点2先判断自己的序号 2 是不是当前最小的序号,在该场景下,其序号 为最小,所以节点2成为新的Leader。利用ZK实现非公平选举什么是非公平选举? 非公平选举就是没有遵循选举的公平性,仍然沿用上面的例子: 村子里要选举村长,领导通知大家在明早7点前排队在前十的人就可以参与选举,这个时候有人晚到,但借关系插队排在前面,这个就是非公平选举。选举处理流程 三台调用节点向ZK集群创建Non-sequence节点,但只会有一个调用节点创建成功,谁能够抢占资源在ZK集 群创建成功,与顺序无关,则竞选为Leader,其他客户端则创建失败,成为Follower角色。
提笔写架构
深入浅出消息队列---12、Kafka存储
Kafka存储Kafka作为一个支持大数据量写入写出的消息队列,由于是基于Scala和Java实现的,而Scala和Java均需要在JVM上运行,所以如果是基于内存的方式,即JVM的堆来进行数据存储则需要开辟很大的堆来支持数据读写,从而会导致GC频繁影响性能。考虑到这些因素,kafka是使用磁盘而不是kafka服务器broker进程内存来进行数据存储,并且基于磁盘顺序读写和MMAP技术来实现高性能。存储结构介绍Kafka一个topic下可以存在很多个分区,不考虑分区副本的情况下。一个分区对应一个日志(Log) 。为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个巨型文件被平均分配为多个相对较小的文件,这样也便于消息的维护和清理。事实上,Log和LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如:以".txnindex"为后缀的事务索引文件)。如下图所示: Log对应了一个命名形式为-的文件夹。举个例子,假设有一个名为"itheima"的主题,此主题中具有4个分 区,那么在实际的物理存储上表现为"itheima-0" ,“itheima-1” , “itheima-2” , “itheima-3"这4个文件夹(/tmp/kafka-logs)。向Log中追加消息时是顺序写入的,**只有最后一个LogSegment才能执行写入操作,在此之前所有的LogSegment都不能写入数据。**为了方便描述,我们将最后一个LogSegment为"activeSegment”,即表示当前活跃的日志分段。每当segment文件达到一定的大小,则会创建一个新的segment文件(具体大小在server.properties中通过配置log.segment.bytes=1073741824:默认为1G)。之后追加的消息将写入新的activeSegment。文件名是以该文件的第一个数据相对于该分区的全局offset命名的。名称固定为20位数字,没有达到的位数则用0填充。比如第一个LogSegment的基准偏移量为0,对应的日志文件的名称为00000000000000000000.log。消息写入存储器的两种输入输出方式: 1、随机读写 2、顺序读写顺序读写 磁盘顺序读或写的速度400M/s,能够发挥磁盘最大的速度。 随机读写,磁盘速度慢的时候十几到几百K/s。这就看出了差距。随机读写:存储的数据在磁盘中占据空间,对于一个新磁盘,操作系统会将数据文件依次写入磁盘,当有些数据被删除时,就会空出该数据原来占有的存储空间,时间长了,不断的写入、删除数据,就会产生很多零零散散的存储空间,就会造成一个较大的数据文件放在许多不连续的存贮空间上,读写些这部分数据时,就是随机读写,磁头要不断的调整磁道的位置,以在不同位置上的读写数据,相对于连续空间上的顺序读写,要耗时很多。如下图所示: 当删除某些数据的时候,就会产生很多不连续的磁盘存储空间(磁盘碎片) 新文件的存储: **顺序读写:**就是在文件的末尾以追加的方式来写入消息,在Kafka中只能在日志文件的末尾追加新的消息,并且也不允许修改和删除已写入的消息。 如下图所示: 这种方法有一个缺陷—— 没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,消费者在消费消息的时候是通过"消费位移"(offset)来决定从哪个位置开始进行消息的消费。页缓存顺序写入是Kafka高吞吐量的一个原因,当然即使采用的是磁盘的顺序写入,那么也是没有办法和内存相比的。因为为了再一次提高Kakfa的吞吐量,Kafka采用了Memory Mapped Files(后面简称 mmap)也被翻译成内存映射文件 ,它的工作原理是直接利用操作系统的 page cache 来实现文件到物理内存的直接映射,完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。操作系统本身有一层缓存,叫做page cache,是在内存里的缓存,我们也可以称之为os cache,意思就是操作系统自己管理的缓存。你在写入磁盘文件的时候,可以直接写入这个os cache里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把os cache里的数据真的刷入磁盘文件中(每5秒检查一次是否需要将页缓存数据同步到磁盘文件)。仅仅这一个步骤,就可以将磁盘文件写性能提升很多了,因为其实这里相当于是在写内存,不是在写磁盘,大家看下图: 所以大家就知道了,上面那个图里,Kafka在写数据的时候,一方面基于了os层面的page cache来写数据,所以性能很高,本质就是在写内存罢了。另外一个,他是采用磁盘顺序写的方式,所以即使数据刷入磁盘的时候,性能也是极高的,也跟写内存是差不多的。基于上面两点,kafka就实现了写入数据的超高性能。日志清理Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加需要对消息做一定的清理操作。Kafka中每一个分区副本都对应了一个Log,而Log又可以分为多个日志分段,这样也便于日志的清理操作。Kafka提供了两种日志清理的策略: 1、日志删除(Log Retention):按照一定的保留策略直接删除不符合条件的日志分段。 2、日志压缩(Log Compaction):针对每个消息的Key进行整合,对于有相同key的不同value值,只保留最后一个版本。我们可以通过broker端参数:log.cleanup.policy来设置日志清理策略,此参数的默认值为"delete"(可以在kafka启动时打印的日志中查看到),即采用日志删除的清理策略。如果要采用日志压缩的清理策略,就需要将log.cleanup.policy设置为"compact",并且还需要将log.cleaner.enable的值设置为true。通过将log.cleanup.policy参数设置为"delete,compact",还可以同时支持日志删除和日志压缩两种策略。最常见的日志清理操作是日志删除。在Kafka的日志管理器中会有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300000,即5分钟。当前日志分段的常见的保留策略有2种: 1、基于时间的保留策略 2、基于日志大小的保留策略基于时间: 日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值来寻找可删除的日志分段文件集合。日志分段保留时间的阈值可以通过如下参数进行设置:log.retention.hours = 168 # 单位为小时,默认的取值为7天 优先级最低
log.retention.minutes=null # 单位为分钟 优先级次之
log.retention.ms=null # 单位为毫秒 优先级最高
删除日志分段文件时,首先会从Log对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作。然后将日志分段所对应的所有文件添加上".delete"的后缀(当然也包含对应的索引文件)。最后交由一个以"delete-file"的命令的延迟任务来删除这些以".delete"为后缀的文件,这个任务的延迟时间可以通过file.delete.delay.ms参数来调配,此 参数的默认值为60000,即1分钟。基于大小: 日志删除任务会检查当前日志的大小是否超过设定的阈值来查找可删除的日志分段的文件集合,可以通过如下参数来设定日志大小的阈值:log.retention.bytes = -1 # 默认为-1,表示无穷大
注:该参数配置的是Log中所有日志文件的总大小,而不是单个日志分段的大小(.log日志文件)零拷贝技术说完了写入这块,再来谈谈消费这块。 大家应该都知道,从Kafka里我们经常要消费数据,那么消费的时候实际上就是要从kafka的磁盘文件里读取某条数据然后发送给下游的消费者,如下图所示。 那么这里如果频繁的从磁盘读数据然后发给消费者,性能瓶颈在哪里呢? 假设要是kafka什么优化都不做,就是很简单的从磁盘读数据发送给下游的消费者,那么大概过程如下所示: 1、先看看要读的数据在不在os cache里,如果不在的话就从磁盘文件里读取数据后放入os cache。 2、接着从操作系统的os cache里拷贝数据到应用程序进程的缓存里。 3、再从应用程序进程的缓存里拷贝数据到操作系统层面的Socket缓存里。 4、最后从Socket缓存里提取数据后发送到网卡,最后发送出去给下游消费。 完成上述的操作,进行了两次copy: 1、一次是从操作系统的cache里拷贝到应用进程的缓存里。 2、从应用程序缓存里拷贝回操作系统的Socket缓存里。而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种方式来读取数据是比较消耗性能的。Kafka为了解决这个问题,在读数据的时候是引入零拷贝技术。也就是说,直接让操作系统的cache中的数据发送到网卡后传输给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存。大家看下图,体会一下这个精妙的过程: 通过零拷贝技术,就不需要把os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到Socket缓存了,两次拷贝都省略了,所以叫做零拷贝。对Socket缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从os cache中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
提笔写架构
深入浅出业务幂等性---3、服务幂等和分布式锁
实现思想对于分布式锁的实现,zookeeper天然携带的一些特性能够很完美的实现分布式锁。其内部主要是利用znode节点 特性和watch机制完成。znode节点在zookeeper中节点会分为四类,分别是:持久节点:一旦创建,则永久存在于zookeeper中,除非手动删除。持久有序节点:一旦创建,则永久存在于zookeeper中,除非手动删除。同时每个节点都会默认存在节点序 号,每个节点的序号都是有序递增的。如demo000001、demo000002…demo00000N。临时节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。临时有序节点:当节点创建后,一旦服务器重启或宕机,则被自动删除。同时每个节点都会默认存在节点序 号,每个节点的序号都是有序递增的。如demo000001、demo000002…demo00000N。watch监听机制watch监听机制主要用于监听节点状态变更,用于后续事件触发,假设当B节点监听A节点时,一旦A节点发生修改、删除、子节点列表发生变更等事件,B节点则会收到A节点改变的通知,接着完成其他额外事情。实现原理其实现思想是当某个线程要对方法加锁时,首先会在zookeeper中创建一个与当前方法对应的父节点,接着每个要获取当前方法的锁的线程,都会在父节点下创建一个临时有序节点,因为节点序号是递增的,所以后续要获取锁的线程在zookeeper中的序号也是逐次递增的。根据这个特性,当前序号最小的节点一定是首先要获取锁的线程,因此可以规定序号最小的节点获得锁。所以,每个线程再要获取锁时,可以判断自己的节点序号是否是最小的,如果是则获取到锁。当释放锁时,只需将自己的临时有序节点删除即可。根据上图,在并发下,每个线程都会在对应方法节点下创建属于自己的临时节点,且每个节点都是临时且有序的。 那么zookeeper又是如何有序的将锁分配给不同线程呢? 这里就应用到了watch监听机制。每当添加一个新的临时节点时,其都会基于watcher机制监听着它本身的前一个节点等待前一个节点的通知,当前一个节点删除时,就轮到它来持有锁了。然后依次类推。zookeeper是基于cp模式,能够保证数据强一致性。基于watch机制实现锁释放的自动监听,锁操作性能较好。频繁创建节点,对于zk服务器压力较大,吞吐量没有redis强。原理剖析&实现低效锁思想&实现在通过zookeeper实现分布式锁时,有另外一种实现的写法,这种也是非常常见的,但是它的效率并不高,此处可以先对这种实现方式进行探讨。此种实现方式,只会存在一个锁节点。当创建锁节点时,如果锁节点不存在,则创建成功,代表当前线程获取到锁,如果创建锁节点失败,代表已经有其他线程获取到锁,则该线程会监听锁节点的释放。当锁节点释放后,则继续尝试创建锁节点加锁。羊群效应这种方案的低效点就在于,只有一个锁节点,其他线程都会监听同一个锁节点,一旦锁节点释放后,其他线程都会收到通知,然后竞争获取锁节点。这种大量的通知操作会严重降低zookeeper性能,对于这种由于一个被watch的 znode节点的变化,而造成大量的通知操作,叫做羊群效应。高效锁思想&实现为了避免羊群效应的出现,业界内普遍的解决方案就是,让获取锁的线程产生排队,后一个监听前一个,依次排序。推荐使用这种方式实现分布式锁按照上述流程会在根节点下为每一个等待获取锁的线程创建一个对应的临时有序节点,序号最小的节点会持有锁, 并且后一个节点只监听其前面的一个节点,从而可以让获取锁的过程有序且高效。具体流程生成操作标识是为了防止feign调用超时出现重试,如果没有操作标识的话,库存服务无法判定是一次操作还是 多次操作,通过标识可以用于区分重试时当前是哪次操作。从而避免多次扣减库存情况的出现。库存服务先检查redis再检查Mysql,出于两点考虑:避免服务间重试时,库存服务无法区分是否为同一个操作,导致相同操作被执行多次。同时缓存结合关系型 数据库,可以起到减轻数据库压力的作用。库存流水表不仅用于区分操作,同时每一次扣减库存时信息都会被记录,可以用于后期的库存信息统计等操作。 总的来说,就是通过操作标识结合zookeeper分布式锁,完成mysql乐观锁的操作,思想上都是相同的。redis分布式锁单节点Redis实现分布式锁原理&实现分布式锁的一个很重要的特性就是互斥性,同一时间内多个调用方加锁竞争,只能有一个调用方加锁成功。而redis是基于单线程模型的,可以利用这个特性让调用方的请求排队,对于并发请求,只会有一个请求能获取到锁。redis实现分布式锁也很简单,基于客户端的几个API就可以完成,主要涉及三个核心API:setNx():向redis中存key-value,只有当key不存在时才会设置成功,否则返回0。用于体现互斥性。expire():设置key的过期时间,用于避免死锁出现。delete():删除key,用于释放锁。单节点问题锁续期当对业务进行加锁时,锁的过期时间,绝对不能想当然的设置一个值。假设线程A在执行某个业务时加锁成功 并设置锁过期时间。但该业务执行时间过长,业务的执行时间超过了锁过期时间,那么在业务还没执行完 时,锁就自动释放了。接着后续线程就可以获取到锁,又来执行该业务。就会造成线程A还没执行完,后续线 程又来执行,导致同一个业务逻辑被重复执行。因此对于锁的超时时间,需要结合着业务执行时间来判断, 让锁的过期时间大于业务执行时间。上面的方案是一个基础解决方案,但是仍然是有问题的。业务执行时间的影响因素太多了,无法确定一个准确值,只能是一个估值。无法百分百保证业务执行期间, 锁只能被一个线程占有。如想保证的话,可以在创建锁的同时创建一个守护线程,同时定义一个定时任务每隔一段时间去为未释放的锁增加过期时间。当业务执行完,释放锁后,再关闭守护线程。 这种实现思想可以用来解决锁续期。服务单点&集群问题在单点redis虽然可以完成锁操作,可一旦redis服务节点挂掉了,则无法提供锁操作。 在生产环境下,为了保证redis高可用,会采用异步复制方法进行主从部署。当主节点写入数据成功,会异步的将 数据复制给从节点,并且当主节点宕机,从节点会被提升为主节点继续工作。假设主节点写入数据成功,在没有将数据复制给从节点时,主节点宕机。则会造成提升为主节点的从节点中是没有锁信息的,其他线程则又可以继续加 锁,导致互斥失效。Redisson实现分布式锁redisson是redis官网推荐实现分布式锁的一个第三方类库。其内部完成的功能非常强大,对各种锁都有实现,同 时对于使用者来说非常简单,让使用者能够将更多的关注点放在业务逻辑上。此处重点利用Redisson解决单机 Redis锁产生的两个问题。单机Redisson实现分布式锁实现基于redisson实现分布式锁很简单,直接基于lock()&unlock()方法操作即可。1. 添加依赖<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--Redis分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.1</version>
</dependency>2. 修改配置文件server:
redis:
host: 192.168.200.150
port: 6379
database: 0
jedis:
pool:
max-active: 500
max-idle: 1000
min-idle: 43. 修改springboot启动类@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Bean
public RedissonClient redissonClient(){
RedissonClient redissonClient;
Config config = new Config();
String url = "redis://" + host + ":" + port;
config.useSingleServer().setAddress(url);
try {
redissonClient = Redisson.create(config);
return redissonClient;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}4. 定义锁工具类@Component
public class RedissonLock {
@Autowired
private RedissonClient redissonClient;
/**
* 加锁
* @param lockKey
* @return
*/
public boolean addLock(String lockKey){
try {
if (redissonClient == null){
System.out.println("redisson client is null");
return false;
}
RLock lock = redissonClient.getLock(lockKey);
//设置锁超时时间为5秒,到期自动释放
lock.lock(10, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+": 获取到锁");
//加锁成功
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean releaseLock(String lockKey){
try{
if (redissonClient == null){
System.out.println("redisson client is null");
return false;
}
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
System.out.println(Thread.currentThread().getName()+": 释放锁");
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
}5. 测试@SpringBootTest
@RunWith(SpringRunner.class)
public class RedissonLockTest {
@Autowired
private RedissonLock redissonLock;
@Test
public void easyLock(){
//模拟多个10个客户端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new LockRunnable());
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private class LockRunnable implements Runnable {
@Override
public void run() {
redissonLock.addLock("demo");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
redissonLock.releaseLock("demo");
}
}
}6. 执行效果根据执行效果可知,多线程并发获取所时,当一个线程获取到锁,其他线程则获取不到,并且其内部会不断尝试获取锁,当持有锁的线程将锁释放后,其他线程则会继续去竞争锁。lock()源码分析在上述加锁方法实现中,最核心就是getLock()和lock()。get()源码非常简单,根据当前传入的锁名称创建并返回 一个RLock对象。当获取到RLock对象后,调用其内部的lock()执行加锁操作。根据源码描述,当线程获取锁时,如果没有获取到锁,则会让其进入自旋,直到获取到锁。 如果获取到锁,则会一直保留到调用unLock()手动释放或根据传入的 leaseTime时间自动释放。当前传入两个参数值:锁超时时间,时间单位。主要用于避免死锁的出现,假设持有锁的redis节点宕机,到期后锁可以自动释放。lock()方法中还会调用lock()的另外一个重载方法,需要传入三个参数:过期时间、时间单位、是否中断。在三个参数的lock()重载方法中,首先会获取当前线程id,接着调用tryAcquire()方法尝试获取锁,如果返回值为 null,代表获取到锁。 如果返回值不是null,则根据当前线程id创建异步任务并放入线程池中,接着进入自旋,在 自旋过程中,尝试调用tryAcquire()获取锁,如果获取到则退出自旋。否则会不断的尝试获取锁。在lock()方法中,最核心的是tryAcquire()。其内部核心实现会调用tryAcquireAsync(),并传入过期时间、时间单位和当前线程id,进行锁的获取。如果leaseTime不为-1,代表设置了有效时间,接着调用tryAcquireAsync()去获取锁。如果是-1的话,则默认把永不过期改为30秒过期,并且创建异步任务,如果没有获取到锁,则什么都不做。如果获取到了锁,则调用scheduleExpirationRenewal()对当前线程id的锁进行延时。最终的tryLockInnerAsync()则是获取锁的具体实现。可以看到,其内部是基于lua脚本语言完成锁获取的。因为获取锁的过程涉及到了多步,为了保证执行过程的原子性,所以使用了lua,最核心的就是要理解这段lua脚本的执行过程。对于这款lua脚本来说,KEYS[1]代表需要加锁的key,ARGV[1]代表锁的超时时间,ARGV[2]代表锁的唯一标识。 对于这段lua脚本,简单来说:检查锁key是否被占用了,如果没有则设置锁key和唯一标识,初始值为1,并且设置锁key的过期时间。如果锁key存在,并且value也匹配,表示是当前线程持有的锁,那么重入次数加1,并且设置失效时间。返回锁key的失效时间毫秒数。unLock()源码分析在释放锁时,unlock()内部会调用unlockAsync()对当前线程持有的锁进行释放。其内部最终会执行 unlockInnerAsync()方法完成锁释放并返回结果。在unlockInnerAsync()中仍然是结合lua脚本完成释放锁操作。相关参数:KEYS[1]:当前锁key。KEYS[2]:redis消息的ChannelName,每个锁对应唯一的一个 channelName。ARGV[1]:redis消息体,用于标记redis的key已经解锁,用于通知其他线程申请锁。ARGV[2]:锁超时时间。ARGV[3]:锁的唯一标识。判断锁key和锁的唯一标识是否匹配,如果不匹配,表示锁已经被占用,那么直接返回。如果是当前线程持有锁,则value值-1,用于重入操作。如果-1后的值大于0,则对锁设置过期时间。如果-1后的值为0,则删除锁key,并发布消息,该锁已被释放。用于通知其他线程申请锁。锁续期对于锁续期问题,在单点redis实现分布式锁时已经介绍过了,用于防止业务执行超时或宕机而引起的业务被重复执行。根据对lock方法的解析,可以发现,当设置完过期时间后,当前锁的过期时间就已经被设定了,不会发生改变, 到期后则会被自动释放,因此在业务执行中,通过lock()方法加锁会造成隐患。看门狗所谓的看门狗是redisson用于自动延长锁有效期的实现机制。其本质是一个后台线程,用于不断延长锁key的生存时间。改造锁示例代码,让锁超时时间为1秒,但是业务执行时,需要耗时3秒,此时执行可以发现,多线程间在上一个锁没有释放的情况下,后续线程又获取到了锁。但是解锁的时候,出现异常,因为加锁时的唯一标识与解锁时的唯 一标识发生了改变,造成死锁。因为业务执行多久无法确定一个准确值,所以在看门狗的实现中,不需要对锁key设置过期时间,当过期时间为-1 时,这时会启动一个定时任务,在业务释放锁之前,会一直不停的增加这个锁的有效时间,从而保证在业务执行完 毕前,这把锁不会被提前释放掉。要开启看门狗机制也很简单,只需要将加锁时使用lock()改为tryLock()即可。并且根据之前lock的源码分析,如果没有设置锁超时,默认过期时间为30秒即watchdog每隔30秒来进行一次续期,该值可以修改。config.setLockWatchdogTimeout(3000L);进行测试,当加锁后,线程睡眠10秒钟,然后释放锁,可以看到在这段时间内,当前线程会一直持有锁,直到锁释放。在多线程环境下,也是阻塞等待进行锁的获取。红锁当在单点redis中实现redis锁时,一旦redis服务器宕机,则无法进行锁操作。因此会考虑将redis配置为主从结 构,但在主从结构中,数据复制是异步实现的。假设在主从结构中,master会异步将数据复制到slave中,一旦某 个线程持有了锁,在还没有将数据复制到slave时,master宕机。则slave会被提升为master,但被提升为slave的 master中并没有之前线程的锁信息,那么其他线程则又可以重新加锁。redlock算法redlock是一种基于多节点redis实现分布式锁的算法,可以有效解决redis单点故障的问题。官方建议搭建五台redis服务器对redlock算法进行实现。在redis官网中,对于redlock算法的实现思想也做了详细的介绍。地址:https://redis.io/topics/distlock。整个实现过程分为五步:记录获取锁前的当前时间。使用相同的key,value获取所有redis实例中的锁,并且设置获取锁的时间要远远小于锁自动释放的时间。假设锁自动释放时间是10秒,则获取时间应在5-50毫秒之间。通过这种方式避免客户端长时间等待一个已经关闭的实例,如果一个实例不可用了,则尝试获取下一个实例。客户端通过获取所有实例的锁后的时间减去第一步的时间,得到的差值要小于锁自动释放时间,避免拿到一个已经过期的锁。并且要有超过半数的redis实例成功获取到锁,才算最终获取锁成功。如果不是超过半数,有可能 出现多个客户端重复获取到锁,导致锁失效。当已经获取到锁,那么它的真正失效时间应该为:过期时间-第三步的差值。如果客户端获取锁失败,则在所有redis实例中释放掉锁。为了保证更高效的获取锁,还可以设置重试策略,在一定时间后重新尝试获取锁,但不能是无休止的,要设置重试次数。虽然通过redlock能够更加有效的防止redis单点问题,但是仍然是存在隐患的。假设redis没有开启持久化, clientA获取锁后,所有redis故障重启,则会导致clientA锁记录消失,clientB仍然能够获取到锁。这种情况虽然发生几率极低,但并不能保证肯定不会发生。保证的方案就是开始AOF持久化,但是要注意同步的策略,使用每秒同步,如果在一秒内重启,仍然数据丢失。使用always又会造成性能急剧下降。官方推荐使用默认的AOF策略即每秒同步,且在redis停掉后,要在ttl时间后再重启。 缺点就是ttl时间内redis无法对外提供服务。红锁实现redisson对于红锁的实现已经非常完善,通过其内部提供的api既可以完成红锁的操作。1. 新建配置类@Configuration
public class RedissonRedLockConfig {
public RedissonRedLock initRedissonClient(String lockKey){
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
Config config4 = new Config();
config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0);
RedissonClient redissonClient4 = Redisson.create(config4);
Config config5 = new Config();
config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0);
RedissonClient redissonClient5 = Redisson.create(config5);
RLock rLock1 = redissonClient1.getLock(lockKey);
RLock rLock2 = redissonClient2.getLock(lockKey);
RLock rLock3 = redissonClient3.getLock(lockKey);
RLock rLock4 = redissonClient4.getLock(lockKey);
RLock rLock5 = redissonClient5.getLock(lockKey);
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);
return redissonRedLock;
}
}2. 新建测试类,完成加锁与解锁操作@SpringBootTest
@RunWith(SpringRunner.class)
public class RedLockTest {
@Autowired
private RedissonRedLockConfig redissonRedLockConfig;
@Test
public void easyLock(){
//模拟多个10个客户端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new RedLockTest.RedLockRunnable());
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private class RedLockRunnable implements Runnable {
@Override
public void run() {
RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");
try {
boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);
if (lockResult){
System.out.println("获取锁成功");
TimeUnit.SECONDS.sleep(3);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
redissonRedLock.unlock();
System.out.println("释放锁");
}
}
}
}redissonRedLock加锁源码分析public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
/**
* 1. 允许加锁失败节点个数限制(N-(N/2+1)),当前假设五个节点,则允许失败节点数为2
*/
int failedLocksLimit = failedLocksLimit();
/**
* 2. 遍历所有节点执行lua加锁,用于保证原子性
*/
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
/**
* 3.对节点尝试加锁
*/
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
/**
*4. 如果获取到锁则添加到已获取锁集合中
*/
acquiredLocks.add(lock);
} else {
/**
* 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
*/
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
/**
* 6.计算从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则申请锁失败,返回false
*/
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
/**
* 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
*/
return true;
}
提笔写架构
深入浅出单点登录---1、什么是单点登录
什么是单点登录随着互联网大数据不断发展,应用服务的不断增多,单点登录越来越能够凸显其作用。单点登录 SSO(Single Sign On),顾名思义就是单个节点登录,全局使用。是目前最为流行的统一登录解决方案。为什么使用目的就是为了快速实现用户认证,统一管理用户信息, 避免重复维护用户数据; 分离用户与业务数据,让业务 服务专注于业务功能的实现,让用户中心服务统一认证,减少频繁认证次数, 同时保障数据的安全性。应用场景内部的服务统一认证与授权,比如电商网站, 内部的用户服务、订单服务、库存服务、资金服务等,以 用户服务作为认证服务中心, 实现统一认证与授权。外部的第三方登录认证与授权,比如登录某个论坛网站, 可以采用FaceBook或者Google账号进行登录。云服务应用,比如使用阿里云的消息推送服务,但不想创建和管理用户,就可以采用基于SAML协议实现 SSO单点登录。淘宝天猫登录场景解析访问淘宝网站, 登录之后, 再访问天猫网站, 你会发现, 天猫也是处于登录状态,那么具体是如何实现的?登录技术方案分析淘宝登录:天猫登录:目前整个登录体系是以淘宝作为中心,天猫通过淘宝作鉴权登录。整个鉴权体系是采用跨域cookie + 分布式 session作为解决方案:淘宝是如何解决Cookie跨域问题,目前淘宝是采用如下方案做处理:通过内嵌iframe,访问统一域名,实现Cookie信息共享,如果禁用Cookie,你会发现无法正常登录;同时利用静态资源不受同源策略的限制,通过JSONP跨域方式来获取用户的登录状态。Response会返回Token信息:var userCookie= {dnk:'',_nk_:'',_l_g_:'',ck1:'',tracknick:'',mt:'ci=0_0',l:'eBMMyMa4QmFJBq7pBO5aourza77T3Id b4sPzaNbMiInca6BPO3JuhNQqw5H95dtjgtC3xetzm21B9dLHR3fRwxDDBTJbWMu‐ exvO.',uc1:'',t:'aa749f01717bd2e29ccacc35701ebef7',unb:'',cna:'y4PeFr/mbEoCAXQZX0Z2u8bq',_t b_token_:'e6163b18b5154',version:'4.0.0'};window.TB && TB.Global && TB.Global.run && TB.Global.run();淘宝是如何解决分布式Session管理问题呢? 为了解决此问题,淘宝专门推出两个重要产品: 第一个是tbsession, 基于Tair缓存体系实现的共享Session; 另一个是passcookie,解决不同域名之间Cookie 同步的问题,上述的登录鉴权Cookie信息就是通过passcookie实现的统一管理。 淘宝是如何防范Session劫持?CSRF/XSRF 攻击的原理,就是利用浏览器对嵌入资源不做限制的行为进行跨站请求伪造攻击, 比如SSO登录架构设计SSO登录实现流程解析用户进入淘宝登录页面,调用地址: https://login.taobao.com/newlogin/login.do调用成功之后,同步Cookie,保存Token认证信息。3. 访问天猫网站,从Cookie里面拿取Token信息,采用jsonp方式,获取淘宝的登录状态:4. 如果不是从淘宝登录, 由天猫发起登录,会请求至淘宝登录页面, 登录完成之后写入Cookie信息, 再返 回至天猫网站。
提笔写架构
深入浅出消息队列---9、Kafka消息顺序性
Kafka消息顺序性消息顺序性Kafka保证消息顺序性的特点如下所示: topic中的数据分割为一个或多个partition。每个topic至少有一个partition。在单个partition中的数据是有序的,如果消息分散在不同的 partition,Kafka 无法保证其顺序性。但只需要确保要求顺序性的若干消息发送到同一个 partiton,即可满足其顺序性。并且在进行消息消费的时候,需要确保消费者是进行单线程消费。要保证若干消息发送到同一个partiton中,那么我们就需要在发送消息的时候指定一个分区的id,那么这样的话消息就被发送到同一个分区中。// 发送消息到指定的分区,保证分区的消息顺序性
public static void sendMessageToDestPartition() {
for(int x = 0 ; x < 5 ; x++) {
// Kafka消息的异步发送
String msg = "Kakfa环境测试...." + x ;
kafkaTemplate.send("dd" ,0 , "order" ,
msg).addCallback((obj) -> {
LOGGER.info("send msg to kafka broker success ---> {} " ,
((SendResult)obj).getProducerRecord().value());
} , (t) -> {
t.printStackTrace();
});
LOGGER.info("send msg to local cache success ---> {} " , msg);
}
}
消费者进行指定分区的消费:@KafkaListener(topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = "dd" ,
partitions = "0")} , groupId = "dd.demo")
public void consumerOrderMessageHandler(String msg , KafkaConsumer
consumer) {
LOGGER.info("consumer topic is : {} , msg is ----> {} " , "itcast" ,
msg);
consumer.commitAsync();
}
消费顺序性当消费端报错,消息重试了指定次数以后,还没有收到服务端的ack,此时就会抛出异常。 如果我们还需要保证消息的顺序性,那么我们就需要将max.in.flight.requests.per.connection设置为1,该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。如何保证顺序性:如果把 retries 设为非零整数,同时把max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。 一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retries 设为 0。可以把 max.in.flight.requests.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。配置如下所示:spring:
kafka:
producer:
bootstrap-servers: 192.168.23.131:9092
acks: all
retries: 2
properties: {'max.in.flight.requests.per.connection' : 1}
提笔写架构
深入浅出Zookeeper---1、特性与架构
概述ZooKeeper是一种集中式服务,用于维护配置信息,命名,提供分布式同步和提供组服务。所有这些类型的服务都以分布式应用程序的某种形式使用。每次实施它们都需要做很多工作来修复不可避免的错误和竞争条件。由于难以实现这些类型的服务,应用程序最初通常会吝啬它们,这使得它们在变化的情况下变得脆弱并且难以管理。即使正确完成,这些服务的不同实现也会在部署应用程序时导致管理复杂性。ZK集群架构设计 ZK主要分为三种角色:Leader(领导者):一个Zookeeper集群同一时间只会有一个实际工作的Leader,它会发起并维护与各 Follwer及Observer间的心跳。所有的写操作必须要通过Leader完成再由Leader将写操作广播给其它服务器。Follower(跟随者):一个Zookeeper集群可能同时存在多个Follower,它会响应Leader的心跳。Follower 可以处理客户端的读请求,但写请求转发给Leader处理,并且负责参与新 leader的选举、响应 leader 的提议。Observer(观察者):无投票权,不参加选举, 也不响应提议。Observer不需要将事务持久化到磁盘,如 果重启需要从 leader 重新同步整个名字空间。ZK特性:顺序一致性实时性原子性ZK数据结构与存储ZK数据结构模型 节点存储容量不能超过1M。ZNode节点类型: Znode的分为四类:持久节点(persistent node):节点会被持久化处理,执行命令: create /apps/app1 “order” 临时节点(ephemeral node):客户端断开连接后,ZooKeeper会自动删除临时节点, 执行命令: create -e /apps/app1 “order” 。顺序节点(sequential node):每次创建顺序节点时,ZooKeeper都会在路径后面自动添加上10位的数 字,从1开始,最大值为2147483647 (2^32-1),每个顺序节点都有一个单独的计数器,并且单调递增 的,由 leader 实例维护,执行命令: create -s /apps/app1 “order” 。临时顺序节点(EPHEMERAL_SEQUENTIAL):基本特性与临时节点一致,创建节点的过程中,zookeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名。ZNode节点属性:[zk: localhost:2181(CONNECTED) 1] get /testNode test
cZxid = 0x2
ctime = Fri Aug 06 22:28:23 CST 2020
mZxid = 0x2
mtime = Fri Aug 06 22:28:23 CST 2020
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
ZK数据存储方式数据存储方式分为三类:1.内存数据 内存数据结构分为三类:DataTree: 内存数据存储的核心DataNode: 数据存储的最小单元ZKDatabase: ZK的内存数据库2. 事务日志 事务日志处理流程: 事务日志文件内容示例: 查看日志命令:java ‐classpath .:./lib/slf4j‐api‐1.7.25.jar:./zookeeper‐3.4.14.jar org.apache.zookeeper.server.LogFormatter /data/zookeeper/version‐2/log.100000001 > log1.log
产生日志内容:3/14/22 8:55:19 PM EDT session 0x20003a778cc0012 cxid 0xa9 zxid 0x1000001bc delete '/lock‐ namespace/shared_lock/order/W‐0000000016
3/14/22 8:55:29 PM EDT session 0x20003a778cc0012 cxid 0xb0 zxid 0x1000001bd delete '/lock‐ namespace/shared_lock/order/W‐0000000017
3/14/22 9:46:18 PM EDT session 0x20003a778cc0012 cxid 0xb1 zxid 0x1000001be create '/lock‐ namespace/shared_lock/order/W-0000000018,#3139322e3136382e3132332e313033,v{s{31,s{'world,'anyone}}},T,19
3/14/22 9:46:38 PM EDT session 0x20003a778cc0012 cxid 0xb4 zxid 0x1000001bf delete '/lock‐ namespace/shared_lock/order/W‐0000000018
3. 数据快照(snapshot) 数据快照用来记录Zookeeper服务器上某一时刻的全量内存数据内容,并将其写入指定的磁盘文件中。 Zookeeper在进行若干次事务日志记录后,将内存数据库的全量数据Dump到本地文件中,这个就是数据快照 快照查看命令:java ‐classpath .:./lib/slf4j‐api‐1.7.25.jar:./zookeeper‐3.4.14.jar org.apache.zookeeper.server.SnapshotFormatter /data/zookeeper/version‐2/snapshot.100000000 > snap1.log
其处理步骤如下:检查是否需要进行数据快照,每一次事务日志记录之后,Zookeeper都会检测是否需要进行数据快照, 考虑到数据快照对于Zookeeper机器的影响,需要尽量避免ZK集群中的所有机器在同一时刻进行数据快照。 采用过半随机策略进行数据快照操作。切换事务日志文件,表示当前的事务日志已经写满,需要重新创建一个新的事务日志。创建数据快照的异步线程,创建单独的异步线程来进行数据快照以避免影响Zookeeper主线程的运行状态。获取全量数据和会话信息,从ZKDatabase数据库中获取到DataTree和会话信息。生成快照数据文件名,ZK根据当前已经提交的最大ZXID来生成数据快照文件名。数据序列化,首先序列化文件头信息,然后再对会话信息和DataTree分别进行序列化,同时生成一个 Checksum校验文件,一并写入快照文件中。
提笔写架构
深入浅出消息队列---4、RabbitMQ消息可靠性传输
RabbitMQ消息可靠性传输在我们的业务系统中,一旦使用到了消息队列,我们就必须考虑消息的丢失问题。比如在秒杀业务中,一旦消息丢失了对我们用户而言就是不公平的。第一种情况场景描述 生产者已经将消息发送给了队列,但是此时消费者还没以及时对消息进行消费,这个时候指定的队列主机宕机了,这样存储在队列的消息也会丢失。解决方案 对消息进行持久化操作。当对消息进行持久化操作以后,这个消息一旦被发送到mq中的某一个队列,那么此时Rabbitmq会立马将消息进行持久化。注意 spring boot和rabbitmq进行整合以后,默认消息的存储就是持久化方式。我们可以将所有的消息都设置为持久化,但是这样会影响Rabbitmq的性能。因为我们需要将消息写入到内存的同时还需要将消息写入到磁盘。对于可靠性不是那么高的消息可以不采用持久化处理,以提高整体系统的吞吐量。消息默认是持久化模式: 设置消息为非持久化模式:// 测试Direct类型的交换机
private static void directExchangeMessageTransport() {
rabbitTemplate.convertAndSend("direct.exchange" , "create" , "direct exchange 测试数据" , (message) -> {
// spring boot和rabbitmq整合以后,默认消息是会被持久化的,我们可以将消息 的持久化方式设置为不进行持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_ PERSISTENT);
return message ;
});
}
消息存储机制不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。 1、持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,当内存吃紧的时候会从内存中清除。 2、非持久化的消息一般只保存在内存中,在内存吃紧的时候会被写入到磁盘中,以节省内存空间。 这两种类型的消息的落盘处理都在RabbitmqMQ的"持久层"中完成。持久层的组成如下所示: rabbit_queue_index:负责维护队列中的落盘消息的信息,包括消息的存储地点、是否已被交付给消费者、 是否已被消费者ack。每一个队列都有与之对应的一个rabbitmq_queue_index。 rabbit_msg_store: 负责消息的存储,它被所有的队列共享,在每个节点中有且只有一个。 rabbit_msg_store可以在进行细分: 在容器中默认这些信息是通过/var/lib/rabbitmq/mnesia/rabbit@977b5f791952这个路径下的3个文件夹进 行存储: 消息可以存储在rabbit_queue_index中也可以存储在rabbit_msg_store中。最佳的配置是较小的消息存储在rabbit_queue_index中而较大的消息存储在rabbit_msg_store中。这个消息的界定可以通过queue_index_embed_msgs_below来配置,默认大小为4096,单位为B。注意这里的消息大小是 指消息体、属性以及headers整体的大小。当一个消息小于设定的大小阈值时就可以存储在rabbit_queue_index中,这样可以得到性能上的优化。这种存储机制是在Rabbitmq3.5 版本以后引入 的,该优化提高了系统性能10%左右。那么我们是不是把queue_index_embed_msgs_below参数的值调节的越大越好呢? 肯定不是的rabbit_queue_index中以顺序(文件名从0开始累加)的段文件来进行存储,后缀为".idx",每个段文件中包含固定的SEGMENT_ENTRY_COUNT条记录,SEGMENT_ENTRY_COUNT默认值为 16384。每个rabbit_queue_index从磁盘中读取消息的时候至少在内存中维护一个段文件,所以设置 queue_index_embed_msgs_below值的时候需要格外谨慎,一点点增大也可能会引起内存爆炸式增长。队列的结构队列的结构以及消息的状态 Rabbitmq中队列的是由两部分组成:rabbit_amqpqueue_process和backing_queue组成: rabbit_amqpqueue_process: 负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。 backing_queue: 是消息存储的具体形式和引擎,并向rabbit_amqpqueue_process提供相关的接口以供调用。 如果消息发送的队列是空的且队列有消费者,该消息不会经过该队列而是直接发往消费者,如果无法直接被消费,则需要将消息暂存入队列,以便重新投递。消息在存入队列后,主要有以下几种状态:alpha:消息内容(包括消息体、属性和headers)和消息索引都存在内存中(消耗内存最多,CPU消耗最少)beta:消息内容保存在磁盘中,消息索引都存在内存中(只需要一次IO操作就可以读取到消息)gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都存在(只需要一次IO操作就可以读取到消息)delta:消息内容和消息索引都在磁盘中(消耗内存最小,但是会消耗更多的CPU和磁盘的IO操作)持久化的消息,消息内容和消息索引必须先保存在磁盘中,才会处于上面状态中的一种,gamma状态只有持久化的消息才有这种状态。Rabbitmq在运行时会根据统计的消息传送速度。 定期计算一个当前内存中能够保存的最大消息数量(target_ram_count), 如果alpha状态的消息数量大于此值时,就会引起消息的状态转换,多余的消息可能会转换到beta状态、gamma状态或者delta状态。 区分这4种状态的主要作用是满足不同的内存和CPU 的需求。 对于普通队列而言,backing_queue内部的实现是通过5个子队列来体现消息的状态的:Q1:只包含alpha状态的消息Q2:包含beta和gamma的消息Delta:包含delta的消息Q3:包含beta和gamma的消息Q4:只包含alpha状态的消息 一般情况下,消息按照Q1->Q2->Delta->Q3->Q4这样的顺序进行流动,但并不是每一条消息都会经历所有状态,这取决于当前系统的负载情况(比如非持久化的消息在内存负载不高时, 就不会经历delta)。如此设计的好处:可以在队列负载很高的情况下,能够通过将一部分消息由磁盘保存来 节省内存空间,而在负载降低的时候,这部分消息又渐渐回到内存被消费者获取, 使得整个队列具有良好的弹性。消费消息时的状态转换消费者消费消息也会引起消息状态的转换,状态转换的过程如下所示:消费者消费时先从Q4获取消息,如果获取成功则返回。如果Q4为空,则从Q3中获取消息,首先判断Q3是否为空,如果为空返回队列为空,即此时队列中无消息如果Q3不为空,取出Q3的消息,然后判断Q3和Delta中的长度,如果都为空,那么Q2、Delta、Q3、Q4都为空,直接将Q1中的消息转移至Q4,下次直接从Q4中读取消息如果Q3为空,Delta不为空,则将Delta中的消息转移至Q3中,下次直接从Q3中读取。在将消息从Delta转移至Q3的过程中,是按照索引分段读取,首先读取某一段,然后判断读取的消息个数和Delta消息的个数,如果相等,判定Delta已无消息,直接将读取 Q2和读取到消息一并放入Q3,如果不相等,仅将此次读取的消息转移到Q3。 通常在负载正常时,如果消息被消费的速度不小于接收新消息的速度,对于不需要保证可靠性的消息来说,极有可能只会处于alpha状态。对于durable属性设置为true的消息,它一定会进入gamma状态,并且在开启publisher confirm机制时,只有到了gamma状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。消息堆积处理在系统负载较高中,已经收到的消息若不能很快被消费掉,就是这些消息就是在队列中"堆积", 那么此时 Rabbitmq就需要花更多的时间和资源处理"堆积"的消息,如此用来处理新流入的消息的能力就会降低,使得流入的消息又被"堆积"继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。 减少消息堆积的常见解决方案: 1、增加prefetch_count的值,设置消费者存储未确认的消息的最大值,消息达到prefetch_count最大值,直到确认了,生产者才会可能推送新的消息 2、消费者进行multiple ack,降低ack带来的开销惰性队列默认情况下,当生产者将消息发送到Rabbitmq的时候,队列中的消息会尽可能地存储在内存中,这样可以更快地将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。这样的机制无形会占用更多系统资源,毕竟内存应该留给更多有需要的地方。如果发送端过快或消费端宕机,导致消息大量积压,此时消息还是在内存和磁盘各存储一份,在消息大爆发的时候,MQ服务器会撑不住,影响其他队列的消息收发,能不能有效的处理这种情况呢。答案 惰性队列。 RabbitMQ从3.6.0版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加I/0的使用,如果消息是持久化的,那么这样的I/0操作不可避免,惰性队列和持久化的消息可谓是"最佳拍档"。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失。 把一个队列设置成惰性队列的方式:// 声明队列
@Bean(name = "direct.queue_03")
public Queue commonQueue03() {
QueueBuilder queueBuilder = QueueBuilder.durable("direct.queue_03");
queueBuilder.lazy(); // 把队列设置成惰性队列
return queueBuilder.build();
}
第二种情况场景描述 消费者消费到这个消息但是还没有及时处理,消费者宕机了。解决方案 默认情况下,消费者消费到这个消息以后会自动给服务端发送一个Basic.Ack指令,告知这个消息已经被消费了,此时服务端会将这个消息从内存(磁盘)删除掉。 针对上述消息丢失的场景:我们只需要将自动应答更改为手动应答即可。 具体的实现如下所示: 1、更改消费端应答模式为:手动应答listener:
simple:
acknowledge-mode: manual # 更改消息的应答模式为手动应答
2、当消费者消费完消息以后,进行消息应答@RabbitListener(queues = "direct.queue_01")
public void directExchangeQueue01(String messageBody , Channel channel
, Message message) {
try {
// 对消息进行消费
LOGGER.info("direct exchange queue 01 message is : {}" ,
messageBody);
// 进行手动应答,第二个参数表示是否需要将该消息之前的所有的消息都进行应答
channel.basicAck(message.getMessageProperties().getDeliveryTag() ,
false);
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("direct exchange queue 01 处理失败, message is :
{}" , messageBody);
}
}
消息分发机制默认的分发机制:当Rabbitmq队列拥有多个消费者时,队列收到的消息将以轮询的方式发给消费者。每条消息只会发给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。 很多时候轮询的分发机制也不是那么的优雅。默认情况下,如果有n个消费者,那么Rabbitmq会将第m条消息分发给第m%n(取余的方式)个消费者,Rabbitmq不管消费者是否消费并已经确认了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他的消费者由于某种原因(比如业务逻辑简单、机器性能卓越等)很快地处理完了分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。 那么该如何处理呢?这里就要用到channel.basicQos(int prefetchCount)这个方法。channel.basicQos方法允许限制通道上的消费者所能够保持的最大未确认消息的数量。spring.rabbitmq.listener.simple.prefetch: 1 # 设置队列中最大的未确认的消息数量
第三种情况场景描述 生成者将消息发送给交换机以后,正当交换机将这个消息发送给指定队列的时候,该队列所在的主机宕机了,那么这一则消息就会丢失。解决方案 要想解决这种情况下消息的丢失,我们就需要知道生产者针对该消息的投递结果。默认情况下发送消息的操作,服务端是不会返回任何信息给生产者的,也就是说默认情况下生产者是不知道消息有没有正确地到达服务器端。那么要想知道生产者针对该消息的投递结果,我们有两种解决方案: 1、通过事务机制实现 2、通过发送方确认(publisher confirm)机制实现事务机制的实现方案在配置类中配置Rabbitmq的事务管理器:// 配置事务管理器
@Bean(name = "rabbitTransactionManager")
public RabbitTransactionManager
rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory) ;
}
定义发送消息的类:@Component
public class RabbitmqProducer {
// 定义日志记录器
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitmqProducer.class) ;
@Autowired
private RabbitTemplate rabbitTemplate ;
@Transactional(rollbackFor = Exception.class , transactionManager =
"rabbitTransactionManager")
public void sendMessage() {
rabbitTemplate.setChannelTransacted(true); // 将消息通道设置为事务机制
String msg = "测试生产者事务消息" ;
rabbitTemplate.convertAndSend("direct.exchange" , "create" ,
msg.getBytes());
int a = 1 / 0 ;
LOGGER.info("transactionManager message send success ----> " +
msg);
}
}
在调用方进行try…catch处理(可以重新尝试发送消息)// 测试事务消息
private static void
sendTransactionManagerMsg(ConfigurableApplicationContext
applicationContext) {
RabbitmqProducer rabbitmqProducer =
applicationContext.getBean(RabbitmqProducer.class);
try {
rabbitmqProducer.sendMessage();
}catch (Exception e) {
e.printStackTrace();
System.out.println("事务消息回滚了");
}
}
我们也可以通过Wireshark捕获到命令的传递过程,如下所示: 事务机制会影响Rabbitmq的性能,事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitmqMQ的 回应,之后才能继续发送下一条消息。因此在真实开发过程中很少的使用。 Rabbitmq提供了一个改进方案,即发送方确认机制(publisher confirm)。发送方确认(publisher confirm)机制原理生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,Rabbitmq就会发送一个确认(Basic.ACK)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 与事务机制相比,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息。如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条Basic.Nack命令,生产者应用程序同样可以在回调方法中处理该nack命令。发送方确认(publisher confirm)机制实现1、在配置文件中配置开启生产者确认机制spring.rabbitmq.publisher-confirm-type: correlated # 开启生产者确认机制
2、定义发送消息的类:@Component
public class RabbitmqSendMsgConfirm implements
RabbitTemplate.ConfirmCallback {
// 定义日志记录对象
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitmqSendMsgConfirm.class) ;
@Autowired
private RabbitTemplate rabbitTemplate ;
// 发送消息
public void sendMsg(String msg , String exchangeName , String
routingKey) {
// 设置消息发送是否成功的回调
rabbitTemplate.setConfirmCallback(this);
// void convertAndSend(String exchange, String routingKey,
Object message, MessagePostProcessor messagePostProcessor,
CorrelationData correlationData)
CorrelationData correlationData = new CorrelationData(msg) ;
rabbitTemplate.convertAndSend(exchangeName , routingKey , msg ,
message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PER
SISTENT); // 进行消息持久化操作
return message ;
} , correlationData);
}
/**
* CorrelationData: 每个发送的消息都需要配备一个 CorrelationData 相关数据
对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息唯一性。
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if(ack) {
LOGGER.info("消息发送成功");
}else {
LOGGER.info("消息发送失败");
}
}
}
上述这种确认机制,是确保消息是否已经发送到正确的交换机上。 消息一旦被发送到正确到交换机上以后,ack的值就是true。那么这个消息是否被投递到了指定的队列,如果 消息没有被投递到指定的队列。那么作为生产者如何获知到呢? 我们需要在次进行确认,具体的实现如下所示: 1、在配置文件中进行如下配置spring.rabbitmq.template.mandatory: true (表示的意思是交换机无法根据自身的类型和路由键找到一个符号条件的队列,那么RabbitmqMQ会调用Basic.Return返回消息给生产者)
spring.rabbitmq.publisher-returns: true 开启return确认机制
2、添加return确认机制的回调函数// 发送消息
public void sendMsg(String msg , String exchangeName , String
routingKey) {
...
// 添加return确认机制的回调函数
rabbitTemplate.setReturnCallback(this);
...
}
// 当消息没有被交换机投递到指定的队列的回调函数
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
LOGGER.info("消息没有被投递到指定的队列 ---> " + new String(message.getBody()));
}
提笔写架构
消息通知系统详解2---后端设计
整体设计用户获取新的消息通知有两种模式上线登录后向系统主动索取在线时系统向接收者主动推送新消息设想下,用户的通知消息和新通知提醒数据都放在数据库中,数据库的读写操作频繁。如果消息量大,DB压力较大,可能出现数据瓶颈。这边按照上述两种模式,拆分下设计:上线登录后向系统索取此模式是接受者请求系统,系统将新的消息通知返回给接收者的模式,流程如下:接收者向服务端netty请求WebSocket连接Netty服务把连接放到自己的连接池中Netty根据接受者信息向RabbitMQ查询消息如果有新消息,返回新消息通知使用WebSocket连接向,接收者返回新消息的数量在线时系统向接收者主动推送此模式是系统将新的消息通知返回给接收者的模式,流程如下:RabbitMQ将新消息数据推送给NettyNetty从连接池中取出接收者的WebSocket连接Netty通过接收者的WebSocket连接返回新消息的数量Rabbitmq搭建在虚拟机中启动RabbitMQdocker run -id --name=tensquare_rabbit -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15672:15672 -p 25672:25672 rabbitmq:management访问地址:http://192.168.200.128:15672登录账号: guest登录密码: guestIO编程上面提到了Netty,在开始了解Netty之前,先来实现一个客户端与服务端通信的程序,使用传统的IO编程和使用NIO编程有什么不一样。传统IO编程每个客户端连接过来后,服务端都会启动一个线程去处理该客户端的请求。阻塞I/O的通信模型示意图如下:业务场景:客户端每隔两秒发送字符串给服务端,服务端收到之后打印到控制台。public class IOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8000);
while (true) {
// (1) 阻塞方法获取新的连接
Socket socket = serverSocket.accept();
new Thread() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
// (2) 每一个新的连接都创建一个线程,负责读取数据
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int len;
// (3) 按字节流方式读取数据
while ((len = inputStream.read(data)) != -1) {
System.out.println("线程" + name + ":" + new String(data, 0, len));
}
}
} catch (Exception e) {
}
}
}.start();
}
}
}客户端实现:public class MyClient {
public static void main(String[] args) {
//测试使用不同的线程数进行访问
for (int i = 0; i < 5; i++) {
new ClientDemo().start();
}
}
static class ClientDemo extends Thread {
@Override
public void run() {
try {
Socket socket = new Socket("127.0.0.1", 8000);
while (true) {
socket.getOutputStream().write(("测试数据").getBytes());
socket.getOutputStream().flush();
Thread.sleep(2000);
}
} catch (Exception e) {
}
}
}
}从服务端代码中我们可以看到,在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环。如果在用户数量较少的情况下运行是没有问题的,但是对于用户数量比较多的业务来说,服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了。如果有1万个连接就对应1万个线程,继而1万个while死循环,这种模型存在以下问题:当客户端越多,就会创建越多的处理线程。线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费。并且如果务器遭遇洪峰流量冲击,例如双十一活动,线程池会瞬间被耗尽,导致服务器瘫痪。因为是阻塞式通信,线程爆炸之后操作系统频繁进行线程切换,应用性能急剧下降。IO编程中数据读写是以字节流为单位,效率不高。NIO编程NIO,也叫做new-IO或者non-blocking-IO,可理解为非阻塞IO。NIO编程模型中,新来一个连接不再创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,我们用一幅图来对比一下IO与NIO:如上图所示,IO模型中,一个连接都会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读。但是在大多数情况下,1万个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为没有数据。而在NIO模型中,可以把这么多的while死循环变成一个死循环,这个死循环由一个线程控制。这就是NIO模型中选择器(Selector)的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到选择器上,通过检查这个选择器,就可以批量监测出有数据可读的连接,进而读取数据。NIO的三大核心组件:通道(Channel)、缓冲(Buffer)、选择器(Selector)通道(Channel)是传统IO中的Stream(流)的升级版。Stream是单向的、读写分离(inputstream和outputstream),Channel是双向的,既可以进行读操作,又可以进行写操作。缓冲(Buffer)Buffer可以理解为一块内存区域,可以写入数据,并且在之后读取它。选择器(Selector)选择器(Selector)可以实现一个单独的线程来监控多个注册在她上面的信道(Channel),通过一定的选择机制,实现多路复用的效果。NIO相对于IO的优势:IO是面向流的,每次都是从操作系统底层一个字节一个字节地读取数据,并且数据只能从一端读取到另一端,不能前后移动流中的数据。NIO则是面向缓冲区的,每次可以从这个缓冲区里面读取一块的数据,并且可以在需要时在缓冲区中前后移动。IO是阻塞的,这意味着,当一个线程读取数据或写数据时,该线程被阻塞,直到有一些数据被读取,或数据完全写入,在此期间该线程不能干其他任何事情。而NIO是非阻塞的,不需要一直等待操作完成才能干其他事情,而是在等待的过程中可以同时去做别的事情,所以能最大限度地使用服务器的资源。NIO引入了IO多路复用器selector。selector是一个提供channel注册服务的线程,可以同时对接多个Channel,并在线程池中为channel适配、选择合适的线程来处理channel。由于NIO模型中线程数量大大降低,线程切换效率因此也大幅度提高。和前面一样的场景,使用NIO实现(复制代码演示效果即可):public class NIOServer {
public static void main(String[] args) throws IOException {
// 负责轮询是否有新的连接
Selector serverSelector = Selector.open();
// 负责轮询处理连接中的数据
Selector clientSelector = Selector.open();
new Thread() {
@Override
public void run() {
try {
// 对应IO编程中服务端启动
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false);
// OP_ACCEPT表示服务器监听到了客户连接,服务器可以接收这个连接了
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
if (serverSelector.select(1) > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
try {
// (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
// OP_READ表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException ignored) {
}
}
}.start();
new Thread() {
@Override
public void run() {
String name = Thread.currentThread().getName();
try {
while (true) {
// (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
if (clientSelector.select(1) > 0) {
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 读取数据以块为单位批量读取
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println("线程" + name + ":" + Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}
}.start();
}
}
提笔写架构
深入浅出Zookeeper---3、Leader选举
PAXOS选举算法Paxos算法概述背景主流分布式一致性算法包括Paxos,Raft和ZAB,它们之间有怎样的区别与关系? Google Chubby的作者Mike Burrows说过,世上只有一种一致性算法,那就是Paxos,所有其他一致性算法 都是Paxos算法的不完整或衍生版。什么是PaxosPaxos算法是基于消息传递且具有高度容错特性的一致性算法,是目前公认的解决分布式一致性问题最有效的算法之一,其解决的问题就是在分布式系统中如何就某个值(决议)达成一致。Paxos的作用常见的分布式系统中,总会发生机器宕机或网络异常(包括消息的延迟、丢失、重复、乱序)等情况。 Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值(决议)达成一致性的问题。拜占庭问题什么是拜占庭问题拜占庭将军问题是一个共识问题:一群将军想要实现某一个目标,必须是全体一致的决定,一致进攻或者一 致撤退;但由于叛徒的存在,将军们不知道应该如何达成一致。 举例来说: 假设有三个拜占庭将军,分别为A、B、C,他们要讨论的只有一件事情:明天是进攻还是撤退。为此,将军 们需要依据“少数服从多数”原则投票表决,只要有两个人意见达成一致就可以。 如果A和B投进攻,C投撤退,那么传递结果:那么A的信使传递给B和C的消息都是进攻;B的信使传递给A和C的消息都是进攻;而C的信使传给A和B的消息都是撤退。 通过以上决策,三个将军就都知道进攻和撤退占比是 2 : 1 。显而易见,进攻方胜出,第二天大家都要进攻, 三者行动最终达成一致。 但是,如果稍微做一个改动:三个将军中出了一个叛徒呢?叛徒的目的是破坏忠诚将军间一致性的达成,让拜占庭的军队遭受损失。 假设A和B是忠诚将军,A投进攻,B投撤退,如果你是C这个叛徒将军,那么你该做些什么,才能在第二天让两个忠诚的将军做出相反的决定呢? 进攻方和撤退方现在是 1 : 1 ,无论C投哪一方,都会变成 2 : 1 ,一致性还是会达成。但是,作为叛徒的C, 你必然不会按照常规出牌,于是你让一个信使告诉A的内容是你“要进攻”,让另一个信使告诉B的则是你“要 撤退”。 至此,A将军看到的投票结果是:进攻方 :撤退方 = 2 : 1 ,而B将军看到的是 1 : 2 。第二天,忠诚的A冲上 了战场,却发现只有自己一支军队发起了进攻,而同样忠诚的B,却早已撤退。最终,A的军队败给了敌人, 拜占庭的军队遭受损失。 拜占庭问题情形在计算机世界中也会出现,如果三个节点中有一个异常节点,那么最坏情况下两个正常节点之间是无法保证一致性的。那么你之前听说过的 etcd 这样的系统可以保证三个节点有一个宕机的情况下依然可以对外提供一致性服务是为什么呢?因为这类系统并没有考虑拜占庭故障,在他们的假设里故障节点可能会停止服务,可能会超时,但是不会发送异常消息。尽管拜占庭的“幽灵”很难处理,但在实际工作应用中, 却并不需要过分去考虑它,因为对于大多数系统来说,内部环境里,硬件故障导致消息错误的概率本身就很 低,还要按照拜占庭叛徒的策略来处理故障就更为困难了。Paxos角色Paxos将系统中的角色分为三种:Proposer:提议者,提出提案Proposal(未经批准的决议)信息,它包括提案编号 (Proposal ID) 和提议的值 (Value)。Acceptor:决策者,参与决策,回应Proposers的提案。收到Proposal后可以接受提案,若Proposal获得多数Acceptors的接受,则标识该Proposal为批准。Learner:最终决策学习者,不参与决策,从Proposers/Acceptors学习最新达成一致的决议 (Value)。在具体的实现中,一个进程可能同时充当多种角色。比如一个进程可能既是Proposer又是Acceptor或 Learner。Proposer负责提出提案,Acceptor负责对提案作出裁决(accept与否),Learner负责学习提案结 果,Acceptor告诉Learner哪个value被选定,Learner则无条件认为相应的value被选定。 为了避免单点故障,会有一个Acceptor集合群,Proposer向Acceptor集合群发送提案,Acceptor集合群中, 只有一半以上的成员同意了这个提案(Proposal),就认为该提案被接受选定了。Paxos算法详解Paxos包含三种算法: Basic Paxos,Multi Paxos和Fast Paxos。1. Basic Paxos Basic Paxos 执行过程分为两个阶段: 阶段一(Prepare阶段):
1. Proposer选择一个提案Proposal编号为N,然后向半数以上的Acceptor发送编号为N的 Prepare请求。
2. 如果某个Acceptor收到一个编号为N的Prepare请求,如果小于它已经响应过的请求,则拒绝。
3. 如果N大于该Acceptor已经响应过的所有请求的编号,那么它就会将它已经接受过(已经经 过第二阶段accept的提案)的编号最大的提案作为响应反馈给Proposer,如果还没有的 accept提案的话返回{pok,null,null}空信息,同时该Acceptor承诺不再接受任何编号小于 N的提案。
阶段二(accept阶段):
1. 如果一个Proposer收到半数以上Acceptor对其发出的提案响应,那么它就会发送一个针对 [N,V]提案的Accept请求给半数以上的Acceptor。注意:N是提案的编号,V就是该提案的决议,该决议是响应编号中最大提案的value。如果响应中不包含任何提案,那么V值就由 Proposer自己决定。
2. 如果Acceptor收到一个针对编号为N的提案的Accept请求,只要该Acceptor没有对编号大于 N的Prepare请求做出过响应,那么它就接受该提案。如果N小于Acceptor以及响应的 prepare请求,则拒绝,不回应或回复error(当proposer没有收到过半的回应,那么它会重 新进入第一阶段,递增提案号,重新提出prepare请求)。
整体流程: 上述介绍了Basic Paxos算法, 但在算法运行过程中,可能还会存在一种极端情况,当有两个proposer依次提出一系列编号递增的议案,那么会陷入死循环,无法完成第二阶段,也就是无法选定一个提案。由此产生了活锁问题,如何解决?再看下面的Multi Paxos算法。2. Multi Paxos 原始的基础Paxos算法只能对一个值进行决议,而且每次决议至少需要两次网络来回,在实际应用中可能会产生各种各样的问题,所以不适用在实际工程中。因此,Multi Paxos基于Basix Paxos做了改进,可以连续确定多个值并提高效率:1. 针对每一个要提议的值,运行一次Paxos算法实例(Instance),形成决议。每一个Paxos实例使用唯一的Instance ID标识。
2. 在所有Proposers中选主,选举一个Leader,由Leader唯一提交Proposal给Acceptors进行表决。 这样没有Proposer竞争,解决了活锁问题。在系统中仅有一个Leader进行Value提交的情况下, Prepare阶段就可以跳过,从而将两阶段变为一阶段,提高效率。
示例: 在Basic Paxos协议中,每一次执行过程都需要经历Prepare->Promise->Accept->Accepted 这四个步骤,这样就会导致消息太多,影响性能。 在Multi Paxos的实际过程中:如果Leader足够稳定的话,Phase 1 里面的Prepare->Promise 完全可以省略掉,从而使用同一个Leader去发送Accept消息。 实质上,Multi Paxos模式下首先对所有Proposers进行Leader选举,再利用Basic Paxos来实现。 选出Leader后,只由Leader来提交Proposal,如果Leader出现宕机,则重新选举Leader,在系统中仅有一 个Leader可以提交Proposal。 Multi Paxos通过改变Prepare阶段的作用范围,从而使得Leader的连续提交只需要执行一次Prepare阶段,后续只需要执行Accept阶段,将两阶段变为一阶段,提高了效率。3. Fast Paxos 上述Paxos协议中,消息最后到达Learner一般都要经历 Client–>Proposer–>Acceptor–>Learner 多个步骤,实质上由Learner是真正执行任务,为了更快的让消息到达Learner,如果Proposer本身没有数据需要被确认的话,那么可以跳过Proposer这一步,直接将请求发送给Accepter,由leader先进行检查, 这样的操作叫做Fast Paxos。选主过程剖析第一步(初始化投票) 初始化投票,所有服务器的logicClock都为1,zxid都为0, 每个节点都会投票给自己,投票信息(1,1,0) 主要分为三部分:第一位数代表服务器的logicalClock(自增的整数),即选举轮次,它表示这是该服务器发起的第多少轮投票;
第二位数代表被推荐的服务器的myid,它是ZooKeeper集群唯一的ID;
第三位数代表被推荐服务器的最大zxid,类似于RDBMS中的事务ID,实质上用于标识一次更新操作的事务Proposal(提议)ID。
ZXID组成: zxid(Zookeeper Transaction Id)是 ZAB 协议的事务编号,其是一个 64 位的整数:低 32 位是一个单调递增的计数器,每当 Leader 服务器产生一个新的事务 Proposal 时,递增 1。
高 32 位代表 Leader 的 epoch 编号,有点类似于 Raft 的任期(改朝换代),每当选举一个新 Leader 时,就会从新 Leader 取出本地日志中的最大事务 Proposal 的 zxid,解析出 epoch 然后加 1,并将低 32 位置 0 来开始新的 zxid。
第二步更新投票信息服务器1收到服务器2的选票(1, 2, 0)和服务器3的选票(1, 3, 0)后,由于所有的logicClock都相等, 所有的zxid都相等,因此根据myid判断应该将自己的选票按照服务器3的选票更新为(1, 3, 0)。投票规则:优先检查ZXID。ZXID比较大的服务器优先作为Leader。如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。服务器1将自己的票箱全部清空,再将服务器3的选票与自己的选票存入自己的票箱,接着将自己更新后的选 票信息广播出去。此时服务器1票箱内的选票为1号选举3号(1, 3),3号选举3号(3, 3)。服务器2收到服务器3的选票后,与服务器1的处理逻辑一样, 也将自己的选票更新为(1, 3)并存入票箱然后 广播。此时服务器2票箱内的选票为(2, 3),(3, 3)。服务器3根据上述规则,无须更新选票,自身的票箱内选票仍为(3, 3)。服务器1与服务器2更新后的选票广播出去后,由于三个服务器最新选票都相同,最后三者的票箱内都包含三张投给服务器3的选票。第三步确定集群角色 根据上述选票结果,三个服务器一致认为此时服务器3应该是Leader。因此服务器1和2都进入FOLLOWING状 态,而服务器3进入LEADING状态。之后由Leader发起并维护与Follower间的心跳。FOLLOW重启选举剖析如果FOLLOW跟随者节点出现问题重启之后, 是如何实现重新选举? 第一步: 第二步:服务器3收到服务器1的投票后,将自己的状态LEADING以及选票返回给服务器1。服务器2收到服务器1的投票后,将自己的状态FOLLOWING及3号主节点的选票信息返回给服务器1。服务器1知道服务器3是Leader,并且通过服务器2与服务器3的选票可以证实服务器3确实得到了超过半数的选票。因此服务器1进入FOLLOWING状态。LEADER重启选举剖析Leader如果重启之后该如何选举? 第一步: Leader主节点(服务器3)宕机后,Follower(服务器1和2)发现Leader不工作了,因此进入LOOKING状态并发起新的一轮投票,并且都将投票选举为自己。 第二步: 服务器1和2根据外部投票确定是否要更新自身的选票。这里就会出现两种情况: 1. 服务器1和2的zxid相同。例如在服务器3宕机前服务器1与2完全同步,zxid一致。此时选票的更新主要取决于myid的大小,根据选举规则,myid越大的优先级越高。
2. 服务器1和2的zxid不同。在旧Leader宕机之前,其所主导的写操作,只需过半服务器确认即可,而不需所有服务器确认。这个时候,服务器1和2可能其中有一个与旧Leader同步(即zxid与之相同),而另 一个则没有同步(即zxid比之小)。这时选票的更新主要取决于谁的zxid较大,优先选取为主节点。
3.
在第一步的图中所示,服务器1的zxid为11,而服务器2的zxid为10,因此服务器2将自身选票更新为(3, 1, 11)。第三步: 经过第二步的选票更新后,服务器1与服务器2均将选票投给服务器1,因此服务器2成为Follower,而服务器1成为 新的Leader并维护与服务器2的心跳。 第四步: 旧的Leader节点3恢复后,进入LOOKING状态并发起新一轮领导选举,并将选票投给自己。此时服务器1会将自己的LEADING状态及选票(3, 1, 11)返回给服务器3,而服务器2将自己的FOLLOWING状态及选票(3, 1, 11)返回给服务器3, 旧的Leader节点3已经被新的Leader节点1取代, 这就类似于上面所讲的FOLLOW重启选举的过程。
提笔写架构
深入浅出消息队列---1、消息队列概述
消息队列介绍消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。消息队列应用场景:异步处理: 异步处理是将很多串行进行的步骤转成异步处理,还是已订单系统为例,下单订单需要创建订单和锁定库存,确定本次请求后马上给用户返回响应,然后把后续请求的数据的都在消息队列,由消息队列异步处理。应用解耦: 举例一下电商系统的中的订单系统。当创建一个订单时:发起支付、扣减库存、发消息告知用户、更新统计数据,这些订单下游的系统都需要实时获得订单数据,随着业务量的增大和业务的变更,有一段时间不需要发消息给客户,或者需要添加功能,每次都需要不断的调式订单系统和下游系统。 引入消息队列后,订单服务在创建订单时发送一条信息到消息队列主题 Order 中,所有的下游都订阅主题Order,这样无论增加、减少下游系统还是下游系统的功能如何变化,订单服务都不需要做更改了,实现了订单服务和下游服务的解耦。流量控制 在购物网站的做一个秒杀活动,平时网站能支撑每秒1000次并发请求,但是电商秒杀一下请求猛增到每秒3000次请求,多出来的请求,可能直接让系统宕机。所以我们就需要使用消息队列来控制流量,当系统短时间接收到大量请求时,会先将请求堆积到消息队列上,后端服务从消息队列上消费数据,消息队列相对于给后端服务做了一次缓冲。常见消息队列MQ相关的产品有:ActiveMQ,RabbitMQ,RocketMQ,Kafka,Zeromq。目前市面上常用的消息队列产品:RabbitMQ, Kafka,RocketMQ。因此我们首先来比对一下这几个MQ之前的差别: 总结: 1、一般的业务系统要引入MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃。 2、后来大家开始用RabbitMQ,但是erlang语言阻止了大量的java工程师去深入研究和掌控他,对公司而言,几乎处于不可控的状态。但由于是开源的,并且也比较稳定,活跃度也高,所以使用很广泛。 3、现在很多的公司也开始使用RocketMQ,由于社区活跃度不高,资料比较少。所以学习成本比较大,因此要使用RocketMQ要考虑社区万一突然黄掉的风险。所以中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择;大型公司,基础架构研发实力较强,用RocketMQ是很好的选择。 4、如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。 下面章节,我们会主要来说下RabbitMQ和Kafka。
提笔写架构
深入浅出业务幂等性---2、服务幂等和乐观锁
防重表对于防止数据重复提交,还有一种解决方案就是通过防重表实现。防重表的实现思路也非常简单。首先创建一张表作为防重表,同时在该表中建立一个或多个字段的唯一索引作为防重字段,用于保证并发情况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则表示是重复数据。对于防重表的解决方案,可能有人会说为什么不使用悲观锁。悲观锁在使用的过程中也是会发生死锁的。悲观锁是通过锁表的方式实现的。 假设现在一个用户A访问表A(锁住了表A),然后试图访问表B; 另一个用户B访问表 B(锁住了表B),然后试图访问表A。 这时对于用户A来说,由于表B已经被用户B锁住了,所以用户A必须等到用户B释放表B才能访问。 同时对于用户B来说,由于表A已经被用户A锁住了,所以用户B必须等到用户A释放表A才能访问。此时死锁就已经产生了。select+insert防重提交对于一些后台系统,并发量并不高的情况下,对于幂等的实现非常简单,通过select+insert思想即可完成幂等控制。在业务执行前,先判断是否已经操作过,如果没有则执行,否则判断为重复操作。在并发下,并不能完成幂等性控制。通过jemeter测试,可以发现,插入了重复数据。产生了脏数据。要解决这个问题,非常简单,在数据库层面添加唯一索引即可,将id设置为唯一索引,也是最容易想到的方式,一旦id出现重复,就会出现异常,避免了脏数据的发生也可以解决永久性幂等。但该方案无法用于分库分表情况,其只适用于单表情况。Mysql悲观锁悲观锁,正如其名,它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。悲观锁一般使用select…for update的方式;例如:select * from table where id=1 for update;此时在table表中,id为1的 那条数据就被我们锁定了,其它的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。优点是,悲观并发控制采取的是保守策略:“先取锁,成功了才访问数据”,这保证了数据获取和修改都是有序进行的,因此适合在写多读少的环境中使用。当然使用悲观锁无法维持非常高的性能,但是在乐观锁也无法提供更好的性能前提下,悲观锁却可以做到保证数据的安全性。缺点是,由于需要加锁,而且可能面临锁冲突甚至死锁的问题,悲观并发控制增加了系统的额外开销,降低了系统的效率,同时也会降低了系统的并行性。MySQL乐观锁假设现在订单已经生成成功,那么就会涉及到扣减库存的操作。当高并发下同时扣减库存时,非常容易出现数据错误问题。扣减库存数据错误编写一个扣除库存服务,通过jemeter进行测试,可以发现。当模拟一万并发时,最终的库存数量是错 误的。这主要是因为当多线程访问时,一个线程读取到了另外线程未提交的数据造成。synchronized失效问题对于现在的问题,暂不考虑秒杀设计、队列请求串行化等,只考虑如何通过锁进行解决,要通过锁解决的话,那最 先想到的可能是synchronized。根据synchronized定义,当多线程并发访问时,会对当前加锁的方法产生阻塞, 从而保证线程安全,避免脏数据。但是,真的能如预期的一样吗?@Service
public class StockServiceImpl implements StockService {
@Autowired
private StockMapper stockMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public synchronized int lessInventory(String goodsId, int num) {
return stockMapper.lessInventory(goodsId, num);
}
}当前已经在在方法上添加了synchronized,对当前方法对象进行了锁定。 通过Jemeter,模拟一万并发对其进行访问。可以发现,仍然出现了脏数据。该问题的产生原因,就在于在方法上synchronized搭配使用了@Transactional。首先synchronized锁定的是当前方法对象,而@Transactional会对当前方法进行AOP增强,动态代理出一个代理对象,在方法执行前开启事务,执行后提交事务。 所以synchronized和@Transactional其实操作的是两个不同的对象,换句话说就是 @Transactional的事务操作并不在synchronized锁定范围之内。假设A线程执行完扣减库存方法,会释放锁并提交事务。但A线程释放锁但还没提交事务前,B线程执行扣减库存方法,B线程执行后,和A线程一起提交事务,就出现了线程安全问题,造成脏数据的出现。MySQL乐观锁保证幂等基于版本号实现MySQL乐观锁是基于数据库完成分布式锁的一种实现,实现的方式有两种:基于版本号、基于条件。但是实现思想都是基于MySQL的行锁思想来实现的。1)修改数据表,添加version字段,默认值为02)修改StockMapper添加基于版本修改数据方法@Update("update tb_stock set amount=amount‐#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);3)测试模拟一万并发进行数据修改,此时可以发现当前版本号从0变为1,且库存量正确。基于条件实现通过版本号控制是一种非常常见的方式,适合于大多数场景。但现在库存扣减的场景来说,通过版本号控制就是多 人并发访问购买时,查询时显示可以购买,但最终只有一个人能成功,这也是不可以的。其实最终只要商品库存不 发生超卖就可以。那此时就可以通过条件来进行控制。1. 修改StockMapper:@Update("update tb_stock set amount=amount‐#{num} where goods_id=#{goodsId} and amount‐# {num}>=0")
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);2. 修改StockController:@PutMapping("/lessInv
entoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){
int result = stockService.lessInventoryByVersionOut(goodsId, num);
if (result == 1){
System.out.println("购买成功");
return "success";
}
System.out.println("购买失败");
return "fail";
}3. 通过jemeter进行测试,可以发现当多人并发扣减库存时,控制住了商品超卖的问题。乐观锁控制服务间幂等在系统中,不光要保证客户端访问的幂等性,同时还要保证服务间幂等。比较常见的情况,当服务间进行调用时, 因为网络抖动等原因出现超时,则很有可能出现数据错误。此时在分布式环境下,就需要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。feign超时重试效果演示以上图为例,当客户端要生成订单时,可以基于token机制保证生成订单的幂等性,接着订单生成成功后,还会基 于feign调用库存服务进行库存扣减,此时则很有可能出现,库存服务执行扣减库存成功,但是当结果返回时,出现网络抖动超时了,那么上游的订单服务则很有可能会发起重试,此时如果不进行扣减库存的幂等性保证的话,则出现扣减库存执行多次。那可以先来演示当下游服务出现延迟,上游服务基于feign进行重试的效果。1. 当前是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时时间与重试次数。/**
1. 自定义feign超时时间、重试次数
2. 默认超时为10秒,不会进行重试。
*/
@Configuration
public class FeignConfigure {
//超时时间,时间单位毫秒
public static int connectTimeOutMillis = 5000;
public static int readTimeOutMillis = 5000;
@Bean
public Request.Options options() {
return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
}
//自定义重试次数
@Bean
public Retryer feignRetryer(){
Retryer retryer = new Retryer.Default(100, 1000, 4);
return retryer;
}
}2. stock服务的StockController中demo方法会延迟六秒。通过这种方式模拟超时效果。此时在order中调用stock 服务,可以发现,order服务会对stock服务调用四次。这里就演示了服务间调用超时的效果,当下游服务超时,上游服务会进行重试。服务调用超时库存多次扣减根据上述演示,当下游服务超时,上游服务就会进行重试。那么结合当前的业务场景,当用户下单成功去调用库存服务扣减库存时, 如果库存服务执行扣减库存成功但返回结果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会出现同一订单商品库存被多次扣减。1. 在订单服务中生成订单,并调用库存服务扣减库存@Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){
String orderId = String.valueOf(idWorker.nextId());
order.setId(orderId);
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
int result = orderService.addOrder(order);
if (result != 1){
System.out.println("fail");
return "fail";
}
//生成订单详情信息
List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);
goodsIdArray.stream().forEach(goodsId->{
//插入订单详情
OrderDetail orderDetail = new OrderDetail();
orderDetail.setId(String.valueOf(idWorker.nextId()));
orderDetail.setGoodsId(goodsId);
orderDetail.setOrderId(orderId);
orderDetail.setGoodsName("heima");
orderDetail.setGoodsNum(1);
orderDetail.setGoodsPrice(1);
orderDetailService.addOrderDetail(orderDetail);
//扣减库存(不考虑锁)
stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());
});
return "success";
}2. 库存服务直接基于商品信息进行库存扣减@Update("update tb_stock set amount=amount‐#{num} where goods_id=#{goodsId}")
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
1
2
@PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num) throws InterruptedException {
System.out.println("reduce stock");
int result = stockService.reduceStockNoLock(goodsId, num);
if (result != 1){
return "reduce stock fail";
}
//延迟
TimeUnit.SECONDS.sleep(6000);
return "reduce stock success";
}3. 执行生成订单扣减库存,此时可以发现扣减库存方法被执行多次,并且库存数量也被扣减了多次{"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}乐观锁解决服务间重试保证幂等1. 修改StockMapper,添加乐观锁控制控制库存@Update("update tb_stock set version=version+1,amount=amount‐#{num} where goods_id=#{goodsId} and version=#{version} and amount‐#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);2. 修改StockController,添加乐观锁扣减库存方法/**
* 乐观锁扣减库存
* @param goodsId
* @param num
* @param version
* @return
*/
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num,
@PathVariable("version") Integer version) throws InterruptedException {
System.out.println("exec reduce stock");
int result = stockService.reduceStock(goodsId, num, version);
if (result != 1){
//扣减失败
return result;
}
//延迟
TimeUnit.SECONDS.sleep(6000);
return result;
}3. 测试,可以发现虽然发生多次重试,但是库存只会被扣减成功一次。保证了服务间的幂等性。ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的延迟每一次都是超过超时时间的,所以最终在 order服务才会出现read timeout异常提示。乐观锁使用场景MySQL乐观锁更适用于一些需要计数的表上,而且在竞争不激烈,出现并发冲突几率较小时,推荐使用乐观锁。虽然通过MySQL乐观锁可以完成并发控制,但锁的操作是直接作用于数据库上,这样就会在一定程度上对数据库性能产生影响。并且MySQL的连接数量是有限的,如果出现大量锁操作占用连接时,也会造成MySQL的性能瓶颈。
提笔写架构
深入浅出消息队列---6、RabbitMQ高可用
RabbitMQ高可用Rabbitmq常见的部署模式:单机,普通集群,镜像集群RabbitMQ普通集群集群原理这里先思考两个问题:搭建集群的好处?提供整体消息队列服务的可靠性可以通过水平扩容提高整体服务的吞吐量2.有了集群以后是否可以保证消息不丢失?不可以基于存储空间和性能的考虑,在Rabbitmq集群中创建队列,集群只会在单个节点而不是在所有节点上创建队列的进程并包含完整的队列消息(元数据,状态,内容)。 这样只有队列的宿主节点,即所有者节点知道队列的所有信息,所有其他非所有者节点只知道队列的元数据和指向该队列存在那个节点的指针。 因此当集群节点崩溃时,该节点的队列进程和关联的绑定都会消失。附加在哪些队列上的消费者也会丢失其所订阅的消息,并且任何匹配该队列绑定信息的新消息也都会消失。3.集群中的交换机有哪些特点呢?交换机其实只是一个名称和绑定列表,没有自己独立的进程,并且交换机的元数据信息会在多个rabbitmq节点上进行共享当消息发布到交换机时,实际上是由所连接信道将消息上的路由键同交换机的绑定列表进行比较,然后再路由消息。当创建一个新的交换机时,Rabbitmq所要做的就是将绑定列表添加到集群中的所有节点上,这样每个节点上的每条信道都可以访问到新的交互机了。4.客户端与集群建立连接的时候,是否需要与集群中所有的节点建立连接?只需要连接集群中任意一个节点就可以了集群搭建创建容器接下来我们就来搭建一个rabbitmq的集群,本次我们搭建一个具有3个节点的rabbitmq集群。拉取镜像docker pull rabbitmq:3.6.10-management
2.创建容器docker run -di --network=docker-network --ip=172.19.0.50 -- hostname=rabbitmq-node01 --name=rabbitmq_01 -p 15673:15672 -p 5673:5672 --privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'
rabbitmq:3.6.10-management /bin/bash
docker run -di --network=docker-network --ip=172.19.0.51 -- hostname=rabbitmq-node02 --name=rabbitmq_02 -p 15674:15672 -p 5674:5672 --privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'
rabbitmq:3.6.10-management /bin/bash
docker run -di --network=docker-network --ip=172.19.0.52 -- hostname=rabbitmq-node03 --name=rabbitmq_03 -p 15675:15672 -p 5675:5672 --privileged=true -e RABBITMQ_ERLANG_COOKIE='rabbitcookie'
rabbitmq:3.6.10-management /bin/bash
参数说明: Erlang Cookie值必须相同,也就是RABBITMQ_ERLANG_COOKIE参数的值必须相同。因为RabbitMQ是用Erlang实现的,Erlang Cookie相当于不同节点之间相互通讯的秘钥,Erlang节点通过交换Erlang Cookie获得认证。3.进入到rabbitmq的容器中docker exec -it rabbitmq_01 /bin/bash
4.配置hosts文件,让各个节点都能互相识别对方的存在。在系统中编辑 /etc/hosts文件,添加ip地址和节点名称的映射信息(apt-get update , apt-get install vim):172.19.0.50 rabbitmq-node01
172.19.0.51 rabbitmq-node02
172.19.0.52 rabbitmq-node03
5.启动rabbitmq,并且查看状态root@014faa4cba72:/# rabbitmq-server -detached # 启动
rabbitmq服务,该命令可以启动erlang虚拟机和rabbitmq服务
root@014faa4cba72:/# rabbitmqctl status # 查看节点
信息
Status of node rabbit@014faa4cba72
[{pid,270},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.6.10"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},
.............
root@014faa4cba72:/# rabbitmqctl cluster_status # 查看集群节点状态
Cluster status of node rabbit@014faa4cba72
[{nodes,[{disc,[rabbit@014faa4cba72]}]},
{running_nodes,[rabbit@014faa4cba72]}, # 正在运行的只有一个节点
{cluster_name,<<"rabbit@014faa4cba72">>},
{partitions,[]},
{alarms,[{rabbit@014faa4cba72,[]}]}]
注意:此时我们可以通过浏览器访问rabbitmq的后端管理系统,但是rabbitmq默认提供的guest用户不支持远程访问。因此我们需要创建用户,并且对其进行授权root@014faa4cba72:/# rabbitmqctl add_user admin admin # 添加用户,用户名为admin,密码为admin
root@014faa4cba72:/# rabbitmqctl list_users # 查看rabbitmq的用户列表
Listing users
admin [] # admin用户已经添加成功,但是没有角色
guest [administrator]
root@014faa4cba72:/# rabbitmqctl set_user_tags admin administrator #给admin用户设置管理员权限
# rabbitmqctl delete_user admin # 删除admin用户
# rabbitmqctl stop_app # 停止rabbitmq服务
# rabbitmqctl stop # 会将rabbitmq的服务和erlang虚拟机一同关闭
再次使用admin用户就可以登录web管理系统了。在其他的rabbitmq中也创建用户,以便后期可以访问后端管理系统。为admin用户设置可以访问的虚拟机:配置集群1.同步cookie 集群中的Rabbitmq节点需要通过交换密钥令牌以获得相互认证,如果节点的密钥令牌不一致,那么在配置节点时就会报错。获取某一个节点上的/var/lib/rabbitmq/.erlang.cookie文件,然后将其复制到其他的节点上。我们以node01节点为基准,进行此操作。docker cp rabbitmq_01:/var/lib/rabbitmq/.erlang.cookie .
docker cp .erlang.cookie rabbitmq_02:/var/lib/rabbitmq
docker cp .erlang.cookie rabbitmq_03:/var/lib/rabbitmq
2.建立集群关系 目前3个节点都是独立的运行,之间并没有任何的关联关系。接下来我们就来建立3者之间的关联关系,我们以rabbitmq-node01为基准,将其他的两个节点加入进来。# 进入到rabbitmq-node02中
rabbitmqctl stop_app # 关闭rabbitmq服务
rabbitmqctl reset # 进行重置
rabbitmqctl join_cluster rabbit@rabbitmq-node01 # rabbitmq-node01为
节点1的主机名称
rabbitmqctl start_app # 启动rabbitmq节点
把rabbitmq-node03加入到节点1中# 进入到rabbitmq-node03中
rabbitmqctl stop_app # 关闭rabbitmq服务
rabbitmqctl reset # 清空节点的状态,并将其恢复都空白状态,当设置的节点时集群
中的一部分,该命令也会和集群中的磁盘节点进行通讯,告诉他们该节点正在离开集群。不然集群
会认为该节点处理故障,并期望其最终能够恢复过来
rabbitmqctl join_cluster rabbit@rabbitmq-node01 # rabbitmq-node01为
节点1的主机名称
rabbitmqctl start_app # 启动rabbitmq节点
进入后台管理系统查看集群概述: 至此也就证明我们的rabbitmq集群就已经搭建好了。节点类型节点类型介绍在使用rabbitmqctl cluster_status命令来查看集群状态时会有[{nodes,[{disc,[‘rabbit@rabbitmqnode01’,‘rabbit@rabbitmq-node02’,‘rabbit@rabbitmq-node03’]}这一项信息,其中的disc标注了Rabbitmq节点类型。Rabbitmq中的每一个节点,不管是单一节点系统或者是集群中的一部分要么是内存节点,要么是磁盘节点。内存节点将所有的队列,交换机,绑定关系、用户、权限和vhost的元数据定义都存储在内存中,而磁盘节点则将这些信息存储到磁盘中。单节点的集群中必然只有磁盘类型的节点,否则当重启Rabbitmq之后,所有关于系统配置信息都会丢失。不过在集群中,可以选择配置部分节点为内存节点,这样可以获得更高的性能。节点类型变更如果我们没有指定节点类型,那么默认就是磁盘节点。我们在添加节点的时候,可以使用如下的命令来指定节点的类型为内存节点:rabbitmqctl join_cluster rabbit@rabbitmq-node01 --ram
我们也可以使用如下的命令将某一个磁盘节点设置为内存节点:rabbitmqctl change_cluster_node_type {disc , ram}
如下所示:root@rabbitmq-node02:/# rabbitmqctl stop_app
# 关闭rabbitmq服务
Stopping rabbit application on node 'rabbit@rabbitmq-node02'
root@rabbitmq-node02:/# rabbitmqctl change_cluster_node_type ram
# 将root@rabbitmq-node02节点类型切换为内存节点
Turning 'rabbit@rabbitmq-node02' into a ram node
root@rabbitmq-node02:/# rabbitmqctl start_app
# 启动rabbitmq服务
Starting node 'rabbit@rabbitmq-node02'
root@rabbitmq-node02:/# rabbitmqctl cluster_status
# 查看集群状态
Cluster status of node 'rabbit@rabbitmq-node02'
[{nodes,[{disc,['rabbit@rabbitmq-node03','rabbit@rabbitmq-node01']},
{ram,['rabbit@rabbitmq-node02']}]},
{running_nodes,['rabbit@rabbitmq-node01','rabbit@rabbitmq-node03',
'rabbit@rabbitmq-node02']},
{cluster_name,<<"rabbit@rabbitmq-node01">>},
{partitions,[]},
{alarms,[{'rabbit@rabbitmq-node01',[]},
{'rabbit@rabbitmq-node03',[]},
{'rabbit@rabbitmq-node02',[]}]}]
root@rabbitmq-node02:/#
节点选择Rabbitmq只要求在集群中至少有一个磁盘节点,其他所有的节点可以是内存节点。当节点加入或者离开集群时,它们必须将变更通知到至少一个磁盘节点。如果只有一个磁盘节点,而且不凑巧它刚好崩溃了,那么集群可以继续接收和发送消息。但是不能执行创建队列,交换机,绑定关系、用户已经更改权限、添加和删除集群节点操作了。也就是说、如果集群中唯一的磁盘节点崩溃了,集群仍然可以保持运行,但是知道将该节点恢复到集群前,你无法更改任何东西,所以在创建集群的时候应该保证至少有两个或者多个磁盘节点。当内存节点重启后,它会连接到预先配置的磁盘节点,下载当前集群元数据的副本。当在集群中添加内存节点的时候,确保告知所有的磁盘节点(内存节点唯一存储到磁盘中的元数据信息是磁盘节点的地址)。只要内存节点可以找到集群中至少一个磁盘节点,那么它就能在重启后重新加入集群中。集群优化之前搭建的集群存在的问题:不具有负载均衡能力优化架构思考的问题:为什么要进行负载均衡? 不具有负载均衡的连接情况: 具有负载均衡的连接情况: 本次我们所选择的负载均衡层的软件是HAProxy。为了保证负载均衡层的高可用,我们需要使用使用到 keepalived软件,使用vrrp协议产生虚拟ip实现动态的ip飘逸。 keepalived是以VRRP协议为实现基础的,VRRP全称Virtual Router Redundancy Protocol,即虚拟路由冗余协议。虚拟路由冗余协议,可以认为是实现路由器高可用的协议,即将N 台提供相同功能的路由器组成一个路由器组,这个组里面有一个master和多个backup,master上面有 一个对外提供服务的vip(该路由器所在局域网内其他机器的默认路由为该vip) ,master会定义向backup发送vrrp协议数据包,当backup收不到vrrp包时就认为master宕掉了,这 时就需要根据VRRP的优先级来选举一个backup当master。这样的话就可以保证 路由器的高可用了。由于我们的虚拟ip是在docker中创建的,而我们的应用程序是基于windows平台开发的额,因此是无法直接 使用docker容器中的虚拟ip。因此我们需要在宿主机上安装keepalived,在宿主机上安装keepalived的主要目的是为了让keepalived产生虚拟服务,后期我们在访问HAProxy的时候, 直接通过宿主机上的虚拟服务去进行访问即可。优化实现HAProxy环境搭建拉取镜像docker pull haproxy:1.7
2.创建一个HAProxy的配置文件haproxy.cfgglobal
#工作目录
chroot /usr/local/etc/haproxy
#日志文件,使用rsyslog服务中local5日志设备(/var/log/local5),等级info
log 127.0.0.1 local5 info
#守护进程运行
daemon
defaults
log 127.0.0.1 local0 err #[err warning info debug]
mode http #默认的模式mode { tcp|http|health },tcp是4
层,http是7层,health只会返回OK
retries 2 #两次连接失败就认为是服务器不可用
option redispatch #当serverId对应的服务器挂掉后,强制定向到其他健康
的服务器
option abortonclose #当服务器负载很高的时候,自动结束掉当前队列处理比
较久的链接
option dontlognull #日志中不记录负载均衡的心跳检测记录
maxconn 4096 #默认的最大连接数
timeout connect 50000ms #连接超时
timeout client 300000ms #客户端超时
timeout server 300000ms #服务器超时
#timeout check 2000 #=心跳检测超时
######## 监控界面配置 #################
listen admin_stats
#监控界面的访问的IP和端口
bind 0.0.0.0:8888
#访问协议
mode http
#URI相对地址
stats uri /dbs
#统计报告格式
stats realm Global\ statistics
#登陆帐户信息
stats auth admin:admin
# rabbitmq管理界面配置
listen proxy_rabbitmq_web
#访问的IP和端口
bind 0.0.0.0:5000
#网络协议
mode tcp
#负载均衡算法(轮询算法)
#轮询算法:roundrobin
#权重算法:static-rr
#最少连接算法:leastconn
#请求源IP算法:source
balance roundrobin
# 这里是容器中的IP地址,由于配置的是轮询roundrobin,weight 权重其实没有生效
server rabbitmq_01 172.19.0.50:15672 check weight 1 maxconn 2000
server rabbitmq_02 172.19.0.51:15672 check weight 1 maxconn 2000
server rabbitmq_03 172.19.0.52:15672 check weight 1 maxconn 2000
# 使用keepalive检测死链
option tcpka
# rabbitmq服务代理,负载均衡配置
listen proxy_rabbitmq
#访问的IP和端口
bind 0.0.0.0:5010
#网络协议
mode tcp
#负载均衡算法(轮询算法)
#轮询算法:roundrobin
#权重算法:static-rr
#最少连接算法:leastconn
#请求源IP算法:source
balance roundrobin
# 这里是容器中的IP地址,由于配置的是轮询roundrobin,weight 权重其实没有生效
server rabbitmq_01 172.19.0.50:5672 check weight 1 maxconn 2000
server rabbitmq_02 172.19.0.51:5672 check weight 1 maxconn 2000
server rabbitmq_03 172.19.0.52:5672 check weight 1 maxconn 2000
# 使用keepalive检测死链
option tcpka
3.创建haproxy容器docker run -di --network=docker-network --ip=172.19.0.40 -p 4001:8888 -p 5001:5000 -p 5010:5010 -v /usr/local/haproxy/haproxy-01/config:/usr/local/etc/haproxy --name=haproxy_01 --privileged=true haproxy:1.7 /bin/bash
docker run -di --network=docker-network --ip=172.19.0.41 -p 4002:8888 -p 5002:5000 -p 5011:5010 -v /usr/local/haproxy/haproxy-02/config:/usr/local/etc/haproxy --name=haproxy_02 --privileged=true haproxy:1.7 /bin/bash
4.进入到容器中,启动HAProxydocker exec -it haproxy_01 /bin/bash # 进入容器
haproxy -f /usr/local/etc/haproxy/haproxy.cfg # 启动容器
5.通过浏览器就可以访问haproxy的后端管理界面了: http://192.168.23.131:4002/dbs,然后输入配置的 用户名和密码就可以看到如下界面HAProxy容器中安装keepalived进入haproxy_01服务docker exec -it haproxy_01 /bin/bash
2.安装keepalived软件apt-get update # 更新软件列表,apt-get源不太稳定,建议多执行几次
apt-get install keepalived # 安装keepalived软件
3.安装其他软件,供后期使用apt-get install net-tools # 安装ifconfig命令
4.创建一个keepalived.conf文件 vim /etc/keepalived/keepalived.conf,内容如下所示:! Configuration File for keepalived
vrrp_instance VI_1 {
state MASTER # 标示状态为MASTER 备份机为BACKUP
interface eth0 # 定义虚拟网卡
virtual_router_id 100 # 定义组vriid, 同一组中
virtual_router_id必须相同
priority 100 # MASTER权重要高于BACKUP 比如
BACKUP为99
advert_int 1 # MASTER 与 BACKUP 负载均衡器之间
同步检查的时间间隔,单位是秒
authentication { # 定义组用户密码
auth_type PASS
auth_pass 123456
}
virtual_ipaddress { #定义docker内ip地址,必须要在和
haproxy同一个网段
172.19.0.119
}
}
5.启动keepalived服务service keepalived start
6.查看eth0上是否已经绑定了虚拟ipip add show eth0
haproxy_02容器中的keepalived软件的配置文件,如下所示:! Configuration File for keepalived
vrrp_instance VI_1 {
state BACKUP # 标示状态为MASTER 备份机为BACKUP
interface eth0 # 定义虚拟网卡
virtual_router_id 100 # 定义组vriid, 同一组中
virtual_router_id必须相同
priority 99 # MASTER权重要高于BACKUP 比如
BACKUP为99
advert_int 1 # MASTER 与 BACKUP 负载均衡器之间
同步检查的时间间隔,单位是秒
authentication { # 定义组用户密码
auth_type PASS
auth_pass 123456
}
virtual_ipaddress { #定义docker内ip地址,必须要在和
haproxy同一个网段
172.19.0.119
}
}
宿主机中keepalived安装keepalivedyum install -y keepalived
2.更改keepalived的配置文件的内容> /etc/keepalived/keepalived.conf # 清空原有配置文件的内容
vim /etc/keepalived/keepalived.conf # 编辑配置文件,配置文件中的内容
如下所示
! Configuration File for keepalived
vrrp_instance VI_1 {
state MASTER
interface ens33 # 这里是宿主机的网卡,可以通过ip a查看当前自己电脑上用的网卡名是哪个
virtual_router_id 50
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
virtual_ipaddress { # 可以不用指定虚拟ip
}
}
# 虚拟服务器地址 IP 对外提供服务的端口
virtual_server 192.168.23.131 4000 {
delay_loop 3 # 健康检查时长 单位秒
lb_algo rr # 负载均衡算法
rr|wrr|lc|wlc|lblc|sh|dh:LVS调度算法
lb_kind NAT # 负载均衡转发规则
persistence_timeout 50 # http服务会话时长 单位秒
protocol TCP # 健康检查用的是TCP还是UDP
# 对应后端真实的docker服务的ip地址和端口号
real_server 172.19.0.119 8888 { # 对应
HAProxy的虚拟ip地址和后端管理系统端口
weight 1
}
}
# rabbitmq的web管理端的虚拟服务器
virtual_server 192.168.23.131 15672 {
delay_loop 3 # 健康检查时长 单位秒
lb_algo rr # 负载均衡算法rr|wrr|lc|wlc|lblc|sh|dh:LVS调度算法
lb_kind NAT # 负载均衡转发规则
persistence_timeout 50 # http服务会话时长 单位秒
protocol TCP # 健康检查用的是TCP还是UDP
# 对应后端真实的docker服务的ip地址和端口号
real_server 172.19.0.119 5000 { # 对应HAProxy的虚
拟ip地址和后端管理系统端口
weight 1
}
}
# rabbitmq的虚拟服务器
virtual_server 192.168.23.131 5672 {
delay_loop 3
lb_algo rr
lb_kind NAT
persistence_timeout 50
protocol TCP
# 对应后端真实的docker服务的ip地址和端口号
real_server 172.19.0.119 5010 { # 对应HAProxy的虚拟ip地址和后端rabbitmq的监听端口
weight 1
}
}
3.启动keepalived服务,关闭防火墙systemctl start keepalived
systemctl status firewalld.service
RabbitMQ镜像集群镜像队列原理思考的问题:为什么要存在镜像队列 为了保证队列和消息的高可用什么是镜像队列,镜像队列是如何进行选取主节点的?引入镜像队列的机制,可以将队列镜像到集群中的其他的Broker节点之上,如果集群中的一个节点失效了,队列能自动的切换到镜像中的另一个节点之上以保证服务的可用性。在通常的用法中,针对每一个配置镜像的队列(一下称之为镜像队列)都包含一个主节点(master)和若干个从节点(slave),如下图所示:slave会准确地按照master执行命令顺序进行动作,故slave和master上维护的状态应该也是相同的。如果master由于某种原因宕机了,那么"资源最老"的slave会被提升为新的master。根据slave加入的时间排序,时间最长的slave即为"资历最老"。发送到镜像队列的所有的消息会被同时发往master和所有的slave,如果此时master挂掉了,消息还会在slave上,这样slave提升为master的时候消息也不会丢失。3.镜像队列的工作模式是什么? 如下图所示: 除发送消息(Basic.Publish)外的所有动作都只会向master发送,然后在由master将命令执行的结果广播给各个slave。如果消费者与slave建立连接并进行订阅消息,其实质上都是从master 上获取消息,只不过看似是从slave上消费而已。比如:消费者与slave建立了TCP连接之后执行一个Basic.GET的操作,那么首先是有slave将Basic.GET请求发往master,再由master准备好数据返回给slave,最后又slave投递给消费者。镜像策略介绍针对某一个队列去配置其对应的镜像其实比较简单,我们只需要去添加一个镜像策略即可:rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
指令参数详情: definition参数详情: 实例代码: 例如:对队列名称为 hello 开头的所有队列镜像镜像,并且在集群的节点 rabbit@10.18.195.57上进行镜 像,队列消息自动同步,policy 的设置命令:rabbitmqctl set_policy --apply-to queues hello-ha "^hello" '{"ha-mode":"nodes","ha-params":["rabbit@10.18.195.57"],"ha-sync-mode":"automatic"}'
我们也可以通过rabbitmq的后台管理系统来添加镜像策略,如下图所示: 添加完毕以后,我们再次查看队列的定义信息,如下所示: 此时也就证明我们的镜像队列配置成功了。
提笔写架构
深入浅出消息队列---5、RabbitMQ公平性保证
RabbitMQ公平性保证消息的可靠性传输可以保证秒杀业务的公平性。关于秒杀业务的公平性,我们还需要考虑一点:消息的顺序性(先进入队列的消息先进行处理)RabbitMQ消息顺序性顺序性:消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为msgl、msg2、msg3,那么消费者必然也是按照msgl、msg2、msg3的顺序进行消费的。目前很多资料显示RabbitMQ的消息能够保障顺序性,这是不正确的,或者说这个观点有很大的局限性。在不使用任何RabbitMQ的高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达Broker 的前后顺序,也就无法验证消息的顺序性,因为每一次消息的发送都是在各自的线程中进行的。RabbitMQ消息顺序错乱演示生产者发送消息: 1、不使用生产者确认机制,单生产者单消费者可以保证消息的顺序性 2、使用了生产者确认机制,那么就无法保证消息到达Broker的前后顺序,因为消息的发送是异步发送的,每一个线程的执行时间不同 3、生产端使用事务机制,保证消息的顺序性 消费端消费消息: 1、单消费者可以保证消息的顺序性 2、多消费者不能保证消息的顺序,因为每一个消息的消费都是在各自的线程中进行,每一个线程的执行时间不同RabbitMQ消息顺序性保障生产端启用事务机制,单生产者单消费者。如果我们不考虑消息到达MQ的顺序,只是考虑对已经到达到MQ中的消息顺序消费,那么需要保证消费者是单消费者即可。RabbitMQ重复消费重复消费原因首先我们可以确认的是,触发消息重复执行的条件会是很苛刻的! 也就说 在大多数场景下不会触发该条 件!!! 一般出在任务超时,或者没有及时返回状态,引起任务重新入队列,重新消费! 在rabbtimq里连接的断开也会触发消息重新入队列。 如下图所示: 由于服务端没有收到消费端的ack应答,因此该条消息还会重新进行投递。幂等性保障方案重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。所谓幂等性,就是对接口的多次调用所产生的结果和调用一次是一致的。通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。怎么保证消息队列消费的幂等性?这一点需要结合的实际的业务来进行处理: 1、比如我们消费的数据需要写数据库,你先根据主键查一下,如果这数据都有了,你就别插入了,执行以下update操作 2、比如我们消费的数据需要写Redis,那没问题了,反正每次都是 set,天然幂等性。 3、比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。 4、比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。
提笔写架构
深入浅出Zookeeper技术解析
通过"特性与架构"、"应用场景"、"Leader选举"、"ZK一致性与同步原理"四大小节,全面剖析Zookeeper的关键特性与内部工作原理,你将将深入了解Zookeeper在分布式系统中的重要角色,掌握其应用场景、Leader选举机制以及一致性与同步原理,为构建稳健可靠的分布式系统提供实际指南
提笔写架构
深入浅出单点登录
探讨单点登录(SSO)的精髓,解析身份认证、授权机制,帮助学员理解SSO的实现原理与安全机制。适合想深入了解用户认证体系、提升系统安全性的开发者和系统架构师
提笔写架构
消息通知系统深度解析
从通讯方式、后端设计、Netty到整合WebSocket,全面剖析消息通知系统的关键组成部分。通过这一系列课程,深入了解通知系统的核心原理,培养构建高效、可靠消息通讯系统的实际技能
提笔写架构
深入浅出消息队列
带您探索消息队列的核心概念和应用,适合想深挖消息队列技术背后原理的开发者和架构师
提笔写架构
深入浅出业务幂等性
深度解析幂等性在业务设计中的关键作用,探讨实现幂等性的策略与技术