分布式协调服务--zookeeper

分布式环境的特点

分布性

组成分布式系统的各个软硬件都可以分布在空间的任何地方

并发性

多个节点访问同一个节点服务

无序性

进程之间的消息通信,会因为网络或者硬件等等问题,出现顺序不一致问题

分布式环境下面临的问题

网络通信

网络本身的不可靠性,因此会涉及到一些网络通信问题

网络分区(脑裂)

当网络发生异常导致分布式系统中部分节点之间的网络延时不断增大,最终导致组成分布式架构的所有节点,只有部分节点能够正常通信

三态

在分布式架构里面,除了成功、失败、超时

分布式事务

ACID(原子性、一致性、隔离性、持久性)

中心化和去中心化

中心化

所有服务都依赖于某一个服务,一旦这个服务宕机了,整个系统就会都瘫痪了

解决方式:冷备和热备

  • 热备:有一台备用的,正常情况下不工作,但是出于启动状态,一旦主的挂掉了,备用的会自动切换上去
  • 冷备:热备的不启动版,平常不懂动,需要他的时候启动它

CAP / BASE理论

CAP

CAP理论仅适用于原子读写的Nosql场景,不适用于数据库系统

  • C: 一致性(Consistency): 所有节点上的数据,时刻保持一致
  • A: 可用性(Availability):每个请求都能够收到一个响应,无论响应成功或者失败
  • P: 分区容错 (Partition-tolerance):表示系统出现脑裂以后,可能导致某些server与集群中的其他机器失去联系。这个问题是一定会存在的,因为网络问题是无法避免的。
    只能选AP / CP

BASE

基于CAP理论,CAP理论并不适用于数据库事务(因为更新一些错误的数据而导致数据出现紊乱,无论什么样的数据库高可用方案都是
徒劳) ,虽然XA事务可以保证数据库在分布式系统下的ACID特性,但是会带来性能方面的影响;

eBay尝试了一种完全不同的套路,放宽了对事务ACID的要求。提出了BASE理论:

  • Basically available:数据库采用分片模式, 把100W的用户数据分布在5个实例上。如果破坏了其中一个实例,仍然可以保证80%的用户可用
  • soft-state:在基于client-server模式的系统中,server端是否有状态,决定了系统是否具备良好的水平扩展、负载均衡、故障恢复等特性。Server端承诺会维护client端状态数据,这个状态仅仅维持一小段时间, 这段时间以后,server端就会丢弃这个状态,恢复正常状态
  • Eventually consistent:数据的最终一致性

zookeeper

分布式架构里面,很多的架构思想采用的是:当集群发生故障的时候,集群中的人群会自动“选举”出一个新的领导。
最典型的是: zookeeper / etcd
zookeeper是一个开源的分布式协调服务,是由雅虎创建的,基于google chubby。
是分布式数据一致性的解决方案

能够实现哪些功能

  • 数据的发布/订阅(配置中心:disconf)
  • 负载均衡(dubbo利用了zookeeper机制实现负载均衡)
  • 命名服务
  • master选举(kafka、hadoop、hbase)、分布式队列
  • 分布式锁

特性

  • 顺序一致性:从同一个客户端发起的事务请求,最终会严格按照顺序被应用到zookeeper中
  • 原子性:所有的事务请求的处理结果在整个集群中的所有机器上的应用情况是一致的,也就是说,要么整个集群中的所有机器都成功应用了某一事务、要么全都不应用
  • 可靠性:一旦服务器成功应用了某一个事务数据,并且对客户端做了响应,那么这个数据在整个集群中一定是同步并且保留下来的
  • 实时性:一旦一个事务被成功应用,客户端就能够立即从服务器端读取到事务变更后的最新数据状态;(zookeeper仅仅保证在一定时间内,近实时)

数据

内存数据和磁盘数据
zookeeper会定时把内存数据保存到磁盘上(可以在zoo.cfg中配置)
所以DataDir存储的是数据的快照

集群

三种角色: leader / follower / observer

  • leader: 接受所有follower的提案请求并同意协调发起提案的投票,负责与所有的follower进行内部的数据交换、同步
  1. 是zookeeper集群的核心
  2. 事务请求的唯一调度者和处理者,保证集群事务处理的顺序性
  3. 集群内部各个服务器的调度者
  • follower: 直接为客户端服务并参数提案的投票,同时与Leader进行数据交换、同步
  1. 处理客户端非事务请求,以及转发事务请求给leader服务器
  2. 参与事务请求提议(proposal)的投票(客户端的一个事务请求,需要半数服务器投票通过以后才能通知leader commit; leader会发起一个提案,要求follower投票)
  3. 参与leader选举的投票
  • observer: 直接为客户端服务但不参数提案的投票,不属于zookeeper的关键部位,同时也与Leader进行数据交换、同步,observer 是一种特殊的zookeeper节点。可以帮助解决zookeeper的扩展性(如果大量客户端访问我们zookeeper集群,需要增加zookeeper集群机器数量。从而增加zookeeper集群的性能。 导致zookeeper写性能下降, zookeeper的数据变更需要半数以上服务器投票通过。造成网络消耗增加投票成本)
  1. 观察zookeeper集群中最新状态的变化并将这些状态同步到observer服务器上
  2. 增加observer不影响集群中事务处理能力,同时还能提升集群的非事务处理能力

