分布式相关实践

分布式 ID

ID 在大部分场景下,都是用来区分一条记录,因此天然要求唯一性,并且根据业务需求,有的还要求趋势递增。在单节点的数据库环境下,可以利用数据库自身具备的自增特性,比如 Oracle 的 sequence,MySQL 的 auto_increment,Postgres 的 serial 等,但是在分布式集群环境下,各个数据库都有自己维护的一套自增 ID,因此可能出现多个数据库都是 ID = 1 的情况,这样就有问题,因此就有了分布式 ID,要求全局唯一,并且具备高性能高可用。下面是几种方案(不是全部)

  • 数据库自增 ID
    抛开事实不谈,如果仅仅说实现方式,这也是一种方法。还是利用数据库的自增 ID 解决,不同的是要在集群前面额外搞一个实例充当“工具库”的角色,API 先从这里拿到最新自增 ID,在插入数据的时候顺便把这个 ID 也一起提交

    缺点也是很明显,并且非常的非主流,正经人谁这么搞

  • UUID
    UUID 实现起来非常简单,并且使用得当的话基本不会重复

    1
    2
    String uuid = UUID.randomUUID().toString(); // 36
    String uuid = UUID.randomUUID().toString().replaceAll("-", ""); // 32

    但是如果有需求说 ID 里面需要反映一些有意义的东西,比如订单号、时间戳等,这种方法就不适用

    有好处吗?
    有。这种 UUID 的方式在某种程度上可以防止一些越权的操作,比如 ID = 1 是普通用户能看,ID = 2 是超管才能看,那么如果 API 权限验证没处理好,可能普通用户就能看到超管的东西

    坏处比好处多得多,比如 MySQL 特殊的数据结构,ID 在叶子节点上往往会有一个排序的操作,如果用 UUID,则每新增一条记录,都有可能会使得数据重新排序,影响性能

  • 雪花算法
    推特开源,通过多少位的时间戳 + 多少位的机器 ID + 多少位的序列号组成,详细的去看别人的文章

    GitHub 上源码 snowflake-2010 在 10 年前就不更新了,除非手动调整机器时间,正常使用基本也不会重复
    国内大厂则有其他类似的实现,比如百度的 UidGenerator、滴滴的 Tinyid、美团的 Leaf

  • 基于 Redis
    主要用了 Redis 的 INCRBY 指令

    1
    INCRBY key increment

    Redis 由于自身的单线程特点(高版本有多线程,但是是用来做其他事情的),里面的操作具备原子性,因此可以利用这条指令,每次增加 1,则可完成分布式 ID 的自增,并且不重复

    但是还是老话,凡是引入了其他中间件,都要考虑高可用问题,让它们不要挂。如果 ID 增长了,但没来得及刷盘 Redis 就挂掉了,下次起来时,可能 ID 就会重复。这个时候往往需要 RDB + AOF 配合使用,RDB 保证实例启动的速度,AOF 则确保数据的一致性;还有就是一主二从三哨兵那一套

分布式事务

单机事务的实现方式,通常是先把事务自动提交设置为 false,编写一堆逻辑,完了后手动 commit 或者 rollback
分布式事务主流的还是阿里的 Seata

Seata 里面有三个核心概念:

  • Transaction Manager(事务管理器):主要用来管理全局事务,需要单独部署一个 seata-server 的服务(支持 Nacos、Eureka、Redis、ZooKeeper 等多种注册中心,一般情况下,阿里系的东西多是配合 Nacos 使用,毕竟一家的),并且创建好全局事务和分支事务对应的表
  • Resource Manager(资源管理器):主要用来管理分支事务
  • Transaction Coordinator(事务协调者):类似警匪片中的谈判专家,维护全局事务和分支事务的状态,当事务提交或回滚的时候进行协调

Seata 里面还有四种分布式事务解决方案:

  • AT:Seata 默认模式,分阶段提交事务,但是在第一阶段就提交分支事务,那么回滚要怎么回滚呢?需要额外创建表,来记录数据修改前的状态,回滚时则进行和事务提交时相反的操作,达到最终一致;第一阶段无需锁定数据库资源,和下面的 XA 相比,性能好一点
  • XA:也是分阶段提交事务,第一阶段各个分支事务先准备,然后看结果返回就绪还是失败,如果就绪则在第二阶段提交事务,失败则回滚。这种模式,一般数据库都自带支持,所以实现起来比较简单,无需编写额外的代码或者建表;但是 XA 偏 CP,在第一阶段需要锁定数据库资源,强调强一致性,因此和上面的 AT 相比,性能较差,还牺牲了一定的可用性
  • TCC:同样是分阶段提交事务,分为 Try-Confirm-Cancel 三个阶段,需要额外的代码来处理对应业务逻辑。这样一来,在第一阶段就直接提交事务,性能比 XA 好,同时数据是通过代码来完成回滚的,无需建额外的表,比 AT 简单,并且由于事务都是由自己的代码来完成,对于不支持事务的存储引擎也可以使用。缺点就是需要自己额外写代码,并且考虑处理失败的情况,对人的要求就比较高了
  • SAGA:还是分阶段提交事务,一阶段直接提交事务,二阶段如果失败则通过代码来进行补偿操作,成功则什么都不做

分布式事务,其实用得真不多,最多就是万一发生问题,通过日志排查来进行人工手动补偿,也有可能是用户量太少了,或 CRUD 做多了

分布式锁

锁一般是用来解决多线程或者说高并发环境下共享变量的安全问题
当只有一个实例的时候,可以使用 synchronized 或者 ReentrantLock 等来进行加锁操作,但在多实例下,每个 JVM 进程都是独立的,单机锁就达不到想要的效果,因此大多都是需要借助第三方的力量,来提供一个全局加锁的功能

以最经典的抢票为例,多个人抢固定数量的票,如果处理得不得当,就会发生超卖的现象,是个人都提示抢到票,但实际上你的票并没有这么多,因为多线程下,总票数被频繁修改,没有安全保障

