zookeeper

思维导图

选型的一些思考

在进行技术选型时,当考虑使用ZooKeeper作为分布式协调服务的解决方案时,首先需要明确ZooKeeper是否能够满足特定的应用场景需求。ZooKeeper作为一种分布式系统中的协调工具,虽然功能强大且通用,但它并不一定适合所有场景。在分布式协调服务的细分领域中,存在多种替代方案。这些方案针对不同的需求和约束条件,可能提供更优的性能、更高的可扩展性或更简单的使用方式。因此,在最终决定采用ZooKeeper之前,应该对其功能特性进行全面评估,并与其他可用的替代方案进行比较,以确保选择最适合当前业务需求的解决方案。

服务发现

早期,Dubbo选择ZooKeeper作为其服务发现组件,这在一定程度上受到当时的技术环境影响。当时,微服务架构并未像现在这样广泛流行,服务发现的解决方案相对有限。重要的是要注意,ZooKeeper主要遵循CP(一致性和分区容错性)模型。然而,对于服务发现组件而言,是否真的需要强调一致性?个人认为,对于服务发现组件来说,最重要的应该是高可用性。在微服务架构中,服务发现组件的任何故障都不应导致整个系统的服务瘫痪,这种情况是不可接受的。理想情况下,服务发现组件应始终保持可用,即使数据不一致导致服务不可用也应通过负载均衡策略或服务降级等方案来确保系统的正常运行。

随着技术的发展,现在已经出现了一些更适合微服务架构的服务发现组件,它们主要遵循AP(可用性和分区容错性)模型,以确保在网络分区和其他故障情况下的高可用性。例如,Eureka(Spring Cloud 1.x)和Nacos,它们提供了更灵活和健壮的服务发现机制,更适合于微服务架构的服务发现需求。

配置中心

在对配置中心进行技术选型时,ZooKeeper的CP(一致性和分区容错性)模型确实非常适合用元数据配置服务。ZooKeeper的节点监听功能可以方便地实现配置的动态更新,这对于基本的元数据配置服务来说是一个不错的选择,早期的Kafka等相关服务也采用其作为解决方案。它的一致性保证了配置信息的准确性和可靠性。

然而,如果将ZooKeeper作为一个功能更全面的配置中心,也存在一些局限性。首先,它缺乏一个灵活且易于使用的配置UI界面,这对于管理和维护大量配置项可能会带来一定的挑战。其次,ZooKeeper本身并不提供数据解析逻辑,这意味着用户需要自行实现配置数据的解析和处理逻辑,这增加了使用的复杂性。

鉴于这些局限性,不如使用针对性更强的服务,如Apollo或Nacos。这些工具不仅提供了更友好的配置界面和数据管理功能,还内置了数据解析和处理逻辑,减少了开发者的工作量。此外,它们还可能提供更多高级功能,如配置版本管理、环境隔离、权限控制等,这些都是在复杂的应用环境中不可或缺的特性。

分布式锁

ZooKeeper构建的分布式锁不是一个适用于所有场景的通用解决方案。这主要是由于ZooKeeper底层的ZAB(ZooKeeper Atomic Broadcast)协议的工作机制。在ZAB协议中,事务的提交需要集群中大多数节点的同意,这意味着随着集群节点数量的增加,达成共识所需的时间会增加,从而导致事务操作的延迟性增高。因此,如果考虑使用ZooKeeper实现分布式锁,特别是在分布式事务场景中,就需要考量业务是否能够容忍这种写操作的性能瓶颈。

如果业务场景中对分布式锁的要求是高可用性,并且需要在服务异常时锁具有自动恢复的能力,那么ZooKeeper是一个非常合适的选择。可以使用临时节点,服务异常时自动释放锁。

然而,如果分布式锁应用于高并发的场景,ZooKeeper就不是一个理想的选择。在高并发场景下,ZooKeeper的性能瓶颈会严重影响业务,导致锁操作的响应时间变长,从而影响整体系统的性能。在这种情况下,可以考虑其他专门为高并发设计的分布式锁解决方案,例如基于Redis的RedLock算法等,这些方案能够提供更快的响应时间和更好的扩展性,更适合于高并发环境。

Leader选举

分布式服务中的Leader选举可以通过ZooKeeper来实现,优点是减少了开发工作量,因为ZooKeeper提供了一套现成的、可靠的机制来处理选举的复杂性。在整个方案中,各个服务节点尝试在ZooKeeper中创建同一个zNode(节点)。由于ZooKeeper保证同一个节点名下只能成功创建一个临时zNode,因此第一个成功创建该zNode的服务节点将成为Leader。

这个临时zNode的特性是关键:它确保了当Leader节点宕机或失去与ZooKeeper集群的连接时,该zNode会被自动删除。其他的服务节点(Follower)在ZooKeeper上对这个zNode进行监听。当它们检测到这个zNode被删除的事件时,意味着原Leader已不再可用,随即开始新一轮的Leader选举。

这种选举方案适合在Leader选举时,没有额外业务逻辑处理的选举场景。

入门

概述

文章涉及源码

Apache ZooKeeper是一个开源的分布式协调服务,主要用于构建分布式应用程序。它为分布式应用程序提供一套简单、可靠的协调和管理功能,帮助应用程序处理分布式环境中的各种服务,如 配置管理同步服务发现 等。

工作机制

ZooKeeper, 从设计模式的角度来看,可以被认为是一个实现了 观察者模式 的分布式协调服务框架。它主要负责存储和管理分布式系统中重要的配置信息和命名数据。当这些数据状态发生变化时,ZooKeeper会通知已经向其注册的观察者,从而使这些观察者能够做出相应的反应。

Zookeeper = 文件系统 + 通知机制

特点

CAP定理

  1. 可靠性:即使部分节点发生故障,整个系统仍将继续运行。

  2. 顺序一致性:Zookeeper 数据操作按照请求的先后顺序排队进行。集群可能存在短暂的数据不一致窗口,但ZooKeeper保证最终一致性。

  3. 原子性:数据更新操作是原子的,要么完全成功,要么完全失败,不会有中间或部分完成的状态。

  4. 可扩展性:集群架构支持节点水平扩展。

  5. 监视和通知:可以在 znode 上设置监视。此机制允许客户端接收有关特定 znode 更改的通知,而无需轮询更新。

广义上满足CP定理

ZK是一个满足CP理论的分布式应用程序协调服务,A(可用性)不满足是因为ZK集群Leader宕机恢复选举过程中,整个集群处于不可用状态,Zab协议保证了集群间数据的最终一致性。

数据结构

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个ZNode都有一个与之关联的路径,这个路径为其在ZooKeeper中提供了唯一的标识。

节点类型

持久性

  • 持久节点(Persistent):无论客户端与服务端的会话是否失效,该节点都会持续存在,除非被显式删除。
  • 临时 (Ephemeral):当客户端与服务端的会话失效时,该节点会自动被删除。

顺序性

  • 普通节点:节点的名称与创建时客户端指定的名称完全相同。
  • 顺序节点:在节点的名称后会自动附加一个唯一的递增序列号。
节点类型说明
持久节点在Zookeeper中,持久节点会持续存在,直到被显式删除。
持久顺序节点与持久节点特性相同,但每次创建时,节点名称后会自动添加由其父节点维护的递增整型数字。
临时节点临时节点与客户端会话相关联。客户端会话失效时,其创建的所有临时节点都会被删除。
临时顺序节点与临时节点特性相同,但在节点名称后会自动添加由其父节点维护的递增整型数字。

临时节点

注意:临时节点不能添加子节点

节点元数据 ephemeralOwner 就是临时节点客户端的 session id

使用场景

Container节点

1
create -c /test

Container 节点是ZooKeeper 3.5.0及其以后版本中引入的一种特殊类型的znode。

  1. 自动清理
    • Container 节点下的所有子节点都被删除后,Container 节点会在将来某个时间点被ZooKeeper自动删除。
    • 这个特性使得 Container 节点很适合用于那些只需要短暂地作为容器存在的场景。
  2. 无法直接删除
    • 如果你尝试直接删除一个还包含子节点的 Container 节点,这个操作会失败。只有当其下没有子节点时,Container 节点才会被自动删除。

TTL节点

3.5.3版本新增,需要配置系统变量zookeeper.extendedTypesEnabled=true,3.6.3版本后默认启用

1
create -t 10 /test

十秒后自动删除

  1. 自动删除:一旦TTL时间过去,节点将被自动删除。
  2. 递归删除:如果一个TTL节点是一个父节点,当其到期被删除时,其子节点也会被删除。
  3. 不可更改的TTL:一旦设置了TTL值,该值是不可以被修改的。
  4. TTL的时间单位:在ZooKeeper中,TTL的单位是毫秒。

节点数据信息

zookeeper 中的所有存储的数据是由 znode 组成的,节点也称为 znode,并以 key/value 形式存储数据

说明描述
data存储在znode中的数据。
acl定义了用户的权限以及他们能够对znode执行的操作。
c: 允许创建子节点
w: 允许更新节点数据
r: 允许读取节点数据和获取子节点列表
d: 允许删除子节点
a: 允许设置节点的acl权限
stat包含znode的元数据,如创建和修改的时间戳、版本信息、大小等。
child列出当前znode的直接子节点。

节点元数据

字段说明
czxid创建节点时的事务ID。事务ID(zxid)表示ZooKeeper状态的每次修改。每个zxid都是唯一的,且按修改的顺序连续生成。如果zxid1小于zxid2,则zxid1在zxid2之前发生。
mzxid节点最后修改时的事务ID。
pZxid当前节点的子节点列表最后一次修改的事务ID。只有当子节点列表变动(例如,添加或删除子节点)时,此ID才会变更。修改子节点的数据内容不会影响此ID。
ctime创建节点时的时间戳(毫秒为单位,从1970年1月1日开始)。
mtime节点最后修改的时间戳(毫秒为单位,从1970年1月1日开始)。
cversion子节点版本号。每当子节点列表变动时,此版本号递增。
dataversion数据版本号。每当节点数据变动时,此版本号递增。
aclVersionACL(访问控制列表)版本号。每当节点的ACL变动时,此版本号递增。
ephemeralOwner如果是临时节点,此字段表示znode拥有者的session id。如果是持久节点,则此字段为0。
dataLength节点数据的长度(以字节为单位)。
numChildren当前节点的直接子节点数量。