1
2
zoo.cfg 配置:server.id=host:port:port
例如:server.1 = 192.168.11.129:2888:3181
1
2
3
增加observer节点
zoo.cfg中 增加 ;peerType=observer
server.3 = 192.168.111.136:2888:3181:observer

id的取值范围: 1~255; 用id来标识该机器在集群中的机器序号
2888表示follower节点与leader节点交换信息的端口号
3181表示leader选举的端口,如果leader节点挂掉了, 需要一个端口来重新选举
在在每一个服务器的dataDir目录下创建一个myid的文件,文件就一行数据,数据内容是每台机器对应的server ID的数字

zoo.cfg配置文件分析

1
2
3
4
5
6
tickTime=2000  zookeeper中最小的时间单位长度 (ms)
initLimit=10 follower节点启动后与leader节点完成数据同步的时间
syncLimit=5 leader节点和follower节点进行心跳检测的最大延时时间
dataDir=/tmp/zookeeper 表示zookeeper服务器存储快照文件的目录
dataLogDir 表示配置 zookeeper事务日志的存储路径,默认指定在dataDir目录下
clientPort 表示客户端和服务端建立连接的端口号: 2181

zookeeper中的一些概念

数据模型

zookeeper的数据模型和文件系统类似,每一个节点称为:znode. 是zookeeper中的最小数据单元。每一个znode上都可以
保存数据和挂载子节点。 从而构成一个层次化的属性结构

节点特性

持久化节点: 节点创建后会一直存在zookeeper服务器上,直到主动删除
持久化有序节点:每个节点都会为它的一级子节点维护一个顺序
临时节点: 临时节点的生命周期和客户端的会话保持一致。当客户端会话失效,该节点自动清理
临时有序节点: 在临时节点上多勒一个顺序性特性

会话

客户端和服务端建立的连接

Watcher

zookeeper提供了分布式数据发布/订阅,zookeeper允许客户端向服务器注册一个watcher监听。当服务器端的节点触发指定事件的时候
会触发watcher。服务端会向客户端发送一个事件通知
watcher的通知是一次性,一旦触发一次通知后,该watcher就失效

ACL

zookeeper提供控制节点访问权限的功能,用于有效的保证zookeeper中数据的安全性。避免误操作而导致系统出现重大事故。
CREATE /READ/WRITE/DELETE/ADMIN

zookeeper的命令操作

  • create [-s] [-e] path data acl
    -s 表示节点是否有序
    -e 表示是否为临时节点
    默认情况下,是持久化节点

  • get path [watch]
    获得指定 path的信息

  • set path data [version]
    修改节点 path对应的data
    version是一个乐观锁版本号

  • delete path [version]
    删除节点
    version是一个乐观锁版本号

节点字段stat信息

cversion = 0 子节点的版本号, 当修改子节点数据的时候,会发生改变
aclVersion = 0 表示acl(access controller)的版本号,修改节点权限时, 会发生改变
dataVersion = 1 表示的是当前节点数据的版本号
czxid 节点被创建时的事务ID
mzxid 节点最后一次被更新的事务ID
pzxid 当前节点下的子节点最后一次被修改时的事务ID
ctime 节点创建时间
mtime 节点修改时间
ephemeralOwner = 0x0 创建临时节点的时候,会有一个sessionId 。 该值存储的就是这个sessionid
dataLength = 3 数据值长度
numChildren = 0 子节点数

java API的使用

原生

1
2
3
4
5
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>

连接状态

  • KeeperStat.Expired 在一定时间内客户端没有收到服务器的通知, 则认为当前的会话已经过期了。
  • KeeperStat.Disconnected 断开连接的状态
  • KeeperStat.SyncConnected 客户端和服务器端在某一个节点上建立连接,并且完成一次version、zxid同步
  • KeeperStat.authFailed 授权失败
    事件类型
  • NodeCreated 当节点被创建的时候,触发
  • NodeChildrenChanged 表示子节点被创建、被删除、子节点数据发生变化
  • NodeDataChanged 节点数据发生变化
  • NodeDeleted 节点被删除
  • None 客户端和服务器端连接状态发生变化的时候,事件类型就是None
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


