打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
zookeeper适用场景:分布式锁实现







zookeeper应用场景有关于分布式集群配置文件同步问题的描述,设想一下如果有100台机器同时对同一台机器上某个文件进行修改,如何才能保证文本不会被写乱,这就是最简单的分布式锁,本文介绍利用zk实现分布式锁。下面是写锁的实现步骤

分布式写锁
create一个PERSISTENT类型的znode,/Locks/write_lock

  • 客户端创建SEQUENCE|EPHEMERAL类型的znode,名字是lockid开头,创建的znode是/Locks/write_lock/lockid0000000001
  • 调用getChildren()不要设置Watcher获取/Locks/write_lock下的znode列表
  • 判断自己步骤2创建znode是不是znode列表中最小的一个,如果是就代表获得了锁,如果不是往下走
  • 调用exists()判断步骤2自己创建的节点编号小1的znode节点(也就是获取的znode节点列表中最小的znode),并且设置Watcher,如果exists()返回false,执行步骤3
  • 如果exists()返回true,那么等待zk通知,从而在回掉函数里返回执行步骤3


释放锁就是删除znode节点或者断开连接就行

*注意:上面步骤2中getChildren()不设置Watcher的原因是,防止羊群效应,如果getChildren()设置了Watcher,那么集群一抖动都会收到通知。在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知—,这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。
下面是代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import sys
class GJZookeeper(object):
    ZK_HOST = "localhost:2181"
    ROOT = "/Locks"
    WORKERS_PATH = join(ROOT,"write_lock")
    MASTERS_NUM = 1
    TIMEOUT = 10000
    def __init__(self, verbose = True):
        self.VERBOSE = verbose
        self.masters = []
        self.is_master = False
        self.path = None
        self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
        self.say("login ok!")
        # init
        self.__init_zk()
        # register
        self.register()
    def __init_zk(self):
        """
        create the zookeeper node if not exist
        |--Locks
                |--write_lock
        """
        nodes = (self.ROOT, self.WORKERS_PATH)
        for node in nodes:
            if not self.zk.exists(node):
                try:
                    self.zk.create(node, "")
                except:
                    pass
    def register(self):
        """
        register a node for this worker
        |--Locks
                |--write_lock
                            |--lockid000000000x ==> hostname
        """
        import socket
        hostname = socket.gethostname()
        self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
        self.lockid = basename(self.path)
        self.say("register ok! I'm %s" % self.path)
        # check who is the master
        self.get_lock()
    def get_lock(self):
        """
        get children znode try to get lock
        """
        @watchmethod
        def watcher(event):
            self.say("child changed, try to get lock again.")
            self.get_lock()
        children = self.zk.get_children(self.WORKERS_PATH)
        children.sort()
        min_lock_id = children[0]
        self.say("%s's children: %s" % (self.WORKERS_PATH, children))
        if cmp(self.lockid,min_lock_id) == 0:
            self.get_lock_success()
            return True
        elif cmp(self.lockid,min_lock_id) > 0:
            index = children.index(self.lockid)
            new_lockid_watch = join(self.WORKERS_PATH,children[index-1])
            self.say("Add watch on %s"%new_lockid_watch)
            res = self.zk.exists(new_lockid_watch,watcher)
            if not res :
                """代表没有存在之前小的锁,但是这并不意味着能拿到锁了,因为还可能有比当前还小的锁,还没轮到,要重新执行一遍"""
#               self.get_lock_success()
                return False
            else :
                """现在的锁有人在使用,等他释放了再抢"""
                self.say("I can not get the lock this time,wait for the next time")
                return False
    def get_lock_success(self):
        self.say("I get the lock !!!")
        self.write_file()
        self.zk.delete(join(self.WORKERS_PATH,self.lockid))
        self.say("I release the lock !!!")
        sys.exit(1)
    def write_file(self):
        fd = open("lock.log",'a')
        fd.write("%s\n"%self.lockid)
        fd.close()
    def say(self, msg):
        """
        print messages to screen
        """
        if self.VERBOSE:
            if self.path:
                log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
            else:
                log.info(msg)
def start_get_lock():
    gj_zookeeper = GJZookeeper()
def main():
    th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ())
    th1.start()
    th1.join()
     
if __name__ == "__main__":
    main()
    time.sleep(1000)


本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
ZooKeeper 分布式锁的实现
java 使用zookeeper分布式锁(curator包临时节点)
ZooKeeper原理及使用
ZooKeeper的十二连问,你顶得了嘛?
这几种常见的“分布式锁”写法,搞懂再也不怕面试官,安排
分布式锁与实现(二)——基于ZooKeeper实现
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服