集群角色

  1. 事务请求

    • 在ZooKeeper的集群(称为ensemble)中,为了保持数据一致性,修改操作都被视为事务请求。
    • 客户端发起的事务请求首先到达Leader。
    • Leader创建提议并发起投票,请求Follower节点的意见。
    • Follower根据其状态回应投票结果。
    • 若过半数Follower同意,Leader确认并执行事务。
    • 由于需过半同意,集群大小确实对写性能有所影响:节点越多,达到多数的延迟越高。
  2. 事务请求转发

    • Follower收到客户端写请求后,会转发给Leader,因为仅Leader可提议更改。
    • Leader提议更改并等待Follower的回应。
    • 一旦得到过半Follower的确认,Leader提交更改,并同步确保所有Follower与其状态一致。

    写请求需要所有活跃的节点参与进来保证数据的一致性,因此它们被视为事务请求。

    角色说明
    leader负责发起投票和做出决策,更新系统状态,以及处理事务请求。
    follower (跟随者)参与投票,接收并处理客户端的非事务请求并返回结果,同时将事务请求转发给leader进行处理。
    observer (观察者)虽然不参与投票,但同步leader的状态来扩展系统并提高读取性能。它还可以接收客户端请求,处理非事务请求并返回结果,同时将事务请求转发给leader。

observer 观察者

  • Leader 选举过程中,我们通常不讨论 Observer,这是因为 Observer 不具备投票权。尽管如此,ObserverFollower 在功能上极为相似,主要区别是其不参与投票和过半机制。Observer 可以接受客户端连接,并能够从 Leader 同步数据。
  • Observer 的设计目的之一是为了支持扩容。它可以帮助实现数据的动态迁移和扩容。最重要的是,Observer在不影响集群写性能的前提下增强了读取性能,使其成为异地数据中心数据同步的理想选择。

集群搭建

zookeeper 下载

创建节点文件夹(伪集群)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mkdir /data/zookeeper/zk1
mkdir /data/zookeeper/zk1/data
mkdir /data/zookeeper/zk1/logs
echo 1 > /data/zookeeper/zk1/data/myid

mkdir /data/zookeeper/zk2
mkdir /data/zookeeper/zk2/data
mkdir /data/zookeeper/zk2/logs
echo 2 > /data/zookeeper/zk2/data/myid

mkdir /data/zookeeper/zk3
mkdir /data/zookeeper/zk3/data
mkdir /data/zookeeper/zk3/logs
echo 3 > /data/zookeeper/zk3/data/myid

将下载好的zk分别解压到创建的三个zk目录中

在zookeeper conf目录下分别添加zoo.cfg

zookeeper 完整配置详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#心跳时间2秒 
tickTime=2000
#Follower跟随者服务器与Leader领导者服务器之间初始化连接时能容忍的最多心跳数10*tickTime
initLimit=10
#集群中Leader与Follower之间的最大响应时间单位5*tickTime
syncLimit=5
#存储快照文件 snapshot 的目录。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能
dataDir=/data/zookeeper/zk1/data
#事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能
dataLogDir=/data/zookeeper/zk1/logs
#zookeeper端口
clientPort=2181
#单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制
maxClientCnxns=60
#server.1代表一台服务器的编号,第一个端口为集群通讯端口,第二个端口代表Leader选举的端口
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883
#指定观察者
#server.3=127.0.0.1:2883:3883:observer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#心跳时间2秒 
tickTime=2000
#Follower跟随者服务器与Leader领导者服务器之间初始化连接时能容忍的最多心跳数10*tickTime
initLimit=10
#集群中Leader与Follower之间的最大响应时间单位5*tickTime
syncLimit=5
#存储快照文件 snapshot 的目录。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能
dataDir=/data/zookeeper/zk2/data
#事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能
dataLogDir=/data/zookeeper/zk2/logs
#zookeeper端口
clientPort=2182
#单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制
maxClientCnxns=60
#server.1代表一台服务器的编号,第一个端口为集群通讯端口,第二个端口代表Leader选举的端口
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#心跳时间2秒 
tickTime=2000
#Follower跟随者服务器与Leader领导者服务器之间初始化连接时能容忍的最多心跳数10*tickTime
initLimit=10
#集群中Leader与Follower之间的最大响应时间单位5*tickTime
syncLimit=5
#存储快照文件 snapshot 的目录。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能
dataDir=/data/zookeeper/zk3/data
#事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能
dataLogDir=/data/zookeeper/zk3/logs
#zookeeper端口
clientPort=2183
#单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制
maxClientCnxns=60
#server.1代表一台服务器的编号,第一个端口为集群通讯端口,第二个端口代表Leader选举的端口
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883

分别启动zk

1
2
3
4
5
cd bin目录
./bin/zkServer.sh start

# 查看状态
./bin/zkServer.sh status

客户端命令行

命令行语法

