Aug 162013

Recently I was asked to become a review for a book Apache Kafka published by the PACKT publishing. I was happy to accept the offer and now I am more than happy to share my view of the technology.

Apache Kafka is a distributed publish-subscribe messaging system that is used in many top IT companies, such as LinkedIn, Twitter, Spotify, DataSift, Square etc. The system guarantees the following properties: persistent messaging with a constant access to disc structures and high performance; high throughput; distributiveness ūüôā as the access to the system is both load balanced and partitioned to reduce the pressure on the one node; real-time properties and Hadoop and Storm integration.

Installation and building the system is very easy and won’t take much time. Depending on your needs a various set ups for the cluster are possible: single node – one broker (core Kafka process), single node – multiple brokers, multiple nodes – multiple brokers. Depending on the choice, the Kafka system should be configured properly, e.g. configuration properties of each broker need to be known by the producer (message sources).

The following figure shows the simplest case, when the messages are published by producers and, through the Kafka broker, they are delivered to the consumers. In this scenario (single node – single broker) all producers, broker and consumers are run on different machines.



Among important design solutions are the following: message caching, opportunity to re consume messages, group messages (reduce network overhead), local maintenance of the consumed messages, the system is purely decentralized and uses zookeeper for load balancing, both asynchronous and synchronous messaging is supported; moreover, a message compression to reduce the network load is used, e.g., gzip or google snappy. Additionally, the need of replication of any active system puts Kafka to the new level. Apache Kafka allow mirroring of an active data cluster into a passive one. It simply consume the messages form the source cluster and republish them on the target cluster. This feature is highly useful if the reliability and fault-tolerance are important concerns. Additionally, a simple replication is implemented within the system. This mainly is reached by the partitioning of messages (hashes) and a presence of a lead replica for each partition (this way both synch and asynch replication can be arranged).

The API provide a sufficient support to create both producer and consumers. Additionally, consumer can be one of two different type: one that ignore further processing of messages and just need the message to be delivered and another that require further processing of the message upon delivery. Consumer implementation can be both single threaded and multithreaded. However to prevent any unpredictable behavior the number of threads should correspond to the number of topics to consume. This way the full utilization of the threads and necessary message order will be preserved.

Kafka can be integrated with the the following technologies: Twitter Storm and Apache Hadoop for further online stream processing and offline analysis respectively. Storm integration is implemented on the consumer side. That is, Kafka consumer represented as regular Storm spouts that reads the data from the Kafka cluster. On the other hand, integration with Hadoop is bilateral, as it can be integrated with Hadoop as both producer and consumer.

In the former, Hadoop is integrated as a bridge for publishing the data to the Kafka broker. Hadoop producer extracts the data from the system in two ways: (a) use Pig scripts for writing data in binary Avro format (here, writes to multiple topics are made easier), (b) use Kafka “OutputFormat” class which publish data as bytes and provides control over output.

In the latter, Hadoop consumer represents a Hadoop job that pulls information from the Kafka system to HDFS. This might happen both sequentially and in parallel.

Pros: Easy to use and deploy; scalable; fast; distributed.

Cons: Might lack configurability and automatization; replication improvements required; fault-tolerance optimization.

I am going to actually test the framework and compare it with the current project I am working on at EPFL (awesome highly dynamic bft decentralized scalable pub/sub system). Stay tuned:)

Mar 082013

Dynamo is a highly available key-value storage system that sacrifices consistency user certain failure scenarios. Moreover conflict resolution is placed mostly on the application side and versioning is highly used for it. The main contribution of the system is that they developed highly decentralized, loosely coupled, service oriented architecture with hundreds of services, combining different techniques.

Combination of different techniques is used to reach defined level of availability and scalability: Partitioning and replication is based on consistent hashing, and consistency is leveraged by object versioning. The consistency among replicas during the updates are facilitated by quorum-like techniques, while failure detection relies on gossip based protocols.


Simple read and writes operation is uniquely identified by a key and do not support any relational schema. Dynamo does not provide any isolation guarantees and permits only single key update. It support always writable design, as its applications require it. This way, conflict resolution is placed on the reads. Incremental scalability, symmetry, heterogeneity are key features of the system.

