Apr 122013

Multicast operations are the operations that are sent from one process to a set of processes and the membership of a group is usually transparent for a sender [1]. However simple multicast protocol does not guarantee any ordering or message delivery. Therefor, stronger assumptions should be made in a frame of the nowadays distributed systems, such as, reliability. Some systems [5] relies on the reliable multicast, in which any transmitted message is either received by all or none processes. In other words, there could not be a situation where a client accesses a server just before it crashes and observe an update that no other server will process. This property is called uniform agreement. Moreover, to maintain a consistent and fault-tolerant system a total order assumption should be made additionally to reliable uniform multicast.

The simplest specification of the uniform reliable total order multicast can be defined in terms of two primitives [2], which are TO-multicast(m) and TO-deliver(m), where m is some message. When a process issued a uniquely identified message m as TO-multicast(m), this assumes following properties [3]:

• Validity. If a correct process TO-multicast a message m, then it eventually TO-delivers m.

• Uniform Agreement. If a process TO-delivers a message m, then all correct processes eventually TO-deliver m.

• Uniform Integrity. For any message m, every process TO-delivers m at most once, and only if m was previously TO-broadcast by the sender.

• Uniform Total Order. If two processes,p and q, both TO-deliver message m and m’, then p TO-deliver m before m’, if and only if q TO-delivers m before m’.

If all these properties satisfied then reliable total order multicast takes place. Uniformity in the system is presented as not allowance to deliver a message out of order by any process at any time.

Internally in NOMX, multicast communication is used for most of the subsystems as it is the only fast and reliable way to guarantee consistency and agreement within all nodes with minimal cost.

Although there are three main ways to maintain total order, e.g., symmetric messaging, collective agreement [Birman and Joseph 1987], sequencer based [Kaashoek 1989]. The system that I am developing within my master project uses the single sequencer ordering mechanism as the more efficient in comparison to the consensus one. The simpliest presentation of the total order ordering is illustrated on the picture down. This figure shows that no matter when the messages were issued they will be delivered in the same order to all the processes. For the sequenced mechanisms the main problem is a possible bottleneck and critical point of failure in sequencer part. Moreover, sequencer may limit the scalability of the system. It can be overcomes using the replicated standby sequencer that is delivers all messages issued by the primary one and takes over in case of failure.



[1] George F. Coulouris, Jean Dollimore, and Tim Kindberg. Distributed Systems: Concepts And Design. Pearson Education, 2005. ISBN 9780321263544.

[2] Xavier Défago, André Schiper, and Péter Urbán. Total order broad- cast and multicast algorithms: Taxonomy and survey. ACM Com- put. Surv., 36(4):372–421, December 2004. ISSN 0360-0300. URL http://doi.acm.org/10.1145/1041680.1041682.

[3] Vassos Hadzilacos and Sam Toueg. A modular approach to fault-tolerant broad- casts and related problems. Technical report, 1994.

[4] L. E. T. Rodrigues, H. Fonseca, and P. Verissimo, “Totally ordered multicast in large-scale systems,” in , Proceedings of the 16th International Conference on Distributed Computing Systems, 1996, 1996, pp. 503–510.


S. K. Kasera, J. Kurose, and D. Towsley, “Scalable reliable multicast using multiple multicast groups,” SIGMETRICS Perform. Eval. Rev., vol. 25, no. 1, pp. 64–74, Jun. 1997.
Mar 082013

Dynamo is a highly available key-value storage system that sacrifices consistency user certain failure scenarios. Moreover conflict resolution is placed mostly on the application side and versioning is highly used for it. The main contribution of the system is that they developed highly decentralized, loosely coupled, service oriented architecture with hundreds of services, combining different techniques.

Combination of different techniques is used to reach defined level of availability and scalability: Partitioning and replication is based on consistent hashing, and consistency is leveraged by object versioning. The consistency among replicas during the updates are facilitated by quorum-like techniques, while failure detection relies on gossip based protocols.


Simple read and writes operation is uniquely identified by a key and do not support any relational schema. Dynamo does not provide any isolation guarantees and permits only single key update. It support always writable design, as its applications require it. This way, conflict resolution is placed on the reads. Incremental scalability, symmetry, heterogeneity are key features of the system.

Only two operation exposed: get and put. Get return the object and its version, while put uses this version as one of the parameter when it’s called.

Partitioning of the data relies on the consistent hashing and this way load is distributed across hosts. Moreover, each node is mapped to  multiple points in the ring. Replication is done on the multiple hosts across the ring and ensured to have unique hosts as a replica, whereas the number of replicas is configured. Preference list is used to store replicas information.

Concurrent updates are resolved by versioning, this way updates can be propagated to all replicas. To resolve updates on the different sites vector clocks are adopted. This way causality between different versions can be tracked. So, each time an object requested to be updated, version number that was obtained before should be specified.

Consistency among replicas is maintained with quorum like mechanism, where W and R, write and read quorum respectively, are configured. On update (put) coordinator of the put generate a vector clock and write the new version of the data. Similarly for a get, where coordinator requests all existing versions for the key. But most of the time “sloppy quorum’ is used, where all read and write operation performed on the first N healthy nodes in the preference list.

This mix of the techniques proved to work to supply highly available and scalable data store, while consistency can be sacrificed in some failure scenarios. Moreover, all parameters, like read, write quorum and number of replicas can be configured by the user.


  • Cool
  • Inspiring
  • Scalable
  • Available
  • Writable


  • Sacrifices consistency
  • Hashing for load balancing and replication
  • Nothing more that get and put
  • Quite slow with all its conflict resolution and failure detection/handling
  • Target write specific applications
  • No security guarantees
  • No isolation guarantees
  • Only single key update
Mar 042013

It is quite a preliminary version of the problem description, i.e. motivation.

Again, any comments are more than welcome 🙂

Problem description

There are many existing distributed systems (DS) which are focused on optimization of the various systems properties, e.g. availability, robustness, consistency. Designing of a distributed data storage and data processing system for real time stock exchange environment is quite challenging and should meet strict SLA requirements. Current general purpose solutions are eager to sacrifice some properties in order to achieve great improvements in the other ones. Moreover, none of them leverages a uniform reliable total order multicast properties [] to supply fault-tolerant and ACID properties for the data operations. (Here a few paragraphs with some basic classification of DSS and their solution focus).

However, despite algorithmic advancements in total order broadcast and the developments of distributed database replication techniques based on it, limited research on applying these algorithms for large-scale data storage and data processing systems exists. (Here are a few sentences about total order algorithms and its application). Limited application in the real-time large-scale systems might be due to the previous scalability issues of the messaging systems, which was limited to the messaging bus capacity.

We are proposing a system, based on the NASDAQ OMX low latency uniform reliable totally ordered message bus, which is highly scalable, as the capacity of the message bus exceeds 2 million messages per second, available, and consistent. This messaging abstraction interprets unordered incoming stream of data into an ordered sequence of operation which are backed up by rewinders and therefore message gap-filling mechanism automatically supported and served by them. An ordered stream of data is published on the, so called, “message stream” and is seen by everyone on the stream. Based on this message bus, optimistic delivery can be assumed. In other words, an early indication of the estimated uniform total order is preserved and it is guaranteed to commit eventually all messages in the same order to all subscribed servers.

