Jan 162013

DBMS with extremely low latency? Suitable for real-time requirements?

MySQL CLuster –  is an Open Source, Scalable and highly Performable DataBase. It is a technology that enables clustering of in-memory database in a shared-nothing system.

Following the paper on MySQL Cluster from Mikael  Ronstrom MySql Cluster combining with Dolphin Express cluster is highly suitable for the  low latency and real time tasks. Dolphin Express cluster provides low latency on the hardware level in combination with optimized TCP/IP bypass software – SuperSockets. All these interconnections and combination, makes MySQL Cluster improve latency, efficiency and bandwidth. Moreover, Dolphine Express hardware can handle 3-5 times higher bandwidth than Gigabit Ethernet.

Even though the results presented in the paper is quite old, latency improvment is quite significant. Nowadays, the latency of Dolphin SuperSockets as big as a couple of microseconds, as it is visualized below.

The newest version of the MySQLCluster 7.2 reaches a throughput of 72M reads/sec (a bit unclear what is “operation”: simple read/write or something more complex)  on 30 node cluster using MySQLCluster 7.2.5. How is it possible to reach such numbers?  Well tuned and very deep performance analysis, so that any strange behavior can be controlled or fixed.

Interesting, what will be the performance of my Thesis System? 🙂 Stay tuned!

Jan 162013

One of the possible directions for my thesis work is going to be a stream data processing. That’s why it is worth reviewing some possible architectures, which at least can be used afterwords to compare my future system with existing ones.

Data stream processing system become quite popular due to the enormous growth (in particular in Volume, Variety, Velocity) of big data analytics on the fly. Starting from Machine Learning systems, such as Mahout, WEKA etc., continuing with a financial trading system, finally ending with realtime world process analysis. Among distributed systems for data stream processing, I would like to highlight two: S4(Simple Scalable Streaming System) and Storm.


Storm is a real time computation system for reliable processing of unbounded stream of data. Storm topology takes streams of data and process them in a complex way.  A good thing about this system is that it is can be integrated with any queueing system and any database system. On the one hand, Storm is needed simply to connect to the database and in response it will handle the parallelization, partitioning and failures when necessary. Within storm tuples can be manipulated. Tuple is a names list of values.

Storm supports three abstractions: spouts, bolts and topologies. Spout – source of streams in a computation. Bolt – process input streams and produce another output streams. Topology – network of spouts and bolts, where the bolt is an edge subscribed to the other spout or bolt. Topologies are submitted using Apache Thrift. On another hand, Storm is scalable and can be run across a cluster of machines. At the same time, fault-tolerance is taken into account. Storm daemons are developed to be stateless and fail-fast. Nimbus and Supervisors are used to maintain fault-tolerance properties.  What if:

  • Worker dies? Supervisor will restart it. If it is still not responding with pings to Nimbus, Nimbus will reassign the worker to another machine.
  • Node dies? Tasks, assigned to that machine, will time-out and Nimbus will reassign them to another machine.
  • Nimbus (N) or Supervisor (S) dies? All states stored in Zookeeper or on disk. Usually N and S should be run over supervision of daemontools or monit. That’s why on restart they will start function as nothing had happened. Additionally, no worker processes are effected by the fail of N or S.
  • Nimbus dies? Nothing critical will happen if N fails. However the ability to reassign processes to other machines won’t be able (which is equivalent to losing an opportunity to reassign tasks to other machines).

Each message in the system, coming out of the spout, is guaranteed to be fully processed. Storm assumes a tuple that is fully processed when the tuple tree processed all the messages.


The system has a master and workers. The master holds Nimbus, which perform code distribution, task assignment and failure monitoring. Each worker has a Supervisor, which listen to the incoming tasks, start and ends the processes. Each worker execute part of a topology, assigned to it. Finally, the coordination between the master and workers is handled by ZooKeeper. All Nimbus and Supervisors states are stored either in ZooKeeper or Local Disks.

As it was said before, the whole computation is based on a built computational tree – topologie. Over topologie a stream of data is circling. Each node in Storm topology executes in parallel.

Stream grouping is one of the distinctive feature of a Storm system. It manages topology to send tuples between two components in a required fashion (send it to a specific task in a component).


S4 is a simple, scalable and flexible processing platform for data streams. Processing elements in  the system can be specified and form directed graphs when a data stream is supposed to flow. Nodes in the graph is called Processing Elements(PE) through which data is sent in the form of events. ZooKeeper is used for managing the configuration and tasks assignment. This way the system can be dynamically reconfigured and resolve failures by reassigning tasks to idle nodes.

PE is the basic computation units in the system and uniquely identified by functionality, types of events, keyed attribute and a value. However a keyless PEs are also possible and can accept all events to which they are associated, while keyed PE accept only events with corresponding key. Usually keyless are the first elements in a processing path. Each node usually have one instance of any keyless PE. Each PE has a state which can be changes as a reaction on received event.

Processing Node (PN) is hosting a running PE. Through PN different PEs can be reached, interruction with a communicatino leyer is estabblished and emmiting output messages are supported. Listener component in PN is used to receive events. Events are routed to PN based on a hash function for keyed attribute .

Processing Node and Communication Layer in S4

Cluster management and automatic failover provided by the communication layer.

The biggest difference between Storm and S4 is that Storm guarantees messages will be processed even in the face of failures whereas S4 will sometimes lose messages. However the state recovery is not supported by Storm.

Jan 152013

My third day at my thesis host company – NASDAQ OMX, Stockholm.

Big eyes.

Happy face.

Hands ready to code.

This is the spirit!

Feel like I will have a crazy 5 month of work with an amazing experience and hands/head ready to handle any problem.

Finally, going down to the thing I’m working on and the thesis topic:

GENIUM DataStore:Distriubted Data Store and Data Processing System.

The system should face very strict requirements of extremely low latency, high availability and scalability. When I’m talking about the low latency requirement, I mean a latency of 100 microseconds per an operation and 250000 transactions per seconds, which for now sounds quite insane:). Two main paradigms will be supported: storing and processing the data. Storing will be a kinda integration part, through which API an information can be stored to already internally used DBs, both relational and key-value. Processing part is similar to the stream processing system and be integrated with the storage part.

Building a system with a high requirements on consistency(C) and fault-tolerance(FT) is supposed to be a pain in the rear part of the body, but using the internally developed libraries for C and FT support will make my life way much easier. However, the performance goals can be a real challenge.

The main goal is not to make a system to have a variety of features and be “almost stable”version, but to have less features and be a stable release. Another side of the work here, is that whatever you are building should be well thought from the perspective of having critical data and a great responsibility for making mistakes. Hooorayyy…

Thesis structure. Preliminary thoughts.

I was also thinking on the structure of the final document already and it is actually quite hard to think of any at this point. I still sure that my vision will change, however here it is:

  • Abstract (Cap. Obvious :D)
  • Introduction

Structure of the Document

  • Background and Related Work

NASDAQ INET (just some words to make at least unclear pic of the base system for GENIUM DS)
Distributed Data Storage Systems
Distributed Data Stream Processing Systems

  • GENIUM DataStore

NASDAQ OMX Domain (What is NASDAQ, what data is used, volume of the data to be processed)
Architecture (and Reasoning for such architecture)
Fault Resilience
Have no idea what more… but for sure should be something

  • Implementation Details (I’m sure that this part will be neseccary, as the coding will be my main occupation these days:))

Failure Scenarios

  • Experimental Results

Set Up
Scalability and Performance

  • Discussion

Main findings
Low Latency
Positioning??? (can’t find any more suitable word… will think on it… one day:))
Comparison to other existing systems (if possible)
Future Work

  • Conclusions
  • References

Let’s see what is going to be in 1 month:)

Finally, I will hope not to become a turtle below and behave:)

Julia is not a turtle 🙂

Nov 082012

In the field of Distributed Data-Storage is it almost impossible to come up with universal system that will satisfy all needs. That’s why, recently, various distributed storage systems appear to face different needs and use different approaches.

DynamoDB uses a key-value interface with only replication within a region. I haven’t checked myself the latencies range, but from its website latency is varies witing single digit milliseconds, what is at least 10 times more that I want to reach in the thesis system.

Megastore doesn’t reach a great performance because it is based on the Bigtable (with high communication cost), however it is scalalble and consistent. Synchronization for wide area reslication is done with Paxos.  Taking into account scalbility, consistency and faults priviledges, latency is sacrifiesed and is witing 100-400 milliseconds for reads and writes.

Scatter is a DHT-based key-value store that layers transactions on top of consistent replication (uses a low level interface). Even though it provides high availaility and scales well, still latencies for the operations are witin milliseconds.

VoltDB is an in-memory db that support master-slave replication over wide area range.

Cassandra is an column based storage developed and used by Facebook with reads within milliseconds.

Spanner provides semi-relational data model support and provides high performance, high level interface, general-purpose transactions and external consistency (using GPS and atomics clocks with new concept of time leases: TrueTime). Spanner also integrates concurrency control with replication. The main contribution of the paper is that the system solves the problem of wide-area replication system and that it implements globally synchronized timestamps (support strong consistency and linearizability for writes and snapshop isolation for reads). Good: TrueTime. Interleaving data. Atomic schema change. Snapshop reads for the past. Weak: Possible clocks uncertainty. Paxos groups are not reconfigurable. Read-Only transaction with trivial solution for executing reads (if there are a few Paxos groups, Spanner is not using communication within this groups and simply apply the latest timestamp on the read). Typical reads are near 10 ms and writes average is 100 ms.

Which characteristics can be sacrified in order to reach specific goals? The answer is: the system should be adopted as much as posible to the needs. Another thing when you are actually chasing for the latencies… Most probably rare DB will fit your requirements…

If it is not 90% well suited – Let the funny part start -> Do it yourself 🙂 Like me:))))

Nov 072012

Pregel is a system for processing large-scale graphs. The main contributions of the paper are the following: 1) fault-tolerant distributed programming framework for graph algorithms execution; 2)API with direct message passing among vertices.

Brief description:

Pregel is a synchronized computation process on vertices. Upon inserting a graph as an input, the graph is divided into partitions (using consistent hashing), which include a set of vertices and their outgoing edges.  One of the machine is coordinator. The workers then undergo a series of iterations, called supersteps. In every superstep, all vertices in each worker execute the same user-defined function which can (a) receive messages sent during the previous superstep, (b) modify the state of the vertex and its outgoing edges (vertices and edges are kept on the machines) and (c) send messages to be delivered during the next superstep. At the end of each superstep a global synchronization point occurs. Vertices can become inactive and the sequence of iterations terminates when all vertices are inactive and there are no messages in transit. During computation, the master also sends ping messages to check for workers failures. The network is used only for sending messages and therefore it significantly reduces the communication overhead, becoming more efficient.

The presentation can be found here:


  • Usability
  • Scalability
  • Performance
  • Transparency in vertex-to-machine assigning

Not good/Open questions/Future improvements:

  • How to detect a master failure?
  • Why not to compare with MapReduce comparison in evaluation part.
  • Failure Detection needed to be improved to confined recovery.
  • Improve automatisation for defining user-defined functions
  • hash(VertexID) mod Partition – Why not to use smarter vertex assigning (to reduce then network usage)?