下面是常见的其中几种分布式锁解决方案

  • MySQL 单条 update 语句
    如果 JVM 存在多个,但是 MySQL 只有一个,这种算分布式吗?

    当它算。由于 MySQL 只有一个,而 MySQL 在处理更新语句的时候,会默认进行一个加锁的操作,因此可以利用这种特性来充当分布式锁,比如下面的伪代码(不考虑边界问题)

    1
    updateset 票数 = 票数 - 1;

    当这个表存在多个列,并且需要充当查询条件的时候,需要注意索引的设置,比如

    1
    updateset 票数 = 票数 - 1 where 目的地 = xxx;

    此时如果目的地没有添加索引,MySQL 触发的是表锁,整个表都不可用,但是如果在目的地上添加了索引,则触发的会是行锁,仅仅锁住当前目的地的数据,这样就提高了性能

    当然要正确使用查询条件,如果使用“目的地 != xxx”,那就算添加了索引,也会触发表锁,因为索引会失效
    这种方式仅仅适用简单的业务逻辑,如果想在修改票数前后添加一些额外的处理,就不行了

  • MySQL select for update 语句
    和上面的类似,只是执行 select for update 锁住了所查记录后,还可以进行一些额外的操作,完了后再执行 update 语句,相对来说就会更加灵活

    缺点在后面跟着,复杂操作更要注意加解锁的处理,防止死锁

  • MySQL 乐观锁
    乐观锁的实现思想基本都来源于 CAS(Compare And Swap),当实际的值与预期的值一致,才将旧值更新成新值,否则就自旋,也就是循环(期间可以设置短时间的 sleep,减少 CPU 的无效操作)。最常见的落地方案就是数据库表加多一个 version 字段

    先把 version 查出来,最后执行更新操作的时候再判断现在的 version 值是否和开始查到的是否一致,如果不一致,那就是被别人改过了,那就重新再拿一遍,再修改

    虽然一般都说加锁是重量级的操作,但是这种 CAS 虽然不用加锁,但是好多循环,也会占用无谓的 CPU 资源,有时性能反而不如悲观锁,所以实际使用最好自己测试一下

  • Redis 乐观锁
    主要利用 Redis 的 WATCH + MULTI + EXEC 指令实现

    当 WATCH 了某个 key,开启 MULTI 后,其他线程如果修改了这个 key 的值,那么最后执行 EXEC 的时候就会返回 nil,成功则返回 OK

    这样就实现了类似 version 的效果,执行失败,自旋,然后重新执行即可

    redis-cli 里是这三条指令,那么对应到 Java 代码里会是简单的三条相加吗?

    1
    2
    3
    4
    String key = "";
    redisTemplate.watch(key);
    redisTemplate.multi();
    redisTemplate.exec();

    不是。如果按照上面的用法会报 ERR EXEC without MULTI,明明有 multi(),但为什么还是报这个错?因为用法不对,正确的用法是这样

    1
    2
    3
    4
    5
    6
    7
    8
    9
    String key = "";
    redisTemplate.execute(new SessionCallback<>() {
    @Override
    public Object execute(RedisOperations operations) throws DataAccessException {
    operations.watch(key);
    operations.multi();
    return operations.exec();
    }
    });
  • Redis 分布式锁
    利用 Redis 的 SETNX + DEL 指令实现,SETNX 进行 key 的设置的时候,如果这个 key 不存在,才会被设置,因此可以充当加锁操作;而解锁就仅仅需要把前面设置的 key 进行删除即可

    SETNX 设置出来的属于独占锁(排他锁),一旦加锁成功,但是由于某些原因 API 挂了,这就会导致整个服务不可用,因为要解锁就必须先经过获取锁的那一步,而现在由于已经设置了锁,那就没办法再得到锁,因此没办法释放。所以需要给锁设置一个过期自动删除的时间

    利用 Redis 的 EXPIRE 指令可以对 key 设置超时时间,完成上面的需求

    Redis 还提供了 SETNX + EXPIRE 的组合指令,比如设置一个 10 秒过期的 key

    1
    SET key value EX 10 NX

    其中 EX 表示秒,可换成 PX 表示毫秒;NX 表示不存在才设置,可换成 XX 表示存在才设置。由于是一条指令,因此具备原子性,对应到 Java 代码里则是

    1
    redisTemplate.opsForValue().set(key, value, 10, TimeUnit.SECONDS);

    此时解决了一个问题,但是出现了新的问题

    举个例子,假如 API 里设置了 10 秒过期自动释放锁,但是业务逻辑执行完需要 15 秒,此时执行到第 10 秒的时候,锁自动释放掉了,第二个线程拿到锁,但是前面的线程还没执行完,继续执行 5 秒后执行完毕,手动释放锁,这个时候的锁已经是第二个线程拿到的了,自己加的锁被别人释放了,这就有问题,因为释放锁的时候仅仅是执行一个删除 key 的操作,才不管这个锁是谁的

    1
    redisTemplate.delete(key);

    因此,需要加上一些判断,让自己的锁只有自己才能删

    可能有人会想到说,既然要确保不能删除别人的锁,那是否让 key 每个线程不同就可以了?我删除的是 set 的时候生成的 key,如果删除了一个不存在的 key,就会失败,等于没删除

    这也是正常人的思维,但是问题在于,每个 key 都不一样,起不到加锁的作用。因此不能在 key 上动手,而可以将矛盾转移到 value 上,比如

    1
    2
    3
    4
    5
    6
    7
    8
    String key = "";
    String value = UUID.randomUUID().toString();

    redisTemplate.opsForValue().set(key, value, 10, TimeUnit.SECONDS);

    if (value.equals(redisTemplate.opsForValue().get(key))) {
    redisTemplate.delete(key);
    }

    这样就解决了自己加的锁只能自己删的问题

    但是又引出了新的问题

    当第一个线程进入 if 代码块,准备执行 delete(key) 的时候,刚好到了过期时间,key 被删了,刚好此时切到第二个线程拿到锁准备执行业务代码,刚好又切回第一个线程继续执行删除操作,就又造成了第二个线程的锁被第一个线程删掉。尽管通过 UUID 来确保了唯一性,但是进入 if 后,delete 操作还是只看 key,不管 UUID

    原因在于,if 和 delete 操作不具备原子性,要解决这个问题,就需要借助 Redis 的 EVAL 指令来执行 lua 脚本

    Redis 里面的操作具备原子性,因此 EVAL 指令具备原子性,因此执行 lua 脚本的操作也具备原子性,因此就算在脚本里写 N 行代码,还是具备原子性,因此可以在里面进行一些较为复杂的业务逻辑编写,因此可以把上面的判断搬进 lua 脚本里

    1
    2
    3
    4
    5
    6
    7
    8
    9
    String lua = """
    if redis.call('GET', KEYS[1]) == ARGV[1]
    then
    return redis.call('DEL', KEYS[1])
    else
    return 0
    end
    """;
    redisTemplate.execute(new DefaultRedisScript<>(lua, Boolean.class), List.of(key), value);

    这样就确保了自己的 key 只有自己能删

    但是还是有其他问题在

    如果一个请求里面执行的业务逻辑获取了锁,但是需要调用别的方法执行另外的逻辑,这个方法里面也要先获得锁才能执行,但是前面的操作已经获取过一次锁,既然在前面已经获取了锁,方法还没执行完就不会释放,这个时候调用另一个方法,这个方法又需要先获取锁才能执行,就出现了死锁。因此,对于同一个请求,要允许它重复获得锁,需要做一个可重入的处理,类似 ReentrantLock

    要实现可重入,就需要记下来获取锁的次数,每次获取到锁,次数 +1,随着方法的调用完毕出栈 -1,最后减到 0 删除 key,达到释放锁的效果。如果用上面的数据结构,因为 value 是 UUID,因此可以在后面拼一个次数,比如 UUID:1,这样也是一种方法。但是 Redis 其实提供了一种更加合适的数据结构 Hash,用 HSET + HEXISTS + HINCRBY 指令可完成类似于对象的存储(不是那种文件的对象存储)

    因为需要提前判断 key 是否存在,不存在则进行设置,存在则需要判断当前的锁是不是自己的,如果是则更新获取锁次数,逻辑较为复杂,因此可以借助 lua 脚本来完成

    加锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    String lua = """
    if redis.call('EXISTS', KEYS[1]) == 0 or redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1
    then
    redis.call('HINCRBY', KEYS[1], ARGV[1], 1)
    return 1
    else
    return 0
    end
    """;
    redisTemplate.execute(new DefaultRedisScript<>(lua, Boolean.class), List.of(key), value);

    解锁则进行相反的操作,当锁次数为 0 后,表示全部方法出栈,删除 key,否则就只是次数变化,加负数就是减法操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    String lua = """
    if redis.call('HEXISTS', KEYS[1], ARGV[1]) == 0
    then
    return nil
    elseif redis.call('HINCRBY', KEYS[1], ARGV[1], -1) == 0
    then
    return redis.call('DEL', KEYS[1])
    else
    return 0
    end
    """;
    redisTemplate.execute(new DefaultRedisScript<>(lua, Boolean.class), List.of(key), value);

    这样就完成了可重入锁的设置

    但是还有问题

    由于设置了过期自动删除 key,因此可能出现业务还没执行完,就自动释放了锁,因此需要确保在代码的执行过程中,通过一些额外的操作来使这个 key 自动续期,达到永不过期的效果。在这个条件下,一旦业务没完成,就会一直自动续期,那么还需要设置过期时间吗?

    需要。设置过期时间只是一个兜底的方案,理想情况下会是代码里进行加锁、解锁的操作,万一有故障发生,才会走超时自动释放锁的流程,防止死锁,所以两者并不冲突,反而相辅相成

    为了完成这个操作,需要定期去判断 Redis 中是否还存在这个 key,存在则更新 key 的 TTL,最简单的就是利用 JDK 的 Timer,比如每隔 5 秒去更新下过期时间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private void updateTTL() {
    String lua = """
    if redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1
    then
    return redis.call('EXPIRE', KEYS[1], 10)
    else
    return 0
    end
    """;
    new Timer().schedule(new TimerTask() {
    @Override
    public void run() {
    boolean success = Boolean.TRUE.equals(redisTemplate.execute(new DefaultRedisScript<>(lua, Boolean.class), List.of(key), value));
    if (success) {
    updateTTL();
    }
    }
    }, 5 * 1000); // 只设置 delay,没设置 period,因此只是一次性任务,无需再额外进行 cancel() 操作
    }

    获取锁成功时,调用这个方法开启定时任务,后面就只需要递归调用,当锁释放时,由于定时器里面返回 false,因此也会自动结束

    可重入锁,则也需要加上更新过期时间的逻辑处理,比如 10 秒

    1
    2
    redis.call('HINCRBY', KEYS[1], ARGV[1], 1)
    redis.call('EXPIRE', KEYS[1], 10)

    这样就完成了自动续期,基本上满足大部分业务需求

  • Redlock
    也直译成红锁,官方的一种 Redis 集群下的实现方式,文档:Distributed Locks with Redis

    在集群的情况下,Redis 会分为主从节点,因为主从节点之间同步数据需要花费一定的时间,因此如果 API 在主节点中获取了锁,数据还没来得及同步到从节点,主节点挂了,哨兵机制会将从节点升级为主节点,如果 API 也是集群部署,则可能导致另一个实例能重新获取到新的锁,导致分布式锁失效

    红锁的实现原理,简单来说就是,当获取锁的时候,从每个节点上都利用相同的 key 去获取,当超过 3 个以上节点获取锁成功(建议 5 个节点),才算成功。如果某个节点获取失败,则跳过,防止阻塞。另外再对在获取锁的过程中,和锁自动释放的时间进行一个处理,详细的话自己去看文档

  • Redisson
    别人封装好的,用起来比较简单

    其实去看里面的代码,核心部分也是通过 lua 脚本,和上面的差不多,只不过会做了一些不太容易看懂的优化,性能会更好,还有一些公平锁非公平锁的处理,比如

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
    "if (redis.call('exists', KEYS[1]) == 0) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
    "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
    "return nil;" +
    "end; " +
    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    "if (counter > 0) then " +
    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
    "return 0; " +
    "else " +
    "redis.call('del', KEYS[1]); " +
    "redis.call('publish', KEYS[2], ARGV[1]); " +
    "return 1; " +
    "end; " +
    "return nil;",
    Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

    有比较丰富的中文文档:目录

  • ZooKeeper
    ZooKeeper 的 ZNodes 在设计上来说就是不可重复的,并且 ZooKeeper 偏向 CP,强调数据的一致性; ZooKeeper 提供了监听节点的机制;ZooKeeper 可以创建临时顺序节点;上面种种特性加起来,也是很适合用来做分布式锁

    当有请求要获取锁的时候,给它创建一个临时顺序节点,并且监听上一个节点,如果上一个节点没了,表明自己是最前上面的,表示获取锁成功,业务执行完毕后删除节点释放锁,这样下一个节点监听到,它就能获取锁,以此类推,直到所有请求处理完成

    由于用了顺序节点,每个请求都会创建不同的节点,删除的时候就不会出现误删的情况

    可重入锁的实现思想和其他的也类似,也是记录下线程(也就是当前请求)相关的东西,进行 +1、-1 的操作。线程隔离的东西就可以考虑使用 ThreadLocal

    而临时节点天然防死锁,当创建一个临时节点时,如果 API 挂了,API 与 ZooKeeper 之间的连接就会断开,而 ZooKeeper 检测到连接断开,就会自动将临时节点删除

  • Curator
    别人封装好的 ZooKeeper 客户端

未完待续


相关链接:分布式相关理论