命令基本语法功能描述
help显示ZooKeeper支持的所有命令及其用法。
ls <path> [watch]列出指定path下的子节点。可选参数watch设置监听指定节点的子节点变化。
-R:递归列出所有子节点。
ls2 <path> [watch]类似于ls,但同时显示节点的状态信息,如数据版本和ACLs。
create <path> [data] [acl]在指定的路径创建一个新节点。
-s:创建一个带序列号的节点。
-e:创建一个临时节点,会话结束或超时后消失。
-c:创建一个容器节点,会自动删除没有子节点的容器。
-t <time>:创建一个具有TTL(存活时间)的节点,未修改且无子节点时将被自动删除。
get <path> [watch]获取指定路径节点的数据内容。watch参数用于设置对节点数据变化的监听。
set <path> <data> [version]更新指定路径节点的数据。
-w:更新时设置对节点数据变化的监听。
-s:更新数据同时设置附加信息。
-v <version>:指定节点的版本进行乐观锁控制。
stat <path>显示指定节点的元数据信息,如版本号和子节点数。
delete <path> [version]删除指定路径的节点。
-v <version>:用于乐观锁控制,只有版本匹配时才能删除。
deleteall <path>递归删除指定路径及其所有子节点。
rmr <path>递归删除节点,与deleteall功能相同,但rmr已被弃用。

操作节点

查看指定路径下的子节点

1
ls /

查看子节点和元数据

1
ls2 /

查看节点元数据

1
stat /

创建永久节点

1
create /yongjiu 永久节点

创建永久顺序节点

1
create -s /yongjiu/shunxu 永久顺序节点

创建成功后名称会发生变化 shunxu0000000000 后面会加上序列号


创建临时节点

1
create -e /linshi 临时节点

创建临时顺序节点

1
create -e -s /linshi 临时顺序节点

临时顺序节点的父节点不能是临时节点


设置节点的值

1
set /yongjiu 永久节点2

获取节点的值

1
get /yongjiu

删除节点

  • 普通删除

    1
    delete /yongjiu
  • 乐观锁删除

    1
    delete -v 1 /youjin

递归删除节点,包括其子节点

1
deleteall /yongjiu
1
rmr /yongjiu

ACL权限操作

  • 注册当前会话的账号和密码

    1
    addauth digest test:123456
  • 创建节点并设置权限(指定该节点的用户,以及用户所拥有的权限)

    1
    create /test-node test auth:test:123456:cdwra

如果其他会话不注册当前会话的账号密码,则没有权限操作/test-node节点

客户端操作

ZooKeeper原生Java API的不足之处:

  • 在连接zk超时的时候,不支持自动重连,需要手动操作
  • Watch注册一次就会失效,需要反复注册
  • 不支持递归创建节点

Apache curator

  • 解决Watch注册一次就会失效的问题
  • 支持直接创建多级结点
  • 提供的 API 更加简单易用
  • 提供更多解决方案并且实现简单,例如:分布式锁
  • 提供常用的ZooKeeper工具类
  • 编程风格更舒服

zkClient 节点操作

curator 节点操作

Curator使用手册

节点监听原理

在ZooKeeper中,节点监听(watch)是一种机制,允许客户端在指定的znode上注册一个watch,以便在该节点上发生特定类型的事件时得到通知。ZooKeeper的watch机制是轻量级的,它被设计为一次性触发器,即一旦被触发就会被移除,如果需要持续监听,则需要在每次接收到通知后重新设置watch。

节点监听流程

  1. 客户端初始化 当主线程中创建一个ZooKeeper客户端实例时,客户端会初始化其内部组件和线程。
  2. 线程启动 客户端启动两个关键的后台线程:
    • 发送线程(Send Thread): 负责与ZooKeeper集群建立和维持网络连接,发送请求和接收响应。
    • 事件线程(Event Thread): 负责处理来自服务器的事件通知,并触发注册的Watcher回调。
  3. 监听器注册 客户端通过发送线程向ZooKeeper服务器注册Watcher,同时指定感兴趣的事件类型。
  4. 服务器处理 ZooKeeper服务器接收到Watcher注册信息后,会把Watcher对象与指定的节点关联起来,并保存在内部的监听器映射表中。
  5. 事件触发 一旦监视的节点发生了客户端注册的事件类型变化,ZooKeeper服务器会将此变化封装为一个事件通知,并将其加入到待处理的事件队列中。
  6. 事件通知 事件线程从队列中获取通知,并通知相关客户端的Watcher进行处理。
  7. 监听器一次性特性 在ZooKeeper中,Watcher是一次性的,即一旦触发,就不再有效。如果客户端需要持续监听某个事件,它需要在每次处理完事件通知后再次注册相同的Watcher。

注意事项:

  • 为了保证线程安全,事件处理逻辑应避免直接在process()方法中执行长时间运行的操作或网络调用。如果必须进行这些操作,应在process()方法中将任务委托给其他线程执行。
  • 客户端的主线程通常用于发起ZooKeeper操作和Watcher注册,并不直接参与网络通信或事件处理。这些职责由发送线程和事件线程分别承担。

监听事件

事件说明触发条件
None连接状态事件客户端的连接状态改变时,包括以下KeeperState事件:
- Expired:会话过期
- Disconnected:断开连接
- SyncConnected:同步连接已建立
- AuthFailed:认证失败
NodeCreated节点创建事件对特定节点设置exists监听后,该节点被成功创建时触发
NodeDeleted节点删除事件被监听的节点被删除时触发
NodeDataChanged节点数据变化事件被监听的节点的数据发生变化时触发
NodeChildrenChanged子节点列表变化事件被监听节点的直接子节点列表发生变化时触发,如子节点的添加或删除

客户端实现 Watch

Curator事件监听

Curator 实现Watch

zkClient 实现Watch

Curator提供了五种监听方式