Only two operation exposed: get and put. Get return the object and its version, while put uses this version as one of the parameter when it’s called.

Partitioning of the data relies on the consistent hashing and this way load is distributed across hosts. Moreover, each node is mapped to  multiple points in the ring. Replication is done on the multiple hosts across the ring and ensured to have unique hosts as a replica, whereas the number of replicas is configured. Preference list is used to store replicas information.

Concurrent updates are resolved by versioning, this way updates can be propagated to all replicas. To resolve updates on the different sites vector clocks are adopted. This way causality between different versions can be tracked. So, each time an object requested to be updated, version number that was obtained before should be specified.

Consistency among replicas is maintained with quorum like mechanism, where W and R, write and read quorum respectively, are configured. On update (put) coordinator of the put generate a vector clock and write the new version of the data. Similarly for a get, where coordinator requests all existing versions for the key. But most of the time “sloppy quorum’ is used, where all read and write operation performed on the first N healthy nodes in the preference list.

This mix of the techniques proved to work to supply highly available and scalable data store, while consistency can be sacrificed in some failure scenarios. Moreover, all parameters, like read, write quorum and number of replicas can be configured by the user.


  • Cool
  • Inspiring
  • Scalable
  • Available
  • Writable


  • Sacrifices consistency
  • Hashing for load balancing and replication
  • Nothing more that get and put
  • Quite slow with all its¬†conflict resolution and failure detection/handling
  • Target write specific applications
  • No security guarantees
  • No isolation guarantees
  • Only single key update
Feb 282013

PNUTS is a parallel and geographically distributed database system for serving web application with per-record consistency guarantees. Main concerns in the system are availability and low latency, this way consistency is tuned according to the fault-resilience and geo-replication is supported for latency reduction during the multi continent reads.


PNUTS is focused on serving web application, rather that complex queries, this was a simple relational model is exposed to the users. To stabilize robustness of the systems, different levels of redundancy are used that exhaust consistency against high availability during failures. Pub/Sub paradigm (Yahoo! Message Broker) is used for the asynchronous operations.

Data is organized into tables of attributed records, where schemas are quire flexible and can be changed without halting any operations. Moreover, the query language is quite simple and limited. It is limited to the selection and simple projection from a single table. However, this provides more flexible access compared to the hashes and ordered data, as they claimed.

System Architecture

Data table are horizontal and partitioned into groups across different servers. To determine the location of the tablet record and storage unit, routers are used, which contains only of cached copy of the mapping, while tablet controller owns that mapping.

Replication is based on the asynchronous messaging and ensures low latency. Data updates are considered committed only if it is published to the Yahoo! Message broker (YMB). It is guaranteed that after the commitment other YMB, updates will be asynchronously propagated to the appropriate regions. However, this guarantees only partial ordering. Messages published to one YMB are guaranteed to be delivered in the arrival order, while messaged arrived to the different YMB instances can be delivered in any order. For this reason, timeline consistency is used for such commits. This consistency supported by the presence of per record master whose order is preserved to be the order of delivery on every replica. Based on the replication, faults recovery are maintained. Copying is used for the recovery of lost tablets. Also checkpoint mechanism is issued after the copy request to ensure applicability of the current updates.

The most interesting thing is a multiget processing, that is based on the scatter-gather engine and is a component of the router. Here, router splits the request into parts and then gather the results on arrival.

Strong points:

  • Asynchronous updates = low latency
  • Record level consistency
  • Flexible schemas
  • Multiget, which retrieved multiple records in parallel
  • Scalable
  • Various application

Weak points:

  • No complex queries support, joins, group by
  • No always the most current version of the data returned on the request
  • No referential integrity
  • No serializable transactions
  • Partial ordering of the transactions
  • Concurrent updates conflicts resolution is not highlighted
  • Per-record master is a mystery ūüôā
  • Slow multiget parallel requests
  • Failure detection is preserved…
Feb 132013

Megastore is a storage system develope by Google, as an improvement of their BigTable one. They claimed to support strong consistency and high availability guatantees. Moreover, fully serializable ACID semantics within fine-grained partitions of data are provided.

To achieve hight availability and consistency, synchronous replication is applied. Replication is implemeted using Paxos for every write across the data centers, e.g. synchronous replication for writes is supported.


  1. Availability and scalability are apriory build-in.
  2. Paxos replication and consensus algorithm is optimized for low-latency across geo distirubted datacentres. This way high availability is supported. (Low-latency = 10-1000 miliseconds)

Availability + Scalability

Availability is supported by using synchronous, fault-tolerant log replicator optimized for long distances. While scalability is supported with partitioning the database. Those two concepts are well thought and oriented for long-distance links and active communication over them.

To scale throughput data is partitioned into a collection of entity groups. Operation across entity groups leverage asynchronous messaging. It is alos used for logically distant entities. Minimizing latency and improving throughput are laid on the applocation control level.


Megastore is deployed through clients libraries to which applications are linked. This library implements Paxos, therefor they deal with failure detecting and replication. Durability of Paxos operations is provided by direct send of an operation to the local Bigtable instance.

Another cool thing about the Megastore is its various index tuning. e.g. secondary indexes can be declared, therefor optimise further lookup.

Concurrency control

Each Entity group is abstracted to be a DB that provide serializable ACID semantics. Each row i nthe Megastore can be stored with multiple values with different timestamps. This way multiversion concurency control supported. Similar approach I will use in my thesis project to suport concurrent updates to the DS.

For atomic updates across entity groups a two phase commit can be used. However its usage is not recommended as it may increase the risk of contention.


Reads and writes can be instantiated from any replica preserving ACID semantic. Repication is performed per entity group. Simply, transaction log synchronously transmitted to a quorum of replicas. Agreemet between replicas is maintained by an optimized Paxos. Coordinator tracks a set of entities groups for each replica. Fast writes are leveraged by teh master-based approach (where each read and write are dedicated to one master). This way Paxos prepare stage can be skipped, if the previous requests succeded.

Strong side

  • Partitioning the data into entity groups. This way suport for ACIT transaction is guarateed.
  • Consistency level can be loosed but the user, therefor the latency improves.
  • Indexation is extremely useful for development.
  • Extension of Paxos to optimize wide-range aggrements.

Weak side

  • Partitioning of the data laid on the application side.
  • Write rate is quite low
  • Joins give an extra overhead, as the data should be denotmalized
  • Asymetric network partitioning vulnerability.
  • Chubby for failure detection, which is, most probably, not the most effecient tool.
Jan 292013

Designing architecture from scratch may require some tools and approaches to put all the things together.

High-Level structure of software can be illustrated/represented variously, and one of the approaches is to make an Architectural Blueprint with 5 different views to the system [by Philippe Kruchten].

Main views:

  • Logical View – an object model of the design.
  • Process View – concurrency and synchronization aspects.
  • Physical View – mapping of the software to the hardware.
  • Development View – static organization of the software.
  • Use Cases – various usage scenarios.

The Logical Architecture

Serves the functional requirements and decomposes the system into a form of objects and object classes. Class diagrams and class templates are usually used to illustrate this abstraction. Common mechanisms or services are defined in class utilities. Keeping a single, coherent object model across the whole system is a general advice when building a logical view.


The Process Architecture

It takes into account non-functional requirements, performance and availability, concurrency and distribution, system integrity and fault-tolerance. it can be represented as a high level view to a set of independently executing logical networks of communication programs that are distributed across a set of hardware. A process is a group of tasks that form an executable unit and which can be (a) tactically controlled, (b) replicated, (c) partitioned into a  set of independent tasks: major and minor (cyclic activities, buffering, time-outs).


The Development Architecture

It represents software module organization on the software development environment. It consists of libraries and subsystems representation. The subsystems are organized into the hierarchy of layers with well-defined interface. Overall, this view is represented by module and subsystem diagram, showing the export/import relationships. Also it takes into account internal requirements. Layered style is recommended for this view.


The Physical Architecture

It represents non-functional requirements such as availability, reliability, scalability and performance. It shows how networks, processes, tasks and objects are mapped onto the various nodes.



This fifth view is redundancy but it has two main purposes:

  • driver to discover the architectural elements during the architecture design
  • validation and illustration role after the architecture design is complete, also it can be used as a starting point for the tests of an architectural prototype.

It uses the components of the logical view with connector’s elements from the Process view for the interaction between the objects.



