zookeeper的介绍以及集群环境搭建

zookeeper是一个分布式协调服务的开源框架。主要用来解决分布式集群中应用系统的一致性问题,例如避免同时操作同一数据造成脏读的问题。

Posted by caosw on August 29, 2019

zookeeper的介绍以及集群环境搭建


1、zookeeper概述

zookeeper是一个分布式协调服务的开源框架。主要用来解决分布式集群中应用系统的一致性问题,例如避免同时操作同一数据造成脏读的问题。

zookeeper本质上是一个分布式的小文件存储系统。提供类似于文件系统的目录树方式的数据存储,并可以对树中的节点进行有效管理。从而用来维护和监控存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。

2、zookeeper架构图

Leader:
zookeeper集群工作的核心,事务请求(写操作)的唯一调度和处理者,保证集群事务处理的顺序性,集群内部各个服务器的调度者。
对于create,setData,delete等有写操作的请求,则需要统一装发给leader处理,leader需要决定编号、执行操作,这个过程称为一个事务。

Follower:
处理客户端非事务(读操作)请求,转发事务请求给Leader,参与集群Leader选举投票。

Observer:
观察者角色,观察zookeeper集群的最新状态变化并将这些状态同步过来,其对于非事务请求可以进行独立处理,对于事务请求,则会转发给Leader服务器进行处理。
不会参与任何形式的投票只提供非事务服务。实际上就是增加并发的读请求。

3、zookeeper的特性

全局数据一致:每个server保存一份相同的数据副本,client无论连接到那个server,展示的数据都是一致的。
可靠性:如果消息被其中一台服务器接收,那么将被所有的服务器接受。
顺序性:包括全局有序和偏序两种:全局有序是指如果在一台服务器上消息 a 在消息 b 前发布,则在所有 Server 上消息 a 都将在消息 b 前被发布;偏序是指如果一个消息 b 在消息 a 后被同一个发送者发布, a 必将排在 b 前面。
数据更新原子性:一次数据更新要么成功,要么失败,不存在中间状态。
实时性:zookeeper客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。

4、三台机器zookeeper集群环境搭建

4.1、环境说明
centos7.6
zookeeper-3.4.9
jdk1.8.0_65
4.2、配置主机名称

三台机器均需要配置

[root@dn1 /root]# vi /etc/hosts
192.168.206.202 dn1
192.168.206.203 dn2
192.168.206.204 dn3
4.3、配置jdk环境

三台机器均需要配置

[root@dn1 /opt]# tar zxvf jdk-8u65-linux-x64.tar.gz
[root@dn1 /opt]# ln -s /opt/jdk1.8.0_65/ /usr/local/java
[root@dn1 /opt]# vi /etc/profile
export JAVA_HOME=/usr/local/java
export PATH=$PATH:$JAVA_HOME/bin
[root@dn1 /opt]# source /etc/profile
4.4、安装zookeeper

创建软连接,配置zookeeper的环境变量,三台机器均需要配置

[root@dn1 /opt]# tar zxvf zookeeper-3.4.9.tar.gz
[root@dn1 /opt]#ln -s /opt/zookeeper-3.4.9 /usr/local/zookeeper
[root@dn1 /opt]# vi /etc/profile
export ZK_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZK_HOME/bin
[root@dn1 /opt]# source /etc/profile
4.5、修改配置文件

首先需要创建zoo.cfg,创建zookeeper数据目录zkdatas,再对zoo.cfg文件进行修改,三台机器均要配置

[root@dn1 /opt]# cd $ZK_HOME/conf
[root@dn1 /usr/local/zookeeper/conf]# cp zoo_sample.cfg zoo.cfg
[root@dn1 /usr/local/zookeeper/conf]# mkdir $ZK_HOME/zkdatas
dataDir=/usr/local/zookeeper/zkdatas
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=dn1:2888:3888
server.2=dn2:2888:3888
server.3=dn3:2888:3888
4.6、添加myid配置文件

