Configuration Management with ZooKeeper

package lbzkconnector;
 
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.util.Scanner;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
/**
 *
 * @author ┼Żygimantas Bruzgys <zygimantas@bruzgys.eu>
 * @author Julia Proskurnia <julia@proskurnia.in.ua>
 */
public class LbZkConnector implements Closeable, Watcher {
 
    public static final String DEFAULT_ROOT_DIR = "logConfig";
    private static final Logger logger = LoggerFactory.getLogger(LbZkConnector.class);
    private ZooKeeper zk;
    private String rootDir;
    private LoggerContext loggerContext;
 
    public LbZkConnector(String zkConnectionString, LoggerContext loggerContext) {
        this(zkConnectionString, loggerContext, DEFAULT_ROOT_DIR);
    }
 
    public LbZkConnector(String zkConnectionString, LoggerContext loggerContext, String rootDir) {
        try {
            this.loggerContext = loggerContext;
            this.rootDir = "/" + rootDir;
            zk = new ZooKeeper(zkConnectionString, 3000, this);
        } catch (IOException ex) {
            logger.error("Could not connect to ZooKeeper", ex);
        }
    }
 
    private void initializeZkStructure() {
        try {
            if (zk.exists(this.rootDir, false) == null) {
                zk.create(this.rootDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } else {
                Stat stat = new Stat();
                byte[] newConf = zk.getData(rootDir, true, stat);
                updateLocalConfiguration(newConf);
            }
            zk.exists(this.rootDir, true);
        } catch (KeeperException ex) {
            logger.error("ZooKeeper error!", ex);
        } catch (InterruptedException ex) {
            logger.error("Transaction interrupted!", ex);
        }
    }
 
    public void updateConfiguration(String conf) {
        updateConfiguration(conf.getBytes());
    }
 
    public void updateConfiguration(byte[] conf) {
        try {
            zk.setData(rootDir, conf, -1);
            updateLocalConfiguration(conf);
        } catch (KeeperException ex) {
            logger.error("ZooKeeper error!", ex);
        } catch (InterruptedException ex) {
            logger.error("Transaction interrupted!", ex);
        }
    }
 
    private void updateLocalConfiguration(byte[] conf) {
        try {
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(loggerContext);
            loggerContext.reset();
            configurator.doConfigure(new ByteArrayInputStream(conf));
        } catch (JoranException ex) {
            logger.error("LogBack configuration error!", ex);
        }
    }
 
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == EventType.None) {
            initializeZkStructure();
        } else if (event.getType() == EventType.NodeDataChanged
                && event.getPath().startsWith(rootDir)) {
            try {
                Stat stat = new Stat();
                byte[] newConf = zk.getData(rootDir, true, stat);
                updateLocalConfiguration(newConf);
            } catch (KeeperException ex) {
                logger.error("ZooKeeper error!", ex);
            } catch (InterruptedException ex) {
                logger.error("Transaction interrupted!", ex);
            }
        }
    }
 
    @Override
    public void close() throws IOException {
        if (zk != null) {
            try {
                zk.close();
            } catch (InterruptedException ex) {
                throw new IOException(ex);
            }
            zk = null;
        }
    }
 
    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        LbZkConnector connector = new LbZkConnector("46.137.39.99:2181", context);
        (new Scanner(System.in)).next();
    }
}
zookeeper_research/configuration_management_source_code.txt · Last modified: 2012/05/31 09:58 by julia
 
Except where otherwise noted, content on this wiki is licensed under the following license: CC Attribution-Share Alike 3.0 Unported
Recent changes RSS feed Donate Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki