打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
zk系列-c++下zookeeper使用实例

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务。分布式应用可以使用它来实现诸如:统一命名服务、配置管理、分布式锁服务、集群管理等功能。公司常用到的是Java服务集群的管理。

1.函数介绍

  1. //create a handle to used communicate with zookeeper  
  2. zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags)  
  3. //create a node synchronously  
  4. int zoo_create(zhandle_t *zh, const char *path, const char *value,int valuelen,  
  5.    const struct ACL_vector *acl, int flags,char *path_buffer, int path_buffer_len);  
  6. //lists the children of a node synchronously.  
  7. int zoo_wget_children(zhandle_t *zh, const char *path, watcher_fn watcher, void* watcherCtx, struct String_vector *strings)  
  8. //close the zookeeper handle and free up any resources.  
  9. int zookeeper_close(zhandle_t *zh)  

2.实例

用上面3个函数,就能创建一个简单的集群管理。

数据存储结构为


服务端向Zk注册服务

  1. //连接zk  
  2. void ZkRegistry::ConnectZK() {  
  3.   if (zhandle_) {  
  4.     zookeeper_close(zhandle_);  
  5.   }  
  6.   int count = 0;  
  7.   do {  
  8.     ++count;  
  9.     zhandle_ = zookeeper_init(zk_hosts_.c_str(),InitWatcher, timeout_, NULL, NULL, 0);  
  10.   } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);  
  11.   
  12.   if (count >= ZK_MAX_CONNECT_TIMES) {  
  13.     SLOG_WARN("ZkRegistry::Init --> connect host " << zk_hosts_ << " over max times:" << count);  
  14.     return;  
  15.   }  
  16. }  
  17.   
  18. //发布服务,建立临时节点  
  19. void ZkRegistry::PublishService() {  
  20.   if (zhandle_ == NULL) {  
  21.     ConnectZK();  
  22.   }  
  23.   string server_path = PING_SERVER + "/" + PingConfig::instance().get_index() + "/"  
  24.     + GetIp() + ":" + PingConfig::instance().get_port();  
  25.   char res_path[128];  
  26.   int rc = zoo_create(zhandle_, server_path.c_str(), GetIp().c_str(), GetIp().size(),  
  27.       &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, res_path, 128);  
  28.   if (rc) {  
  29.     SLOG_INFO("ZkRegistry::PublishService --> zoo_create() path=" << server_path << "," << zerror(rc));  
  30.   }  
  31. }  
  32.   
  33. //每隔10s注册一次服务  
  34. void ZkRegistry::run() {  
  35.   while(true) {  
  36.     SLOG_INFO("publish service");  
  37.     ConnectZK();  
  38.     PublishService();  
  39.     sleep(10);  
  40.   }  
  41. }  
  42.   
  43. void ZkRegistry::InitWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcher_ctx) {  
  44.   if (state == ZOO_CONNECTED_STATE) {  
  45.     connected_ = true;  
  46.     SLOG_INFO("InitWatcher() ZOO_CONNECTED_STATE");  
  47.   } else if (state == ZOO_AUTH_FAILED_STATE) {  
  48.     SLOG_INFO("InitWatcher() ZOO_AUTH_FAILED_STATE");  
  49.   } else if (state == ZOO_EXPIRED_SESSION_STATE) {  
  50.     SLOG_INFO("InitWatcher() ZOO_EXPIRED_SESSION_STATE");  
  51.   } else if (state == ZOO_CONNECTING_STATE) {  
  52.     SLOG_INFO("InitWatcher() ZOO_CONNECTING_STATE");  
  53.   } else if (state == ZOO_ASSOCIATING_STATE) {  
  54.     SLOG_INFO("InitWatcher() ZOO_ASSOCIATING_STATE");  
  55.   }  
  56. }  

客户端获取服务列表

  1. //连接zk server  
  2. void ZkClient::ConnectZK() {  
  3.   cout << "ZkClient::ConnectZK" << endl;  
  4.   if (zhandle_) {  
  5.     zookeeper_close(zhandle_);  
  6.   }  
  7.   zhandle_ = NULL;  
  8.   connected_ = false;  
  9.   
  10.   int count = 0;  
  11.   do {  
  12.     ++count;  
  13.     zhandle_ = zookeeper_init(zk_hosts_.c_str(), InitWatcher, timeout_, NULL, NULL, 0);  
  14.     sleep(5 * ONE_SECONDS);  
  15.   } while (!connected_ && count < ZK_MAX_CONNECT_TIMES);  
  16.   
  17.   if (count >= ZK_MAX_CONNECT_TIMES){  
  18.     cout << "ZkClient::Init --> connecting zookeeper host: " << zk_hosts_ << " over times: " << count << endl;  
  19.   }  
  20. }  
  21. //更新服务列表,冷备和热备  
  22. void ZkClient::Update() {  
  23.   cout << "ZkClient::Update" << endl;  
  24.   if (zhandle_ == NULL || connected_ == false) {  
  25.     Init();  
  26.   }  
  27.   //获得服务份数  
  28.   struct String_vector str_vec;  
  29.   int ret = zoo_wget_children(zhandle_, PING_SERVER.c_str(), ServiceWatcher, NULL, &str_vec);  
  30.   if (ret) {  
  31.     cout << "Update --> read path:" << PING_SERVER << " wrong, " << zerror(ret) << endl;  
  32.     return;  
  33.   }  
  34.   
  35.   //获得各份服务ip:port  
  36.   for (int i = 0; i < str_vec.count; ++i) {  
  37.     struct String_vector node_vec;  
  38.     string path = PING_SERVER + "/" + str_vec.data[i];  
  39.     int ret = zoo_wget_children(zhandle_, path.c_str(), ServiceWatcher, NULL, &node_vec);  
  40.     cout << "Update --> path:" << path << ", ret:" << ret << ", node's size:" << node_vec.count << endl;  
  41.     if (ret || node_vec.count != 1) {  
  42.       continue;  
  43.     }  
  44.     ....  
  45.   }  
  46. }  
  47. //监控服务变化  
  48. void ZkClient::ServiceWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) {  
  49.   cout << "type:" << type << endl;  
  50.   cout << "state:" << state << endl;  
  51.   cout << "path:" << path << endl;  
  52. //  cout << "watcherCtx:" << (char*)watcherCtx << endl;  
  53.   cout << "ZOO_CHILD_EVENT:" << ZOO_CHILD_EVENT << endl;  
  54.   if (ZOO_CHILD_EVENT == type) {  
  55.     cout << "ServiceWatcher ZOO_CHILD_EVENT" << endl;  
  56.     ZkClient::Instance().Update();//更新服务列表      
  57.   }  
  58. }  

服务端会每隔一段时间重新注册自己;

客户端在第一次与zk建立连接获取服务列表时,注册监听函数。zk当节点发生变化时,通知客户端,客户端重新获取服务列表,并注册事件。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Zookeeper C API 指南五(同步 API 介绍)
178 f0603
ZooKeeper安装过程
ZooKeeper伪分布式集群安装及使用 | 粉丝日志
zookeeper技术介绍[python书籍福利]
顶级分布式开源项目,配上这款可视化工具,真香!
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服