打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
zookeeper编程(二)
import java.io.IOException;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.KeeperException.ConnectionLossException;import org.apache.zookeeper.KeeperException.SessionExpiredException;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat;import org.apache.zookeeper.ZooKeeper;/** *  * @description Zookeeper Session演示类 * @author zhangchaoyang * @date 2014-6-22 */public class SessionDemo {    /**     * zoo.cfg中的配置:     *      * <pre>     * tickTime=2000     * minSessionTimeout=4000(至少是tickTime的2倍)     * maxSessionTimeout=40000(最大是tickTime的20倍)     * </pre>     *      * 如果客户端建立连接时指定的TIMEOUT不在[minSessionTimeout,maxSessionTimeout]区间内,     * 服务端会强制把它修改到该区间内     */    private static final int TIMEOUT = 40000; // Session                                                // Timeout设为40秒,因为心跳周期为2秒,所以如果server向client连续发送20个心跳都收不到回应,则Session过期失效    private static ZooKeeper zkp = null;    private static void connect() throws IOException {        zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null);    }    private static void createNode() throws KeeperException,            InterruptedException {        if (zkp != null) {            zkp.create("/znodename", "znodedata".getBytes(),                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);        }    }    private static String getData() throws KeeperException,            InterruptedException {        if (zkp != null) {            Stat stat = zkp.exists("/znodename", false);            return new String(zkp.getData("/znodename", false, stat));        }        return null;    }    private static void disconnect() throws InterruptedException {        if (zkp != null) {            zkp.close();        }    }    /**     * 休息,在此期间我们有三种选择:<br>     * <ol>     * <li>永久性断开网络连接     * <li>断开网络连接一段时间timespan后再连上,其中timespan<{@code TIMEOUT}     * <li>断开网络连接一段时间timespan后再连上,其中timespan>{@code TIMEOUT}     * </ol>     */    private static void sleepForNetworkDisturbances() {        try {            Thread.sleep(2 * TIMEOUT);        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public static void main(String[] args) {        try {            connect();        } catch (IOException e) {            System.err                    .println("Can't create zookeeper client, please check the network.");        }        System.out.println("Session build.");        try {            createNode();        } catch (Exception e) {            System.err.println("Create znode failed.");        }        System.out.println("znode created.");        sleepForNetworkDisturbances();        try {            String data = getData();            if (data != null) {                // 在“休息”期间做了第2件事情,Sesion没有过期,EPHEMERAL节点依然存在                System.out.println("data=" + data);            }        } catch (KeeperException e) {            e.printStackTrace();            // 在“休息”期间做了第1件事情            if (e instanceof ConnectionLossException) {                System.err                        .println("Oops, network is disconnected. Retry getData().");                // 如果session没有失效,而仅仅是网络异常,则可以重新尝试获取数据,可能在重试时网络已经正常了                try {                    Thread.sleep(1000);                    String data = getData();                    if (data != null) {                        System.out.println("data=" + data);                    } else {                        System.out.println("can't get data.");                    }                } catch (Exception e1) {                    e1.printStackTrace();                }            }            // 在“休息”期间做了第3件事情,则session会过期            else if (e instanceof SessionExpiredException) {                System.err                        .println("Session Expired, client will reconnect and create znode again.");                // 当发再Session Expired时,必须重新建立连接,即new一个ZooKeeper                try {                    connect();                    createNode();                    String data = getData();                    if (data != null) {                        System.out.println("data=" + data);                    } else {                        System.out.println("can't get data.");                    }                } catch (Exception e1) {                    e1.printStackTrace();                }            }        } catch (InterruptedException e) {            e.printStackTrace();        }        try {            disconnect();        } catch (InterruptedException e) {            e.printStackTrace();        }        System.out.println("Client disconnected.");    }}

Watcher

可以注册watcher的方法:getData、exists、getChildren。
可以触发watcher的方法:create、delete、setData。连接断开的情况下触发的watcher会丢失。
一个Watcher实例是一个回调函数,被回调一次后就被移除了。如果还需要关注数据的变化,需要再次注册watcher。
New ZooKeeper时注册的watcher叫default watcher,它不是一次性的,只对client的连接状态变化作出反应。

什么样的操作会产生什么类型的事件:

 

event For “/path”

event For “/path/child”

create(“/path”)

EventType.NodeCreated

--

delete(“/path”)

EventType.NodeDeleted

--

setData(“/path”)

EventType.NodeDataChanged

--

create(“/path/child”)

EventType.NodeChildrenChanged

EventType.NodeCreated

delete(“/path/child”)

EventType.NodeChildrenChanged

EventType.NodeDeleted

setData(“/path/child”)

--

EventType.NodeDataChanged

 什么操作会触发EventType.None?

事件类型与watcher的对应关系:

event For “/path”

Default

Watcher

exists
(“/path”)

getData
(“/path”)

getChildren
(“/path”)

EventType.None

EventType.NodeCreated

 

 

EventType.NodeDeleted

 

 

EventType.NodeDataChanged

 

 

EventType.NodeChildrenChanged

 

 

 

操作与watcher的对应关系:

 

 

"/path"

"/path/child"

 

exists

getData

getChildren

exists

getData

getChildren

create(“/path”)

 

 

 

 

delete(“/path”)

 

 

 

setData(“/path”)

 

 

 

 

create(“/path/child”)

 

 

 

delete(“/path/child”)

 

 

setData(“/path/child”)

 

 

 

 

import java.io.IOException;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat;/** *  * @description Zookeeper Watcher演示类 * @author zhangchaoyang * @date 2014-6-22 */public class WatcherDemo {    private static ZooKeeper zkp = null;    private static final int TIMEOUT = 6000;    private static Watcher getWatcher(final String msg) {        return new Watcher() {            @Override            public void process(WatchedEvent event) {                System.out.println(msg + "上的监听被触发\t事件类型" + event.getType()                        + "\t发生变化的节点" + event.getPath());            }        };    }    public static void main(String[] args) throws IOException, KeeperException,            InterruptedException {        System.out.println("--------------1----------------");        zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,                getWatcher("CONNECT"));        Thread.sleep(1000);                System.out.println("--------------2----------------");        zkp.create("/znodename", "znodedata".getBytes(), Ids.OPEN_ACL_UNSAFE,                CreateMode.PERSISTENT);        zkp.create("/znodename/childnode", new byte[0], Ids.OPEN_ACL_UNSAFE,                CreateMode.PERSISTENT);        Stat stat = zkp.exists("/znodename", getWatcher("EXISTS"));        zkp.getChildren("/", getWatcher("GETCHILDREN"));        zkp.getData("/znodename", getWatcher("GETDATA"), stat);        stat = zkp.exists("/znodename/childnode", getWatcher("EXISTS"));        zkp.getChildren("/znodename", getWatcher("GETCHILDREN"));        zkp.getData("/znodename/childnode", getWatcher("GETDATA"), stat);        // zkp.close();        zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,                getWatcher("CONNECT"));        Thread.sleep(1000);        System.out.println("--------------3----------------");        zkp.delete("/znodename/childnode", -1);        zkp.delete("/znodename", -1);        zkp.close();    }}

 

import java.io.IOException;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.KeeperException.ConnectionLossException;import org.apache.zookeeper.KeeperException.SessionExpiredException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/** *  * @description 自定义持久性的zookeeper watcher * @author zhangchaoyang * @date 2014-6-22 */public class PersistWatcher {    private static final int TIMEOUT = 6000;    private static final String znode = "/globalconfnode";    private static String globalConfData = "";    private static Watcher getConnectWatcher() {        return new Watcher() {            @Override            public void process(WatchedEvent event) {                if (event.getType().equals(EventType.None)) {                    System.out.println("连接状态发生变化。");                }            }        };    }    private static Watcher getExistsWatcher(final ZooKeeper zkp) {        return new Watcher() {            @Override            public void process(WatchedEvent event) {                try {                    if (event.getType().equals(EventType.NodeDataChanged)                            || event.getType().equals(EventType.NodeCreated)) {                        // 节点被创建或修改时更新缓存中的值                        Stat stat = zkp.exists(znode, this);// 再次注册监听                        String data = new String(                                zkp.getData(znode, false, stat));                        globalConfData = data;                    } else if (event.getType().equals(EventType.NodeDeleted)) {                        // 节点被删除时报警                        System.out                                .println("global configuration node have been deleted!");                        try {                            // 再次注册监听                            zkp.exists(znode, this);                        } catch (KeeperException e) {                            if (e instanceof ConnectionLossException) {                                System.out.println("连接已断开。");                            }                        }                    }                } catch (KeeperException e) {                    e.printStackTrace();                } catch (InterruptedException e) {                    e.printStackTrace();                }            }        };    }    public static void main(String[] args) {        try {            ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn",                    TIMEOUT,                     getConnectWatcher());            zkp.exists(znode, getExistsWatcher(zkp));            zkp.create(znode, "config_value".getBytes(), Ids.OPEN_ACL_UNSAFE,                    CreateMode.EPHEMERAL);            Thread.sleep(500);// 修改节点后必须sleep,等待watcher回调完成            System.out.println(globalConfData);            for (int i = 0; i < 4; i++) {                zkp.setData(znode, ("config_value" + i).getBytes(), -1);                Thread.sleep(500);// 修改节点后必须sleep,等待watcher回调完成                System.out.println(globalConfData);            }            zkp.close();// EPHEMERAL节点会被删除,但Session并不会马上失效(只不过ConnectionLoss了),所以还是会触发watcher            try {                // 此时Session已失效                zkp.exists(znode, false);            } catch (KeeperException e) {                if (e instanceof SessionExpiredException)                    System.out.println("Session已失效。");            }        } catch (IOException e) {            e.printStackTrace();        } catch (KeeperException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

ACL

每个节点有单独的ACL,子节点不能继承父节点的ACL
ACL有三个维度:schema,id,permision
Schema有7种:
world: 它下面只有一个id, 叫anyone
auth: 它不需要id
digest: 它对应的id为username:BASE64(SHA1(password))
ip: 它对应的id为客户机的IP地址,设置的时候可以设置一个ip段,比如ip:192.168.1.0/16, 表示匹配前16个bit的IP段
super: 在这种scheme情况下,对应的id拥有超级权限
sasl: sasl的对应的id,是一个通过了kerberos认证的用户id
Permission有5种:
CREATE(c),DELETE(d),READ(r),WRITE(w),ADMIN(a) 
import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.KeeperException.NoAuthException;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.ACL;/** *  * @description Zookeeper ACL演示类 * @author zhangchaoyang * @date 2014-6-22 */public class AclDemo {    private static final int TIMEOUT = 6000;    public static void main(String[] args) throws IOException, KeeperException,            InterruptedException {        ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,                null);        String schema = "digest";// schema类型有:world,auth,digest,ip,super        String auth = "username:password";        zkp.addAuthInfo(schema, auth.getBytes());        List<ACL> acls = new ArrayList<ACL>();        for (ACL id : Ids.CREATOR_ALL_ACL) {            acls.add(id);        }        zkp.create("/znodename", "znodedata".getBytes(), acls,                CreateMode.PERSISTENT);        ZooKeeper zoo = null;        try {            zoo = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null);            System.out.println("采用不合法的认证方式:");            String badAuthentication = "username:wrongpass";            zoo.addAuthInfo(schema, badAuthentication.getBytes());            zoo.getData("/znodename", null, null);        } catch (KeeperException e) {            if (e instanceof NoAuthException) {                System.out.println("认证失败:" + e.getMessage());            }            System.out.println("采用合法的认证方式:");            zoo.addAuthInfo(schema, auth.getBytes());            String data = new String(zoo.getData("/znodename", null, null));            if (data != null) {                System.out.println("认证成功:data=" + data);            }        } finally {            if (zoo != null && zoo.getState().isAlive()) {                zoo.close();            }        }        zkp.delete("/znodename", -1);        zkp.close();    }}

开源工具menagerie

基于ZooKeeper实现了分布式的:
ReentrantZkLock
ReentrantZkReadWriteLock
Semaphore
CyclicBarrier
CountDownLatch
BlockingQueue
HashMap
ListSet
import java.io.IOException;import java.util.Date;import java.util.List;import java.util.concurrent.CountDownLatch;import org.apache.log4j.Logger;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooKeeper.States;/** *  * @description 使用ZooKeeper实现分布式锁 * @author zhangchaoyang * @date 2014-6-22 */public class ZooKeeperLock {    private static Logger logger = Logger.getLogger(ZooKeeperLock.class);    private static ZooKeeper zk = null;    private static final int TIMEOUT = 1000 * 60;    private static String connStr = null;    public static void setServerPath(String path) {        connStr = path + "/app/bqas/lock";        logger.info("ZooKeeperLock zookeeper node:" + connStr);    }    public static boolean getLock(String lockname) throws KeeperException,            InterruptedException, IOException {        connect(connStr, TIMEOUT);        if (lockname.contains("-")) {            throw new RuntimeException("锁名称不能包含'-'");        }        boolean lock = false;        String path = zk.create("/" + lockname + "-", new byte[0],                Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);        int selfIndex = getIndex(path);        List<String> children = zk.getChildren("/", false);        int min = getMinIndex(children);        if (min == selfIndex) {            lock = true;        }        return lock;    }    public static void releaseLock(String lockname)            throws InterruptedException, KeeperException {        disconnect();    }    private static int getIndex(String str) {        int index = -1;        int pos = str.lastIndexOf("-");        if (pos >= 0) {            try {                index = Integer.parseInt(str.substring(pos + 1));            } catch (NumberFormatException e) {                e.printStackTrace();            }        }        return index;    }    private static int getMinIndex(List<String> list) {        int min = Integer.MAX_VALUE;        for (String ele : list) {            int index = getIndex(ele);            if (index < 0) {                throw new RuntimeException("SEQUENTIAL节点名中不包含数字:" + ele);            }            if (index < min) {                min = index;            }        }        return min;    }    private static void waitUntilConnected(CountDownLatch connectedLatch) {        if (States.CONNECTING == zk.getState()) {            try {                connectedLatch.await();            } catch (InterruptedException e) {                throw new IllegalStateException(e);            }        }    }    public static boolean connect(String hostPath, int sessionTimeout) {        if (zk == null || zk.getState() == States.CLOSED) {            try {                CountDownLatch connectedLatch = new CountDownLatch(1);                Watcher watcher = new ConnectedWatcher(connectedLatch);                zk = new ZooKeeper(hostPath, sessionTimeout, watcher);                waitUntilConnected(connectedLatch);            } catch (Exception e) {                logger.error("Connect to Zookeeper failed:", e);                return false;            }        }        return true;    }    public static boolean disconnect() {        if (zk != null) {            if (States.CLOSED != zk.getState()) {                try {                    zk.close();                } catch (InterruptedException e) {                    logger.error("Disconnect from Zookeeper failed:", e);                    return false;                }            }        }        return true;    }    static class ConnectedWatcher implements Watcher {        private CountDownLatch connectedLatch;        ConnectedWatcher(CountDownLatch connectedLatch) {            this.connectedLatch = connectedLatch;        }        @Override        public void process(WatchedEvent event) {            // 事件状态为SyncConnected时,说明与服务端的连接已建立好            if (event.getState() == KeeperState.SyncConnected) {                connectedLatch.countDown();            }        }    }    public static void main(String[] args) {        String lockname = "writeHitCount2DBlock";        System.out.println("begin to run.");        ZooKeeperLock.setServerPath("192.168.119.96:2181");        try {            boolean havelock = ZooKeeperLock.getLock(lockname);            if (havelock) {                Date date = new Date();                System.out                        .println("I got the lock,and I will write DB!" + date);                Thread.sleep(1000);// 休息一段时间之后再释放锁            }            System.out.println("Job done, I will release the lock.");            ZooKeeperLock.releaseLock(lockname);        } catch (KeeperException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        } catch (IOException e) {            e.printStackTrace();        }    }}

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
ZooKeeper 分布式锁的实现
分布式ID常见解决方案
zookeeper下的分布式锁
java 使用zookeeper分布式锁(curator包临时节点)
Zookeeper分布式技术详解 (一)zookeeper 集群的 监控图形化页面
简单之美 | Kafka+Storm+HDFS整合实践
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服