在zookeeper应用场景有关于分布式集群配置文件同步问题的描述,设想一下如果有100台机器同时对同一台机器上某个文件进行修改,如何才能保证文本不会被写乱,这就是最简单的分布式锁,本文介绍利用zk实现分布式锁。下面是写锁的实现步骤
分布式写锁
create一个PERSISTENT类型的znode,/Locks/write_lock
释放锁就是删除znode节点或者断开连接就行
*注意:上面步骤2中getChildren()不设置Watcher的原因是,防止羊群效应,如果getChildren()设置了Watcher,那么集群一抖动都会收到通知。在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知—,这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。
下面是代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | import sys class GJZookeeper( object ): ZK_HOST = "localhost:2181" ROOT = "/Locks" WORKERS_PATH = join(ROOT, "write_lock" ) MASTERS_NUM = 1 TIMEOUT = 10000 def __init__( self , verbose = True ): self .VERBOSE = verbose self .masters = [] self .is_master = False self .path = None self .zk = ZKClient( self .ZK_HOST, timeout = self .TIMEOUT) self .say( "login ok!" ) # init self .__init_zk() # register self .register() def __init_zk( self ): """ create the zookeeper node if not exist |--Locks |--write_lock """ nodes = ( self .ROOT, self .WORKERS_PATH) for node in nodes: if not self .zk.exists(node): try : self .zk.create(node, "") except : pass def register( self ): """ register a node for this worker |--Locks |--write_lock |--lockid000000000x ==> hostname """ import socket hostname = socket.gethostname() self .path = self .zk.create( self .WORKERS_PATH + "/lockid" , hostname, flags = zookeeper.EPHEMERAL | zookeeper.SEQUENCE) self .lockid = basename( self .path) self .say( "register ok! I'm %s" % self .path) # check who is the master self .get_lock() def get_lock( self ): """ get children znode try to get lock """ @watchmethod def watcher(event): self .say( "child changed, try to get lock again." ) self .get_lock() children = self .zk.get_children( self .WORKERS_PATH) children.sort() min_lock_id = children[ 0 ] self .say( "%s's children: %s" % ( self .WORKERS_PATH, children)) if cmp ( self .lockid,min_lock_id) = = 0 : self .get_lock_success() return True elif cmp ( self .lockid,min_lock_id) > 0 : index = children.index( self .lockid) new_lockid_watch = join( self .WORKERS_PATH,children[index - 1 ]) self .say( "Add watch on %s" % new_lockid_watch) res = self .zk.exists(new_lockid_watch,watcher) if not res : """代表没有存在之前小的锁,但是这并不意味着能拿到锁了,因为还可能有比当前还小的锁,还没轮到,要重新执行一遍""" # self.get_lock_success() return False else : """现在的锁有人在使用,等他释放了再抢""" self .say( "I can not get the lock this time,wait for the next time" ) return False def get_lock_success( self ): self .say( "I get the lock !!!" ) self .write_file() self .zk.delete(join( self .WORKERS_PATH, self .lockid)) self .say( "I release the lock !!!" ) sys.exit( 1 ) def write_file( self ): fd = open ( "lock.log" , 'a' ) fd.write( "%s\n" % self .lockid) fd.close() def say( self , msg): """ print messages to screen """ if self .VERBOSE: if self .path: log.info( "[ %s(%s) ] %s" % ( self .path, "master" if self .is_master else "slave" , msg)) else : log.info(msg) def start_get_lock(): gj_zookeeper = GJZookeeper() def main(): th1 = threading.Thread(target = start_get_lock, name = "thread_1" , args = ()) th1.start() th1.join() if __name__ = = "__main__" : main() time.sleep( 1000 ) |
联系客服