通过java代码怎么和ZK进行交互,这里主要使用apache提供的org.apache.zookeeper来进行和zk server进行连接的。
一、client : zookeeper
1.1 说明
现在我们要使用代码的方式进行连接到zk server,那么我们一个zookeeper java客户端的依赖包,那这里我们使用的就是apache提供的zookeeper.jar。
在zookeeper提供了和zk server交互的核心类就是ZooKeeper,接下来我们看下的步骤:
(1)创建项目;
(2)在pom.xml文件中添加zookeeper的依赖包;
(3)编写一个测试类,实现创建一个节点和获取一个节点;
1.2 开发环境
(1)操作系统:Mac OS;
(2)ZK Server : 3.6.2;
(3)开发工具:idea;
(4)JDK:1.8
1.3 hello小栗子
1.3.1 创建一个项目
使用idea创建一个maven project,取名为:zookeeper-java。
1.3.2 添加依赖
在pom.xml文件中添加依赖,zookeeper(zk client包)和junit(单元测试包):
<!-- zookeeper java客户端,选择的版本号最好是和zk的服务端是相同的版本,避免引发奇奇怪怪的问题 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
说明:zookeeper的版本最好是和服务端的版本是一样的。
1.3.3 编写例子
(1)和zk server建立连接:
String connectString = "127.0.0.1:2181";
int timeout = 4000;
/*
* 连接过程是异步的
* 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。
* 这里也就是main线程结束守护线程也就结束了。
*/
ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){
System.out.println("连接已经建立");
}
}
});
说明:
① 连接过程是异步的;
② 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。这里也就是main线程结束守护线程也就结束了。
③ 如何避免main线程不结束呢?使用线程的sleep,休眠一下。
集群的情况下,多个地址使用逗号分隔:
String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
(2)创建节点
有了zooKeeper对象之后,就可以使用这个对象的create方法创建节点了:
String path = "/myconfig";
byte[] bytes = new String("hello").getBytes();
//节点不能多次创建,多次创建会报错:KeeperErrorCode = NodeExists for /myconfig
String rs = zooKeeper.create(path,bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//result:/myconfig
System.out.println("result:"+rs);
说明:
① 创建节点调用的方法是:zooKeeper.create(),第一个参数是节点的path;第二个参数是byte[] data;第三个参数是:ACl权限信息;第四个参数是:节点的类型;
② 节点不能多次创建,否则会报错:KeeperErrorCode = NodeExistsfor /myconfig。
(3)获取节点的数据:
获取节点的数据是getData,这里有一个Watcher可以监听到数据的变化:
//通过create创建数据,通过get获取数据
//这种方式只能监听一次
byte[] dataBytes = zooKeeper.getData(path,new Watcher(){
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
System.out.println("数据改变了:"+event.getPath());
}
}
},null);
System.out.println("获取到的数据是:"+ new String(dataBytes));
整个类的代码如下:
package com.kfit;
import org.apache.zookeeper.*;
import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
/**
* TODO
*
* @author 悟纤「公众号SpringBoot」
* @date 2021-03-16
* @slogan 大道至简 悟在天成
*/
public class ZkJavaClientDemo {
public static void main(String[] args) throws Exception {
String connectString = "127.0.0.1:2181";
int timeout = 4000;
/*
* 连接过程是异步的
* 底层是使用守护线程,当没有业务线程执行的话,那么守护线程也就结束了。
* 这里也就是main线程结束守护线程也就结束了。
*/
ZooKeeper zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.None && event.getState() == Event.KeeperState.SyncConnected){
System.out.println("连接已经建立");
}
}
});
System.out.println("开始创建节点...");
String path = "/myconfig";
byte[] bytes = new String("hello").getBytes();
//节点不能多次创建,多次创建会报错:KeeperErrorCode = NodeExists for /myconfig
String rs = zooKeeper.create(path,bytes, OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//result:/myconfig
System.out.println("result:"+rs);
//通过create创建数据,通过get获取数据
//这种方式只能监听一次
byte[] dataBytes = zooKeeper.getData(path,new Watcher(){
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
System.out.println("数据改变了:"+event.getPath());
}
}
},null);
System.out.println("获取到的数据是:"+ new String(dataBytes));
/*
* 通过sleep 避免主线程结束
*/
Thread.sleep(Integer.MAX_VALUE);
System.out.println("main end.");
}
}
运行main方法:
这时候我们通过zkCli.sh进行访问zkserver,然后修改节点数据/myconfig:
set /myconfig hello1
1.3.4 永久监听
我们发现现在我们的编码只能监听一次,如何实现永久监听呢?很简单,只要在监听的代码里再次监听就可以了,很简单:
Watcher watcher = new Watcher(){
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDataChanged && event.getPath() != null && event.getPath().equals(path)){
System.out.println("数据改变了:"+event.getPath());
try {
byte[] dataBytes = zooKeeper.getData(path,this,null);
System.out.println("获取到的数据是:"+ new String(dataBytes));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
byte[] dataBytes = zooKeeper.getData(path,watcher,null);
System.out.println("获取到的数据是:"+ new String(dataBytes));
多次执行set 都是能够监听到的。
1.3.5 永久监听方式二
上面的方式不管是之前的版本还是最新的版本都是通用的,如果是3.6.x版本的是支持永久递归监听的,那么怎么玩呐?也很简单:
zooKeeper.addWatch(path, event -> {
System.out.println("获取到的数据是:"+ event);
},AddWatchMode.PERSISTENT);
event这里使用了Lambda表达式,非Lambda的写法是:
zooKeeper.addWatch(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("获取到的数据是:"+ event);
}
}, AddWatchMode.PERSISTENT);
这里的AddWatchMode可以是PERSISTENT和PERSISTENT_RECURSIVE。
二、进阶的使用
2.1 修改节点
修改节点使用的是set的操作,我们看下简单的例子:
//path = "/myconfig";
byte[] bytes = new String("hello-update").getBytes();
Stat stat = zooKeeper.setData(path,bytes,-1);
说明:
(1)执行set的时候,如果node不存在,会抛出异常:KeeperErrorCode = NoNode for/myconfig。在执行zooKeeper.setData()就会抛出异常,就不会有返回值Stat了
(2)这里的第三个参数是version:
①如果不考虑并发修改的问题的话,那么version=-1;
② 如果填写的version和我们节点的version对不上的话,那么执行setData会报错:KeeperErrorCode = BadVersion for/myconfig。
2.2 并发修改节点
对于节点的修改,会出现多个线程进行并发修改的问题,那么我们控制并发节点的修改问题呐,很简单,在前面的例子中的第三个参数version就是用来解决节点的并发修改问题的。具体的一个修改思路:
(1)、通过getData获取到节点的版本信息;
(2)、在执行setData的时候,传递当前获取到的版本号;
具体的代码如下:
Stat nodeStat = new Stat();
zooKeeper.getData(path, false,nodeStat);
byte[] dataBytes = new String("hello-update").getBytes();
zooKeeper.setData(path,dataBytes,nodeStat.getVersion());
2.3 删除节点
删除节点很简单,我们直接来看下代码:
zooKeeper.delete(path, -1);
版本号的说明:-1 代表匹配所有版本号,直接删除。任意大于-1的代表可以指定数据版本删除。
2.4 异步获取数据
我们之前getData的方式是同步的,那么如何异步获取呢?对于zookeeper也提供了相应的方式:
zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println(Thread.currentThread().getName());//main-EventThread
System.out.println("ctx="+ctx);//ctx=1000
System.out.println("data="+new String(data));//data=hello
}
}, "1000");
三、小结
(1)使用java client的核心思路:引jar包,使用zookeeper类连接上ZK Server,然后就可以调用create/get/set/delete等操作节点的方法。
(2)对于节点的监听watcher操作,只监听一次,可以使用循环监听的方式进行永久监听;在3.6.x的版本可以使用addWatch方法永久递归监听。
(3)不能递归创建和删除节点。
注意:本文归作者所有,未经作者允许,不得转载