Curator框架为ZooKeeper客户端操作提供了强化的监听器功能,包括以下几种类型:

  1. Watcher监听:类似于ZooKeeper原生API的Watcher,用于对节点的变更进行一次性监听。它与ZooKeeper的原生实现在使用上差别不大,但Curator提供了更易用的接口。
  2. CuratorListener监听:专门用于处理后台操作的通知,如使用inBackground方法提交的异步任务。这类监听关注的是任务执行的结果及错误通知,而非节点内容的改变。
  3. NodeCache监听:针对单个节点的监听器,它会监控节点本身的创建、更新和删除事件。一旦注册,无需再次设置监听器,Curator会自动管理监听的生命周期。
  4. PathChildrenCache监听:用于监听一个节点的所有子节点的状态变化,如子节点的增加、更新和删除。与NodeCache一样,它提供了自动再监听机制,简化了开发者的操作。
  5. TreeCache监听:结合了NodeCachePathChildrenCache的特性,监听指定的起始节点及其所有子节点的变化。这意味着无论是节点本身还是其任意层次的子节点发生变化,TreeCache都能捕捉到事件,并自动重新注册监听,方便用户追踪整个树形结构的状态。

节点的值变化监听

1
2
客户端1
get -w /yongjiu
1
2
客户端2
set /yongjiu 永久节点2
1
2
3
4
客户端1
WATCHER::

WatchedEvent state:SyncConnected type:NodeDataChanged path:/yongjiu

节点的子节点变化监听

只对一级子节点有效

当监听节点的子节点发生变化就会触发(新增和删除)

1
2
客户端1
ls -w /yongjiu
1
2
客户端2
create -s /yongjiu/shunxu
1
2
3
4
客户端1
WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/yonjiu

节点的后辈节点变化监听

1
2
客户端1
ls -R -w /test
1
2
客户端2
create /test/test2/test3/test4
1
2
3
4
客户端1
WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/test/test2/test3

判断Znode是否存在

1
exists /test

存在则返回节点信息,不存在则返回null

事务操作

zkClient 使用事务

curator 使用事务

在分布式系统中,我们经常需要确保一组操作要么全部成功执行,要么全部不执行,以此来保持系统的一致性。这是事务的基本特性,也称为原子性。在多线程或者分布式环境中,操作原子性尤为重要,因为它可以避免由于操作部分完成而引起的数据不一致问题。

从版本3.4.0起,ZooKeeper引入了 multi 操作,它允许客户端原子性地执行一批操作。这意味着这批操作要么都成功,要么都不会对ZooKeeper的状态产生任何影响。在Java客户端中,这一特性被封装在 Transaction 类中,提供了一种便捷的方法来组合多个操作并一次性提交。如果事务中的任何操作失败,整个事务会回滚,保证数据的一致性。

multiop

删除 /a /b /c 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
@SneakyThrows
public void multiOpTest() {
List<OpResult> results = zooKeeper.multi(Arrays.asList(
Op.delete("/a", -1),
Op.delete("/b", -1),
Op.delete("/c", -1)
));

for (OpResult result : results) {
System.out.println(result.getType());
}
}

假如只存在 /a /b,执行时会抛出 NoNodeException 因为不存在 /c,执行完之后发生 /a /b 还在

异步处理

跟 ZooKeeper 的其他节点操作一样,multiop也提供了异步的版本,通过返回码判断执行结果,不用去捕获处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* multi 事务异步操作
*/
@Test
public void asyncMultiOpTest() throws InterruptedException {
// 回调函数
// ctx 是multi 方法传入的
AsyncCallback.MultiCallback callback = (rc, path, ctx, opResults) -> {
switch (KeeperException.Code.get(rc)) {
case OK:
System.out.println(String.format("节点%s删除成功", path));
case CONNECTIONLOSS:
System.out.println("连接丢失");
break;
case NONODE:
System.out.println("NoNode Error!");
break;
default:
System.out.println("Error when trying to delete node: " + KeeperException.create(KeeperException.Code.get(rc), path));
}
};

zkClient.multi(Arrays.asList(
Op.delete("/a", -1),
Op.delete("/b", -1),
Op.delete("/c", -1),
),
callback,
null
);

TimeUnit.SECONDS.sleep(2);
}

Transaction

