入门
概述
ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册。
ZooKeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。
特点
-
ZooKeeper 是由 1个 leader,多个 follower 组成的集群
-
集群中只要有 半数以上 节点存活,ZooKeeper 就能正常服务
-
全局数据一致,每个 Server 上保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的
-
更新请求顺序执行,来自同一个 Client 的更新请求按期发送顺序依次执行
-
数据更新具有 原子性,一次更新要么成功,要么失败
-
实时性,在一定时间范围内,Client 能读到最新数据
数据结构
ZooKeeper 数据模型的结构与 Unix文件系统 很类似,整体上可以看作一棵树,每个节点称作一个 ZNode,每个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
应用场景
ZooKeeper 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别,例如:IP不好记,但是域名很好记
统一配置管理
分布式环境下,配置文件同步非常常见
- 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群
- 对配置文件修改后,希望能够快速同步到各个节点上
配置管理可交由 ZooKeeper 来实现
- 将配置信息写入 ZooKeeper 的一个 ZNode
- 各个客户端服务器监听这个 Znode
- 一旦 Znode 数据修改,ZooKeeper 将通知各个客户端服务器
统一集群管理
分布式环境中,实时掌握每个节点的状态是表要的,可根据节点实时状态做一些调整,ZooKeeper 可实现实时监控节点状态变化:
- 可将节点信息写入 ZooKeeper 的一个 ZNode
- 监听这个 ZNode 可获取它的实时状态变化
服务器动态上下线
软负载均衡
在 ZooKeeper 中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。
下载安装
ZooKeeper 集群有多种搭建方式,比如 多台虚拟机,本机搭建,docker搭建 等等,这里使用 docker-compose 来搭建 ZooKeeper 3.7.0 集群。
目录及配置
目录树如下:
➜ zookeeper tree
.
├── docker-compose.yaml
├── zoo1
│ ├── data
│ │ ├── myid
│ │ └── version-2
│ │ ├── acceptedEpoch
│ │ ├── currentEpoch
│ │ ├── snapshot.0
│ │ └── snapshot.100000000
│ ├── datalog
│ │ └── version-2
│ └── logs
│ └── zookeeper_audit.log
├── zoo2
│ ├── data
│ │ ├── myid
│ │ └── version-2
│ │ ├── acceptedEpoch
│ │ ├── currentEpoch
│ │ └── snapshot.0
│ ├── datalog
│ │ └── version-2
│ └── logs
│ └── zookeeper_audit.log
└── zoo3
├── data
│ ├── myid
│ └── version-2
│ ├── acceptedEpoch
│ ├── currentEpoch
│ └── snapshot.0
├── datalog
│ └── version-2
└── logs
└── zookeeper_audit.log
docker-compose.yaml 的配置文件如下:
version: '3.8'
services:
zoo1:
image: zookeeper:3.7.0
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- ./zoo1/data:/data
- ./zoo1/datalog:/datalog
- ./zoo1/logs:/logs
zoo2:
image: zookeeper:3.7.0
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
volumes:
- ./zoo2/data:/data
- ./zoo2/datalog:/datalog
- ./zoo2/logs:/logs
zoo3:
image: zookeeper:3.7.0
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
volumes:
- ./zoo3/data:/data
- ./zoo3/datalog:/datalog
- ./zoo3/logs:/logs
可执行文件
这里已经将每个节点的 /data, /datalog, /logs 三个目录全部映射到宿主机上,随便进入一个节点容器,查看 /apache-zookeeper-3.7.0-bin/bin 下的可执行文件:
root@zoo1:/apache-zookeeper-3.7.0-bin/bin# ls -l
total 72
-rwxr-xr-x 1 zookeeper zookeeper 232 Mar 17 09:45 README.txt
-rwxr-xr-x 1 zookeeper zookeeper 2066 Mar 17 09:45 zkCleanup.sh
-rwxr-xr-x 1 zookeeper zookeeper 1158 Mar 17 09:45 zkCli.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1620 Mar 17 09:45 zkCli.sh
-rwxr-xr-x 1 zookeeper zookeeper 1843 Mar 17 09:45 zkEnv.cmd
-rwxr-xr-x 1 zookeeper zookeeper 3690 Mar 17 09:45 zkEnv.sh
-rwxr-xr-x 1 zookeeper zookeeper 4559 Mar 17 09:45 zkServer-initialize.sh
-rwxr-xr-x 1 zookeeper zookeeper 1286 Mar 17 09:45 zkServer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 11561 Mar 17 09:45 zkServer.sh
-rwxr-xr-x 1 zookeeper zookeeper 988 Mar 17 09:45 zkSnapShotToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1377 Mar 17 09:45 zkSnapShotToolkit.sh
-rwxr-xr-x 1 zookeeper zookeeper 987 Mar 17 09:45 zkSnapshotComparer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1374 Mar 17 09:45 zkSnapshotComparer.sh
-rwxr-xr-x 1 zookeeper zookeeper 996 Mar 17 09:45 zkTxnLogToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1385 Mar 17 09:45 zkTxnLogToolkit.sh
配置参数
先进入节点容器,打印配置信息:
root@zoo1:/conf# cat /conf/zoo.cfg
dataDir=/data
dataLogDir=/datalog
tickTime=2000
initLimit=5
syncLimit=2
autopurge.snapRetainCount=3
autopurge.purgeInterval=0
maxClientCnxns=60
standaloneEnabled=true
admin.enableServer=true
server.1=0.0.0.0:2888:3888;2181
server.2=zoo2:2888:3888;2181
server.3=zoo3:2888:3888;2181
-
tickTime=2000:通信心跳数
ZooKeeper 服务端之间以及服务端与客户端心跳时间,单位为毫秒,它是 ZooKeeper 使用的基本时间,ZooKeeper session 的超时时间为该数值的两倍,即 2*tickTime
对应 docker-compose 的配置:
ZOO_TICK_TIME
,默认 2000 毫秒 -
initLimit=5:LF 初始通信时限
集群中的 Follower 与 Leader 之间初始连接时能容忍的最长时间
对应 docker-compose 的配置:
ZOO_INIT_LIMIT
,默认 5*tickTime -
syncLimit=2:LF 同步通信时限
Leader 与 Follower 之间的最大响应时限,如果超时,Leader 会 认为该 Follower 已经挂掉,会从服务端列表踢除该 Follower
对应 docker-compose 的配置:
ZOO_SYNC_LIMIT
,默认 2*tickTime -
maxClientCnxns=60
单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制。请注意这个限制的使用范围,仅仅是单台客户端机器与单台ZK服务器之间的连接数限制,不是针对指定客户端IP,也不是ZK集群的连接数限制,也不是单台ZK对所有客户端的连接数限制。
对应 docker-compose 的配置:
ZOO_MAX_CLIENT_CNXNS
,默认 60 -
server.1=0.0.0.0:2888:3888;2181
1: 节点id
0.0.0.0: 节点IP
2888: 节点与Leader通信的端口
3888: 选举用的端口
2181: 客户端访问端口
内部原理
选举机制
半数机制:集群中半数以上机器存活,则集群可用,所以 ZooKeeper 适合安装奇数台服务器。
ZooKeeper 虽然没有在配置文件中指定 master 和 slave,但是 ZooKeeper 工作时只会有一个 Leader,其余都是 Follower,Leader 是通过内部选举机制推选出来的。
假设有五台 ZooKeeper 服务器组成了集群,id分别是 1-5,都是新启动且没有历史数据,每台机器的数据量都一致,按照 id 顺序启动,如下图所示:
选举过程如下:
-
服务器1启动,此时只有它一台机器在运行,它发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。
-
服务器2启动,它与服务器1通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id值较大的服务器2 胜出,但是由于没有超过半数以上,选举无法完成,服务器1和服务器2的选举状态保持在 LOOKING。
-
服务器3启动,发起一次选举,此时服务器1和服务器2都会选举3,服务器3的票数为2,超过一半,当选为 Leader,服务器1和2 的状态更改为 FOLLOWING,服务器3的状态更改为 LEADING。
-
服务器4启动,发起一次选举,由于 1,2,3 的状态都不是 LOOKING,不会更改选票信息,交换选票结果为服务器3获得3票继续当选 Leader,服务器4的状态更改为 FOLLOWING。
-
服务器5启动,选举流程和4一致。
节点类型
持久(Persistent)
客户端和服务端断开连接后,创建的节点不会删除
持久类节点分为两类:
-
持久化目录节点
-
持久化顺序编号目录节点
客户端与 ZooKeeper 断开连接后,该节点依旧存在,只是ZooKeeper 会给该节点进行顺序编号
创建 ZNode 时设置顺序标识,ZNode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以根据顺序号推断事件的顺序。
临时(Ephemeral)
客户端和服务端断开连接后,创建的节点自动删除。
临时节点不能拥有子节点
临时类节点分为两类:
- 临时目录节点
- 临时顺序编号目录节wqeqwewq
监听器原理
- 首先客户端开启一个 main 线程,在 main 线程中创建 ZooKeeper 客户端
- main 线程会创建两个线程,一个负责网络连接通信(connect),另一个负责监听(listener)
- 通过 connect 线程将注册的监听事件发送给 ZooKeeper
- 通过 ZooKeeper 的注册监听列表将注册的监听事件添加到列表中
- ZooKeeper 监听到有数据或路径变化,就会将消息发送给客户端的 listener 线程
- 客户端 listener 线程内部处理监听事件
常见的监听有:
- config [-c] [-w] [-s]
- get [-s] [-w] path
- ls [-s] [-w] [-R] path
- stat [-w] path
写数据流程
- client 向 server 发送写请求,这个 server 不一定是 leader
- 如果 server 不是 leader,那么会把请求转发给 leader,leader 再把请求转发给所有 follower,各个 follower 写成功后会通知 leader
- 当 leader 收到大多数 server 写成功了,那么就说明数据写成功了,写成功后,leader 会告诉接收客户端请求的 server 数据写成功了
- 收到请求的 server 会将写入成功的消息再告诉对应的 client
客户端操作
命令基本语法 | 功能描述 |
---|---|
help | 显示所有操作命令,**实际使用会报 Command not found help,不过也会提示所有的命令 ** |
ls [-s] [-w] [-R] path | 使用 ls 命令查看当前 znode 中的内容 |
create [-s] [-e] [-c] [-t ttl] path [data] [acl] | 创建,-s:含有序列 -e:临时(重启或者超时会消失) |
get [-s] [-w] path | 获得节点的值 |
set [-s] [-v version] path data | 设置节点的具体值 |
stat [-w] path | 查看节点状态 |
delete [-v version] path | 删除节点 |
deleteall path [-b batch size] | 递归删除节点 |
启动客户端
进入容器内部执行 /apache-zookeeper-3.7.0-bin/bin/zkCli.sh
➜ ~ docker exec -it 894493ffc262 bash
root@zoo1:/apache-zookeeper-3.7.0-bin# cd bin/
root@zoo1:/apache-zookeeper-3.7.0-bin/bin# ls -l
total 72
-rwxr-xr-x 1 zookeeper zookeeper 232 Mar 17 09:45 README.txt
-rwxr-xr-x 1 zookeeper zookeeper 2066 Mar 17 09:45 zkCleanup.sh
-rwxr-xr-x 1 zookeeper zookeeper 1158 Mar 17 09:45 zkCli.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1620 Mar 17 09:45 zkCli.sh
-rwxr-xr-x 1 zookeeper zookeeper 1843 Mar 17 09:45 zkEnv.cmd
-rwxr-xr-x 1 zookeeper zookeeper 3690 Mar 17 09:45 zkEnv.sh
-rwxr-xr-x 1 zookeeper zookeeper 4559 Mar 17 09:45 zkServer-initialize.sh
-rwxr-xr-x 1 zookeeper zookeeper 1286 Mar 17 09:45 zkServer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 11561 Mar 17 09:45 zkServer.sh
-rwxr-xr-x 1 zookeeper zookeeper 988 Mar 17 09:45 zkSnapShotToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1377 Mar 17 09:45 zkSnapShotToolkit.sh
-rwxr-xr-x 1 zookeeper zookeeper 987 Mar 17 09:45 zkSnapshotComparer.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1374 Mar 17 09:45 zkSnapshotComparer.sh
-rwxr-xr-x 1 zookeeper zookeeper 996 Mar 17 09:45 zkTxnLogToolkit.cmd
-rwxr-xr-x 1 zookeeper zookeeper 1385 Mar 17 09:45 zkTxnLogToolkit.sh
root@zoo1:/apache-zookeeper-3.7.0-bin/bin# zkCli.sh
2021-05-30 16:41:18,597 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.max=256MB
2021-05-30 16:41:18,597 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.total=64MB
2021-05-30 16:41:18,601 [myid:] - INFO [main:ZooKeeper@637] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@7946e1f4
2021-05-30 16:41:18,604 [myid:] - INFO [main:X509Util@77] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2021-05-30 16:41:18,609 [myid:] - INFO [main:ClientCnxnSocket@239] - jute.maxbuffer value is 1048575 Bytes
2021-05-30 16:41:18,616 [myid:] - INFO [main:ClientCnxn@1726] - zookeeper.request.timeout value is 0. feature enabled=false
Welcome to ZooKeeper!
2021-05-30 16:41:18,638 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1171] - Opening socket connection to server localhost/127.0.0.1:2181.
2021-05-30 16:41:18,640 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1173] - SASL config status: Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2021-05-30 16:41:18,652 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1005] - Socket connection established, initiating session, client: /127.0.0.1:42454, server: localhost/127.0.0.1:2181
2021-05-30 16:41:18,666 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1438] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x100000b1e710001, negotiated timeout = 30000
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]
help
实际使用会报 Command not found help,不过也会提示所有的命令
[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port -client-configuration properties-file cmd args
addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE
addauth scheme auth
close
config [-c] [-w] [-s]
connect host:port
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
delete [-v version] path
deleteall path [-b batch size]
delquota [-n|-b|-N|-B] path
get [-s] [-w] path
getAcl [-s] path
getAllChildrenNumber path
getEphemerals path
history
listquota path
ls [-s] [-w] [-R] path
printwatches on|off
quit
reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
redo cmdno
removewatches path [-c|-d|-a] [-l]
set [-s] [-v version] path data
setAcl [-s] [-v version] [-R] path acl
setquota -n|-b|-N|-B val path
stat [-w] path
sync path
version
whoami
Command not found: Command not found help
ls
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 4] ls -s /
[zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
create
[zk: localhost:2181(CONNECTED) 5] create /wcxst
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/
Created /wcxst
[zk: localhost:2181(CONNECTED) 7] create /test "this is test"
Created /test
[zk: localhost:2181(CONNECTED) 8] get /test
this is test
[zk: localhost:2181(CONNECTED) 9] create -s /serial
Created /serial0000000002
[zk: localhost:2181(CONNECTED) 10] create -e /temp
Created /temp
[zk: localhost:2181(CONNECTED) 15] create /wcxst/blog "this is my blog"
Created /wcxst/blog
[zk: localhost:2181(CONNECTED) 15] create /wcxst/blog "this is my blog"
Created /wcxst/blog
[zk: localhost:2181(CONNECTED) 16] ls /wcxst
[blog]
[zk: localhost:2181(CONNECTED) 17] ls /wcxst/
Path must not end with / character
[zk: localhost:2181(CONNECTED) 18] get /wcxst/blog
this is my blog
get
[zk: localhost:2181(CONNECTED) 18] get /wcxst/blog
this is my blog
[zk: localhost:2181(CONNECTED) 19] get -s /wcxst/blog
this is my blog
cZxid = 0x10000000c
ctime = Sun May 30 16:54:30 UTC 2021
mZxid = 0x10000000c
mtime = Sun May 30 16:54:30 UTC 2021
pZxid = 0x10000000c
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 15
numChildren = 0
set
[zk: localhost:2181(CONNECTED) 20] set /wcxst/blog "change blog"
[zk: localhost:2181(CONNECTED) 21] get /wcxst/blog
change blog
stat
[zk: localhost:2181(CONNECTED) 22] stat /wcxst
cZxid = 0x100000008
ctime = Sun May 30 16:50:15 UTC 2021
mZxid = 0x100000008
mtime = Sun May 30 16:50:15 UTC 2021
pZxid = 0x10000000c
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 23] stat /wcxst/blog
cZxid = 0x10000000c
ctime = Sun May 30 16:54:30 UTC 2021
mZxid = 0x10000000d
mtime = Sun May 30 16:56:47 UTC 2021
pZxid = 0x10000000c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 11
numChildren = 0
字段 | 说明 |
---|---|
cZxid | 创建znode的zxid |
ctime | 创建znode的时间,单位毫秒 |
mZxid | 最近一次修改znode的zxid(创建、删除、set直系子节点、set自身节点都会计数) |
mtime | 最近一次修改znode的时间,单位毫秒 |
pZxid | 最近一次修改子节点的zxid(创建、删除直系子节点都会计数,set子节点不会计数) |
cversion | 修改子节点的次数(创建、删除直系子节点都会计数,set子节点不会计数) |
dataVersion | 表示对该znode的数据所做的更改次数 |
aclVersion | 表示对此znode的ACL进行更改的次数 |
ephemeralOwner | 如果znode是ephemeral类型节点,则这是znode所有者的 session ID。 如果znode不是ephemeral节点,则该字段设置为零 |
dataLength | znode数据字段的长度 |
numChildren | 直系子节点的数量(不会递归计算孙节点) |
delete
[zk: localhost:2181(CONNECTED) 24] create /wcxst/uid "this is my uid"
Created /wcxst/uid
[zk: localhost:2181(CONNECTED) 25] ls /wcxst
[blog, uid]
[zk: localhost:2181(CONNECTED) 26] delete /wcxst/uid
[zk: localhost:2181(CONNECTED) 27] ls /wcxst
[blog]
[zk: localhost:2181(CONNECTED) 28] delete /wcxst
Node not empty: /wcxst
deleteall
[zk: localhost:2181(CONNECTED) 27] ls /wcxst
[blog]
[zk: localhost:2181(CONNECTED) 28] delete /wcxst
Node not empty: /wcxst
[zk: localhost:2181(CONNECTED) 29] deleteall /wcxst
[zk: localhost:2181(CONNECTED) 31] ls /
[serial0000000002, temp, test, zookeeper]
[zk: localhost:2181(CONNECTED) 33] ls /wcxst
Node does not exist: /wcxst
Api 使用(Golang)
使用的库: github.com/samuel/go-zookeeper
文档:https://pkg.go.dev/github.com/samuel/go-zookeeper/zk#pkg-overview
常规功能
测试了以下功能:
- 连接 ZooKeeper
- 全局监听
- 查看子节点列表
- 判断节点是否存在以及监听事件
- 创建四种节点
- 获取节点详情
- 删除节点
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/samuel/go-zookeeper/zk"
)
const (
ZKFlagPersistent = 0 // 创建ZNode时的Flag:永久存储
ZKFlagEphemeral = 1 // 创建ZNode时的Flag:临时存储
ZKFlagPersistentSequence = 2 // 创建ZNode时的Flag:永久存储+序列编号
ZKFlagEphemeralSequence = 3 // 创建ZNode时的Flag:临时存储+序列编号
)
func main() {
// 连接 ZooKeeper 集群
hosts := []string{"127.0.0.1:2181", "127.0.0.1:2182", "127.0.0.1:2183"}
option := zk.WithEventCallback(
func(event zk.Event) {
b, _ := json.Marshal(event)
fmt.Printf("【all event】%s\n", b)
},
)
conn, _, err := zk.Connect(hosts, time.Second*5, option)
if err != nil {
fmt.Printf("【connect zookeeper server error】 %v\n", err)
return
}
defer conn.Close()
parentPath := "/golang"
persistentPath := fmt.Sprintf("%s/persistent", parentPath)
ephemeralPath := fmt.Sprintf("%s/ephemeral", parentPath)
persistentSequencePath := fmt.Sprintf("%s/persistent_sequence", parentPath)
ephemeralSequencePath := fmt.Sprintf("%s/ephemeral_sequence", parentPath)
// 查看根节点下子节点
nodes, stat, err := conn.Children("/")
fmt.Printf("【root children】 %+v, %+v, %v\n", nodes, stat, err)
acl := zk.WorldACL(zk.PermAll)
// 添加golang节点
_, _ = Create(conn, parentPath, "this is a golang znode", ZKFlagPersistent, acl)
// 添加永久存储节点
_, _ = Create(conn, persistentPath, "this is a persistent znode", ZKFlagPersistent, acl)
// 添加临时存储节点
_, _ = Create(conn, ephemeralPath, "this is a ephemeral znode", ZKFlagEphemeral, acl)
// 添加永久存储节点-序列化编号
_, _ = Create(
conn, persistentSequencePath, "this is a persistent sequence znode", ZKFlagPersistentSequence, acl,
)
// 添加临时存储节点-序列化编号
_, _ = Create(
conn, ephemeralSequencePath, "this is a ephemeral sequence znode", ZKFlagEphemeralSequence, acl,
)
// 查看golang节点下子节点
nodes, stat, err = conn.Children(parentPath)
fmt.Printf("【golang children】 %+v, %+v, %v\n", nodes, stat, err)
// 判断golang是否存在
res, stat, err := conn.Exists(parentPath)
fmt.Printf("【golang Exists】 %+v, %+v, %v\n", res, stat, err)
// 查看节点【/golang/persistent】详情
b, stat, err := conn.Get(persistentPath)
fmt.Printf("【get path [%s] info result】 %s, %+v, %v\n", persistentPath, b, stat, err)
// 设置节点【/golang/persistent】数据
stat, err = conn.Set(persistentPath, []byte("change data"), stat.Version)
fmt.Printf("【change path [%s] data result】 %+v, %v\n", persistentPath, stat, err)
// 查看节点【/golang/persistent】详情
b, stat, err = conn.Get(persistentPath)
fmt.Printf("【get path [%s] info result】 %s, %+v, %v\n", persistentPath, b, stat, err)
// 删除结点【/golang/persistent】
err = conn.Delete(persistentPath, stat.Version)
fmt.Printf("【delete path [%s] result】 %v\n", persistentPath, err)
// 查看golang节点下子节点
nodes, stat, err = conn.Children(parentPath)
fmt.Printf("【golang children】 %+v, %+v, %v\n", nodes, stat, err)
// 查看节点【/golang】详情
b, stat, err = conn.Get(parentPath)
fmt.Printf("【get path [%s] info result】 %s, %+v, %v\n", parentPath, b, stat, err)
// 删除golang节点及其子节点
err = conn.Delete("/golang", stat.Version)
fmt.Printf("【delete path [%s] result】 %v\n", parentPath, err)
// 查看根节点下子节点
nodes, stat, err = conn.Children("/")
fmt.Printf("【root children】 %+v, %+v, %v\n", nodes, stat, err)
// exists watch
existsPath := fmt.Sprintf("%s/exists", parentPath)
res, stat, cha, err := conn.ExistsW(existsPath)
go func(<-chan zk.Event) {
for v := range cha {
b, _ := json.Marshal(v)
fmt.Printf("【exists watch event】%s\n", b)
}
}(cha)
_, _ = Create(conn, existsPath, "this is exists data", ZKFlagPersistent, acl)
}
// ZooKeeper 连接
type ZKConn struct {
*zk.Conn
}
// 创建
func Create(conn *zk.Conn, path string, data string, flag int32, acl []zk.ACL) (string, error) {
res, err := conn.Create(path, []byte(data), flag, acl)
if err != nil {
fmt.Printf("【create znode [%s] error】 %v\n", path, err)
return "", err
}
fmt.Printf("【create znode [%s] success】 %v\n", path, res)
return res, err
}
执行结果:
【all event】{"Type":-1,"State":1,"Path":"","Err":null,"Server":"127.0.0.1:2181"}
【all event】{"Type":-1,"State":100,"Path":"","Err":null,"Server":"127.0.0.1:2181"}
2021/05/31 13:25:17 Connected to 127.0.0.1:2181
【all event】{"Type":-1,"State":101,"Path":"","Err":null,"Server":"127.0.0.1:2181"}
2021/05/31 13:25:17 authenticated: id=72057641793290252, timeout=5000
2021/05/31 13:25:17 re-submitting `0` credentials after reconnect
【root children】 [zookeeper], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:25 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:1 Pzxid:4294967385}, <nil>
【create znode [/golang] success】 /golang
【create znode [/golang/persistent] success】 /golang/persistent
【create znode [/golang/ephemeral] success】 /golang/ephemeral
【create znode [/golang/persistent_sequence] success】 /golang/persistent_sequence0000000002
【create znode [/golang/ephemeral_sequence] success】 /golang/ephemeral_sequence0000000003
【golang children】 [ephemeral persistent_sequence0000000002 persistent ephemeral_sequence0000000003], &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:4 Aversion:0 EemeralOwner:0 DataLength:22 NumChildren:4 Pzxid:4294967391}, <nil>
【golang Exists】 true, &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:4 Aversion:0 EphemeralOwner:0 DataLength:22 NumChildren:4 Pzxid:4294967391}, <nil>
【get path [/golang/persistent] info result】 this is a persistent znode, &{Czxid:4294967388 Mzxid:4294967388 Ctime:1622438717126 Mtime:1622438717126 Version:0 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:26umChildren:0 Pzxid:4294967388}, <nil>
【change path [/golang/persistent] data result】 &{Czxid:4294967388 Mzxid:4294967392 Ctime:1622438717126 Mtime:1622438717211 Version:1 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:11 NumChildren:0 Pzxid:42947388}, <nil>
【get path [/golang/persistent] info result】 change data, &{Czxid:4294967388 Mzxid:4294967392 Ctime:1622438717126 Mtime:1622438717211 Version:1 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:11 NumChildren:0 xid:4294967388}, <nil>
【delete path [/golang/persistent] result】 <nil>
【golang children】 [ephemeral persistent_sequence0000000002 ephemeral_sequence0000000003], &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:5 Aversion:0 EphemeralOwn:0 DataLength:22 NumChildren:3 Pzxid:4294967393}, <nil>
【get path [/golang] info result】 this is a golang znode, &{Czxid:4294967387 Mzxid:4294967387 Ctime:1622438717117 Mtime:1622438717117 Version:0 Cversion:5 Aversion:0 EphemeralOwner:0 DataLength:22 NumChildren:3 xid:4294967393}, <nil>
【delete path [/golang] result】 zk: node has children
【root children】 [zookeeper golang], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:26 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:2 Pzxid:4294967387}, <nil>
【all event】{"Type":1,"State":3,"Path":"/golang/exists","Err":null,"Server":""}
【create znode [/golang/exists] success】 /golang/exists
【exists watch event】{"Type":1,"State":3,"Path":"/golang/exists","Err":null,"Server":""}
2021/05/31 13:25:17 recv loop terminated: err=EOF
2021/05/31 13:25:17 send loop terminated: err=<nil>
模拟监听服务器动态上下线
这个是模拟业务服务端动态上下线,客户端来做动态的监听响应,这里的服务端与客户端,本质上都是 ZooKeeper 的客户端。
-
ZooKeeper 集群:三台机器,访问地址分别为 127.0.0.1:2181、127.0.0.1:2182、127.0.0.1:2183
-
业务服务端:负责维护 ZooKeeper 该业务特定的 ZNode 数据,创建的节点数据为临时序列类型,存储的数据为服务的IP
三个服务端的IP:172.10.01.11、172.10.01.12、172.10.01.13
-
客户服务端:启动时会连接 ZooKeeper, 之后获取服务端列表信息,并启动监听,每次事件通知过来时都会重新获取服务端信息
三个客户端的IP:172.10.02.11、172.10.02.12、172.10.02.13
模拟流程
-
检测业务服务端的根结点是否存在
启动业务服务端节点之前,先检测业务服务的根结点是否存在,不存在需要创建业务服务的根结点,需要注意,根结点需要是永久存储类型,因为临时节点不能拥有子节点,并且这里的业务服务端根结点也不能有序列号
-
使用 goroutine 启动业务服务端
启动时设置 host 为 172.10.02.11 的服务端不是持续运行,而是在运行 10 秒后自动停止,用于模拟所有客户端启动成功后,某一台服务宕机
-
休息3秒,保证业务服务端都启动成功
-
使用 goroutine 启动业务客户端
-
业务服务端 172.10.01.11 启动 10 秒后会停止运行,此时可以观察客户端是否监听到并正确处理监听事件
-
休息20秒,保证业务客户端都正确处理了监听事件后,再启动 服务端 172.10.01.11,模拟服务端上线
代码结构
➜ distribute tree
.
├── client
│ └── client.go
├── common
│ └── common.go
├── main.go
└── server
└── server.go
总进程代码:
package main
import (
"time"
"github.com/samuel/go-zookeeper/zk"
"gitlab.wcxst.com/jormin/gohelper"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/client"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/common"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/server"
)
func main() {
// 模拟监听服务器动态上下线
// ZooKeeper 集群:三台机器,访问地址分别为 127.0.0.1:2181、127.0.0.1:2182、127.0.0.1:2183
// 服务端与客户端:本质上都是 ZooKeeper 的客户端
// 三个以业务服务端身份运行,负责维护 ZooKeeper 该业务特定的 ZNode 数据,创建的节点数据为临时序列类型,存储的数据为服务的IP
// 三个以业务客户端身份运行,负责获取并监听 ZooKeeper 该业务特定的 ZNode 数据
// 客户端启动时会连接 ZooKeeper, 之后获取服务端列表信息,并启动监听,每次事件通知过来时都会重新获取服务端信息
// 假定三个模拟服务的IP分别是 172.10.01.11、172.10.01.12、172.10.01.13
// 假定三个模拟客户端的IP分别是 172.10.02.11、172.10.02.12、172.10.02.13
// 启动业务服务端节点之前,先检测业务服务的根结点是否存在,不存在需要创建业务服务的根结点,需要注意,根结点需要是永久存储类型,因为临时节点不能拥有子节点,并且不能有序列号
conn, _, err := common.GetConn()
gohelper.Must(err)
exists, _, err := conn.Exists(common.ServerRootPath)
gohelper.Must(err)
if !exists {
_, err = conn.Create(
common.ServerRootPath, []byte("server root znode"), common.ZKFlagPersistent, zk.WorldACL(zk.PermAll),
)
gohelper.Must(err)
}
// 使用 goroutine 启动业务服务端
serverHosts := []string{"172.10.01.11", "172.10.01.12", "172.10.01.13"}
for _, host := range serverHosts {
// 服务端 172.10.01.11 第一次启动时设置不持续在线,用于模拟服务器宕机
alwaysOnline := true
if host == "172.10.01.11" {
alwaysOnline = false
}
go server.Run(host, alwaysOnline)
}
// 休息3秒,保证业务服务端都启动成功
time.Sleep(time.Second * 3)
// 使用 goroutine 启动业务客户端
clientHosts := []string{"172.10.02.11", "172.10.02.12", "172.10.02.13"}
for _, host := range clientHosts {
go client.Run(host)
}
// 业务服务端 172.10.01.11 启动 10 秒后会停止运行,此时可以观察客户端是否监听到并正确处理监听事件
// 休息20秒,保证业务客户端都正确处理了监听事件后,再启动 服务端 172.10.01.11,模拟服务端上线
time.Sleep(time.Second * 20)
go server.Run("172.10.01.11", true)
// 保持进程一直运行
select {}
}
公共库代码:
package common
import (
"time"
"github.com/samuel/go-zookeeper/zk"
)
const (
ZKFlagPersistent = 0 // 创建ZNode时的Flag:永久存储
ZKFlagEphemeral = 1 // 创建ZNode时的Flag:临时存储
ZKFlagPersistentSequence = 2 // 创建ZNode时的Flag:永久存储+序列编号
ZKFlagEphemeralSequence = 3 // 创建ZNode时的Flag:临时存储+序列编号
)
const ServerRootPath = "/server" // 业务服务根节点
// 获取连接
func GetConn() (*zk.Conn, <-chan zk.Event, error) {
// 连接 ZooKeeper 集群
hosts := []string{"127.0.0.1:2181", "127.0.0.1:2182", "127.0.0.1:2183"}
return zk.Connect(hosts, time.Second*5)
}
服务端代码:
package server
import (
"fmt"
"time"
"github.com/samuel/go-zookeeper/zk"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/common"
)
var conn *zk.Conn
// 运行
func Run(host string, alwaysOnline bool) {
var err error
// 连接 ZooKeeper
conn, _, err = common.GetConn()
if err != nil {
fmt.Printf("【server %s】get zookeeper connection error: %v", host, err)
return
}
defer conn.Close()
// 注册服务
_, _ = RegisterServer(host)
// 根据参数判断进程服务一直在线
if alwaysOnline {
// 保持进程在线,模拟业务运行
select {}
} else {
// 不是一直在线,则运行 10 秒后自动停止,用于模拟所有客户端启动成功后,某一台服务宕机
time.Sleep(time.Second * 10)
fmt.Printf("【server %s】server off line\n", host)
}
}
// 注册服务,需要注意,该类型服务创建的节点是临时类型,这样当服务宕机的时候会自动删除该服务节点信息
func RegisterServer(host string) (string, error) {
res, err := conn.Create(
fmt.Sprintf("%s/server", common.ServerRootPath), []byte(host), common.ZKFlagEphemeralSequence,
zk.WorldACL(zk.PermAll),
)
if err == nil {
fmt.Printf("【server %s】register server success, znode path: %v\n", host, res)
} else {
fmt.Printf("【server %s】register server error: %v\n", host, err)
}
return res, err
}
客户端代码:
package client
import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"gitlab.wcxst.com/jormin/study/zookeeper/distribute/common"
)
var conn *zk.Conn
// 运行
func Run(host string) {
var err error
// 连接 ZooKeeper
conn, _, err = common.GetConn()
if err != nil {
fmt.Printf("【client %s】get zookeeper connection error: %v", host, err)
return
}
defer conn.Close()
// 获取服务列表
_, _ = GetServerList(host)
// 保持进程在线,模拟业务运行
select {}
}
// 获取服务列表并注册监听
func GetServerList(host string) ([]string, error) {
res, _, event, err := conn.ChildrenW(common.ServerRootPath)
if err != nil {
fmt.Printf("【client %s】get server list error: %v\n", host, err)
return nil, err
}
var servers []string
for _, s := range res {
b, _, _ := conn.Get(fmt.Sprintf("%s/%s", common.ServerRootPath, s))
servers = append(servers, fmt.Sprintf("%s", b))
}
fmt.Printf("【client %s】get server list success: %v\n", host, servers)
// 监听
go func(<-chan zk.Event) {
for _ = range event {
fmt.Printf("【client %s】server changed!\n", host)
_, _ = GetServerList(host)
}
}(event)
return servers, nil
}
测试结果
2021/05/31 15:56:46 Connected to 127.0.0.1:2181
2021/05/31 15:56:46 authenticated: id=72057641793290322, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
2021/05/31 15:56:46 Connected to 127.0.0.1:2182
2021/05/31 15:56:46 Connected to 127.0.0.1:2182
2021/05/31 15:56:46 Connected to 127.0.0.1:2183
2021/05/31 15:56:46 authenticated: id=216172829808263224, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
2021/05/31 15:56:46 authenticated: id=144115235768434755, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
2021/05/31 15:56:46 authenticated: id=144115235768434756, timeout=5000
2021/05/31 15:56:46 re-submitting `0` credentials after reconnect
【server 172.10.01.13】register server success, znode path: /server/server0000000091
【server 172.10.01.11】register server success, znode path: /server/server0000000092
【server 172.10.01.12】register server success, znode path: /server/server0000000093
2021/05/31 15:56:49 Connected to 127.0.0.1:2183
2021/05/31 15:56:49 Connected to 127.0.0.1:2181
2021/05/31 15:56:49 Connected to 127.0.0.1:2181
2021/05/31 15:56:49 authenticated: id=216172829808263225, timeout=5000
2021/05/31 15:56:49 re-submitting `0` credentials after reconnect
2021/05/31 15:56:49 authenticated: id=72057641793290323, timeout=5000
2021/05/31 15:56:49 re-submitting `0` credentials after reconnect
2021/05/31 15:56:49 authenticated: id=72057641793290324, timeout=5000
2021/05/31 15:56:49 re-submitting `0` credentials after reconnect
【client 172.10.02.11】get server list success: [172.10.01.12 172.10.01.11 172.10.01.13]
【client 172.10.02.13】get server list success: [172.10.01.12 172.10.01.11 172.10.01.13]
【client 172.10.02.12】get server list success: [172.10.01.12 172.10.01.11 172.10.01.13]
【server 172.10.01.11】server off line
【client 172.10.02.11】server changed!
【client 172.10.02.13】server changed!
【client 172.10.02.12】server changed!
2021/05/31 15:56:56 recv loop terminated: err=EOF
2021/05/31 15:56:56 send loop terminated: err=<nil>
【client 172.10.02.13】get server list success: [172.10.01.12 172.10.01.13]
【client 172.10.02.11】get server list success: [172.10.01.12 172.10.01.13]
【client 172.10.02.12】get server list success: [172.10.01.12 172.10.01.13]
2021/05/31 15:57:09 Connected to 127.0.0.1:2181
2021/05/31 15:57:09 authenticated: id=72057641793290325, timeout=5000
2021/05/31 15:57:09 re-submitting `0` credentials after reconnect
【server 172.10.01.11】register server success, znode path: /server/server0000000094
【client 172.10.02.11】server changed!
【client 172.10.02.12】server changed!
【client 172.10.02.13】server changed!
【client 172.10.02.13】get server list success: [172.10.01.11 172.10.01.12 172.10.01.13]
【client 172.10.02.11】get server list success: [172.10.01.11 172.10.01.12 172.10.01.13]
【client 172.10.02.12】get server list success: [172.10.01.11 172.10.01.12 172.10.01.13]