来一套通用分布式幂等组件不咯!

    科技2026-01-13  10

    通用分布式幂等组件

    之前一篇文章中提到了幂等性的一些解决办法,但是基本上是单机上的,今天我们来弄一个支持分布式的通用幂等性组件

    【以下有不对的地方,一定要指出哦,共同成长。】

    一、背景

    分布式系统由众多微服务组成,微服务之间必然存在大量的网络调用。比如一个服务间调用异常的例子,用户提交表单之后,请求到A服务,A服务落单之后,开始调用B服务,但是在A调用B的过程中,存在很多不确定性,例如B服务执行超时了,RPC直接返回A请求超时了,然后A返回给用户一些错误提示,但实际情况是B有可能执行是成功的,只是执行时间过长而已。

    用户看到错误提示之后,往往会选择在界面上重复点击,导致重复调用,如果B是个支付服务的话,用户重复点击可能导致同一个订单被扣多次钱。不仅仅是用户可能触发重复调用,定时任务、消息投递和机器重新启动都可能会出现重复执行的情况。在分布式系统里,服务调用出现各种异常的情况是很常见的,这些异常情况往往会使得系统间的状态不一致,所以需要容错补偿设计,最常见的方法就是调用方实现合理的重试策略,被调用方实现应对重试的幂等策略。

    相信大家都知道,并且也都遇到过类似的问题以及有自己的一套解决方案。

    基本上所有业务系统中的幂等都是各自进行处理,也不是说不能统一处理,统一处理的话需要考虑的内容会比较多。

    个人认为核心的业务还是业务方自己去处理,比如订单支付,会有个支付记录表,一个订单只能被支付一次,通过支付记录表就可以达到幂等的效果。

    而那些非核心的业务,也有幂等的需求。比如网络问题,多次重试。用户点击多次等场景。这种场景下可以用一个通用的幂等框架来处理,会让业务开发更加简单。

    二、回顾一下:什么是幂等?

    对于幂等,有一个很常见的描述是:对于相同的请求应该返回相同的结果,所以查询类接口是天然的幂等性接口。举个例子:如果有一个查询接口是查询订单的状态,状态是会随着时间发生变化的,那么在两次不同时间的查询请求中,可能返回不一样的订单状态,这个查询接口还是幂等接口吗?

    幂等的定义直接决定了我们如何去设计幂等方案,如果幂等的含义是相同请求返回相同结果,那实际上只需要缓存第一次的返回结果,即可在后续重复请求时实现幂等了。但问题真的有这么简单吗?

    而我更赞同这种定义:幂等指的是相同请求(identical request)执行一次或者多次所带来的副作用(side-effects)是一样的。

    这就主要考虑三个方向:什么是相同的请求?哪些情况会有副作用?该如何避免副作用?

    三、设计

    参考一位大佬的文章,并添加自己的完善:

    相同的请求:通过分布式id生成器生成唯一id作为标识同一请求的key。通用存储:通过Redis与Mysql做统一存储,Redis做资源锁组件。使用简单:支持代码与注解两种形式,注入对应的类即可实现幂等,屏蔽加锁,记录判断等逻辑。多级存储:采用Redis作为一级存储,优点是性能高,通过设置一定的失效时间,让 Key 自动失效。Mysql或者Mongo 作为二级存储,适用于时间长或者永久存储的场景。封装Runnable与Supplier,以及自定义处理注解@IdempotentHandler标识为幂等失败的处理函数。并发读写:因为多级存储,必会涉及到并发读写的场景,主要支持两种方式,顺序和并发。顺序就是先写一级存储,再写二级存储,读也是一样。这样的问题在于性能会有点损耗。并发就是多线程同时写入,同时读取,提高性能。幂等执行流程:

    四、实现

    主要还是以那位大佬的为主,添加自己的特色。

    1. 封装Runnable与Supplier

    /** * @program: idempotence * @description: Runnable包装 * @author: Mr.Liu * @create: 2020-10-06 21:06 **/ public interface IdempotentRunnable extends Runnable{ /** * 包装 * @param key 幂等键 * @param ex 异常 */ default void run(String key, IdempotentException ex){ runTask(key,ex); } /** * 任务体 * @param key 幂等键 * @param ex 异常 */ public abstract void runTask(String key, IdempotentException ex); /** * 实现父类的 * 不用的方法 */ @Override @Deprecated default void run() { } } /** * @program: idempotence * @description: Supplier包装 * @author: Mr.Liu * @create: 2020-10-06 21:12 **/ public interface IdempotentSupplier<T> extends Supplier<T> { /** * 具体需要被调用的 * @param key 幂等键 * @param ex 异常 * @return */ default T get(String key, IdempotentException ex){ return runTask(key,ex); } /** * 任务体 * @param key 幂等键 * @param ex 异常 * @return */ public abstract T runTask(String key, IdempotentException ex); /** * 实现父类的 * 不用的方法 * @return */ @Override @Deprecated default T get() { return null; } }

    2. 基于Redisson的Redis锁和MySQL的锁。

    接口如下,有两个实现类DistributedLockMysql.class和DistributedLockRedis.class,以Redis为首要锁中间件,MySQL为备用锁组件。但是我MySQL的还没有去实现,主要是使用Redis。

    /** * @program: idempotence * @description: 分布式锁接口,锁的释放时间一定要考虑好,不然业务处理时间太长了,导致锁被释放了,然后又调用了unLock()方法,就会出现错误 * @author: Mr.Liu * @create: 2020-10-04 19:42 **/ public interface DistributedLock { /** * 加锁 * @param key 锁key * @param waitTime 尝试加锁,等待时间(ms) * @param leaseTime 上锁后的失效时间(ms) * @param success 锁成功执行的逻辑 * @param fail 锁失败执行的逻辑 * @param <T> * @return */ <T> T lock(String key, int waitTime, int leaseTime, IdempotentSupplier<T> success, IdempotentSupplier<T> fail); /** * 加锁,加锁失败立即返回 * @param key 锁key * @param leaseTime 上锁后的失效时间(ms) * @param success 锁成功执行的逻辑 * @param fail 锁失败执行的逻辑 * @param <T> * @return */ <T> T lock(String key, int leaseTime, IdempotentSupplier<T> success, IdempotentSupplier<T> fail); /** * 加锁,加锁失败立即返回 * @param key 锁key * @param leaseTime 上锁后的失效时间(ms) * @param timeUnit 时间单位 * @param success 锁成功执行的逻辑 * @param fail 锁失败执行的逻辑 * @param <T> * @return */ <T> T lock(String key, int leaseTime, TimeUnit timeUnit, IdempotentSupplier<T> success, IdempotentSupplier<T> fail); /** * 加锁 * @param key 锁key * @param waitTime 尝试加锁,等待时间(ms) * @param leaseTime 上锁后的失效时间(ms) * @param success 锁成功执行的逻辑 * @param fail 锁失败执行的逻辑 */ void lock(String key, int waitTime, int leaseTime, IdempotentRunnable success, IdempotentRunnable fail); /** * 加锁,加锁失败立即返回 * @param key 锁key * @param leaseTime 上锁后的失效时间(ms) * @param success 锁成功执行的逻辑 * @param fail 锁失败执行的逻辑 */ void lock(String key, int leaseTime, IdempotentRunnable success, IdempotentRunnable fail); /** * 加锁,加锁失败立即返回 * @param key 锁key * @param leaseTime 上锁后的失效时间 * @param timeUnit 时间单位 * @param success 锁成功执行的逻辑 * @param fail 锁失败执行的逻辑 */ void lock(String key, int leaseTime, TimeUnit timeUnit, IdempotentRunnable success, IdempotentRunnable fail); }

    3. 代码与注解使用事例

    @RestController @RequestMapping("/") public class Test { @Autowired private DistributedLock distributedLock; @RequestMapping(value = "test",method = RequestMethod.GET) public ResposeData test(@RequestParam("key") String key){ //锁的释放时间一定要考虑好,不然业务处理时间太长了,导致锁被释放了,然后又调用了unLock()方法,就会出现错误 distributedLock.lock(key,300,(k,ex)->{ System.out.println("成功:"+k); // try { // Thread.sleep(100); // } catch (InterruptedException e) { // e.printStackTrace(); // } },(k,ex)->{ System.out.println("失败"+k); }); return ResposeData.success("欧克"); } /** * 这个key应该用一个分布式id生成器来生成,不能由用户自己提供 * @param key * @return */ @RequestMapping(value = "an",method = RequestMethod.GET) @Idempotent(lockName = "test",spelKey = "#key", idempotentHandler = "idempotentHandler",readWriteType = ReadWriteTypeEnum.ORDER) public ResposeData an(@RequestParam("key") String key){ System.out.println("我已经进来了"); return ResposeData.success("success"); } /** * 固定参数格式 * @param request 请求 * @param e 异常,需要判断是否为null */ @IdempotentHandler public ResposeData idempotentHandler(IdempotentRequest request, IdempotentException e){ System.out.print(request.getKey() + ": idempotentHandler已经执行过了,"); if (e != null){ System.out.println(e.toString()); }else { System.out.println(); } return ResposeData.success("error"); } }

    4. 切面编程

    @Around(value = "@annotation(idempotent)") public Object around(ProceedingJoinPoint joinPoint, Idempotent idempotent) throws Throwable{ Object[] args = joinPoint.getArgs(); Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); String key = ""; if (StringUtils.hasText(idempotent.spelKey())){ // 这个key应该用一个分布式id生成器来生成,不能由用户自己提供 key = parseKey(idempotent.spelKey(), method, args); }else { key = ContextHolder.getRequestContext().get("globalIdempotentId"); } String userInputKey = idempotent.lockName(); if (!StringUtils.hasText(userInputKey)){ userInputKey = method.getName(); } String idempotentKey = userInputKey + ":" + key; IdempotentRequest request = IdempotentRequest.builder().key(idempotentKey) .firstLevelExpireTime(idempotent.firstLevelExpireTime()) .secondLevelExpireTime(idempotent.secondLevelExpireTime()) .timeUnit(idempotent.timeUnit()) .lockExpireTime(idempotent.lockExpireTime()) .readWriteType(idempotent.readWriteType()) .build(); if (key == null){ return tis(joinPoint, idempotent, method, request,new IdempotentException("未获取到key")); } try { return distributedIdempotent.execute(request,(k,ex)->{ try { return joinPoint.proceed(); }catch (Throwable e){ log.error("幂等执行异常"); //throw new IdempotentException(e); return tis(joinPoint, idempotent, method, request,new IdempotentException(e)); } },(k,ex)->{ //throw new IdempotentException("重复请求"); log.error("重复请求,执行幂等处理"); return tis(joinPoint, idempotent, method, request,ex); }); }catch (IdempotentException ex){ return handleIdempotentException(joinPoint, idempotent, ex); } }

    5. 注解失败逻辑处理

    处理逻辑函数限制了参数类型为IdempotentRequest request, IdempotentException e。

    /** * 执行幂等处理函数 * @param joinPoint 切点 * @param idempotent 主解 * @param method 方法 * @param request 请求 * @return */ private Object tis(ProceedingJoinPoint joinPoint, Idempotent idempotent, Method method, IdempotentRequest request, IdempotentException ex){ /**获取当前被切的那个类**/ Class targetClass = ReflectionUtils.getDeclaringType(joinPoint.getTarget().getClass(), method.getName(), method.getParameterTypes()); // 获取该类(不包括父类)的public,private, protected, default (package)方法 Method[] methods = targetClass.getDeclaredMethods(); for (Method m : methods){ if (m.getName().equals(idempotent.idempotentHandler())){ // 执行幂等处理 Class<?>[] classes = m.getParameterTypes(); // 参数类型限制为两种:IdempotentRequest request, IdempotentException e List s = Arrays.asList(classes); if (s.size() == 2 && s.contains(IdempotentRequest.class) && s.contains(IdempotentException.class)){ log.debug("执行幂等处理函数"); try { return m.invoke(joinPoint.getTarget(),request,ex); } catch (IllegalAccessException | InvocationTargetException e) { throw new IdempotentException(e); } } } } log.error("未获取到key,并且处理函数格式错误[{},{}]",IdempotentRequest.class, IdempotentException.class); throw new IdempotentException("未获取到key,并且处理函数格式错误"); }

    五、源码

    通用幂等组件:https://gitee.com/Ljolan/idempotent

    Processed: 0.020, SQL: 9