
基于 ZooKeeper 的分布式锁和队列( 下 )

public interface ZooKeeperOperation {



     * Performs the operation - which may be involved multiple times if the connection

     * to ZooKeeper closes during this operation


     * @return the result of the operation or null

     * @throws KeeperException

     * @throws InterruptedException


    public boolean execute() throws KeeperException, InterruptedException;





package org.apache.zookeeper.recipes.lock;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;

import org.apache.zookeeper.recipes.lock.ZooKeeperOperation;


import java.util.List;

import java.util.concurrent.atomic.AtomicBoolean;



 * A base class for protocol implementations which provides a number of higher 

 * level helper methods for working with ZooKeeper along with retrying synchronous

 *  operations if the connection to ZooKeeper closes such as 

 *  {@link #retryOperation(ZooKeeperOperation)}



class ProtocolSupport {

    private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class);


    protected final ZooKeeper zookeeper;

    private AtomicBoolean closed = new AtomicBoolean(false);

    private long retryDelay = 500L;

    private int retryCount = 10;

    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;


    public ProtocolSupport(ZooKeeper zookeeper) {

        this.zookeeper = zookeeper;




     * Closes this strategy and releases any ZooKeeper resources; but keeps the

     *  ZooKeeper instance open


    public void close() {

        if (closed.compareAndSet(false, true)) {






     * return zookeeper client instance

     * @return zookeeper client instance


    public ZooKeeper getZookeeper() {

        return zookeeper;




     * return the acl its using

     * @return the acl.


    public List<ACL> getAcl() {

        return acl;




     * set the acl 

     * @param acl the acl to set to


    public void setAcl(List<ACL> acl) {

        this.acl = acl;




     * get the retry delay in milliseconds

     * @return the retry delay


    public long getRetryDelay() {

        return retryDelay;




     * Sets the time waited between retry delays

     * @param retryDelay the retry delay


    public void setRetryDelay(long retryDelay) {

        this.retryDelay = retryDelay;




     * Allow derived classes to perform 

     * some custom closing operations to release resources


    protected void doClose() {





     * Perform the given operation, retrying if the connection fails

     * @return object. it needs to be cast to the callee's expected 

     * return type.


    protected Object retryOperation(ZooKeeperOperation operation) 

        throws KeeperException, InterruptedException {

        KeeperException exception = null;

        for (int i = 0; i < retryCount; i++) {

            try {

                return operation.execute();

            } catch (KeeperException.SessionExpiredException e) {

                LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);

                throw e;

            } catch (KeeperException.ConnectionLossException e) {

                if (exception == null) {

                    exception = e;


                LOG.debug("Attempt " + i + " failed with connection loss so " +

                        "attempting to reconnect: " + e, e);




        throw exception;




     * Ensures that the given path exists with no data, the current

     * ACL and no flags

     * @param path


    protected void ensurePathExists(String path) {

        ensureExists(path, null, acl, CreateMode.PERSISTENT);




     * Ensures that the given path exists with the given data, ACL and flags

     * @param path

     * @param acl

     * @param flags


    protected void ensureExists(final String path, final byte[] data,

            final List<ACL> acl, final CreateMode flags) {

        try {

            retryOperation(new ZooKeeperOperation() {

                public boolean execute() throws KeeperException, InterruptedException {

                    Stat stat = zookeeper.exists(path, false);

                    if (stat != null) {

                        return true;


                    zookeeper.create(path, data, acl, flags);

                    return true;



        } catch (KeeperException e) {

            LOG.warn("Caught: " + e, e);

        } catch (InterruptedException e) {

            LOG.warn("Caught: " + e, e);





     * Returns true if this protocol has been closed

     * @return true if this protocol is closed


    protected boolean isClosed() {

        return closed.get();




     * Performs a retry delay if this is not the first attempt

     * @param attemptCount the number of the attempts performed so far


    protected void retryDelay(int attemptCount) {

        if (attemptCount > 0) {

            try {

                Thread.sleep(attemptCount * retryDelay);

            } catch (InterruptedException e) {

                LOG.debug("Failed to sleep: " + e, e);






public interface LockListener {


     * call back called when the lock 

     * is acquired


    public void lockAcquired();



     * call back called when the lock is 

     * released.


    public void lockReleased();



分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带”qn-”结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:”_path-to-queue-node_/qn-X”,X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。








package org.apache.zookeeper.recipes.queue;


import java.util.List;

import java.util.NoSuchElementException;

import java.util.TreeMap;

import java.util.concurrent.CountDownLatch;


import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;




 * A <a href="package.html">protocol to implement a distributed queue</a>.



public class DistributedQueue {

    private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);


    private final String dir;


    private ZooKeeper zookeeper;

    private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;


    private final String prefix = "qn-";


    public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){

        this.dir = dir;


        if(acl != null){

            this.acl = acl;


        this.zookeeper = zookeeper;


        //Add root dir first if not exists

        if (zookeeper != null) {

            try {

                Stat s = zookeeper.exists(dir, false);

                if (s == null) {

                    zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);


            } catch (KeeperException e) {


            } catch (InterruptedException e) {







     * Returns a Map of the children, ordered by id.

     * @param watcher optional watcher on getChildren() operation.

     * @return map from id to child name for all children


    private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();


        List<String> childNames = null;


            childNames = zookeeper.getChildren(dir, watcher);

        }catch (KeeperException.NoNodeException e){

            throw e;



        for(String childName : childNames){


                //Check format

                if(!childName.regionMatches(0, prefix, 0, prefix.length())){

                    LOG.warn("Found child node with improper name: " + childName);



                String suffix = childName.substring(prefix.length());

                Long childId = new Long(suffix);


            }catch(NumberFormatException e){

                LOG.warn("Found child node with improper format : " + childName + " " + e,e);




        return orderedChildren;




     * Find the smallest child node.

     * @return The name of the smallest child node.


    private String smallestChildName() throws KeeperException, InterruptedException {

        long minId = Long.MAX_VALUE;

        String minName = "";


        List<String> childNames = null;



            childNames = zookeeper.getChildren(dir, false);

        }catch(KeeperException.NoNodeException e){

            LOG.warn("Caught: " +e,e);

            return null;



        for(String childName : childNames){


                //Check format

                if(!childName.regionMatches(0, prefix, 0, prefix.length())){

                    LOG.warn("Found child node with improper name: " + childName);



                String suffix = childName.substring(prefix.length());

                long childId = Long.parseLong(suffix);

                if(childId < minId){

                    minId = childId;

                    minName = childName;


            }catch(NumberFormatException e){

                LOG.warn("Found child node with improper format : " + childName + " " + e,e);




        if(minId < Long.MAX_VALUE){

            return minName;


            return null;





     * Return the head of the queue without modifying the queue.

     * @return the data at the head of the queue.

     * @throws NoSuchElementException

     * @throws KeeperException

     * @throws InterruptedException


    public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren;


        // element, take, and remove follow the same pattern.

        // We want to return the child node with the smallest sequence number.

        // Since other clients are remove()ing and take()ing nodes concurrently, 

        // the child with the smallest sequence number in orderedChildren might be gone by the time we check.

        // We don't call getChildren again until we have tried the rest of the nodes in sequence order.



                orderedChildren = orderedChildren(null);

            }catch(KeeperException.NoNodeException e){

                throw new NoSuchElementException();


            if(orderedChildren.size() == 0 ) throw new NoSuchElementException();


            for(String headNode : orderedChildren.values()){

                if(headNode != null){


                        return zookeeper.getData(dir+"/"+headNode, false, null);

                    }catch(KeeperException.NoNodeException e){

                        //Another client removed the node first, try next










     * Attempts to remove the head of the queue and return it.

     * @return The former head of the queue

     * @throws NoSuchElementException

     * @throws KeeperException

     * @throws InterruptedException


    public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren;

        // Same as for element.  Should refactor this.



                orderedChildren = orderedChildren(null);

            }catch(KeeperException.NoNodeException e){

                throw new NoSuchElementException();


            if(orderedChildren.size() == 0) throw new NoSuchElementException();


            for(String headNode : orderedChildren.values()){

                String path = dir +"/"+headNode;


                    byte[] data = zookeeper.getData(path, false, null);

                    zookeeper.delete(path, -1);

                    return data;

                }catch(KeeperException.NoNodeException e){

                    // Another client deleted the node first.






    private class LatchChildWatcher implements Watcher {


        CountDownLatch latch;


        public LatchChildWatcher(){

            latch = new CountDownLatch(1);



        public void process(WatchedEvent event){

            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + 

                    event.getState() + " type " + event.getType());



        public void await() throws InterruptedException {






     * Removes the head of the queue and returns it, blocks until it succeeds.

     * @return The former head of the queue

     * @throws NoSuchElementException

     * @throws KeeperException

     * @throws InterruptedException


    public byte[] take() throws KeeperException, InterruptedException {

        TreeMap<Long,String> orderedChildren;

        // Same as for element.  Should refactor this.


            LatchChildWatcher childWatcher = new LatchChildWatcher();


                orderedChildren = orderedChildren(childWatcher);

            }catch(KeeperException.NoNodeException e){

                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);



            if(orderedChildren.size() == 0){





            for(String headNode : orderedChildren.values()){

                String path = dir +"/"+headNode;


                    byte[] data = zookeeper.getData(path, false, null);

                    zookeeper.delete(path, -1);

                    return data;

                }catch(KeeperException.NoNodeException e){

                    // Another client deleted the node first.







     * Inserts data into queue.

     * @param data

     * @return true if data was successfully added


    public boolean offer(byte[] data) throws KeeperException, InterruptedException{



                zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);

                return true;

            }catch(KeeperException.NoNodeException e){

                zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);






     * Returns the data at the first element of the queue, or null if the queue is empty.

     * @return data at the first element of the queue, or null.

     * @throws KeeperException

     * @throws InterruptedException


    public byte[] peek() throws KeeperException, InterruptedException{


            return element();

        }catch(NoSuchElementException e){

            return null;





     * Attempts to remove the head of the queue and return it. Returns null if the queue is empty.

     * @return Head of the queue or null.

     * @throws KeeperException

     * @throws InterruptedException


    public byte[] poll() throws KeeperException, InterruptedException {


            return remove();

        }catch(NoSuchElementException e){

            return null;




Apache Curator


RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)

CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);



Distributed Lock

InterProcessMutex lock = new InterProcessMutex(client, lockPath);

if ( lock.acquire(maxWait, waitUnit) ) 




        // do some work inside of the critical section here







Leader Election

LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()


    public void takeLeadership(CuratorFramework client) throws Exception


        // this callback will get called when you are the leader

        // do whatever leader work you need to and only exit

        // this method when you want to relinquish leadership




LeaderSelector selector = new LeaderSelector(client, path, listener);

selector.autoRequeue();  // not required, but this is behavior that you will probably expect