在$ZK_HOME/zkdatas路径下创建myid文件,第一台机器内容为1,第二台为2,第三台为3

[root@dn1 /usr/local/zookeeper/zkdatas]# vi myid
1
4.7、三台机器启动zookeeper服务
[root@dn1 /root]# zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@dn1 /root]# zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
4.8、通过jps查看启动的Java进程

会启动一个QuorumPeerMain的进程

[root@dn1 /root]# jps
6870 Jps
6347 QuorumPeerMain

5、zookeeper的shell操作

5.1、客户端连接

运行zkCli.sh进入命令行工具输入help,输出zh shell提示

[root@dn1 /root]# zkCli.sh
[zk: localhost:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
    stat path [watch]
    set path data [version]
    ls path [watch]
    delquota [-n|-b] path
    ls2 path [watch]
    setAcl path acl
    setquota -n|-b val path
    history
    redo cmdno
    printwatches on|off
    delete path [version]
    sync path
    listquota path
    rmr path
    get path [watch]
    create [-s] [-e] path data acl
    addauth scheme auth
    quit
    getAcl path
    close
    connect host:port
5.2、shell操作

创建节点
create [-s][-e] path data acl
其中,-s表示创建顺序节点,-e表示创建临时节点(临时节点表示只在当前会话中显示,退出当前会话即删除),若不指定,则表示创建永久节点,acl用来进行权限控制。

-- 创建顺序节点
[zk: localhost:2181(CONNECTED) 2] create -s /caosw helloworld
Created /caosw0000000002
[zk: localhost:2181(CONNECTED) 3] ls /
[test0000000000, zookeeper, test, caosw0000000002]
-- 创建临时节点
[zk: localhost:2181(CONNECTED) 5] ls /
[test0000000000, caosw_temp, zookeeper, test, caosw0000000002]
-- 创建永久节点
[zk: localhost:2181(CONNECTED) 6] create /caosw_2 helloworld_2
Created /caosw_2
[zk: localhost:2181(CONNECTED) 7] ls /
[caosw_2, test0000000000, caosw_temp, zookeeper, test, caosw0000000002]

读取节点
ls path
get path
与读取命令相关的ls命令和get命令,ls命令可以列出zookeeper指定节点下的所有子节点,只能查看指定节点下的第一级的所有子节点;get命令可以获取zookeeper指定节点的数据内容和属性信息

-- 列出根节点下的所有子节点
[zk: localhost:2181(CONNECTED) 8] ls /
[caosw_2, test0000000000, caosw_temp, zookeeper, test, caosw0000000002]
-- 查看节点caosw_2的数据内容和属性信息
[zk: localhost:2181(CONNECTED) 9] get /caosw_2
helloworld_2
cZxid = 0x20000000e
ctime = Sat Aug 31 00:00:37 CST 2019
mZxid = 0x20000000e
mtime = Sat Aug 31 00:00:37 CST 2019
pZxid = 0x20000000e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 0

更新节点
set path data [version]
data就是要更新的新内容,version表示数据版本(可选参数)

[zk: localhost:2181(CONNECTED) 11] set /caosw_2 helloworld_update 0
cZxid = 0x20000000e
ctime = Sat Aug 31 00:00:37 CST 2019
mZxid = 0x200000010
mtime = Sat Aug 31 00:08:12 CST 2019
pZxid = 0x20000000e
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 17
numChildren = 0

dataVersion已经变为1了,表示进行了更新。

删除节点
delete path [version]
若删除节点存在子节点,那么无法删除该节点,必须先删除子节点,再删除父节点

[zk: localhost:2181(CONNECTED) 14] delete /caosw_2
[zk: localhost:2181(CONNECTED) 15] ls /
[test0000000000, caosw_temp, zookeeper, test, caosw0000000002] `rmr path 可以实现递归删除`

6、zookeeper的数据模型

zookeeper的数据模型在结构上和标准文件系统类似,拥有一个层次的命名空间,采用树形层次结构,zookeeper树中的每个节点被称为一个znode。和文件系统的目录树一样,zookeeper树中的每个节点都可以拥有子节点。
不同之处:

  • znode兼具文件和目录两种特点。既像文件一样维护者数据、元信息、ACL、时间戳等数据结构,又像目录一样可以作为路径标识的一部分,并可以具有子znode。
  • znode具有原子性操作,读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。
  • znode存储数据大小有限制,zookeeper用来管理调度数据,比如分布式应用中的配置文件信息、状态信息、汇集位置等等。这些数据的共同特性就是它们都是很小的数据,通常以KB为大小单位。zookeeper的服务器和客户端都被设计为严格检查并限制每个znode的数据大小至多1M。
  • znode通过路径引用。路径必须是绝对的,因此必须由斜杠字符开头。

znode有两种类型:临时节点和永久节点

  • 临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话结束,临时节点将被自动删除,当然也可以手动删除,临时节点不允许拥有子节点。
  • 永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,它们才会被删除。

znode还有序列化的特性。因此znode分为4种:

  • PERSISTENT:永久节点
  • EPHEMERAL:临时节点
  • PERSISTENT_SEQUENTIAL:永久序列化节点
  • EPHEMERAL_SEQUENTIAL:临时序列化节点

7、zookeeper的watch机制

zookeeper提供了分布式数据发布和订阅的功能,zookeeper中引入了watch机制来实现这种分布式的通知功能。
zookeeper允许客户端向服务端注册一个watch监听,当服务端的一些事件触发了这个watch,那么就会向指定客户端发送一个事件通知来实现分布式的通知功能。
触发事件种类很多,如:节点创建、节点删除、节点改变、子节点改变等。
watch分为三个过程:客户端向服务端注册watch、服务端事件触发watch、客户端回调watch得到触发事件情况

7.1、watch机制特点

一次性触发
事件触发监听,一个watch event就会被发送到设置监听的客户端,这种效果是一次性的,后续再次发生同样的事件,不会再次触发。
事件封装
zookeeper使用watchedEvent对象来封装服务端事件并传递。watchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState)、事件类型(EventType)、节点路径(path)。
event异步发送
watch的通知事件从服务端发送到客户端是异步的。
先注册再触发
zookeeper中的watch机制,必须客户端先从服务端注册监听,这样事件发送才会被触发监听,通知给客户端。

7.2、通知状态和事件类型

| KeeperState | EventType | 触发条件 | 说明 | | —- | —- | —- | —- | —- | | | None(-1) | 客户端与服务端成功建立连接 | | | SyncConnected(0) | NodeCreated(1) | Watcher 监听的对应数据节点被创建 | | | | NodeDeleted(2) | Watcher 监听的对应数据节点被删除 | 此时客户端和服务器处于连接状态 | | | NodeDataChanged(3) | Watcher 监听的对应数据节点的数据内容 发生变更 | | | | NodeChildChanged(4) | Wather 监听的对应节点的子节点数据列表发生变更 | | | Disconnected(0) | None(-1) | 客户端与zookeeper服务器断开连接 | 此时客户端和服务器处于断开连接状态 | | Expired(-112) | None(-1) | 会话超时 | 此时客户端会话失效,通常同时也会收到SessionExpiredException 异常 | | AuthFailed(4) | None(-1) | 通常有两种情况
1:使用错误的schema 进行权限检查
2:SASL 权限检查失败 | 通常同时也会收到AuthFailedException 异常 |

7.3、shell客户端设置watch机制

设置节点数据变动监听

[zk: localhost:2181(CONNECTED) 17] get /test watch
hello
cZxid = 0x200000004
ctime = Fri Aug 30 17:11:15 CST 2019
mZxid = 0x200000004
mtime = Fri Aug 30 17:11:15 CST 2019
pZxid = 0x200000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

通过另一个客户端更改节点数据

[zk: localhost:2181(CONNECTED) 1] set /test hello_test

此时设置监听的节点收到通知

[zk: localhost:2181(CONNECTED) 18]
WATCHER::


WatchedEvent state:SyncConnected type:NodeDataChanged path:/test

8、Java操作zookeeper

8.1、创建maven工程,导入jar包
<dependencies>
     <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
    </dependency>
    <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
    </dependency>
    <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
    </dependency>
</dependencies>
8.2、节点测试操作

创建永久节点

/**
 * 创建永久节点
 * @throws Exception
 */
@Test
public void createNode() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    // 获取客户端对象
    CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.206.202:2181,192.168.206.203:2181,192.168.206.204:2181", 1000, 1000, retryPolicy);
    // 调用start开启客户端操作
    client.start();
    // 通过create来进行创建节点,并且需要指定节点类型
    client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/java");
    // 关闭客户端
    client.close();
}

在服务端进行通过zkCli.sh进行查看

[zk: localhost:2181(CONNECTED) 0] ls /
[test0000000000, java, zookeeper, test, caosw0000000002]

创建临时节点

/**
 * 创建永久节点
 * @throws Exception
 */
@Test
public void createNode2() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.206.202:2181,192.168.206.203:2181,192.168.206.204:2181", 1000, 1000, retryPolicy);
    client.start();
    client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/java_temp");
    Thread.sleep(5000);
    client.close();
}

修改节点信息

/**
 * 修改数据节点信息
 * @throws Exception
 */
@Test
public void nodeData() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.206.202:2181,192.168.206.203:2181,192.168.206.204:2181", 1000, 1000, retryPolicy);
    client.start();
    client.setData().forPath("/java", "caosw".getBytes());
    client.close();
}

在服务端进行通过zkCli.sh进行查看

[zk: localhost:2181(CONNECTED) 6] get /java
caosw
cZxid = 0x200000016
ctime = Sat Aug 31 01:00:22 CST 2019
mZxid = 0x200000024
mtime = Sat Aug 31 01:08:15 CST 2019
pZxid = 0x200000016
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

节点数据查询

/**
 * 节点数据查询
 * @throws Exception
 */
@Test
public void findnodeData() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.206.202:2181,192.168.206.203:2181,192.168.206.204:2181", 1000, 1000, retryPolicy);
    client.start();
    byte[] data = client.getData().forPath("/java");
    System.out.println(new String(data));
    client.close();
}

节点watch机制

/**
 * zookeeper的watch机制
 * @throws Exception
 */
@Test
public void watchNode() throws Exception {
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.206.202:2181,192.168.206.203:2181,192.168.206.204:2181", 1000, 1000, retryPolicy);
    client.start();
    // 设置节点的cache
    TreeCache treeCache = new TreeCache(client, "/java");
    treeCache.getListenable().addListener(new TreeCacheListener() {
                
            @Override
            public void childEvent(CuratorFramework client,  TreeCacheEvent event) throws Exception {
                ChildData data = event.getData();
                if(data != null) {
                    switch(event.getType()) {
                    case NODE_ADDED:
                        System.out.println("NODE_ADDED : "+  data.getPath() +"  数据:"+ new String(data.getData()));
                        break;
                    case NODE_REMOVED:
                        System.out.println("NODE_REMOVED : "+  data.getPath() +"  数据:"+ new String(data.getData()));  
                        break;
                    case NODE_UPDATED:  
                        System.out.println("NODE_UPDATED : "+ data.getPath() +"   数据:"+ new String(data.getData()));  
                        break;  
                    default:
                        break;
                    }
                } else {
                    System.out.println( "data is null : "+  event.getType());  
                }
            }
         });
    // 开始监听
    treeCache.start();
    Thread.sleep(50000000);
    client.close();
}