public class ApiOperatorDemo implements Watcher{
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";
private static CountDownLatch countDownLatch=new CountDownLatch(1);
private static ZooKeeper zookeeper;
private static Stat stat=new Stat();
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zookeeper=new ZooKeeper(CONNECTSTRING, 5000, new ApiOperatorDemo());
countDownLatch.await();
ACL acl=new ACL(ZooDefs.Perms.ALL,new Id("ip","192.168.11.129"));
List<ACL> acls=new ArrayList<>();
acls.add(acl);
// zookeeper.create("/authTest","111".getBytes(),acls,CreateMode.PERSISTENT);
zookeeper.getData("/authTest",true,new Stat());
/* System.out.println(zookeeper.getState());

//创建节点
String result=zookeeper.create("/node1","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zookeeper.getData("/node1",new ZkClientApiOperatorDemo(),stat); //增加一个
System.out.println("创建成功:"+result);

//修改数据
zookeeper.setData("/node1","mic123".getBytes(),-1);
Thread.sleep(2000);
//修改数据
zookeeper.setData("/node1","mic234".getBytes(),-1);
Thread.sleep(2000);

*//* //删除节点
zookeeper.delete("/mic/mic1",-1);
Thread.sleep(2000);*//*

//创建节点和子节点
String path="/node11";

zookeeper.create(path,"123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
TimeUnit.SECONDS.sleep(1);

Stat stat=zookeeper.exists(path+"/node1",true);
if(stat==null){//表示节点不存在
zookeeper.create(path+"/node1","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
TimeUnit.SECONDS.sleep(1);
}
//修改子路径
zookeeper.setData(path+"/node1","mic123".getBytes(),-1);
TimeUnit.SECONDS.sleep(1);*/


//获取指定节点下的子节点
/* List<String> childrens=zookeeper.getChildren("/node",true);
System.out.println(childrens);*/


}