The main focus of this work is the leverage of reliable total order multicast protocol for building real time, fault-tolerant, ACID and low-latency distributed data store system. The major difficulty is to be able to guarantee fault-tolerance, availability for the system and ACID properties for the data operations. Moreover, supporting system in real time is challenging and maintaining distributed read queries and concurrent updates is no straightforward endeavor. To reach the performance goals, the following approach is applied:

  • Scalability: Adding extra instances on the stream is very easy. Therefore, the only thing that is required is to declare schemas and tables that are served by the data store.
  • Availability: Ability to serve request at any given time is provided for both simple operations and queries. First, capacity of the message bus can handle simple operations without extra tweaks. Second, read queries responses are sent directly to the requester and are served by the fastest data replica.
  • Consistency: As the underlying message passing abstraction produces a uniform reliable totally ordered stream of requests, each instance sees exactly the same sequence of messages. This gives a consistent view by any instance at any request time. Similarly for concurrent updates, totally ordered timestamps per update are used, hence timestamp concurrency control [] is deployed.
  • Fault-Resilience: As absolutely equal stream of requests are received by any of the replica, this way, failure of any instance during simple operations is not important. Failure of the data store during the query serving is handled by the simple snapshot indication message on the message stream. This way the query can be requested again from the fracture place.
  • Read Query Support: In order to increase the availability level, limitation on the query response is set. If the extension of the response is required, the query should be submitted again.


Mar 022013

I think it is kind of time to start working on the report draft 🙂

Here is first version of an abstract for my project report. Any commects are more that welcome!


In recent years the need for distributed, fault-tolerant, ACID and low latency data storage and data processing systems has led the way for new systems in the area of distributed systems. The growth of unbounded streams of data and the need to process them with low latency are some of the reasons for such interest in this area. At the same time, it was discussed that a total order algorithms is a fundamental building block in construction of a distributed fault-tolerant applications.

In this work, we are leveraging NASDAQ OMX low-latency uniform reliable totally ordered message bus with a capacity of 2 million messages per second. The ACID properties of the data operations are easily implemented using the messaging bus as it forwards all transactions in reliable total order fashion. Moreover, relying on the reliable totally ordered messaging, active replication support for fault handling and load balancing is integrated. Consequently, the prototype was developed using requirements from a production environment to demonstrate its feasibility.

Experimental results show that around 250 000 operations per second can be served with 100 microseconds latency. Queries response capacity is 100 Mbps. It was concluded that uniform totally ordered sequenced input data can be used in real time for large-scale distributed data storage and processing systems to provide availability, consistency and high performance.

Mar 012013

GENIUM Data Store (GDS) is a system I am working on for my Master thesis in NASDAQ.

GENIUM INET messaging bus

To achieve performance goals, GDS leverages the high-performance of GENIUM INET messaging bus. GENIUM INET messaging bus is based on UDP multicasting and is made reliable by totally ordered sequence of backed up messages with a gap-fill mechanism using rewinders. As it is well known, a total order broadcast algorithms are fundamental building blocks for fault-tolerant systems construction. The purpose of such algorithms is to provide a communication primitives that allows processes to agree on the set of messages and order that deliver. NASDAQ OMX implementation of this abstraction assumes a perfect failure detector, e.g. it forces a process to fail if it was considered faulty. Moreover, uniform reliable total order is preserved, where a process is not allowed to deliver any message out of order, even if it faulty.

The receivers of the ordered messages should guarantee exactly-once delivery to the applications for each message, this way uniform integrity is guaranteed. Across the cluster of applications/clients and servers connected to the message stream, messages should be delivered in the same order.

The message stream can be configured without restriction, so that it can contain any number of various clients that can be placed on the same or different server, with defined replication level.  The server/sequencer is responsible for the sequenced/ordered stream creation as an output from received clients’ messages.

Failure resilience is provided by the following:

  •  All message callbacks are fully deterministic and replayable (if single-threaded), as the incoming stream is identical each time it is received.
  • Replication can be adopted by installing the same receivers at multiple servers.
  • As long as the new primary rewound to the same point as the failed one, the message stream is sufficient to synchronize state.

GDS uses the privileges of the described above infrastructure and a priory maintain a fault-tolerant real-time system. Moreover, the distributed state is made consistent by adhering to the sequenced numbering implied by the message stream.


In a GENIUM Data Store, transactions are submitted through the MoldUDP, therefor it ensures the lowest possible transaction latency.

MoldUDP is a networking protocol that makes transmission of data messages efficient and scalable in a scenario where one transmitter and many listeners are present. It is a lightweight protocol that is built on top of UDP where missed packets can be easily traced and detected, but retransmission is not supported.

Some optimization can be applied to make this protocol more efficient: (a) multiple messages are aggregated into a single packet – to reduce network traffic, (b)  caching Re-request Server is placed near remote receiver – to reduce the latency and bandwidth.

MoldUDP presumes that system consists of listeners, which are subscribed to some multicast groups, and server, which transmits on those multicast groups. MoldUDP server transmits downstream packets through UDP multicast to send normal data stream addressed to listeners. MoldUDP server sends heartbeats periodically to clients, so they can retrieve information about packet loss if it takes place. Moreover, listeners should be configured with IP and port to which they can submit the requests.

Note: message in this context is an atomic piece of information that is carried by the MoldUDP protocol from 0 to 64 KB.


In GENIUM Data Store, read query support will be maintained. That is why, TCP-like protocol is intended to be used to stream the data to the client in response to the submitted query.

SoupTCP is a lightweight point-to-point protocol build on top of TCP/IP sockets. This protocol allows to deliver a set of sequenced messages from a  server to a client. It guarantees that the client will receive all messages sent from a server strictly in sequence even when failures occur.

Server functionality with SoupTCP includes: (a) clients authentication on login and (b)  delivery of a logical stream of sequenced messages to a client in a real-time scenario. Clients sends messages to a server which are not guaranteed to be delivered in case of failures. That’s why the client will need to resubmit the request to the server.

Protocol flow:

  • Client opens a TCP/IP socket to the server with login request.
  • If the login information is valid – server responds with accept and starts to send sequenced data.
  • Both client and server compute message number locally by simple counting of messages and the first message in a session is always 1.
  • Link failure detected by the hearbeating. Both server and client send these messages. Former is required to notify a client in case of failure to reconnect to another socket. Later is necessary to close the existing socket with failed client and listen for a new connection.
Feb 282013

PNUTS is a parallel and geographically distributed database system for serving web application with per-record consistency guarantees. Main concerns in the system are availability and low latency, this way consistency is tuned according to the fault-resilience and geo-replication is supported for latency reduction during the multi continent reads.


PNUTS is focused on serving web application, rather that complex queries, this was a simple relational model is exposed to the users. To stabilize robustness of the systems, different levels of redundancy are used that exhaust consistency against high availability during failures. Pub/Sub paradigm (Yahoo! Message Broker) is used for the asynchronous operations.

Data is organized into tables of attributed records, where schemas are quire flexible and can be changed without halting any operations. Moreover, the query language is quite simple and limited. It is limited to the selection and simple projection from a single table. However, this provides more flexible access compared to the hashes and ordered data, as they claimed.

System Architecture

Data table are horizontal and partitioned into groups across different servers. To determine the location of the tablet record and storage unit, routers are used, which contains only of cached copy of the mapping, while tablet controller owns that mapping.

Replication is based on the asynchronous messaging and ensures low latency. Data updates are considered committed only if it is published to the Yahoo! Message broker (YMB). It is guaranteed that after the commitment other YMB, updates will be asynchronously propagated to the appropriate regions. However, this guarantees only partial ordering. Messages published to one YMB are guaranteed to be delivered in the arrival order, while messaged arrived to the different YMB instances can be delivered in any order. For this reason, timeline consistency is used for such commits. This consistency supported by the presence of per record master whose order is preserved to be the order of delivery on every replica. Based on the replication, faults recovery are maintained. Copying is used for the recovery of lost tablets. Also checkpoint mechanism is issued after the copy request to ensure applicability of the current updates.

The most interesting thing is a multiget processing, that is based on the scatter-gather engine and is a component of the router. Here, router splits the request into parts and then gather the results on arrival.

Strong points:

  • Asynchronous updates = low latency
  • Record level consistency
  • Flexible schemas
  • Multiget, which retrieved multiple records in parallel
  • Scalable
  • Various application

Weak points:

  • No complex queries support, joins, group by
  • No always the most current version of the data returned on the request
  • No referential integrity
  • No serializable transactions
  • Partial ordering of the transactions
  • Concurrent updates conflicts resolution is not highlighted
  • Per-record master is a mystery 🙂
  • Slow multiget parallel requests
  • Failure detection is preserved…
Feb 202013

I think it is a  good idea to keep things not only in mind but also here, so I could come back to it.

I decided to start from the todo list for my thesis project:

  • Finish simple prototype with simple functionality (append)  — 26.02 I hope
  • Test 🙂  — 28.02
  • Add some failure handling  + Finalize Update, Delete– 01.03
  • Test some failure scenarious 🙂  — 03.03
  • Add more complex features (query)  — 10.03
  • Test 🙂  — 13.03
  • Add data store — 15.03
  • Add failure resistence functionality  —  20.03
  • Test Test Test 🙂 + Test failure scenarious 🙂   —  22.03
  • Write… … …    —   25.03
  • Think on streaming application  —  26.03
  • Do a simple prototype of aggregation application over a stream of data  — 31.03
  • Test 🙂  —  03.04
  • Do some bulshit and write an application to visualize messages in the system 🙂  —  10.04
  • Feel happy about the visualization part  —  14.04
  • Write Write Write…

For now, these dates look a bit too optimistic, but I hope that it will fluctuate no more that 1 week.

Gonna be a tough month 🙂

Feb 182013

Any suggestion on classification and extra Distributed Data Stores to review are more than welcome 🙂

Recently there has been increasing interest in NOSQL data storage to meet the highly intense demand of the applications. Representative work includes Bigtable, Cassandra and Yahoo PNUTS. In these systems, scalability is achieved by sacrificing some properties, e.g. transactions support. On the other side, most prevailing data storage systems use asynchronous replication schemes with a  weaker consistency model, e.g., Cassandra, HBase, CouchDB and Dynamo use an  eventual consistency model. Conventional database systems provide mature and sophisticated data management features, ut have difficulties is serving large-scale interactive applications. Open source database systems such as MySQL do not scale up to required levels, while expensive commercial database systems like Oracle significantly increase the total cost of ownership in large deployments. Moreover, neither of them offer fault-tolerant synchronous replication mechanism which is the key piece to build robust applications.


  Review follows in the next posts… Some info here


  Review follows in the next posts…

Combining the merit from both scalable data stores and databases, Genium Data Store (GDS) provides ACID guarantees with high scalability, fault-tolerance, consistency and availability. However in case of GDS, wide-area network semantic is not taken into account, as the range of applications, that will use GDS, do not require wide-area replication.

To guarantee consistency a few systems use Paxos to achieve synchronous replication, e.g. SCALARIS, Keyspace, Megastore.


  Review follows in the next posts…


  Review follows in the next posts…


  Review follows in the next posts…

In a chase for latency, MySQL Cluster is the one that can meet our requirements, however …. (should be something) 🙂

MySQL Cluster

  Review follows in the next posts…

Redis, ElasticSearch, Spanner, BlinkDB, God ….


Also I was thinking on the following classification:

  • Wide-Area Deployment. Those which are trying to solve wide-range synchronization
  • Short-Area Deployment. This is opposite to the above one.
  • Chase for latency

The main reason for this classification is that my project is not concerned about wide-area deployment.

Feb 132013

Megastore is a storage system develope by Google, as an improvement of their BigTable one. They claimed to support strong consistency and high availability guatantees. Moreover, fully serializable ACID semantics within fine-grained partitions of data are provided.

To achieve hight availability and consistency, synchronous replication is applied. Replication is implemeted using Paxos for every write across the data centers, e.g. synchronous replication for writes is supported.


  1. Availability and scalability are apriory build-in.
  2. Paxos replication and consensus algorithm is optimized for low-latency across geo distirubted datacentres. This way high availability is supported. (Low-latency = 10-1000 miliseconds)

Availability + Scalability

Availability is supported by using synchronous, fault-tolerant log replicator optimized for long distances. While scalability is supported with partitioning the database. Those two concepts are well thought and oriented for long-distance links and active communication over them.

To scale throughput data is partitioned into a collection of entity groups. Operation across entity groups leverage asynchronous messaging. It is alos used for logically distant entities. Minimizing latency and improving throughput are laid on the applocation control level.


Megastore is deployed through clients libraries to which applications are linked. This library implements Paxos, therefor they deal with failure detecting and replication. Durability of Paxos operations is provided by direct send of an operation to the local Bigtable instance.

Another cool thing about the Megastore is its various index tuning. e.g. secondary indexes can be declared, therefor optimise further lookup.

Concurrency control

Each Entity group is abstracted to be a DB that provide serializable ACID semantics. Each row i nthe Megastore can be stored with multiple values with different timestamps. This way multiversion concurency control supported. Similar approach I will use in my thesis project to suport concurrent updates to the DS.

For atomic updates across entity groups a two phase commit can be used. However its usage is not recommended as it may increase the risk of contention.


Reads and writes can be instantiated from any replica preserving ACID semantic. Repication is performed per entity group. Simply, transaction log synchronously transmitted to a quorum of replicas. Agreemet between replicas is maintained by an optimized Paxos. Coordinator tracks a set of entities groups for each replica. Fast writes are leveraged by teh master-based approach (where each read and write are dedicated to one master). This way Paxos prepare stage can be skipped, if the previous requests succeded.

Strong side

  • Partitioning the data into entity groups. This way suport for ACIT transaction is guarateed.
  • Consistency level can be loosed but the user, therefor the latency improves.
  • Indexation is extremely useful for development.
  • Extension of Paxos to optimize wide-range aggrements.

Weak side

  • Partitioning of the data laid on the application side.
  • Write rate is quite low
  • Joins give an extra overhead, as the data should be denotmalized
  • Asymetric network partitioning vulnerability.
  • Chubby for failure detection, which is, most probably, not the most effecient tool.
Jan 292013

Designing architecture from scratch may require some tools and approaches to put all the things together.

High-Level structure of software can be illustrated/represented variously, and one of the approaches is to make an Architectural Blueprint with 5 different views to the system [by Philippe Kruchten].

Main views:

  • Logical View – an object model of the design.
  • Process View – concurrency and synchronization aspects.
  • Physical View – mapping of the software to the hardware.
  • Development View – static organization of the software.
  • Use Cases – various usage scenarios.

The Logical Architecture

Serves the functional requirements and decomposes the system into a form of objects and object classes. Class diagrams and class templates are usually used to illustrate this abstraction. Common mechanisms or services are defined in class utilities. Keeping a single, coherent object model across the whole system is a general advice when building a logical view.


The Process Architecture

It takes into account non-functional requirements, performance and availability, concurrency and distribution, system integrity and fault-tolerance. it can be represented as a high level view to a set of independently executing logical networks of communication programs that are distributed across a set of hardware. A process is a group of tasks that form an executable unit and which can be (a) tactically controlled, (b) replicated, (c) partitioned into a  set of independent tasks: major and minor (cyclic activities, buffering, time-outs).


The Development Architecture

It represents software module organization on the software development environment. It consists of libraries and subsystems representation. The subsystems are organized into the hierarchy of layers with well-defined interface. Overall, this view is represented by module and subsystem diagram, showing the export/import relationships. Also it takes into account internal requirements. Layered style is recommended for this view.


The Physical Architecture

It represents non-functional requirements such as availability, reliability, scalability and performance. It shows how networks, processes, tasks and objects are mapped onto the various nodes.



This fifth view is redundancy but it has two main purposes:

  • driver to discover the architectural elements during the architecture design
  • validation and illustration role after the architecture design is complete, also it can be used as a starting point for the tests of an architectural prototype.

It uses the components of the logical view with connector’s elements from the Process view for the interaction between the objects.