深入浅出业务幂等性---2、服务幂等和乐观锁-灵析社区

提笔写架构

防重表

对于防止数据重复提交,还有一种解决方案就是通过防重表实现。防重表的实现思路也非常简单。首先创建一张表作为防重表,同时在该表中建立一个或多个字段的唯一索引作为防重字段,用于保证并发情况下,数据只有一条。在向业务表中插入数据之前先向防重表插入,如果插入失败则表示是重复数据。

对于防重表的解决方案,可能有人会说为什么不使用悲观锁。悲观锁在使用的过程中也是会发生死锁的。悲观锁是通过锁表的方式实现的。 假设现在一个用户A访问表A(锁住了表A),然后试图访问表B; 另一个用户B访问表 B(锁住了表B),然后试图访问表A。 这时对于用户A来说,由于表B已经被用户B锁住了,所以用户A必须等到用户B释放表B才能访问。 同时对于用户B来说,由于表A已经被用户A锁住了,所以用户B必须等到用户A释放表A才能访问。此时死锁就已经产生了。

select+insert防重提交

对于一些后台系统,并发量并不高的情况下,对于幂等的实现非常简单,通过select+insert思想即可完成幂等控制。

在业务执行前,先判断是否已经操作过,如果没有则执行,否则判断为重复操作。

在并发下,并不能完成幂等性控制。通过jemeter测试,可以发现,插入了重复数据。产生了脏数据。

要解决这个问题,非常简单,在数据库层面添加唯一索引即可,将id设置为唯一索引,也是最容易想到的方式,一旦id出现重复,就会出现异常,避免了脏数据的发生也可以解决永久性幂等。但该方案无法用于分库分表情况,其只适用于单表情况。

Mysql悲观锁

悲观锁,正如其名,它指的是对数据被外界(包括本系统当前的其他事务,以及来自外部系统的事务处理)修改持保守态度,因此,在整个数据处理过程中,将数据处于锁定状态。悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在本系统中实现了加锁机制,也无法保证外部系统不会修改数据)。

悲观锁一般使用select…for update的方式;

例如:select * from table where id=1 for update;此时在table表中,id为1的 那条数据就被我们锁定了,其它的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。

优点是,悲观并发控制采取的是保守策略:“先取锁,成功了才访问数据”,这保证了数据获取和修改都是有序进行的,因此适合在写多读少的环境中使用。当然使用悲观锁无法维持非常高的性能,但是在乐观锁也无法提供更好的性能前提下,悲观锁却可以做到保证数据的安全性。

缺点是,由于需要加锁,而且可能面临锁冲突甚至死锁的问题,悲观并发控制增加了系统的额外开销,降低了系统的效率,同时也会降低了系统的并行性。

MySQL乐观锁

假设现在订单已经生成成功,那么就会涉及到扣减库存的操作。当高并发下同时扣减库存时,非常容易出现数据错误问题。

扣减库存数据错误

编写一个扣除库存服务,通过jemeter进行测试,可以发现。当模拟一万并发时,最终的库存数量是错 误的。这主要是因为当多线程访问时,一个线程读取到了另外线程未提交的数据造成。

synchronized失效问题

对于现在的问题,暂不考虑秒杀设计、队列请求串行化等,只考虑如何通过锁进行解决,要通过锁解决的话,那最 先想到的可能是synchronized。根据synchronized定义,当多线程并发访问时,会对当前加锁的方法产生阻塞, 从而保证线程安全,避免脏数据。但是,真的能如预期的一样吗?

@Service
public class StockServiceImpl implements StockService {

    @Autowired
    private StockMapper stockMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public synchronized int lessInventory(String goodsId, int num) {
        return stockMapper.lessInventory(goodsId, num);
    }
}

当前已经在在方法上添加了synchronized,对当前方法对象进行了锁定。 通过Jemeter,模拟一万并发对其进行访问。可以发现,仍然出现了脏数据。

该问题的产生原因,就在于在方法上synchronized搭配使用了@Transactional。首先synchronized锁定的是当前方法对象,而@Transactional会对当前方法进行AOP增强,动态代理出一个代理对象,在方法执行前开启事务,执行后提交事务。 所以synchronized和@Transactional其实操作的是两个不同的对象,换句话说就是 @Transactional的事务操作并不在synchronized锁定范围之内。

假设A线程执行完扣减库存方法,会释放锁并提交事务。但A线程释放锁但还没提交事务前,B线程执行扣减库存方法,B线程执行后,和A线程一起提交事务,就出现了线程安全问题,造成脏数据的出现。

MySQL乐观锁保证幂等

基于版本号实现

MySQL乐观锁是基于数据库完成分布式锁的一种实现,实现的方式有两种:基于版本号、基于条件。但是实现思想都是基于MySQL的行锁思想来实现的。

1)修改数据表,添加version字段,默认值为0

2)修改StockMapper添加基于版本修改数据方法

@Update("update tb_stock set amount=amount‐#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);

3)测试模拟一万并发进行数据修改,此时可以发现当前版本号从0变为1,且库存量正确。

基于条件实现

通过版本号控制是一种非常常见的方式,适合于大多数场景。但现在库存扣减的场景来说,通过版本号控制就是多 人并发访问购买时,查询时显示可以购买,但最终只有一个人能成功,这也是不可以的。其实最终只要商品库存不 发生超卖就可以。那此时就可以通过条件来进行控制。

1. 修改StockMapper:

@Update("update tb_stock set amount=amount‐#{num} where goods_id=#{goodsId} and amount‐# {num}>=0") 
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);

2. 修改StockController:

@PutMapping("/lessInv

entoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){

    int result = stockService.lessInventoryByVersionOut(goodsId, num);
    if (result == 1){
        System.out.println("购买成功");
        return "success";
    }

    System.out.println("购买失败");
    return "fail";
}

3. 通过jemeter进行测试,可以发现当多人并发扣减库存时,控制住了商品超卖的问题。

乐观锁控制服务间幂等

在系统中,不光要保证客户端访问的幂等性,同时还要保证服务间幂等。比较常见的情况,当服务间进行调用时, 因为网络抖动等原因出现超时,则很有可能出现数据错误。此时在分布式环境下,就需要通过分布式事务或分布式锁来保证数据的一致性。分布式锁的解决方案中MySQL乐观锁就是其中一种实现。

feign超时重试效果演示

以上图为例,当客户端要生成订单时,可以基于token机制保证生成订单的幂等性,接着订单生成成功后,还会基 于feign调用库存服务进行库存扣减,此时则很有可能出现,库存服务执行扣减库存成功,但是当结果返回时,出现网络抖动超时了,那么上游的订单服务则很有可能会发起重试,此时如果不进行扣减库存的幂等性保证的话,则出现扣减库存执行多次。

那可以先来演示当下游服务出现延迟,上游服务基于feign进行重试的效果。

1. 当前是order调用feign,所以在order中会存在feignConfigure配置类,用于配置超时时间与重试次数。


/**
 1. 自定义feign超时时间、重试次数
 2. 默认超时为10秒,不会进行重试。
 */
@Configuration
public class FeignConfigure {

    //超时时间,时间单位毫秒
    public static int connectTimeOutMillis = 5000;
    public static int readTimeOutMillis = 5000;

    @Bean
    public Request.Options options() {
        return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
    }

    //自定义重试次数
    @Bean
    public Retryer feignRetryer(){
        Retryer retryer = new Retryer.Default(100, 1000, 4);
        return retryer;
    }
}

2. stock服务的StockController中demo方法会延迟六秒。通过这种方式模拟超时效果。此时在order中调用stock 服务,可以发现,order服务会对stock服务调用四次。

这里就演示了服务间调用超时的效果,当下游服务超时,上游服务会进行重试。

服务调用超时库存多次扣减

根据上述演示,当下游服务超时,上游服务就会进行重试。那么结合当前的业务场景,当用户下单成功去调用库存服务扣减库存时, 如果库存服务执行扣减库存成功但返回结果超时,则上游订单服务就会重试,再次进行扣减库存,此时就会出现同一订单商品库存被多次扣减。

1. 在订单服务中生成订单,并调用库存服务扣减库存


@Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){

    String orderId = String.valueOf(idWorker.nextId());
    order.setId(orderId);
    order.setCreateTime(new Date());
    order.setUpdateTime(new Date());
    int result = orderService.addOrder(order);

    if (result != 1){
        System.out.println("fail");
        return "fail";
    }

    //生成订单详情信息
    List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);

    goodsIdArray.stream().forEach(goodsId->{
        //插入订单详情
        OrderDetail orderDetail = new OrderDetail();
        orderDetail.setId(String.valueOf(idWorker.nextId()));
        orderDetail.setGoodsId(goodsId);
        orderDetail.setOrderId(orderId);
        orderDetail.setGoodsName("heima");
        orderDetail.setGoodsNum(1);
        orderDetail.setGoodsPrice(1);
        orderDetailService.addOrderDetail(orderDetail);

        //扣减库存(不考虑锁)
        stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());

    });


    return "success";
}

2. 库存服务直接基于商品信息进行库存扣减

@Update("update tb_stock set amount=amount‐#{num} where goods_id=#{goodsId}") 
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
1
2
@PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
                                    @PathVariable("num") Integer num) throws InterruptedException {

        System.out.println("reduce stock");
        int result = stockService.reduceStockNoLock(goodsId, num);

        if (result != 1){
            return "reduce stock fail";
        }

        //延迟
        TimeUnit.SECONDS.sleep(6000);
        return "reduce stock success";
    }

3. 执行生成订单扣减库存,此时可以发现扣减库存方法被执行多次,并且库存数量也被扣减了多次

{"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}

乐观锁解决服务间重试保证幂等

1. 修改StockMapper,添加乐观锁控制控制库存

@Update("update tb_stock set version=version+1,amount=amount‐#{num} where goods_id=#{goodsId} and version=#{version} and amount‐#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);

2. 修改StockController,添加乐观锁扣减库存方法


/**
     * 乐观锁扣减库存
     * @param goodsId
     * @param num
     * @param version
     * @return
     */
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
                       @PathVariable("num") Integer num,
                       @PathVariable("version") Integer version) throws InterruptedException {

    System.out.println("exec reduce stock");
    int result = stockService.reduceStock(goodsId, num, version);
    if (result != 1){
        //扣减失败
        return result;
    }
    //延迟
    TimeUnit.SECONDS.sleep(6000);
    return result;
}

3. 测试,可以发现虽然发生多次重试,但是库存只会被扣减成功一次。保证了服务间的幂等性。

ps:order服务出现异常,是因为order服务会超时重试四次,但stock服务的延迟每一次都是超过超时时间的,所以最终在 order服务才会出现read timeout异常提示。

乐观锁使用场景

MySQL乐观锁更适用于一些需要计数的表上,而且在竞争不激烈,出现并发冲突几率较小时,推荐使用乐观锁。虽然通过MySQL乐观锁可以完成并发控制,但锁的操作是直接作用于数据库上,这样就会在一定程度上对数据库性能产生影响。并且MySQL的连接数量是有限的,如果出现大量锁操作占用连接时,也会造成MySQL的性能瓶颈。

阅读量:2013

点赞量:0

收藏量:0