Reliable Multicast

package reliablemulticast;
 
import java.io.IOException;
import java.util.*;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.*;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
 
/**
 *
 * @author Julia Proskurnia <julia@proskurnia.in.ua>
 * @author ┼Żygimantas Bruzgys <zygimantas@bruzgys.eu>
 */
public class ReliableMulticast implements Watcher {
 
    public static final String DEFAULT_ROOT_DIR = "multicast";
    private static final Logger logger = LoggerFactory.getLogger(ReliableMulticast.class);
    private ZooKeeper zk;
    private final String rootDir, messagesDir, participantsDir, sequenceDir, leaderDir;
    private final String id;
    private final MulticastReceiver receiver;
    private final ArrayList<String> messagesSent = new ArrayList<String>();
    private boolean amLeader = false;
 
    public ReliableMulticast(String id, MulticastReceiver receiver, String zkConnectionStiring) {
        this(id, receiver, zkConnectionStiring, DEFAULT_ROOT_DIR);
    }
 
    public ReliableMulticast(String id, MulticastReceiver receiver, String zkConnectionStiring, String rootDir) {
        this.rootDir = "/" + rootDir;
        this.messagesDir = this.rootDir + "/messages";
        this.participantsDir = this.rootDir + "/participants";
        this.sequenceDir = this.rootDir + "/sequence";
        this.leaderDir = this.rootDir + "/leader";
        this.id = id;
        this.receiver = receiver;
        try {
            this.zk = new ZooKeeper(zkConnectionStiring, 4000, this);
        } catch (IOException ex) {
            logger.error("Could not connect!", ex);
        }
 
    }
 
    private void initializeZkStructure() throws KeeperException, InterruptedException {
        if (zk.exists(this.rootDir, false) == null) {
            zk.create(this.rootDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
 
        if (zk.exists(sequenceDir, false) == null) {
            zk.create(sequenceDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        if (zk.exists(participantsDir, false) == null) {
            zk.create(participantsDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        if (zk.exists(leaderDir, false) == null) {
            zk.create(leaderDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        if (zk.exists(messagesDir, false) == null) {
            zk.create(messagesDir, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        String myIdPath = participantsDir + "/" + id;
        if (zk.exists(myIdPath, false) == null) {
            zk.create(myIdPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }
        if (!leaderExists()) {
            electLeader();
        } else {
            watchLeader();
        }
        watchMessages();
    }
 
    private void watchLeader() throws InterruptedException, KeeperException {
        List<String> leaders = zk.getChildren(leaderDir, false);
        Collections.sort(leaders);
        zk.exists(leaderDir + "/" + leaders.get(0), this);
    }
 
    private boolean leaderExists() throws InterruptedException, KeeperException {
        return !zk.getChildren(leaderDir, false).isEmpty();
    }
 
    private void electLeader() throws KeeperException, InterruptedException {
        amLeader = false;
        String ret[] = zk.create(leaderDir + "/x", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL).split("/");
        String myNode = ret[ret.length - 1];
        List<String> leaders = zk.getChildren(leaderDir, false);
        Collections.sort(leaders);
        if (leaders.get(0).equals(myNode)) {
            amLeader = true;
        } else {
            zk.delete(leaderDir + "/" + myNode, -1);
            zk.exists(leaderDir + "/" + leaders.get(0), this); // Watch Leader
        }
    }
 
    public void sendMessage(String msg) {
        try {
            List<String> nodes = zk.getChildren(participantsDir, this, null);
            String ret[] = zk.create(sequenceDir + "/msg", msg.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL).split("/");
            String msgNode = messagesDir + "/" + ret[ret.length - 1];
 
            List<Op> operations = new ArrayList<Op>();
            operations.add(Op.create(msgNode, msg.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
            for (String node : nodes) {
                operations.add(Op.create(msgNode + "/" + node, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
            }
            zk.multi(operations);
        } catch (KeeperException ex) {
            logger.error("ZooKeeper Exception!", ex);
        } catch (InterruptedException ex) {
            logger.error("Transaction interrupted!", ex);
        }
    }
 
    private void readMessages(WatchedEvent event) throws KeeperException, InterruptedException {
        List<String> participants = null;
        if (amLeader) {
            participants = zk.getChildren(participantsDir, false);
        }
 
        List<String> children = zk.getChildren(messagesDir, null);
        Collections.sort(children);
        for (String child : children) {
            String messagePath = messagesDir + "/" + child;
            List<String> messageShouldBeReadBy = zk.getChildren(messagePath, null);
            if (messageShouldBeReadBy.contains(id)) {
                byte[] message = zk.getData(messagePath, false, null);
                zk.delete(messagePath + "/" + id, -1);
                receiver.receiveMessage(new String(message));
            }
            if (amLeader) {
                messageShouldBeReadBy = zk.getChildren(messagePath, null);
                for (Iterator<String> it = messageShouldBeReadBy.iterator(); it.hasNext();) {
                    String readNode = it.next();
                    if (!participants.contains(readNode)) {
                        zk.delete(messagePath + "/" + readNode, -1);
                        it.remove();
                    }
                }
                if (messageShouldBeReadBy.isEmpty()) {
                    List<Op> ops = new ArrayList<Op>();
                    ops.add(Op.delete(messagePath, -1));
                    ops.add(Op.delete(sequenceDir + "/" + child, -1));
                    zk.multi(ops);
                }
            }
        }
    }
 
    @Override
    public void process(WatchedEvent event) {
        try {
            if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(messagesDir)) {
                readMessages(event);
                watchMessages();
            } else if (event.getType() == EventType.None) {
                initializeZkStructure();
            } else if (event.getType() == EventType.NodeDeleted && event.getPath().startsWith(leaderDir)) {
                electLeader();
            }
        } catch (KeeperException ex) {
            logger.error("ZooKeeper Exception!", ex);
        } catch (InterruptedException ex) {
            logger.error("Transaction interrupted!", ex);
        }
    }
 
    private void watchMessages() throws InterruptedException, KeeperException {
        zk.getChildren(this.rootDir + "/messages", this);
    }
 
    public static void main(String[] args) {
        Scanner s = new Scanner(System.in);
        ReliableMulticast rm = new ReliableMulticast(s.nextLine(),
                new MulticastReceiver() {
 
                    @Override
                    public void receiveMessage(String message) {
                        System.out.println("Multicast received: " + message);
                    }
                },
                "46.137.39.99:2181");
        String next;
        while (!(next = s.nextLine()).equals("EOF")) {
            rm.sendMessage(next);
        }
    }
}
zookeeper_research/reliable_multicast.txt · Last modified: 2012/06/02 12:02 by skd
 
Except where otherwise noted, content on this wiki is licensed under the following license: CC Attribution-Share Alike 3.0 Unported
Recent changes RSS feed Donate Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki