Jan 252013

Most probably many of you asked yourselves: To build from scratch or Not To build from scratch?

  1. – To build!
  2. – Not to build!
  3. – To build!
  4. – Not to build!

Hmm… ¬†Hard to decide? Take a look on the diagram and decide!

To build from scratch or not to build from scratch

Is there any Re Use in your answer? So you are one of those lucky bastards who can relax for a while and enjoy the cocktail ūüôā

Re-use in all its beauty

Distributed platforms and frameworks aka Map-Reduce:

  • Apache Hadoop is an open source software for reliable, scalable, distributed computing. It is allows to use simple programming models to support distributed processing of a large data sets across clusters of computers. The main components are HDFS (distributed file system), Hadoop YARN (framework for job scheduling resource management), Hadoop MapReduce (system for parallel large data sets processing).
  • YARN is an aka second generation of MapReduce. The main idea behind its architecture is to split JobTracker’s resource management and job scheduling/monitoring into separate daemons. So that a Global Resource Manager and per-application AppManager were represented in the system.
  • Disco Project is a framework that is based on a MapReduce paradigm. It is on open Source Nokia Research Centre project which purpose is to serve handling of massive amount of data. According to the documentation it distribubed and replicates your data, schedules your jobs efficiently. Moreover, indexing of large amount of records is supported, so that real time querying is possible.

    • Master adds jobs to the job queue and run them when nodes are available.
    • Client submit jobs to the master
    • Slaves are started on the nodes by master and spawn and monitor processes.
    • Workers do the jobs. Output location is notified to the Master.
    • Data locality mechanisms are applied once results are saved to the cluster.

  • Spring Batch is an open source framework for batch processing. It is provide development of robust batch applications. It is built on Spring Framework. It supports logging, transaction management, statistics, restart, skip and resource management. Some optimization and partitioning techniques can be tuned in the system as well.

    • Infrastructure level is a low level tool. It is provide an opportunity to batch operations together and retry if an error occurred.
    • Execution environment provides robust features for tracing and management of the batch lifecycle.
    • Core module is the batch-focused domain and implementation. It includes statistics, job launch, restart.

  • Gearman is a framework to delegate tasks to another machines and processes that are more suited for it. Parallel execution of work is supported, as well as load balancing, multi language function calls.

    • A client, a worker and a job server are the parts of the system.
    • The client creates jobs and sends them to job server.
    • Job server forward tasks to suitable workers.
    • Worker works and responds to the client through job server.
    • Communication with the server is established through TCP sockets.

Distributed platforms and frameworks aka Directed Acyclic Graph aka Stream Processing Systems:

  • Spark¬†a system that is optimized for data analytics to make it fast both to run and write. It is suited for in-memory data processing. Api is in Java and Scala. Purpose: machine learning and data mining, also general purpose is possible. It runs on Apache Mesos to share resources with Hadoop and other apps.
  • Dryad is a general purpose runtime for execution data parallel apps. It is modeled as a directed acyclic graph (DAG) which defines the dataflow of the application and vertices that represents the operations that should be performed on the data. However, creation of a graph is quite tricky part. That is why some high-level language compilers were created. One of them is DryadLINQ.
  • S4 and Storm
    are both for real time, reliable processing of unbounded stream of data. The main difference between Storm and S4 is the Storm guarantees messages to be processes even while failures occur, and S4 supports state recovery. More in the previous post.

Distributed Data Storage(DSS): 

  • Key-Value
  • Column Based
  • SQL
  • NewSQL
  • bla bla bla…
  • Latency
  • Consistency
  • Availability
  • bal bla bla…

The problem of choosing the most suitable distributed storage system is quite tricky and require some reading in the field. Some information on Storage System with their deep review, from my previous project on Decentralized Storage Systems, can be found on my wiki. Also a brief review of come hot systems are represented in my previous post.

Actor Model Frameworks:

Is a  quite old computational model for concurrent computations that consist of concurrent digital computations called actors, that can react on the received messages: make local decisions, spawn other actors, send messages and design behavior for the next message that will be received. However this framework has some issues, that should be taken into account, if the decision to use this model is made Р(a) scalability, (b) transparency, (c) inconsistency.

Actor Programming Languages are Erlang, Scala and other. This is one extra motivation to get to know closer those languages.

The most popular Actor Libraries:

  • Akka
    • Language: Java and Scala
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application on the JVM.
    • Actors: Very lightweighted concurrent entities. They process messages asynchronously using an event driven receive loop.
  • Pykka
    • Language: Python
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application.
    • Actors: It is an execution unit that executes concurrently with other actors. They don’t share state with each other. Communication¬†is maintained by sending/receiving messages. On message, an actor can do some actions. Only one message is processed at a time.
  • Theron
    • Language: C++
    • Purpose: Build highly concurrent, distributed event-driven application.
    • Actors: They are specialized objects that execute in parallel natively. Each of them has a unique address. Communication is dome by messaging. Each actor’s behavior defined in message handlers, which are user-defined private member functions.
  • S4¬†– more in the Stream Processing Systems section.

Scheduling and Load Balancing:

  • Luigi Scheduler
    • It is a python module that helps to build complex pipelines of batch jobs. Also it builds in a support for Hadoop. It is a scheduler that is open-sourced by Spotify and used within the company.
    • It is still quite immature and anyone can try hers luck to contribute to this scheduler that is written in Python.
  • Apache Oozie
    • It is a workflow scheduler system to manage Apache Hadoop jobs.
    • It is a directed acyclical graph od actions.
    • It is scalable, reliable and extensible system.
  • Azkaban
    • It is a batch job scheduler.
    • It helps to control dependencies¬†and scheduling of individual pieces to run.
  • Helix
    • It is a generic cluster management framework for automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
    • Features: (a) automatic assignments of resources to nodes, (b) node failure detection and recovery, (c) dynamic addition of resources and nodes to a cluster, (d) pluggable distributed state machine, (e) automatic load balancing and throttling of transactions
  • Norbert
    • Provides easy cluster management and workload distiribution
    • It is implemented in Scala and wraps ZooKeeper, Netty and Protocol Buffers to make it easier to build applications.
    • Purpose: (a) Provide Group Management, change configuration, add/remove nodes. (b) Partitions workload by using software load balancing. (c) Provide asynchronous client/server RPC and notifications.


  • Bucardo¬†is an asynchronous¬†PostgreSQL¬†replication
  • And whatever others…

Consesnsus and stuff:

Log Collection:

  • Apache Kafka
    Distributed Pub/Sub messaging systems that supports:

    • Persistent messaging with constant time performance
    • High-throughput
    • Explicit support for message partitioning over servers and machine-consumers
    • Support for parallel data load into Hadoop

    It is a viable solution to provide logged data to offline analysis systems like Hadoop, but it is might be quite limited for building real-time processing. The system is quite similar to Scribe and Apache Flume as they all do activity stream processing, even though that the architectures are different.

  • Logstash

Message Passing Frameworks:

(De)Serialization for sending objects in the network safely:

  • Avro
    • It is a data serialization system.
    • Provides: (a) rich data structures, (b) compact, fast, binary data format, (c) container file to store persistent data, (d) integration with dynamic languages. The schemas are defined with JSON.
    • Used by Spotify
  • Protocol Buffer
    • Developed and Used by Google.
    • It encodes structured data in an efficient extensible format.
  • Apache Thrift
    • Purpose: scalable cross-language services deployment.

Chasing for latency:



Jan 162013

One of the possible directions for my thesis work is going to be a stream data processing. That’s why it is worth reviewing some possible architectures, which at least can be used afterwords to compare my future system with existing ones.

Data stream processing system become quite popular due to the enormous growth (in particular in Volume, Variety, Velocity) of big data analytics on the fly. Starting from Machine Learning systems, such as Mahout, WEKA etc., continuing with a financial trading system, finally ending with realtime world process analysis. Among distributed systems for data stream processing, I would like to highlight two: S4(Simple Scalable Streaming System) and Storm.


Storm is a real time computation system for reliable processing of unbounded stream of data. Storm topology takes streams of data and process them in a complex way.  A good thing about this system is that it is can be integrated with any queueing system and any database system. On the one hand, Storm is needed simply to connect to the database and in response it will handle the parallelization, partitioning and failures when necessary. Within storm tuples can be manipulated. Tuple is a names list of values.

Storm supports three abstractions: spouts, bolts and topologies. Spout Рsource of streams in a computation. Bolt Рprocess input streams and produce another output streams. Topology Рnetwork of spouts and bolts, where the bolt is an edge subscribed to the other spout or bolt. Topologies are submitted using Apache Thrift. On another hand, Storm is scalable and can be run across a cluster of machines. At the same time, fault-tolerance is taken into account. Storm daemons are developed to be stateless and fail-fast. Nimbus and Supervisors are used to maintain fault-tolerance properties.  What if:

  • Worker dies?¬†Supervisor will restart it. If it is still not responding with pings to Nimbus, Nimbus¬†will reassign the worker to another machine.
  • Node dies?¬†Tasks, assigned to that machine, will time-out and Nimbus will reassign them to another machine.
  • Nimbus (N) or Supervisor (S) dies? All states stored in Zookeeper or on disk. Usually N and S should be run over supervision of daemontools or monit. That’s why on restart they will start function as nothing had happened. Additionally, no worker processes are effected by the fail of N or S.
  • Nimbus dies?¬†Nothing critical will happen if N fails. However the ability to reassign processes to other machines won’t be able (which is equivalent to losing¬†an opportunity to reassign tasks to other machines).

Each message in the system, coming out of the spout, is guaranteed to be fully processed. Storm assumes a tuple that is fully processed when the tuple tree processed all the messages.


The system has a master and workers. The master holds Nimbus, which perform code distribution, task assignment and failure monitoring. Each worker has a Supervisor, which listen to the incoming tasks, start and ends the processes. Each worker execute part of a topology, assigned to it. Finally, the coordination between the master and workers is handled by ZooKeeper. All Nimbus and Supervisors states are stored either in ZooKeeper or Local Disks.

As it was said before, the whole computation is based on a built computational tree – topologie. Over topologie a stream of data is circling. Each node in Storm topology executes in parallel.

Stream grouping is one of the distinctive feature of a Storm system. It manages topology to send tuples between two components in a required fashion (send it to a specific task in a component).


S4 is a simple, scalable and flexible processing platform for data streams. Processing elements in  the system can be specified and form directed graphs when a data stream is supposed to flow. Nodes in the graph is called Processing Elements(PE) through which data is sent in the form of events. ZooKeeper is used for managing the configuration and tasks assignment. This way the system can be dynamically reconfigured and resolve failures by reassigning tasks to idle nodes.

PE is the basic computation units in the system and uniquely identified by functionality, types of events, keyed attribute and a value. However a keyless PEs are also possible and can accept all events to which they are associated, while keyed PE accept only events with corresponding key. Usually keyless are the first elements in a processing path. Each node usually have one instance of any keyless PE. Each PE has a state which can be changes as a reaction on received event.

Processing Node (PN) is hosting a running PE. Through PN different PEs can be reached, interruction with a communicatino leyer is estabblished and emmiting output messages are supported. Listener component in PN is used to receive events. Events are routed to PN based on a hash function for keyed attribute .

Processing Node and Communication Layer in S4

Cluster management and automatic failover provided by the communication layer.

The biggest difference between Storm and S4 is that Storm guarantees messages will be processed even in the face of failures whereas S4 will sometimes lose messages. However the state recovery is not supported by Storm.