ZooKeeper 的 Transaction 功能建立在 multi 操作之上,提供了一种灵活的方式来组织一组操作。在这个机制中,你可以在事务提交前随时添加新的操作到一个原子操作序列中。此外,你可以在不同的方法中构建事务,不受限于单一代码块。当准备好提交时,Transaction 支持同步(commit)和异步(commitAsync)两种提交方式,确保事务的一致性执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
@SneakyThrows
public void transactionTest() {
Transaction t = zooKeeper.transaction();
t.create("/t1", "t1".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
t.create("/t2", "t2".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
t.create("/t3", "t3".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<OpResult> results = t.commit();

for (OpResult result : results) {
if (result instanceof OpResult.CreateResult) {
System.out.println("create success");
}
}
}

异步处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
@SneakyThrows
public void asyncTransactionTest() {
// 回调函数
AsyncCallback.MultiCallback callback = (rc, path, ctx, opResults) -> {
switch (KeeperException.Code.get(rc)) {
case OK:
System.out.println("事务执行成功");
case CONNECTIONLOSS:
System.out.println("连接丢失");
break;
case NONODE:
System.out.println("NoNode Error!");
break;
default:
System.out.println("Error when trying to delete node: " + KeeperException.create(KeeperException.Code.get(rc), path));
}
};

Transaction t = zooKeeper.transaction();
t.delete("/t1", -1)
.delete("/t2", -1)
.delete("/t3", -1);

t.commit(callback, null);
TimeUnit.SECONDS.sleep(1);
}

服务上下线

在分布式系统中,一个服务的节点可以有多台(比如Dubbo),可以动态上下线,任意一台客户端都能实时感知到服务节点的上下线

服务端实现

  • 首先建立一个服务的永久节点
  • 当节点上线则在服务节点下创建一个临时节点,节点的值存储服务器连接信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j
public class ProviderTest {
// 集群地址
private static final String ROOT = "/server";
private String connectString = "127.0.0.1:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;

@SneakyThrows
@BeforeEach
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {

}
});

// 创建 /server 节点
Stat exists = zkClient.exists(ROOT, false);
if (exists == null) {
try {
zkClient.create(ROOT, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (Exception e) {
}
}

// 启动注册服务
this.register("provider", "127.0.0.1", 8080);
}


/**
* 注册服务
* @param serverName
* @param ip
* @param port
*/
@SneakyThrows
private void register(String serverName, String ip, int port) {
String path = String.format("%s/%s", ROOT, serverName);
String value = String.format("%s:%s", ip, port);

String create = zkClient.create(path, value.getBytes(StandardCharsets.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);

log.info("server {} is online", serverName);
}

@Test
public void test() throws InterruptedException {
log.info("启动服务...");
TimeUnit.SECONDS.sleep(1000);
}
}

服务端实现

  • 获取服务节点下的所有临时节点,并保存到本地服务列表
  • 监听服务节点子节点变化,当服务节点的子节点发生变化时刷新本地服务列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Slf4j
public class ConsumerTest {
// 集群地址
private static final String ROOT = "/server";
private String connectString = "127.0.0.1:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
private List<String> serverList = null;
private AtomicLong sequence = new AtomicLong(0);

@BeforeEach
public void init() throws IOException {
zkClient = new ZooKeeper(connectString, 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
log.info("path: {}", event.getPath());

// 服务变更再次获取服务列表
getServerList();
}
});

// 服务启动获取服务列表
getServerList();
}


/**
* 获取远程服务
*/
@SneakyThrows
private void getServerList() {
// 1 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zkClient.getChildren(ROOT, true);

// 2 存储服务器信息列表
serverList = new CopyOnWriteArrayList<>(children);

log.info("serverList: {}", serverList);
}


/**
* 获取服务
* @return
*/
private String getServer() {
if (CollectionUtils.isEmpty(serverList)) {
throw new RuntimeException("没有可用的服务");
}

int len = serverList.size();
return serverList.get((int) (sequence.getAndIncrement() % len));
}


@Test
public void test() throws InterruptedException {
for (;;) {
try {
TimeUnit.SECONDS.sleep(10);
log.info("执行业务");
String server = getServer();
log.info("跨服务调用:调用服务:{}", server);
log.info("服务调用结束...");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

分布式锁实现

ZooKeeper 利用其临时顺序节点实现分布式锁,这种机制的优势在于其高可靠性:一旦持锁的服务意外宕机,ZooKeeper会自动删除那些与会话关联的临时节点,因此锁会被迅速释放。相比之下,基于Redis的分布式锁通常依赖于键的过期时间(TTL)来释放锁,如果服务在锁未释放时宕机,则需要等待TTL到期,这可能导致锁被持有超过必要的时间。

然而,ZooKeeper并不总是适用于高并发场景下的分布式锁。因为ZooKeeper的写操作需要经过集群中唯一的领导者(Leader)进行处理和复制到其他跟随者(Follower)节点,这限制了其在高写负载条件下的性能。如果写请求过于频繁,可能会成为系统性能的瓶颈。

分布式锁实现

  1. 创建根节点:在ZooKeeper中创建一个持久的根节点/locks,作为所有分布式锁的父节点。
  2. 请求锁:当一个客户端希望获得锁时,它在/locks节点下创建一个临时顺序节点,例如/locks/lock_
  3. 节点排序:客户端获取/locks下所有子节点,并按照节点编号排序。
  4. 检查排名:客户端检查自己创建的临时顺序节点在所有子节点中的排序位置:
    • 如果是最小的节点,那么客户端获得锁。
    • 如果不是,客户端找到比自己小的最近的一个节点,并在这个节点上设置监听。
  5. 锁等待:如果没有获得锁,客户端等待监听的节点变更(例如被删除),在变更发生时重新进行排名判断。
  6. 锁的释放:一旦客户端完成其业务逻辑,它会删除自己的临时顺序节点,从而释放锁。
  7. 监听触发:其他客户端的监听器会在它们监听的节点被删除时收到通知,然后这些客户端将重复步骤4和5来尝试获取锁。

原生 Zookeeper 实现

1
2
3
4
5
6
7
8
/**
* 分布式锁接口
*/
public interface DistributedLock {
void lock();

void unLock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@Slf4j
public class ZookeeperLock implements DistributedLock {
// 根节点
private static final String ROOT = "/lock";

// 使用ThreadLocal绑定当前线程与其所创建的节点信息
private ThreadLocal<String> nodeInfo = new ThreadLocal<>();

// 连接地址
private String connectString = "127.0.0.1:2181";

// 过期事件
private int sessionTimeout = 2000;

// 当前线程锁的路径
private String lockPath;

// zk客户端独享
private ZooKeeper zkClient;

private ZookeeperLock(String lockName) {
this.lockPath = String.format("%s/%s", ROOT, lockName);

try {
// 用于同步等待zk客户端连接服务端
CountDownLatch connectedSignal = new CountDownLatch(1);
zkClient = new ZooKeeper(connectString, sessionTimeout, (Watcher) watchedEvent -> {
// 连接事件
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
});

// 确保连接建立
connectedSignal.await();

// 是否需要创建根节点
Stat stat = zkClient.exists(ROOT, false);

if (stat == null) {
// 创建持久节点
zkClient.create(ROOT, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void lock() {
try {
// 创建临时顺序节点
String nodeName = zkClient.create(lockPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);

// 取出所有子节点
List<String> subNodeList = zkClient.getChildren(ROOT, false);

TreeSet<String> subNodeSet = new TreeSet<>();

subNodeList.forEach(node -> subNodeSet.add(String.format("%s/%s", ROOT, node)));

// 获取最小节点
String smallNode = subNodeSet.first();

// 获取当前节点的上一个节点
String preNode = subNodeSet.lower(nodeName);

// 如果当前线程节点为最小节点,则表示获取锁
if (nodeName.equals(smallNode)) {
nodeInfo.set(nodeName);
return;
}

CountDownLatch waitLockLatch = new CountDownLatch(1);

// 注册当前节点的上个节点删除事件监听
Stat stat = zkClient.exists(preNode, watchedEvent -> {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
waitLockLatch.countDown();
}
});

// 判断比当前节点小的节点是否存在,不存在则获得锁
if (stat != null) {
// 阻塞主线程,等待上一个节点删除后唤醒线程
waitLockLatch.await();
nodeInfo.set(nodeName);
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void unLock() {
String nodeName = nodeInfo.get();

try {
// 判断当前线程是否已上锁(在等待锁时不能解锁)
if (nodeName != null) {
zkClient.delete(nodeName, -1);
nodeInfo.remove();
}
} catch (Exception e) {
e.printStackTrace();
}
}


static int NUM = 0;
public static void main(String[] args) throws InterruptedException {
Executor executor = Executors.newFixedThreadPool(100);

ZookeeperLock zookeeperLock = new ZookeeperLock("count");
CountDownLatch latch = new CountDownLatch(1000);

for (int i = 0; i < 1000; i++) {
executor.execute(() -> {
try {
zookeeperLock.lock();
NUM++;
} finally {
zookeeperLock.unLock();
latch.countDown();
}
});
}

latch.await();
log.info("NUM = {}", NUM);
}
}

Curator 框架实现

Curator使用手册

Curator是Netflix公司开源的一套zookeeper客户端框架,Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能,比如Leader选举、分布式锁等,减少了技术人员在使用Zookeeper时的底层细节开发工作

使用原生API存在的问题

  • 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
  • Watch 需要重复注册,不然就不能生效
  • 开发的复杂性比较高
  • 不支持多节点删除和创建。需要自己去递归

Curator主要实现了下面四种锁

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

分布式可重入排他锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
@SpringBootTest
public class InterProcessMutexTest {
@Autowired
private CuratorFramework curatorFramework;

@Test
@SneakyThrows
public void test() {
CountDownLatch countDownLatch = new CountDownLatch(100);
ExecutorService executorService = Executors.newFixedThreadPool(100);

String lockNode = "/lock";
InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockNode);

// 多线程竞争锁
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
try {
// 获取锁
lock.acquire();
log.info("线程:{} 获得分布式锁", Thread.currentThread().getName());
TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
try {
lock.release();
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
countDownLatch.await();
}
}

分布式排他锁

用法和 InterProcessMutex 一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
@SpringBootTest
public class InterProcessSemaphoreMutexTest {
@Autowired
private CuratorFramework curatorFramework;

@Test
@SneakyThrows
public void test() {
CountDownLatch countDownLatch = new CountDownLatch(100);
ExecutorService executorService = Executors.newFixedThreadPool(100);

String lockNode = "/lock";
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(curatorFramework, lockNode);

// 多线程竞争锁
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
try {
// 获取锁
lock.acquire();
log.info("线程:{} 获得分布式锁", Thread.currentThread().getName());
// TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放锁
try {
lock.release();
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
countDownLatch.await();
}
}

分布式读写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Slf4j
@SpringBootTest
public class InterProcessReadWriteLockTest {
@Autowired
private CuratorFramework curatorFramework;

@Test
@SneakyThrows
public void test() {
CountDownLatch countDownLatch = new CountDownLatch(22);
ExecutorService executorService = Executors.newFixedThreadPool(30);

String lockNode = "/lock";
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(curatorFramework, lockNode);
InterProcessMutex readLock = lock.readLock();
InterProcessMutex writeLock = lock.writeLock();

for (int i = 0; i < 10; i++) {
executorService.execute(() -> tryLock(readLock, "获取读锁", countDownLatch));
}

for (int i = 0; i < 2; i++) {
executorService.execute(() -> tryLock(writeLock, "获取写锁", countDownLatch));
}

for (int i = 0; i < 10; i++) {
executorService.execute(() -> tryLock(readLock, "获取读锁", countDownLatch));
}

countDownLatch.await();
}

public void tryLock(InterProcessMutex lock, String msg, CountDownLatch countDownLatch) {
try {
lock.acquire();
log.info("线程:{} 获取{}", Thread.currentThread().getName(), msg);
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
log.error(e.getMessage());
} finally {
try {
lock.release();
} catch (Exception e) {
}
}
}
}

分布式锁容器

curator实现了一个类似容器的锁InterProcessMultiLock,它可以把多个锁包含起来像一个锁一样进行操作,简单来说就是对多个锁进行一组操作。当acquire的时候就获得多个锁资源,否则失败。当release时候释放所有锁资源,不过如果其中一把锁释放失败将会被忽略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Slf4j
@SpringBootTest
public class InterProcessMultiLockTest {
@Autowired
private CuratorFramework curatorFramework;

@Test
@SneakyThrows
public void test() {
CountDownLatch countDownLatch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(5);

String lockNode1 = "/lock1";
String lockNode2 = "/lock2";
InterProcessMutex lock1 = new InterProcessMutex(curatorFramework, lockNode1);
InterProcessSemaphoreMutex lock2 = new InterProcessSemaphoreMutex(curatorFramework, lockNode2);

// 锁容器,合并多个锁
InterProcessMultiLock multiLock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
multiLock.acquire();
log.info("线程:{} 获取锁", Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
multiLock.release();
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}

countDownLatch.await();
}
}

底层原理

持久化机制

ZooKeeper将所有的数据存储在一个内存中的数据结构称为 Data Tree 中。这允许快速的读取操作,因为所有的数据都在内存中。但为了持久化和恢复,ZooKeeper使用了以下机制来保存数据到磁盘。

事务日志

每当ZooKeeper的数据发生变化时(例如,创建、删除或更新znodes),这些变化都会作为一个事务写入到事务日志中。事务日志是ZooKeeper保持数据一致性的关键,允许它在崩溃后重新构建数据状态。

数据快照

为了减少磁盘I/O,ZooKeeper周期性地将整个Data Tree的状态保存到磁盘上的快照文件中。快照包含了在特定时间点上所有znodes的数据和状态信息。

zk通过两种形式的持久化,在恢复时先全量恢复快照文件中的数据到内存中,再用日志文件中的数据做增量恢复,这样的恢复速度更快。

读流程

集群中任意节点都是可读的

遵循 BASE 理论

  • 请求发送:客户端可以向集群中的任何服务器( LeaderFollowerObserver )发送读取请求。
  • 本地读取:接收请求的ZooKeeper服务器将直接在本地数据副本上执行读取操作。因为ZooKeeper集群中的数据是一致的,没有分片,每个服务器都包含完整的数据集。
  • 最终一致性:尽管每个服务器都保存有全量数据,但由于Zab协议的事务处理方式,集群可能存在一个时间窗口的数据不一致,但经过一段时间数据同步或,保证最终一致性。

Zab协议

ZooKeeper 是一个为分布式应用提供协调服务的关键系统,通常部署多节点集群以确保高可用性。集群中的节点可以扮演领导者(Leader)、跟随者(Follower)或观察者(Observer)的角色。领导者处理所有的写请求,确保数据的一致性,而跟随者和观察者则参与读请求和状态的复制。

ZooKeeper 通过实现 ZAB(ZooKeeper Atomic Broadcast)协议来维护集群数据的一致性。ZAB 协议是特别为ZooKeeper设计的,以确保即使在领导者更换或系统崩溃后也能够保持数据的一致性。ZAB协议负责:

  • 消息广播:ZAB 保证所有的写操作,如创建、更新或删除数据节点等事务性请求,都按照全局一致的顺序被应用到每一个服务器上,从而保持整个集群的一致性。

  • 状态恢复:当一个新的服务器节点加入集群,或者一个已有的服务器节点在崩溃后重新加入集群时,ZAB 保证这个节点能够与当前领导者同步,获得最新的系统状态。

  • Zab 协议简介

  • 原子广播

  • 崩溃恢复

为什么集群节点推荐奇数

  1. 最大化容错能力

    奇数节点集群能够在不增加冗余节点的同时,最大化容错能力。例如,有3个节点的集群可以容忍1个节点失败。一个有4个节点的集群也只能容忍1个节点失败。

  2. 避免脑裂

    脑裂是指集群在网络分区发生时分成两个独立的部分,每部分都可能独自做出决策。奇数节点集群在分区发生时,不可能有两个等大的部分,这样就总有一个部分能够维持多数派,从而保持集群的正常运作。

  3. 决策效率

    在进行领导者选举或其他需要多数派同意的操作时,奇数节点集群能更快地达成决策,因为不会出现票数相等的情况,从而避免了决策的延迟。

ZK中的NIO

ZK追求的模型

遵循 BASE理论

ZooKeeper选择优先保证一致性(C)和分区容错性(P),在网络分区发生时,它会牺牲一部分可用性以保持一致性。这里的一致性是指线性一致性或顺序一致性,意味着系统保证一旦更新操作完成,所有后续的读操作都将返回该更新的值,形成一个全局一致的操作顺序。


zookeeper
https://wugengfeng.cn/2022/08/18/zookeeper/
作者
wugengfeng
发布于
2022年8月18日
许可协议