Jan 162013

One of the possible directions for my thesis work is going to be a stream data processing. That’s why it is worth reviewing some possible architectures, which at least can be used afterwords to compare my future system with existing ones.

Data stream processing system become quite popular due to the enormous growth (in particular in Volume, Variety, Velocity) of big data analytics on the fly. Starting from Machine Learning systems, such as Mahout, WEKA etc., continuing with a financial trading system, finally ending with realtime world process analysis. Among distributed systems for data stream processing, I would like to highlight two: S4(Simple Scalable Streaming System) and Storm.


Storm is a real time computation system for reliable processing of unbounded stream of data. Storm topology takes streams of data and process them in a complex way.  A good thing about this system is that it is can be integrated with any queueing system and any database system. On the one hand, Storm is needed simply to connect to the database and in response it will handle the parallelization, partitioning and failures when necessary. Within storm tuples can be manipulated. Tuple is a names list of values.

Storm supports three abstractions: spouts, bolts and topologies. Spout Рsource of streams in a computation. Bolt Рprocess input streams and produce another output streams. Topology Рnetwork of spouts and bolts, where the bolt is an edge subscribed to the other spout or bolt. Topologies are submitted using Apache Thrift. On another hand, Storm is scalable and can be run across a cluster of machines. At the same time, fault-tolerance is taken into account. Storm daemons are developed to be stateless and fail-fast. Nimbus and Supervisors are used to maintain fault-tolerance properties.  What if:

  • Worker dies?¬†Supervisor will restart it. If it is still not responding with pings to Nimbus, Nimbus¬†will reassign the worker to another machine.
  • Node dies?¬†Tasks, assigned to that machine, will time-out and Nimbus will reassign them to another machine.
  • Nimbus (N) or Supervisor (S) dies? All states stored in Zookeeper or on disk. Usually N and S should be run over supervision of daemontools or monit. That’s why on restart they will start function as nothing had happened. Additionally, no worker processes are effected by the fail of N or S.
  • Nimbus dies?¬†Nothing critical will happen if N fails. However the ability to reassign processes to other machines won’t be able (which is equivalent to losing¬†an opportunity to reassign tasks to other machines).

Each message in the system, coming out of the spout, is guaranteed to be fully processed. Storm assumes a tuple that is fully processed when the tuple tree processed all the messages.


The system has a master and workers. The master holds Nimbus, which perform code distribution, task assignment and failure monitoring. Each worker has a Supervisor, which listen to the incoming tasks, start and ends the processes. Each worker execute part of a topology, assigned to it. Finally, the coordination between the master and workers is handled by ZooKeeper. All Nimbus and Supervisors states are stored either in ZooKeeper or Local Disks.

As it was said before, the whole computation is based on a built computational tree – topologie. Over topologie a stream of data is circling. Each node in Storm topology executes in parallel.

Stream grouping is one of the distinctive feature of a Storm system. It manages topology to send tuples between two components in a required fashion (send it to a specific task in a component).


S4 is a simple, scalable and flexible processing platform for data streams. Processing elements in  the system can be specified and form directed graphs when a data stream is supposed to flow. Nodes in the graph is called Processing Elements(PE) through which data is sent in the form of events. ZooKeeper is used for managing the configuration and tasks assignment. This way the system can be dynamically reconfigured and resolve failures by reassigning tasks to idle nodes.

PE is the basic computation units in the system and uniquely identified by functionality, types of events, keyed attribute and a value. However a keyless PEs are also possible and can accept all events to which they are associated, while keyed PE accept only events with corresponding key. Usually keyless are the first elements in a processing path. Each node usually have one instance of any keyless PE. Each PE has a state which can be changes as a reaction on received event.

Processing Node (PN) is hosting a running PE. Through PN different PEs can be reached, interruction with a communicatino leyer is estabblished and emmiting output messages are supported. Listener component in PN is used to receive events. Events are routed to PN based on a hash function for keyed attribute .

Processing Node and Communication Layer in S4

Cluster management and automatic failover provided by the communication layer.

The biggest difference between Storm and S4 is that Storm guarantees messages will be processed even in the face of failures whereas S4 will sometimes lose messages. However the state recovery is not supported by Storm.