赵宇博的技术博客 赵宇博的技术博客
首页
前端
后端
数据库专栏
k8s专栏
分布式专栏
Linux网络专栏
手写系列专栏
随笔
关于
GitHub (opens new window)
首页
前端
后端
数据库专栏
k8s专栏
分布式专栏
Linux网络专栏
手写系列专栏
随笔
关于
GitHub (opens new window)
  • Raft论文-中文
  • Raft论文-读后记录
  • Zookeeper-概述
    • 1、zookeeper入门
      • 1-1、Zookeeper工作机制
      • 1-2、Zookeeper特点
      • 1-3、ZooKeeper 数据模型
      • 1-4、ZooKeeper应用场景
    • 2、Zookeeper 本地安装
      • 2-1、安装
      • 2-2、配置修改
      • 2-3、操作 Zookeeper
      • 2-4、配置参数解读
    • 3、Zookeeper 集群操作
      • 3-1、 集群安装
      • 3-2、选举机制
      • 3-2-1、第一次启动选举
      • 3-2-2、非第一次启动选举
      • 3-3、客户端命令行操作
      • 3-3-1、ls 查看节点具体信息
      • 3-3-2、节点类型(持久/短暂/有序号/无序号)
      • 3-4、监听器原理
      • 3-5、客户端 API 操作
      • 3-6、客户端向服务端写数据流程
    • 4、服务器动态上下线监听案例
      • 4-1、需求分析-服务器动态上下线
      • 4-2、具体实现
    • 5、ZooKeeper 分布式锁案例
      • 5-1、原生 Zookeeper 实现分布式锁案例
      • 5-2、Curator 框架实现分布式锁案例
  • 关于架构师
  • 大型架构的演进之路
  • 架构核心之分布式缓存
  • 分布式专栏
zhaoyb
2024-01-11
目录

Zookeeper-概述

# Zookeeper-概述

# 1、zookeeper入门

Zookeeper 是一个开源的分布式的,为分布式框架提供协调服务的 Apache 项目

# 1-1、Zookeeper工作机制

Zookeeper从设计模式角度来理解:是一个基 于观察者模式设计的分布式服务管理框架,它负 责 存储和管理大家都关心的数据,然 后接受观察者的 注 册,一旦这些数据的状态发生变化,Zookeeper 就 将负责通知已经在Zookeeper上注册的那些观察 者做出相应的反应

# 1-2、Zookeeper特点

1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。

2)集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所 以Zookeeper适合安装奇数台服务器。 3)全局数据一致:每个Server保存一份相同的数据副本,Client无论连接到哪个Server,数据都是一致的。 4)更新请求顺序执行,来自同一个Client的更新请求按其发送顺序依次执行。

5)数据更新原子性,一次数据更新要么成功,要么失败。 6)实时性,在一定时间范围内,Client能读到最新数据

# 1-3、ZooKeeper 数据模型

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个 节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过 其路径唯一标识。

image-20220615190434641

# 1-4、ZooKeeper应用场景

  • 统一命名服务

    在分布式环境下,经常需要对应用/服 务进行统一命名,便于识别。 例如:IP不容易记住,而域名容易记住

  • 统一配置管理

    1)分布式环境下,配置文件同步非常常见。

    (1)一般要求一个集群中,所有节点的配置信息是 一致的,比如 Kafka 集群。

    (2)对配置文件修改后,希望能够快速同步到各个 节点上。

    2)配置管理可交由ZooKeeper实现。

    (2)各个客户端服务器监听这个Znode。

    (3)一 旦Znode中的数据被修改,ZooKeeper将通知 各个客户端服务器。

  • 统一集群管理

    1)分布式环境中,实时掌握每个节点的状态是必要的。

    (1)可根据节点实时状态做出一些调整。

    2)ZooKeeper可以实现实时监控节点状态变化

    (1)可将节点信息写入ZooKeeper上的一个ZNode。

    (2)监听这个ZNode可获取它的实时状态变化。

  • 服务器动态上下线

    客户端能实时洞察到服务 器上下线的变化

  • 软负载均衡

    在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求

# 2、Zookeeper 本地安装

官网:https://zookeeper.apache.org/

# 2-1、安装

  • 拷贝 apache-zookeeper-3.5.7-bin.tar.gz 安装包到 Linux 系统下

  • 解压到指定目录

    tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
    
    1
  • 修改名称

    mv apache-zookeeper-3.5.7 -bin/zookeeper-3.5.7
    
    1

# 2-2、配置修改

  • 将/opt/module/zookeeper-3.5.7/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg

    mv zoo_sample.cfg zoo.cfg
    
    1
  • 打开 zoo.cfg 文件,修改 dataDir 路径

    vim zoo.cfg
    dataDir=/opt/module/zookeeper-3.5.7/zkData
    
    1
    2
  • 在/opt/module/zookeeper-3.5.7/这个目录上创建 zkData 文件夹

     mkdir zkData
    
    1

# 2-3、操作 Zookeeper

  • 启动 Zookeeper

     bin/zkServer.sh start
    
    1
  • 查看进程是否启动

    jps
    
    1
  • 查看状态

    bin/zkServer.sh status
    
    1
  • 启动客户端

     bin/zkCli.sh
    
    1
  • 退出客户端

    quit
    
    1
  • 停止 Zookeeper

    bin/zkServer.sh stop
    
    1

# 2-4、配置参数解读

  • tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒 image-20220615123214161

  • initLimit = 10:LF初始通信时限 image-20220614125045508

  • syncLimit = 5:LF同步通信时限 image-20220615123244366

  • dataDir:保存Zookeeper中的数据

    注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。

  • clientPort = 2181:客户端连接端口,通常不做修改。

# 3、Zookeeper 集群操作

# 3-1、 集群安装

  • 集群规划

    在 102、103 和 104 三个节点上都部署 Zookeeper。

  • 解压安装

    tar -zxvf apache-zookeeper-3.5.7- bin.tar.gz -C /opt/module/

    mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

  • 配置服务器编号

    1、在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData mkdir zkData 2、在/opt/module/zookeeper-3.5.7/zkData 目录下创建一个 myid 的文件 vi myid 2

    3、分别在 103、 104 上修改 myid 文件中内容为 3、4

  • 配置zoo.cfg文件

    1、重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg mv zoo_sample.cfg zoo.cfg

    2、打开 zoo.cfg 文件,修改数据存储路径配置、增加如下配置

    dataDir=/opt/module/zookeeper-3.5.7/zkData

    #######################cluster##########################

    server.2=102:2888:3888

    server.3=103:2888:3888

    server.4=104:2888:3888

    3、配置参数解读

    server.A=B:C:D。

    A 是一个数字,表示这个是第几号服务器; 集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据 就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比 较从而判断到底是哪个 server。

    B 是这个服务器的地址;

    C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;

    D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

    4、集群操作启动Zookeeper

# 3-2、选举机制

# 3-2-1、第一次启动选举

image-20220615124104587

(1)服务器1启 动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为 LOOKING;

(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1) 大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING

(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服 务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;

(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为 1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING

(5)服务器5启动,同4一样当小弟。

# 3-2-2、非第一次启动选举

(1)当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

  • 服务器初始化启动。
  • 服务器运行期间无法和Leader保持连接

(2)而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:

  • 集群中本来就已经存在一个Leader。 对于第一种已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连 接,并进行状态同步即可。

  • 集群中确实不存在Leader。

    假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、7,并且此时SID为3的服务器是Leader。某一时刻, 3和5服务器出现故障,因此开始进行Leader选举

    SID为1、2、4的机器投票情况: (1,8,1) (1,8,2) (1,7,4)

    选举Leader规则: ①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出

# 3-3、客户端命令行操作

命令基本语法 功能描述
help 显示所有操作命令
ls path 使用 ls 命令来查看当前 znode 的子节点 [可监听]
-w 监听子节点变化
-s 附加次级信息
create 普通创建
-s 含有序列
-e 临时(重启或者超时消失)
get path 获得节点的值 [可监听]
-w 监听节点内容变化
-s 附加次级信息
set 设置节点的具体值
stat 查看节点状态
delete 删除节点
deleteall 递归删除节点

# 3-3-1、ls 查看节点具体信息

 ls -s /
1

image-20220615124959829

  • czxid:创建节点的事务 zxid

    每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所 有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之 前发生。

  • ctime:znode 被创建的毫秒数(从 1970 年开始)

  • mzxid:znode 最后更新的事务 zxid

  • mtime:znode 最后修改的毫秒数(从 1970 年开始)

  • pZxid:znode 最后更新的子节点 zxid

  • cversion:znode 子节点变化号,znode 子节点修改次数

  • dataversion:znode 数据变化号

  • aclVersion:znode 访问控制列表的变化号

  • ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是 临时节点则是 0

  • dataLength:znode 的数据长度

  • numChildren:znode 子节点数量

# 3-3-2、节点类型(持久/短暂/有序号/无序号)

持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除

短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除

创建znode时设置顺序标识,znode名称 后会附加一个值,顺序号是一个单调递增的计数 器,由父节点维护 注意:在分布式系统中,顺序号可以被用于 为所有的事件进行全局排序,这样客户端可以通 过顺序号推断事件的顺序

  • 持久化目录节点 create

    客户端与Zookeeper断开连接后,该节点依旧存在

  • 持久化顺序编号目录节点 create -s

    客户端与Zookeeper断开连接后,该节点依旧存 在,只是Zookeeper给该节点名称进行顺序编号

  • 临时目录节点 create -e

    客户端与Zookeeper断开连接后,该节点被删除

  • 临时顺序编号目录节点 create -e -s

    客户端与 Zookeeper 断开连接后 , 该 节 点 被 删 除 , 只 是 Zookeeper给该节点名称进行顺序编号。

# 3-4、监听器原理

客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目 录节点增加删除)时,ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数 据的任何改变都能快速的响应到监听了该节点的应用程序。

get -w

image-20220615125648545

# 3-5、客户端 API 操作

  • 创建一个工程:zookeeper

  • 添加pom文件

    <dependencies>
    	<dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.7</version>
    	</dependency>
    </dependencies>
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
  • 拷贝log4j.properties文件到项目根目录

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c]- %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n 
1
2
3
4
5
6
7
8
  • 创建包名com.zk

  • 创建类名称zkClient

    // 注意:逗号前后不能有空格
    private static String connectString ="102:2181,103:2181,104:2181";
    private static int sessionTimeout = 2000;
    private ZooKeeper zkClient = null;
    @Before
    public void init() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // 收到事件通知后的回调函数(用户的业务逻辑)
                System.out.println(watchedEvent.getType() + "--"
                + watchedEvent.getPath());
                // 再次启动监听
                try {
                    List<String> children = zkClient.getChildren("/",true);
                     for (String child : children) {
                     	System.out.println(child);
                     }
                } catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	});
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
  • 创建子节点

    // 创建子节点
    @Test
    public void create() throws Exception {
    // 参数 1:要创建的节点的路径; 参数 2:节点数据 ; 参数 3:节点权限 ;参数 4:节点的类型
    String nodeCreated = zkClient.create("/zkTest","node1".getBytes(), Ids.OPEN_ACL_UNSAFE,
    	CreateMode.PERSISTENT);
    }
    
    1
    2
    3
    4
    5
    6
    7
  • 获取子节点并监听节点变化

    // 获取子节点
    @Test
    public void getChildren() throws Exception {
        List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {
        	System.out.println(child);
        }
        // 延时阻塞
        Thread.sleep(Long.MAX_VALUE);
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
  • 判断 Znode 是否存在

    // 判断 znode 是否存在
    @Test
    public void exist() throws Exception {
        Stat stat = zkClient.exists("/zkTest", false);
    	System.out.println(stat == null ? "not exist" : "exist");
    }
    
    1
    2
    3
    4
    5
    6

# 3-6、客户端向服务端写数据流程

  • 写流程之写入请求直接发送给Leader节点 image-20220615130552407
  • 写流程之写入请求发送给follower节点 image-20220615130616657

# 4、服务器动态上下线监听案例

# 4-1、需求分析-服务器动态上下线

image-20220615130857072

# 4-2、具体实现

  • 先在集群上创建/servers 节点

    create /servers "servers"
    
    1
  • 在 Idea 中创建包名:com.zkcase1

  • 服务器端向 Zookeeper 注册代码

    public class DistributeServer {
        private static String connectString ="102:2181,103:2181,104:2181";
        private static int sessionTimeout = 2000;
        private ZooKeeper zk = null;
        private String parentNode = "/servers";
        // 创建到 zk 的客户端连接
        public void getConnect() throws IOException{
            zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {}
    		});
    	}
        // 注册服务器
        public void registServer(String hostname) throws Exception{
            String create = zk.create(parentNode + "/server",hostname.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(hostname +" is online "+ create);
        }
        // 业务功能
        public void business(String hostname) throws Exception{
            System.out.println(hostname + " is working ...");
            Thread.sleep(Long.MAX_VALUE);
        }
        public static void main(String[] args) throws Exception {
            // 1 获取 zk 连接
            DistributeServer server = new DistributeServer();
            server.getConnect();
            // 2 利用 zk 连接注册服务器信息
            server.registServer(args[0]);
            // 3 启动业务功能
            server.business(args[0]);
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
  • 客户端代码

    public class DistributeClient {
        private static String connectString ="102:2181,103:2181,104:2181";
        private static int sessionTimeout = 2000;
        private ZooKeeper zk = null;
        private String parentNode = "/servers";
        // 创建到 zk 的客户端连接
        public void getConnect() throws IOException {
            zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                	// 再次启动监听
                	try {
                    	getServerList();
                    } catch (Exception e) {
                    	e.printStackTrace();
                    }
                }
            });
    	}
        // 获取服务器列表信息
        public void getServerList() throws Exception {
             // 1 获取服务器子节点信息,并且对父节点进行监听
            List<String> children = zk.getChildren(parentNode, true);
             // 2 存储服务器信息列表
            ArrayList<String> servers = new ArrayList<>();
             // 3 遍历所有节点,获取节点中的主机名称信息
            for (String child : children) {
                byte[] data = zk.getData(parentNode + "/" + child,false, null);
                servers.add(new String(data));
            }
             // 4 打印服务器列表信息
            System.out.println(servers);
         }
        // 业务功能
        public void business() throws Exception{
            System.out.println("client is working ...");
            Thread.sleep(Long.MAX_VALUE);
        }
        public static void main(String[] args) throws Exception {
            // 1 获取 zk 连接
            DistributeClient client = new DistributeClient();
            client.getConnect();
            // 2 获取 servers 的子节点信息,从中获取服务器信息列表
            client.getServerList();
            // 3 业务进程启动
            client.business();
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48

# 5、ZooKeeper 分布式锁案例

什么叫做分布式锁呢? 比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其 他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的 访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

image-20220615132012278

# 5-1、原生 Zookeeper 实现分布式锁案例

public class DistributedLock {
    // zookeeper server 列表
    private String connectString ="102:2181,103:2181,104:2181";
    // 超时时间
    private int sessionTimeout = 2000;
    private ZooKeeper zk;
    private String rootNode = "locks";
    private String subNode = "seq-";
    // 当前 client 等待的子节点
    private String waitPath;
    //ZooKeeper 连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    //ZooKeeper 节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);
    // 当前 client 创建的子节点
    private String currentNode;
    // 和 zk 服务建立连接,并创建根节点
    public DistributedLock() throws IOException,InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                if (event.getState() ==  Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
                // 发生了 waitPath 的删除事件
                if (event.getType() == Event.EventType.NodeDeleted && 	event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });
        // 等待连接建立
        connectLatch.await();
        //获取根节点状态
        Stat stat = zk.exists("/" + rootNode, false);
        //如果根节点不存在,则创建根节点,根节点类型为永久节点
        if (stat == null) {
            System.out.println("根节点不存在");
            zk.create("/" + rootNode, new byte[0],
                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }
    // 加锁方法
    public void zkLock() {
        try {
            //在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode = zk.create("/" + rootNode + "/" + subNode,null, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            // wait 一小会, 让结果更清晰一些
            Thread.sleep(10);
            // 注意, 没有必要监听"/locks"的子节点的变化情况
            List<String> childrenNodes = zk.getChildren("/" +rootNode, false);
            // 列表中只有一个子节点, 那肯定就是 currentNode , 说明 client 获得锁
            if (childrenNodes.size() == 1) {
                return;
            } else {
                //对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(childrenNodes);
                //当前节点名称
                String thisNode = currentNode.substring(("/" +rootNode + "/").length());
                //获取当前节点的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("数据异常");
                } else if (index == 0) {
                    // index == 0, 说明 thisNode 在列表中最小, 当前 client 获得锁
                    return;
                } else {
                    // 获得排名比 currentNode 前 1 位的节点
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
                    // 在 waitPath 上注册监听器, 当 waitPath 被删除时, zookeeper 会回调监听器的 process 方法
                    zk.getData(waitPath, true, new Stat());
                    //进入等待锁状态
                    waitLatch.await();
                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    // 解锁方法
    public void zkUnlock() {
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
  • 分布式锁测试
public class DistributedLockTest {
    public static void main(String[] args) throws
        InterruptedException, IOException, KeeperException {
        // 创建分布式锁 1
        final DistributedLock lock1 = new DistributedLock();
        // 创建分布式锁 2
        final DistributedLock lock2 = new DistributedLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock1.zkLock();
                    System.out.println("线程 1 获取锁");
                    Thread.sleep(5 * 1000);
                    lock1.zkUnlock();
                    System.out.println("线程 1 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                try {
                    lock2.zkLock();
                    System.out.println("线程 2 获取锁");
                    Thread.sleep(5 * 1000);
                    lock2.zkUnlock();
                    System.out.println("线程 2 释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 5-2、Curator 框架实现分布式锁案例

1、原生的 Java API 开发存在的问题 (1)会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

(2)Watch 需要重复注册,不然就不能生效

(3)开发的复杂性还是比较高的

(4)不支持多节点删除和创建。需要自己去递归 2、Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。 详情请查看官方文档:https://curator.apache.org/index.html

  • 添加依赖

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-client</artifactId>
        <version>4.3.0</version>
    </dependency>
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
  • 代码实现

    public class CuratorLockTest {
        private String rootNode = "/locks";
        // zookeeper server 列表
        private String connectString = "102:2181,103:2181,104:2181";
        // connection 超时时间
        private int connectionTimeout = 2000;
        // session 超时时间
        private int sessionTimeout = 2000;
        public static void main(String[] args) {
            new CuratorLockTest().test();
        }
        // 测试
        private void test() {
            // 创建分布式锁 1
            final InterProcessLock lock1 = new
                InterProcessMutex(getCuratorFramework(), rootNode);
            // 创建分布式锁 2
            final InterProcessLock lock2 = new
                InterProcessMutex(getCuratorFramework(), rootNode);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 获取锁对象
                    try {
                        lock1.acquire();
                        System.out.println("线程 1 获取锁");
                        // 测试锁重入
                        lock1.acquire();
                        System.out.println("线程 1 再次获取锁");
                        Thread.sleep(5 * 1000);
                        lock1.release();
                        System.out.println("线程 1 释放锁");
                        lock1.release();
                        System.out.println("线程 1 再次释放锁");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 获取锁对象
                    try {
                        lock2.acquire();
                        System.out.println("线程 2 获取锁");
                        // 测试锁重入
                        lock2.acquire();
                        System.out.println("线程 2 再次获取锁");
                        Thread.sleep(5 * 1000);
                        lock2.release();
                        System.out.println("线程 2 释放锁");
                        lock2.release();
                        System.out.println("线程 2 再次释放锁");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
        // 分布式锁初始化
        public CuratorFramework getCuratorFramework (){
            //重试策略,初试时间 3 秒,重试 3 次
            RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
            //通过工厂创建 Curator
            CuratorFramework client =
                CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .connectionTimeoutMs(connectionTimeout)
                .sessionTimeoutMs(sessionTimeout)
                .retryPolicy(policy).build();
            //开启连接
            client.start();
            System.out.println("zookeeper 初始化完成...");
            return client;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
#Zookeeper
上次更新: 2024/01/11, 16:12:34
Raft论文-读后记录
关于架构师

← Raft论文-读后记录 关于架构师→

最近更新
01
Activiti6-业务实现
12-06
02
Activiti6-API详解
11-28
03
SpringBoot集成Activiti和UI
11-21
更多文章>
Theme by Vdoing | Copyright © 2022-2024 赵宇博 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式