public void process(WatchedEvent watchedEvent) {
//如果当前的连接状态是连接成功的,那么通过计数器去控制
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
if(Event.EventType.None==watchedEvent.getType()&&null==watchedEvent.getPath()){
countDownLatch.countDown();
System.out.println(watchedEvent.getState()+"-->"+watchedEvent.getType());
}else if(watchedEvent.getType()== Event.EventType.NodeDataChanged){
try {
System.out.println("数据变更触发路径:"+watchedEvent.getPath()+"->改变后的值:"+
zookeeper.getData(watchedEvent.getPath(),true,stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){//子节点的数据变化会触发
try {
System.out.println("子节点数据变更路径:"+watchedEvent.getPath()+"->节点的值:"+
zookeeper.getData(watchedEvent.getPath(),true,stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else if(watchedEvent.getType()== Event.EventType.NodeCreated){//创建子节点的时候会触发
try {
System.out.println("节点创建路径:"+watchedEvent.getPath()+"->节点的值:"+
zookeeper.getData(watchedEvent.getPath(),true,stat));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}else if(watchedEvent.getType()== Event.EventType.NodeDeleted){//子节点删除会触发
System.out.println("节点删除路径:"+watchedEvent.getPath());
}
System.out.println(watchedEvent.getType());
}

}
}

权限控制模式

  • schema: 授权对象
  • ip: 192.168.1.1
  • Digest: username:password
  • world: 开放式的权限控制模式,数据节点的访问权限对所有用户开放。 world:anyone
  • super: 超级用户,可以对zookeeper上的数据节点进行操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.gupao.vip.michael.javaapi;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class AuthControlDemo implements Watcher{
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";
private static CountDownLatch countDownLatch=new CountDownLatch(1);
private static CountDownLatch countDownLatch2=new CountDownLatch(1);

private static ZooKeeper zookeeper;
private static Stat stat=new Stat();
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zookeeper=new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
countDownLatch.await();

ACL acl=new ACL(ZooDefs.Perms.CREATE, new Id("digest","root:root"));
ACL acl2=new ACL(ZooDefs.Perms.CREATE, new Id("ip","192.168.1.1"));

List<ACL> acls=new ArrayList<>();
acls.add(acl);
acls.add(acl2);
zookeeper.create("/auth1","123".getBytes(),acls,CreateMode.PERSISTENT);
zookeeper.addAuthInfo("digest","root:root".getBytes());
zookeeper.create("/auth1","123".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
zookeeper.create("/auth1/auth1-1","123".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL,CreateMode.EPHEMERAL);


ZooKeeper zooKeeper1=new ZooKeeper(CONNECTSTRING, 5000, new AuthControlDemo());
countDownLatch.await();
zooKeeper1.delete("/auth1",-1);

// acl (create /delete /admin /read/write)
//权限模式: ip/Digest(username:password)/world/super

}
public void process(WatchedEvent watchedEvent) {
//如果当前的连接状态是连接成功的,那么通过计数器去控制
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
if(Event.EventType.None==watchedEvent.getType()&&null==watchedEvent.getPath()){
countDownLatch.countDown();
System.out.println(watchedEvent.getState()+"-->"+watchedEvent.getType());
}
}

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateSessionDemo {
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";
private static CountDownLatch countDownLatch=new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException {
ZooKeeper zooKeeper=new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//如果当前的连接状态是连接成功的,那么通过计数器去控制
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
countDownLatch.countDown();
System.out.println(watchedEvent.getState());
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());
}
}

zkClient

1
2
3
4
5
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.I0Itec.zkclient.ZkClient;

public class SessionDemo {

private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";

public static void main(String[] args) {
ZkClient zkClient=new ZkClient(CONNECTSTRING,4000);

System.out.println(zkClient+" - > success");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ZkClientApiOperatorDemo {

private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";

private static ZkClient getInstance(){
return new ZkClient(CONNECTSTRING,10000);
}

public static void main(String[] args) throws InterruptedException {
ZkClient zkClient=getInstance();
//zkclient 提供递归创建父节点的功能
/* zkClient.createPersistent("/zkclient/zkclient1/zkclient1-1/zkclient1-1-1",true);
System.out.println("success");*/

//删除节点
// zkClient.deleteRecursive("/zkclient");


//获取子节点
List<String> list=zkClient.getChildren("/node");
System.out.println(list);

//watcher

zkClient.subscribeDataChanges("/node", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("节点名称:"+s+"->节点修改后的值"+o);
}

@Override
public void handleDataDeleted(String s) throws Exception {

}
});

zkClient.writeData("/node","node");
TimeUnit.SECONDS.sleep(2);

zkClient.subscribeChildChanges("/node", new IZkChildListener() {
@Override
public void handleChildChange(String s, List<String> list) throws Exception {

}
});

}
}

curator

Curator本身是Netflix公司开源的zookeeper客户端;
curator提供了各种应用场景的实现封装
curator-framework 提供了fluent风格api
curator-replice 提供了实现封装

curator连接的重试策略
ExponentialBackoffRetry() 衰减重试
RetryNTimes 指定最大重试次数
RetryOneTime 仅重试一次
RetryUnitilElapsed 一直重试知道规定的时间

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorClientUtils {

private static CuratorFramework curatorFramework;
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";


public static CuratorFramework getInstance(){
curatorFramework= CuratorFrameworkFactory.
newClient(CONNECTSTRING,5000,5000,
new ExponentialBackoffRetry(1000,3));
curatorFramework.start();
return curatorFramework;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorCreateSessionDemo {
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";
public static void main(String[] args) {
//创建会话的两种方式 normal
CuratorFramework curatorFramework= CuratorFrameworkFactory.
newClient(CONNECTSTRING,5000,5000,
new ExponentialBackoffRetry(1000,3));
curatorFramework.start(); //start方法启动连接

//fluent风格
CuratorFramework curatorFramework1=CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000,3)).
namespace("/curator").build();

curatorFramework1.start();
System.out.println("success");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.TimeUnit;

public class CuratorEventDemo {

/**
* 三种watcher来做节点的监听
* pathcache 监视一个路径下子节点的创建、删除、节点数据更新
* NodeCache 监视一个节点的创建、更新、删除
* TreeCache pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
* 缓存路径下的所有子节点的数据
*/

public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();

/**
* 节点变化NodeCache
*/
/* NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
cache.start(true);

cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
":"+new String(cache.getCurrentData().getData())));

curatorFramework.setData().forPath("/curator","菲菲".getBytes());*/


/**
* PatchChildrenCache
*/

PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT

cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
System.out.println("增加子节点");
break;
case CHILD_REMOVED:
System.out.println("删除子节点");
break;
case CHILD_UPDATED:
System.out.println("更新子节点");
break;
default:break;
}
});

curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("1");
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("2");

curatorFramework.setData().forPath("/event/event1","222".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("3");

curatorFramework.delete().forPath("/event/event1");
System.out.println("4");

System.in.read();

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CuratorOperatorDemo {

public static void main(String[] args) throws InterruptedException {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
System.out.println("连接成功.........");

//fluent风格

/**
* 创建节点
*/

/* try {
String result=curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).
forPath("/curator/curator1/curator11","123".getBytes());

System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}*/

/**
* 删除节点
*/
/*try {
//默认情况下,version为-1
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/node11");

} catch (Exception e) {
e.printStackTrace();
}*/

/**
* 查询
*/
/*Stat stat=new Stat();
try {
byte[] bytes=curatorFramework.getData().storingStatIn(stat).forPath("/curator");
System.out.println(new String(bytes)+"-->stat:"+stat);
} catch (Exception e) {
e.printStackTrace();
}*/

/**
* 更新
*/

/* try {
Stat stat=curatorFramework.setData().forPath("/curator","123".getBytes());
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
}*/


/**
* 异步操作
*/
/*ExecutorService service= Executors.newFixedThreadPool(1);
CountDownLatch countDownLatch=new CountDownLatch(1);
try {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(Thread.currentThread().getName()+"->resultCode:"+curatorEvent.getResultCode()+"->"
+curatorEvent.getType());
countDownLatch.countDown();
}
},service).forPath("/mic","123".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.await();
service.shutdown();*/

/**
* 事务操作(curator独有的)
*/

try {
Collection<CuratorTransactionResult> resultCollections=curatorFramework.inTransaction().create().forPath("/trans","111".getBytes()).and().
setData().forPath("/curator","111".getBytes()).and().commit();
for (CuratorTransactionResult result:resultCollections){
System.out.println(result.getForPath()+"->"+result.getType());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;

import java.io.Closeable;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable{

private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount=new AtomicInteger();

public ExampleClient(CuratorFramework client,String path,String name) {
this.name = name;
this.leaderSelector = new LeaderSelector(client,path,this);
leaderSelector.autoRequeue(); //自动抢
}

public void start(){
leaderSelector.start();
}

@Override
public void close() throws IOException {
leaderSelector.close();
}

@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds=new Random().nextInt(50);
System.out.println(name+"->我现在是leader,等待时间:"+waitSeconds+", 抢到领导的次数:"+leaderCount.getAndIncrement());

TimeUnit.SECONDS.toMillis(1000);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class TreeCacheDemo {
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";

private static final String MASTER_PATH="/curator_master_path1";

private static final int CLIENT_QTY=10; //客户端数量

public static void main(String[] args) throws Exception {
System.out.println("创建"+CLIENT_QTY+"个客户端,");
List<CuratorFramework> clients = Lists.newArrayList();
List<ExampleClient> examples = Lists.newArrayList();
try {
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECTSTRING,
new ExponentialBackoffRetry(1000, 3));
clients.add(client);
ExampleClient exampleClient = new ExampleClient(client, MASTER_PATH, "Client:" + i);
examples.add(exampleClient);
client.start();
exampleClient.start();
}
System.in.read();
}finally {
for ( ExampleClient exampleClient : examples ){
CloseableUtils.closeQuietly(exampleClient);
}
for ( CuratorFramework client : clients ){
CloseableUtils.closeQuietly(client);
}
}
}
}

zookeeper的实际应用场景

其实在 corator-reciples 中已经实现在大部分公布,无需手动去实现

订阅发布(配置中心)

基于watcher机制
实现配置中心有两种模式:push 、pull。
zookeeper采用的是推拉相结合的方式。 客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,那么服务器端就会向客户端
发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据

配置数据要求:

  1. 数据量比较小
  2. 数据内容在运行时会发生动态变更
  3. 集群中的各个机器共享配置

分布式锁

通常实现分布式锁有几种方式

  • redis
    setNX 存在则会返回0, 不存在
    getAndSet 方法,获取之前的值并赋新的值
  • 数据库
    创建一个表, 通过索引唯一的方式
    create table (id , methodname …) methodname增加唯一索引
    获得锁的进程在表中insert 一条数据XXX
    放弃锁的时候在删除这条数据delete 语句删除这条记录
    for update (mysql) 获得纪录的汗锁
  • zookeeper
    排它锁
    都创建临时节点,因为zookeeper不能有相同名字的节点,所以之后的节点都无法注册节点

共享锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// 原生api实现
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

public class DistributeLock {


private static final String ROOT_LOCKS="/LOCKS";//根节点

private ZooKeeper zooKeeper;

private int sessionTimeout; //会话超时时间

private String lockID; //记录锁节点id

private final static byte[] data={1,2}; //节点的数据

private CountDownLatch countDownLatch=new CountDownLatch(1);

public DistributeLock() throws IOException, InterruptedException {
this.zooKeeper=ZookeeperClient.getInstance();
this.sessionTimeout=ZookeeperClient.getSessionTimeout();
}

//获取锁的方法
public boolean lock(){
try {
//LOCKS/00000001
lockID=zooKeeper.create(ROOT_LOCKS+"/",data, ZooDefs.Ids.
OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println(Thread.currentThread().getName()+"->成功创建了lock节点["+lockID+"], 开始去竞争锁");

List<String> childrenNodes=zooKeeper.getChildren(ROOT_LOCKS,true);//获取根节点下的所有子节点
//排序,从小到大
SortedSet<String> sortedSet=new TreeSet<String>();
for(String children:childrenNodes){
sortedSet.add(ROOT_LOCKS+"/"+children);
}
String first=sortedSet.first(); //拿到最小的节点
if(lockID.equals(first)){
//表示当前就是最小的节点
System.out.println(Thread.currentThread().getName()+"->成功获得锁,lock节点为:["+lockID+"]");
return true;
}
SortedSet<String> lessThanLockId=sortedSet.headSet(lockID);
if(!lessThanLockId.isEmpty()){
String prevLockID=lessThanLockId.last();//拿到比当前LOCKID这个几点更小的上一个节点
zooKeeper.exists(prevLockID,new LockWatcher(countDownLatch));
countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
//上面这段代码意味着如果会话超时或者节点被删除(释放)了
System.out.println(Thread.currentThread().getName()+" 成功获取锁:["+lockID+"]");
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}

public boolean unlock(){
System.out.println(Thread.currentThread().getName()+"->开始释放锁:["+lockID+"]");
try {
zooKeeper.delete(lockID,-1);
System.out.println("节点["+lockID+"]成功被删除");
return true;
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return false;
}


public static void main(String[] args) {
final CountDownLatch latch=new CountDownLatch(10);
Random random=new Random();
for(int i=0;i<10;i++){
new Thread(()->{
DistributeLock lock=null;
try {
lock=new DistributeLock();
latch.countDown();
latch.await();
lock.lock();
Thread.sleep(random.nextInt(500));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if(lock!=null){
lock.unlock();
}
}
}).start();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

public class LockWatcher implements Watcher{

private CountDownLatch latch;

public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}

public void process(WatchedEvent event) {
if(event.getType()== Event.EventType.NodeDeleted){
latch.countDown();
}
}
}

负载均衡

所有机器在zookeeper某个节点上创建临时目录节点,然后监听父节点的节点变化事件,一旦有机器挂掉了,该机器会与zookeeper断开连接,所创建的临时节点也会被删除,同时其他所有机器也会受到相应的通知事件,加减机器都是基于这样的机制。
而负载均衡也是找到所有机器节点,然后通过一定的算法命中到某台机器上

统一命名服务

将服务的元数据:名字,ip,端口等等数据保存在节点数据上

master选举

master-slave模式:保证高可用,也就是主从,同时从业兼顾主备的功能,随时准备“上位”

当因为网络问题,导致master暂时不可访问时,slave会选举出另外一个master, 那么这个时候会出现双master的情况,那么这个时候就会产生master数据的重复处理的问题
这个时候使用zookeeper去解决

和锁概念一样,只有一个节点能争抢到master节点,如果master挂掉了,那么这个master临时节点也会被删除,那么其他节点就可以争抢master节点了。
corator实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class MasterSelector {

private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";

private final static String MASTER_PATH="/curator_master_path";
public static void main(String[] args) {

CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).
retryPolicy(new ExponentialBackoffRetry(1000,3)).build();

LeaderSelector leaderSelector=new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("获得leader成功");
TimeUnit.SECONDS.sleep(2);
}
});
leaderSelector.autoRequeue();
leaderSelector.start();//开始选举
}
}

zkclient实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.io.Serializable;

/**
* 腾讯课堂搜索 咕泡学院
* 加群获取视频:608583947
* 风骚的Michael 老师
*/
@data
public class UserCenter implements Serializable{

private static final long serialVersionUID = -1776114173857775665L;
private int mc_id; //机器信息
private String mc_name;//机器名称
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.omg.CORBA.PRIVATE_MEMBER;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class MasterSelector {

private ZkClient zkClient;

private final static String MASTER_PATH="/master"; //需要争抢的节点

private IZkDataListener dataListener; //注册节点内容变化

private UserCenter server; //其他服务器

private UserCenter master; //master节点

private boolean isRunning=false;

ScheduledExecutorService scheduledExecutorService= Executors.newScheduledThreadPool(1);

public MasterSelector(UserCenter server,ZkClient zkClient) {
System.out.println("["+server+"] 去争抢master权限");
this.server = server;
this.zkClient=zkClient;

this.dataListener= new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {

}

@Override
public void handleDataDeleted(String s) throws Exception {
//节点如果被删除, 发起选主操作
chooseMaster();
}
};
}

public void start(){
//开始选举
if(!isRunning){
isRunning=true;
zkClient.subscribeDataChanges(MASTER_PATH,dataListener); //注册节点事件
chooseMaster();
}
}


public void stop(){
//停止
if(isRunning){
isRunning=false;
scheduledExecutorService.shutdown();
zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener);
releaseMaster();
}
}


//具体选master的实现逻辑
private void chooseMaster(){
if(!isRunning){
System.out.println("当前服务没有启动");
return ;
}
try {
zkClient.createEphemeral(MASTER_PATH, server);
master=server; //把server节点赋值给master
System.out.println(master+"->我现在已经是master,你们要听我的");

//定时器
//master释放(master 出现故障),没5秒钟释放一次
scheduledExecutorService.schedule(()->{
releaseMaster();//释放锁
},2, TimeUnit.SECONDS);
}catch (ZkNodeExistsException e){
//表示master已经存在
UserCenter userCenter=zkClient.readData(MASTER_PATH,true);
if(userCenter==null) {
System.out.println("启动操作:");
chooseMaster(); //再次获取master
}else{
master=userCenter;
}
}
}

private void releaseMaster(){
//释放锁(故障模拟过程)
//判断当前是不是master,只有master才需要释放
if(checkIsMaster()){
zkClient.delete(MASTER_PATH); //删除
}
}


private boolean checkIsMaster(){
//判断当前的server是不是master
UserCenter userCenter=zkClient.readData(MASTER_PATH);
if(userCenter.getMc_name().equals(server.getMc_name())){
master=userCenter;
return true;
}
return false;
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class MasterChooseTest {

private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";


public static void main(String[] args) throws IOException {
List<MasterSelector> selectorLists=new ArrayList<>();
try {
for(int i=0;i<10;i++) {
ZkClient zkClient = new ZkClient(CONNECTSTRING, 5000,
5000,
new SerializableSerializer());
UserCenter userCenter = new UserCenter();
userCenter.setMc_id(i);
userCenter.setMc_name("客户端:" + i);

MasterSelector selector = new MasterSelector(userCenter,zkClient);
selectorLists.add(selector);
selector.start();//触发选举操作
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
for(MasterSelector selector:selectorLists){
selector.stop();
}
}
}
}

分布式队列

  • 先进先出:利用有序队列实现,后一个监听前一个队列的删除事件
  1. 通过getChildren获取指定根节点下的所有子节点,子节点就是任务
  2. 确定自己节点在子节点中的顺序
  3. 如果自己不是最小的子节点,那么监控比自己小的上一个子节点,否则处于等待接收watcher通知,重复流程
  • barrier模式(阻碍模式、围栏模式):监听父节点事件,当子节点数量达到阈值时执行逻辑

原理分析

leader 选举

选举算法:

  • FastLeaderElection 默认, 具体可见源码,QuorumPeer的startLeaderElection方法
  • leaderElection
  • AuthFastLeaderElection

源码地址 https://github.com/apache/zookeeper.git

FastLeaderElection

serverid : 在配置server集群的时候,给定服务器的标识id(myid)
zxid : 服务器在运行时产生的数据ID, zxid的值越大,表示数据越新
Epoch: 选举的轮数
server的状态:Looking、 Following、Observering、Leading

第一次初始化启动的时候: LOOKING

  1. 所有在集群中的server都会推荐自己为leader,然后把(myid、zxid、epoch)作为广播信息,广播给集群中的其他server, 然后等待其他服务器返回
  2. 每个服务器都会接收来自集群中的其他服务器的投票。集群中的每个服务器在接受到投票后,开始判断投票的有效性
    a. 判断逻辑时钟(Epoch) ,如果Epoch大于自己当前的Epoch,说明自己保存的Epoch是过期。更新Epoch,同时clear其他服务器发送过来的选举数据。判断是否需要更新当前自己的选举情况
    b. 如果Epoch小于目前的Epoch,说明对方的epoch过期了,也就意味着对方服务器的选举轮数是过期的。这个时候,只需要讲自己的信息发送给对方
    c. 如果Epoch等于目前的Epoch, 根据条件判断是否有资格获得leader
  3. 最终选择zxid最新的那个作为leader

Zookeeper集群节点数量为什么要是奇数个

首先需要明确zookeeper选举的规则:leader选举,要求 可用节点数量 > 总节点数量/2 。注意 是 > , 不是 ≥。
如果不这样限制,在集群出现脑裂的时候,可能会出现多个子集群同时服务的情况(即子集群各组选举出自己的leader), 这样对整个zookeeper集群来说是紊乱的。
换句话说,如果遵守上述规则进行选举,即使出现脑裂,集群最多也只能回出现一个子集群可以提供服务的情况(能满足节点数量> 总结点数量/2 的子集群最多只会有一个)。所以要限制 可用节点数量 > 集群总结点数量/2 。

  1. 防止由脑裂造成的集群不可用。
    首先,什么是脑裂?集群的脑裂通常是发生在节点之间通信不可达的情况下,集群会分裂成不同的小集群,小集群各自选出自己的master节点,导致原有的集群出现多个master节点的情况,这就是脑裂。

下面举例说一下为什么采用奇数台节点,就可以防止由于脑裂造成的服务不可用:

(1) 假如zookeeper集群有 5 个节点,发生了脑裂,脑裂成了A、B两个小集群:

 (a) A : 1个节点 ,B :4个节点 , 或 A、B互换

 (b) A : 2个节点, B :3个节点  , 或 A、B互换

可以看出,上面这两种情况下,A、B中总会有一个小集群满足 可用节点数量 > 总节点数量/2 。所以zookeeper集群仍然能够选举出leader , 仍然能对外提供服务,只不过是有一部分节点失效了而已。

(2) 假如zookeeper集群有4个节点,同样发生脑裂,脑裂成了A、B两个小集群:

(a) A:1个节点 ,  B:3个节点,   或 A、B互换 

(b) A:2个节点 , B:2个节点

可以看出,情况(a) 是满足选举条件的,与(1)中的例子相同。 但是情况(b) 就不同了,因为A和B都是2个节点,都不满足 可用节点数量 > 总节点数量/2 的选举条件, 所以此时zookeeper就彻底不能提供服务了。

综合上面两个例子可以看出: 在节点数量是奇数个的情况下, zookeeper集群总能对外提供服务(即使损失了一部分节点);如果节点数量是偶数个,会存在zookeeper集群不能用的可能性(脑裂成两个均等的子集群的时候)。

在生产环境中,如果zookeeper集群不能提供服务,那将是致命的 , 所以zookeeper集群的节点数一般采用奇数个。

  1. 在容错能力相同的情况下,奇数台更节省资源。
    leader选举,要求 可用节点数量 > 总节点数量/2 。注意 是 > , 不是 ≥。

举两个例子:

(1) 假如zookeeper集群1 ,有3个节点,3/2=1.5 , 即zookeeper想要正常对外提供服务(即leader选举成功),至少需要2个节点是正常的。换句话说,3个节点的zookeeper集群,允许有一个节点宕机。

(2) 假如zookeeper集群2,有4个节点,4/2=2 , 即zookeeper想要正常对外提供服务(即leader选举成功),至少需要3个节点是正常的。换句话说,4个节点的zookeeper集群,也允许有一个节点宕机。

那么问题就来了, 集群1与集群2都有 允许1个节点宕机 的容错能力,但是集群2比集群1多了1个节点。在相同容错能力的情况下,本着节约资源的原则,zookeeper集群的节点数维持奇数个更好一些。

ZAB协议

拜占庭问题
paxos协议主要就是如何保证在分布式环网络环境下,各个服务器如何达成一致最终保证数据的一致性问题

ZAB协议,基于paxos协议的一个改进。
zab协议为分布式协调服务zookeeper专门设计的一种支持崩溃恢复的原子广播协议
zookeeper并没有完全采用paxos算法, 而是采用zab Zookeeper atomic broadcast

zab协议的原理

  1. 在zookeeper 的主备模式下,通过zab协议来保证集群中各个副本数据的一致性
  2. zookeeper使用的是单一的主进程来接收并处理所有的事务请求,并采用zab协议,把数据的状态变更以事务请求的形式广播到其他的节点
  3. zab协议在主备模型架构中,保证了同一时刻只能有一个主进程来广播服务器的状态变更
  4. 所有的事务请求必须由全局唯一的服务器来协调处理,这个的服务器叫leader,其他的叫followerleader节点主要负责把客户端的事务请求转化成一个事务提议(proposal),并分发给集群中的所有follower节点再等待所有follower节点的反馈。一旦超过半数服务器进行了正确的反馈,那么leader就会commit这条消息

崩溃恢复
原子广播

1.什么情况下zab协议会进入崩溃恢复模式
a.当服务器启动时
b.当leader服务器出现网络中断、崩溃或者重启的情况
c.集群中已经不存在过半的服务器与该leader保持正常通信
2.zab协议进入崩溃恢复模式会做什么
a.当leader出现问题,zab协议进入崩溃恢复模式,并且选举出新的leader。当新的leader选举出来以后,如果集群中已经有过半机器完成了leader服务器的状态同(数据同步),退出崩溃恢复,进入消息广播模式
b.当新的机器加入到集群中的时候,如果已经存在leader服务器,那么新加入的服务器就会自觉进入数据恢复模式,找到leader进行数据同步

问题

假设一个事务在leader服务器被提交了,并且已经有过半的follower返回了ack。 在leader节点把commit消息发送给folower机器之前leader服务器挂了怎么办?

  1. 选举新的leader(zxid最新的)
  2. 把该数据重新同步给其他follower
    zab协议,一定需要保证已经被leader提交的事务也能够被所有follower提交
    zab协议需要保证,在崩溃恢复过程中跳过哪些已经被丢弃的事务

watcher原理

EventType

  1. None 客户端与服务器端成功建立会话
  2. NodeCreated 节点创建
  3. NodeDeleted 节点删除
  4. NodeDataChanged 数据变更:数据内容
  5. NodeChildrenChanged 子节点发生变更: 子节点删除、新增的时候,才会触发

特性

一次性触发: 事件被处理一次后,会被移除,如果需要永久监听,则需要反复注册
zkClient 和 curator 帮我们封装好了,无需我们手动反复注册监听

原理

源码位置:Zookeeper.java -> getData(final String path, Watcher watcher, Stat stat) 方法
zookeeper中使用的序列化技术是jute

WatcheRegistration.java
WatchManager.java