Oct 202013

The field of Distributed Video Streaming shows a great gap in churn and adversary optimization. Attempts to eliminate adversary influence were made by cool guys from Texas University at Austin in their Bar Gossip version. However, churn is not covered and resolved by the authors.

In our work, we are trying to embrace both dynamicity and adversary behavior of possibly sub-30% nodes.
The video streaming system is build on top of the framework developed by the LPD lab at EPFL. This framework makes use of highly expensive byzantine agreement protocols over small subsets of the whole system – clusters. This approach guarantees both sustainability to adversary and consensus within honest majority of nodes. Additionally, a certain level of randomization is added to handle adversary attacks, e.g. nodes are constantly changing their location in the topology, jumping from one to another cluster.

This constant move gives a certain challenge for maintaining the streaming application.


Previously, streaming systems were classified into two categories: tree-based (push) and mesh based (pull)[link]. However, both of them have its own advantages and disadvantages. The former is not suitable for handling the churn, while the latter system is robust to churn but cannot guarantee the effectiveness of the content distribution.

Even though that the mesh based topology is designed with churn in mind it is not robust agains any adversary influence and vulnerably exposed to the outside world. Oppose to the mesh systems, our framework support dynamic node placement, thus maintaining better fault-tolerance.

At the same time, two main strategies for the content dissemination exist: pull [link] and push. Pull strategies are proven to produce less overhead on the system [link], however, towards fully dynamic and partially corrupted network the pull strategy cannot provide the required fault-tolerance and constant node replacement. According to the main requirements to sustain the performance over dynamic and fault-tolerant system, we sacrify the network overhead property and adopted push strategy.

Another combination of systems are those that actually can handle byzantine adversary and try to sustain dynamicity, e.g., BarGossip[link] and FlightPath[link].

The former, BarGossip, is based on the simple gossip model however it introduces verifiable pseudo randomness for peer selection. The system is easy to implement. The main difference with simple gossip model is its introduction of pseudo random peer selection and usage of credible threats instead of reputation for peers. Credible threats mechanism is based on the fact that upon suspicion any node can send POM (proof of misbehavior message), therefore is rational node thinks that it can be suspected – it might decide to actually forward some messages. The system is implemented in rounds and uses balanced exchange (exchange some info with others if the node received something that it did not had previously in the current round) and optimistic push protocol.

The later, FlightPath, At the same time with fault-tolerance, sustain dynamicity  in the infrastructure and relies on the epsilon-Nash equilibrium. In such equilibrium rational nodes will behave differently if they expect to benefit more that factor of epsilon from such a behavior. In the system the source sends two types of messages: stream update (actual content) and linear digests (authentification of the update). This system also relies on the verifiable pseudo-random algorithm and uses history update messages.

System Model and Problem Definition

We consider a network consisting a dynamic collection of nodes with an adversary model similar to [link]. Nodes have equal roles and  connected to each other in the [link] fashion.

The underlying system architecture looks as follows:



Basically, all nodes are formed in small clusters within which a heavy consensus protocol may run relatively fast. Nodes change their places in the system constantly. Each connection is TCP, which will be updated to UDP in the future. When the source start streaming the data it broadcast it to everyone in the cluster and everyone in the neighboring clusters. Clusters are organized in Hamilton graphs and for the sake of good expandability number of these cycles is redundant (usually 2,3). As you might see it is quite preliminary description.

More details on the implementation and performance evaluation and comparison to other existing systems I am planning to add in the next posts.

Aug 162013

Recently I was asked to become a review for a book Apache Kafka published by the PACKT publishing. I was happy to accept the offer and now I am more than happy to share my view of the technology.

Apache Kafka is a distributed publish-subscribe messaging system that is used in many top IT companies, such as LinkedIn, Twitter, Spotify, DataSift, Square etc. The system guarantees the following properties: persistent messaging with a constant access to disc structures and high performance; high throughput; distributiveness 🙂 as the access to the system is both load balanced and partitioned to reduce the pressure on the one node; real-time properties and Hadoop and Storm integration.

Installation and building the system is very easy and won’t take much time. Depending on your needs a various set ups for the cluster are possible: single node – one broker (core Kafka process), single node – multiple brokers, multiple nodes – multiple brokers. Depending on the choice, the Kafka system should be configured properly, e.g. configuration properties of each broker need to be known by the producer (message sources).

The following figure shows the simplest case, when the messages are published by producers and, through the Kafka broker, they are delivered to the consumers. In this scenario (single node – single broker) all producers, broker and consumers are run on different machines.



Among important design solutions are the following: message caching, opportunity to re consume messages, group messages (reduce network overhead), local maintenance of the consumed messages, the system is purely decentralized and uses zookeeper for load balancing, both asynchronous and synchronous messaging is supported; moreover, a message compression to reduce the network load is used, e.g., gzip or google snappy. Additionally, the need of replication of any active system puts Kafka to the new level. Apache Kafka allow mirroring of an active data cluster into a passive one. It simply consume the messages form the source cluster and republish them on the target cluster. This feature is highly useful if the reliability and fault-tolerance are important concerns. Additionally, a simple replication is implemented within the system. This mainly is reached by the partitioning of messages (hashes) and a presence of a lead replica for each partition (this way both synch and asynch replication can be arranged).

The API provide a sufficient support to create both producer and consumers. Additionally, consumer can be one of two different type: one that ignore further processing of messages and just need the message to be delivered and another that require further processing of the message upon delivery. Consumer implementation can be both single threaded and multithreaded. However to prevent any unpredictable behavior the number of threads should correspond to the number of topics to consume. This way the full utilization of the threads and necessary message order will be preserved.

Kafka can be integrated with the the following technologies: Twitter Storm and Apache Hadoop for further online stream processing and offline analysis respectively. Storm integration is implemented on the consumer side. That is, Kafka consumer represented as regular Storm spouts that reads the data from the Kafka cluster. On the other hand, integration with Hadoop is bilateral, as it can be integrated with Hadoop as both producer and consumer.

In the former, Hadoop is integrated as a bridge for publishing the data to the Kafka broker. Hadoop producer extracts the data from the system in two ways: (a) use Pig scripts for writing data in binary Avro format (here, writes to multiple topics are made easier), (b) use Kafka “OutputFormat” class which publish data as bytes and provides control over output.

In the latter, Hadoop consumer represents a Hadoop job that pulls information from the Kafka system to HDFS. This might happen both sequentially and in parallel.

Pros: Easy to use and deploy; scalable; fast; distributed.

Cons: Might lack configurability and automatization; replication improvements required; fault-tolerance optimization.

I am going to actually test the framework and compare it with the current project I am working on at EPFL (awesome highly dynamic bft decentralized scalable pub/sub system). Stay tuned:)

May 242013

A small break from thesis related posts 🙂

Finally I found time to describe the project we (me and Zygimantas) were working on during the last semester. And here is some motivation for it:

There is an increasing interest in distributed machine learning algorithms. A gossip learning algorithm was proposed that works on a random graph with fully distributed data. The goal of our research is to analyse the behaviour of this algorithm on clustered graphs. Experiments show that this algorithm needs to be modified in order for it to work on clustered graphs. A triangulation technique was introduced. It modifies the original peer sampling algorithm and is used to limit model exchange between different clusters. Results show that with such algorithm it is possible to find models of local objective functions.

In other words, let’s imagine a social network where people don’t want to share their private information, but they agree to locally fit their information to some kind of model generation function and then share only parameters of obtained model. However, a model parameters from only one person is not enough to make any conclusions about the network. So why not to just randomly exchange these model between friends and merge them locally. Here is where our algorithm appear with its awesome model merging. As a result, as peers are likely to exchange their models with their friends – resulting model might characterise some clusters. And Wooalia!!! If we have models for some clusters – we can make various assumptions about them. For example, you are living in Ukraine and want to move to Sweden. You are searching for a job and have no idea on what approximately you can expect as a salary. With our approach, you can put information about you (as an input) and our merged resulting function will give you an answer for Stockholm cluster 🙂

Obviously, all above is very simplified version of what we’ve done. Now a bit more serious explanation:

Peer-to-peer (P2P) is a system model in which every participant (peer) is equal. Peers are connected to each other in such a way that they form a graph. Moreover, P2P communication and peers themselves are unreliable, i.e. peers may fail, and messages may get delayed, may get lost and
may not be delivered at any time. Systems designed for this environment are usually robust and scalable, since no central servers are needed. Adding more computation resources to such system is the same as adding more peers. These systems usually consist of a large number of peers that communicate by passing messages to each other.

Furthermore, such P2P systems can offer security to some extent. They could be used to protect sensitive data such as personal data, preferences, ratings, history, etc. by not disclosing it to other participants. For example, in P2P the data could be completely distributed, so that each peer knows only about his data. In such case, an algorithm could be run locally and only a result of an algorithm could be shared among peers. This may ensure that there is no way for peers to learn about the data kept in other peers.

This security characteristic of P2P networks can be used to build machine learning algorithms on fully distributed data. Mark Jelasity et al. in their work [1] present such algorithm that uses gossiping for sharing predictive models with neighbour peers. We can assume that in this algorithm a random walk is performed in the P2P network. During this random walk, ensemble learning is performed, that is, model built during the random walk is merged with the local model stored at each peer. After merging two models, a merged model is then updated with the local data (which is stored at each peer) and then used in the next step of a random walk. Mark Jelasity et al. conclude that in P2P networks that form a random graph such algorithm converges. Moreover, they state that this algorithm is more effective than the one that gathers the data before building the prediction model. It is so because peers exchange only models that may be considerably smaller than the data.

Although, Mark Jelasity et al. proved that in random graphs this gossip learning algorithm converges, it is still unclear if such convergence may be achieved in clustered graphs. Moreover, the behaviour of such convergence may provide these results:

  • every peer after a number of iterations will have a model that represents the data on a local cluster;
  • after more iterations every peer will have a model that represents the data on every peer.


  • Our gossip learning algorithm uses a framework described in [1] with Adaline gradient descent learning algorithm;
  • We analysed gossip learning algorithm’s convergence properties on random and clustered graphs;
  • We designed and implemented a graph generating tool that generates random and clustered graphs.

[1] R. Ormandi, I. Heged-Hus, and M. Jelasity. Gossip learning with linear models on fully distributed data. Concurrency and Computation: Practice and Experience, 2012.

Apr 122013

Multicast operations are the operations that are sent from one process to a set of processes and the membership of a group is usually transparent for a sender [1]. However simple multicast protocol does not guarantee any ordering or message delivery. Therefor, stronger assumptions should be made in a frame of the nowadays distributed systems, such as, reliability. Some systems [5] relies on the reliable multicast, in which any transmitted message is either received by all or none processes. In other words, there could not be a situation where a client accesses a server just before it crashes and observe an update that no other server will process. This property is called uniform agreement. Moreover, to maintain a consistent and fault-tolerant system a total order assumption should be made additionally to reliable uniform multicast.

The simplest specification of the uniform reliable total order multicast can be defined in terms of two primitives [2], which are TO-multicast(m) and TO-deliver(m), where m is some message. When a process issued a uniquely identified message m as TO-multicast(m), this assumes following properties [3]:

• Validity. If a correct process TO-multicast a message m, then it eventually TO-delivers m.

• Uniform Agreement. If a process TO-delivers a message m, then all correct processes eventually TO-deliver m.

• Uniform Integrity. For any message m, every process TO-delivers m at most once, and only if m was previously TO-broadcast by the sender.

• Uniform Total Order. If two processes,p and q, both TO-deliver message m and m’, then p TO-deliver m before m’, if and only if q TO-delivers m before m’.

If all these properties satisfied then reliable total order multicast takes place. Uniformity in the system is presented as not allowance to deliver a message out of order by any process at any time.

Internally in NOMX, multicast communication is used for most of the subsystems as it is the only fast and reliable way to guarantee consistency and agreement within all nodes with minimal cost.

Although there are three main ways to maintain total order, e.g., symmetric messaging, collective agreement [Birman and Joseph 1987], sequencer based [Kaashoek 1989]. The system that I am developing within my master project uses the single sequencer ordering mechanism as the more efficient in comparison to the consensus one. The simpliest presentation of the total order ordering is illustrated on the picture down. This figure shows that no matter when the messages were issued they will be delivered in the same order to all the processes. For the sequenced mechanisms the main problem is a possible bottleneck and critical point of failure in sequencer part. Moreover, sequencer may limit the scalability of the system. It can be overcomes using the replicated standby sequencer that is delivers all messages issued by the primary one and takes over in case of failure.



[1] George F. Coulouris, Jean Dollimore, and Tim Kindberg. Distributed Systems: Concepts And Design. Pearson Education, 2005. ISBN 9780321263544.

[2] Xavier Défago, André Schiper, and Péter Urbán. Total order broad- cast and multicast algorithms: Taxonomy and survey. ACM Com- put. Surv., 36(4):372–421, December 2004. ISSN 0360-0300. URL http://doi.acm.org/10.1145/1041680.1041682.

[3] Vassos Hadzilacos and Sam Toueg. A modular approach to fault-tolerant broad- casts and related problems. Technical report, 1994.

[4] L. E. T. Rodrigues, H. Fonseca, and P. Verissimo, “Totally ordered multicast in large-scale systems,” in , Proceedings of the 16th International Conference on Distributed Computing Systems, 1996, 1996, pp. 503–510.


S. K. Kasera, J. Kurose, and D. Towsley, “Scalable reliable multicast using multiple multicast groups,” SIGMETRICS Perform. Eval. Rev., vol. 25, no. 1, pp. 64–74, Jun. 1997.
Mar 012013

GENIUM Data Store (GDS) is a system I am working on for my Master thesis in NASDAQ.

GENIUM INET messaging bus

To achieve performance goals, GDS leverages the high-performance of GENIUM INET messaging bus. GENIUM INET messaging bus is based on UDP multicasting and is made reliable by totally ordered sequence of backed up messages with a gap-fill mechanism using rewinders. As it is well known, a total order broadcast algorithms are fundamental building blocks for fault-tolerant systems construction. The purpose of such algorithms is to provide a communication primitives that allows processes to agree on the set of messages and order that deliver. NASDAQ OMX implementation of this abstraction assumes a perfect failure detector, e.g. it forces a process to fail if it was considered faulty. Moreover, uniform reliable total order is preserved, where a process is not allowed to deliver any message out of order, even if it faulty.

The receivers of the ordered messages should guarantee exactly-once delivery to the applications for each message, this way uniform integrity is guaranteed. Across the cluster of applications/clients and servers connected to the message stream, messages should be delivered in the same order.

The message stream can be configured without restriction, so that it can contain any number of various clients that can be placed on the same or different server, with defined replication level.  The server/sequencer is responsible for the sequenced/ordered stream creation as an output from received clients’ messages.

Failure resilience is provided by the following:

  •  All message callbacks are fully deterministic and replayable (if single-threaded), as the incoming stream is identical each time it is received.
  • Replication can be adopted by installing the same receivers at multiple servers.
  • As long as the new primary rewound to the same point as the failed one, the message stream is sufficient to synchronize state.

GDS uses the privileges of the described above infrastructure and a priory maintain a fault-tolerant real-time system. Moreover, the distributed state is made consistent by adhering to the sequenced numbering implied by the message stream.


In a GENIUM Data Store, transactions are submitted through the MoldUDP, therefor it ensures the lowest possible transaction latency.

MoldUDP is a networking protocol that makes transmission of data messages efficient and scalable in a scenario where one transmitter and many listeners are present. It is a lightweight protocol that is built on top of UDP where missed packets can be easily traced and detected, but retransmission is not supported.

Some optimization can be applied to make this protocol more efficient: (a) multiple messages are aggregated into a single packet – to reduce network traffic, (b)  caching Re-request Server is placed near remote receiver – to reduce the latency and bandwidth.

MoldUDP presumes that system consists of listeners, which are subscribed to some multicast groups, and server, which transmits on those multicast groups. MoldUDP server transmits downstream packets through UDP multicast to send normal data stream addressed to listeners. MoldUDP server sends heartbeats periodically to clients, so they can retrieve information about packet loss if it takes place. Moreover, listeners should be configured with IP and port to which they can submit the requests.

Note: message in this context is an atomic piece of information that is carried by the MoldUDP protocol from 0 to 64 KB.


In GENIUM Data Store, read query support will be maintained. That is why, TCP-like protocol is intended to be used to stream the data to the client in response to the submitted query.

SoupTCP is a lightweight point-to-point protocol build on top of TCP/IP sockets. This protocol allows to deliver a set of sequenced messages from a  server to a client. It guarantees that the client will receive all messages sent from a server strictly in sequence even when failures occur.

Server functionality with SoupTCP includes: (a) clients authentication on login and (b)  delivery of a logical stream of sequenced messages to a client in a real-time scenario. Clients sends messages to a server which are not guaranteed to be delivered in case of failures. That’s why the client will need to resubmit the request to the server.

Protocol flow:

  • Client opens a TCP/IP socket to the server with login request.
  • If the login information is valid – server responds with accept and starts to send sequenced data.
  • Both client and server compute message number locally by simple counting of messages and the first message in a session is always 1.
  • Link failure detected by the hearbeating. Both server and client send these messages. Former is required to notify a client in case of failure to reconnect to another socket. Later is necessary to close the existing socket with failed client and listen for a new connection.
Feb 282013

PNUTS is a parallel and geographically distributed database system for serving web application with per-record consistency guarantees. Main concerns in the system are availability and low latency, this way consistency is tuned according to the fault-resilience and geo-replication is supported for latency reduction during the multi continent reads.


PNUTS is focused on serving web application, rather that complex queries, this was a simple relational model is exposed to the users. To stabilize robustness of the systems, different levels of redundancy are used that exhaust consistency against high availability during failures. Pub/Sub paradigm (Yahoo! Message Broker) is used for the asynchronous operations.

Data is organized into tables of attributed records, where schemas are quire flexible and can be changed without halting any operations. Moreover, the query language is quite simple and limited. It is limited to the selection and simple projection from a single table. However, this provides more flexible access compared to the hashes and ordered data, as they claimed.

System Architecture

Data table are horizontal and partitioned into groups across different servers. To determine the location of the tablet record and storage unit, routers are used, which contains only of cached copy of the mapping, while tablet controller owns that mapping.

Replication is based on the asynchronous messaging and ensures low latency. Data updates are considered committed only if it is published to the Yahoo! Message broker (YMB). It is guaranteed that after the commitment other YMB, updates will be asynchronously propagated to the appropriate regions. However, this guarantees only partial ordering. Messages published to one YMB are guaranteed to be delivered in the arrival order, while messaged arrived to the different YMB instances can be delivered in any order. For this reason, timeline consistency is used for such commits. This consistency supported by the presence of per record master whose order is preserved to be the order of delivery on every replica. Based on the replication, faults recovery are maintained. Copying is used for the recovery of lost tablets. Also checkpoint mechanism is issued after the copy request to ensure applicability of the current updates.

The most interesting thing is a multiget processing, that is based on the scatter-gather engine and is a component of the router. Here, router splits the request into parts and then gather the results on arrival.

Strong points:

  • Asynchronous updates = low latency
  • Record level consistency
  • Flexible schemas
  • Multiget, which retrieved multiple records in parallel
  • Scalable
  • Various application

Weak points:

  • No complex queries support, joins, group by
  • No always the most current version of the data returned on the request
  • No referential integrity
  • No serializable transactions
  • Partial ordering of the transactions
  • Concurrent updates conflicts resolution is not highlighted
  • Per-record master is a mystery 🙂
  • Slow multiget parallel requests
  • Failure detection is preserved…
Feb 132013

Megastore is a storage system develope by Google, as an improvement of their BigTable one. They claimed to support strong consistency and high availability guatantees. Moreover, fully serializable ACID semantics within fine-grained partitions of data are provided.

To achieve hight availability and consistency, synchronous replication is applied. Replication is implemeted using Paxos for every write across the data centers, e.g. synchronous replication for writes is supported.


  1. Availability and scalability are apriory build-in.
  2. Paxos replication and consensus algorithm is optimized for low-latency across geo distirubted datacentres. This way high availability is supported. (Low-latency = 10-1000 miliseconds)

Availability + Scalability

Availability is supported by using synchronous, fault-tolerant log replicator optimized for long distances. While scalability is supported with partitioning the database. Those two concepts are well thought and oriented for long-distance links and active communication over them.

To scale throughput data is partitioned into a collection of entity groups. Operation across entity groups leverage asynchronous messaging. It is alos used for logically distant entities. Minimizing latency and improving throughput are laid on the applocation control level.


Megastore is deployed through clients libraries to which applications are linked. This library implements Paxos, therefor they deal with failure detecting and replication. Durability of Paxos operations is provided by direct send of an operation to the local Bigtable instance.

Another cool thing about the Megastore is its various index tuning. e.g. secondary indexes can be declared, therefor optimise further lookup.

Concurrency control

Each Entity group is abstracted to be a DB that provide serializable ACID semantics. Each row i nthe Megastore can be stored with multiple values with different timestamps. This way multiversion concurency control supported. Similar approach I will use in my thesis project to suport concurrent updates to the DS.

For atomic updates across entity groups a two phase commit can be used. However its usage is not recommended as it may increase the risk of contention.


Reads and writes can be instantiated from any replica preserving ACID semantic. Repication is performed per entity group. Simply, transaction log synchronously transmitted to a quorum of replicas. Agreemet between replicas is maintained by an optimized Paxos. Coordinator tracks a set of entities groups for each replica. Fast writes are leveraged by teh master-based approach (where each read and write are dedicated to one master). This way Paxos prepare stage can be skipped, if the previous requests succeded.

Strong side

  • Partitioning the data into entity groups. This way suport for ACIT transaction is guarateed.
  • Consistency level can be loosed but the user, therefor the latency improves.
  • Indexation is extremely useful for development.
  • Extension of Paxos to optimize wide-range aggrements.

Weak side

  • Partitioning of the data laid on the application side.
  • Write rate is quite low
  • Joins give an extra overhead, as the data should be denotmalized
  • Asymetric network partitioning vulnerability.
  • Chubby for failure detection, which is, most probably, not the most effecient tool.
Jan 292013

Designing architecture from scratch may require some tools and approaches to put all the things together.

High-Level structure of software can be illustrated/represented variously, and one of the approaches is to make an Architectural Blueprint with 5 different views to the system [by Philippe Kruchten].

Main views:

  • Logical View – an object model of the design.
  • Process View – concurrency and synchronization aspects.
  • Physical View – mapping of the software to the hardware.
  • Development View – static organization of the software.
  • Use Cases – various usage scenarios.

The Logical Architecture

Serves the functional requirements and decomposes the system into a form of objects and object classes. Class diagrams and class templates are usually used to illustrate this abstraction. Common mechanisms or services are defined in class utilities. Keeping a single, coherent object model across the whole system is a general advice when building a logical view.


The Process Architecture

It takes into account non-functional requirements, performance and availability, concurrency and distribution, system integrity and fault-tolerance. it can be represented as a high level view to a set of independently executing logical networks of communication programs that are distributed across a set of hardware. A process is a group of tasks that form an executable unit and which can be (a) tactically controlled, (b) replicated, (c) partitioned into a  set of independent tasks: major and minor (cyclic activities, buffering, time-outs).


The Development Architecture

It represents software module organization on the software development environment. It consists of libraries and subsystems representation. The subsystems are organized into the hierarchy of layers with well-defined interface. Overall, this view is represented by module and subsystem diagram, showing the export/import relationships. Also it takes into account internal requirements. Layered style is recommended for this view.


The Physical Architecture

It represents non-functional requirements such as availability, reliability, scalability and performance. It shows how networks, processes, tasks and objects are mapped onto the various nodes.



This fifth view is redundancy but it has two main purposes:

  • driver to discover the architectural elements during the architecture design
  • validation and illustration role after the architecture design is complete, also it can be used as a starting point for the tests of an architectural prototype.

It uses the components of the logical view with connector’s elements from the Process view for the interaction between the objects.



Jan 282013

The system should provide a functionality to insert, modify and read the data.


  • One instance of the data store should handle 250 000 messages per second, where each message is no more than 200 bytes.
  • A query for a record, with a unique index, should be returned in no more than 10 milliseconds with a 10 billion records database.
  • The capacity of a given response stream should be 100 Mbps.
  • The response streaming capacity of a datastore server should be 300 Mbps (divided across several clients), while receiving 100 000 messages per second.
  • A transaction should be acknowledges on the message bus within 100 microseconds.
  • Fail over within 30 seconds on the same site.

It is important to note that consistency and fault-tolerance is an important part when building distributed systems, however the leverage of the internal NASDAQ OMX messaging system INET provides a build-in support for this functionality. In the further posts (at least I really hope 🙂 lol) I will discover how the load will be handled and which techniques will be used to provide a high availability level and specific faults recovery.

Jan 252013

Most probably many of you asked yourselves: To build from scratch or Not To build from scratch?

  1. – To build!
  2. – Not to build!
  3. – To build!
  4. – Not to build!

Hmm…  Hard to decide? Take a look on the diagram and decide!

To build from scratch or not to build from scratch

Is there any Re Use in your answer? So you are one of those lucky bastards who can relax for a while and enjoy the cocktail 🙂

Re-use in all its beauty

Distributed platforms and frameworks aka Map-Reduce:

  • Apache Hadoop is an open source software for reliable, scalable, distributed computing. It is allows to use simple programming models to support distributed processing of a large data sets across clusters of computers. The main components are HDFS (distributed file system), Hadoop YARN (framework for job scheduling resource management), Hadoop MapReduce (system for parallel large data sets processing).
  • YARN is an aka second generation of MapReduce. The main idea behind its architecture is to split JobTracker’s resource management and job scheduling/monitoring into separate daemons. So that a Global Resource Manager and per-application AppManager were represented in the system.
  • Disco Project is a framework that is based on a MapReduce paradigm. It is on open Source Nokia Research Centre project which purpose is to serve handling of massive amount of data. According to the documentation it distribubed and replicates your data, schedules your jobs efficiently. Moreover, indexing of large amount of records is supported, so that real time querying is possible.

    • Master adds jobs to the job queue and run them when nodes are available.
    • Client submit jobs to the master
    • Slaves are started on the nodes by master and spawn and monitor processes.
    • Workers do the jobs. Output location is notified to the Master.
    • Data locality mechanisms are applied once results are saved to the cluster.

  • Spring Batch is an open source framework for batch processing. It is provide development of robust batch applications. It is built on Spring Framework. It supports logging, transaction management, statistics, restart, skip and resource management. Some optimization and partitioning techniques can be tuned in the system as well.

    • Infrastructure level is a low level tool. It is provide an opportunity to batch operations together and retry if an error occurred.
    • Execution environment provides robust features for tracing and management of the batch lifecycle.
    • Core module is the batch-focused domain and implementation. It includes statistics, job launch, restart.

  • Gearman is a framework to delegate tasks to another machines and processes that are more suited for it. Parallel execution of work is supported, as well as load balancing, multi language function calls.

    • A client, a worker and a job server are the parts of the system.
    • The client creates jobs and sends them to job server.
    • Job server forward tasks to suitable workers.
    • Worker works and responds to the client through job server.
    • Communication with the server is established through TCP sockets.

Distributed platforms and frameworks aka Directed Acyclic Graph aka Stream Processing Systems:

  • Spark a system that is optimized for data analytics to make it fast both to run and write. It is suited for in-memory data processing. Api is in Java and Scala. Purpose: machine learning and data mining, also general purpose is possible. It runs on Apache Mesos to share resources with Hadoop and other apps.
  • Dryad is a general purpose runtime for execution data parallel apps. It is modeled as a directed acyclic graph (DAG) which defines the dataflow of the application and vertices that represents the operations that should be performed on the data. However, creation of a graph is quite tricky part. That is why some high-level language compilers were created. One of them is DryadLINQ.
  • S4 and Storm
    are both for real time, reliable processing of unbounded stream of data. The main difference between Storm and S4 is the Storm guarantees messages to be processes even while failures occur, and S4 supports state recovery. More in the previous post.

Distributed Data Storage(DSS)

  • Key-Value
  • Column Based
  • SQL
  • NewSQL
  • bla bla bla…
  • Latency
  • Consistency
  • Availability
  • bal bla bla…

The problem of choosing the most suitable distributed storage system is quite tricky and require some reading in the field. Some information on Storage System with their deep review, from my previous project on Decentralized Storage Systems, can be found on my wiki. Also a brief review of come hot systems are represented in my previous post.

Actor Model Frameworks:

Is a  quite old computational model for concurrent computations that consist of concurrent digital computations called actors, that can react on the received messages: make local decisions, spawn other actors, send messages and design behavior for the next message that will be received. However this framework has some issues, that should be taken into account, if the decision to use this model is made – (a) scalability, (b) transparency, (c) inconsistency.

Actor Programming Languages are Erlang, Scala and other. This is one extra motivation to get to know closer those languages.

The most popular Actor Libraries:

  • Akka
    • Language: Java and Scala
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application on the JVM.
    • Actors: Very lightweighted concurrent entities. They process messages asynchronously using an event driven receive loop.
  • Pykka
    • Language: Python
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application.
    • Actors: It is an execution unit that executes concurrently with other actors. They don’t share state with each other. Communication is maintained by sending/receiving messages. On message, an actor can do some actions. Only one message is processed at a time.
  • Theron
    • Language: C++
    • Purpose: Build highly concurrent, distributed event-driven application.
    • Actors: They are specialized objects that execute in parallel natively. Each of them has a unique address. Communication is dome by messaging. Each actor’s behavior defined in message handlers, which are user-defined private member functions.
  • S4 – more in the Stream Processing Systems section.

Scheduling and Load Balancing:

  • Luigi Scheduler
    • It is a python module that helps to build complex pipelines of batch jobs. Also it builds in a support for Hadoop. It is a scheduler that is open-sourced by Spotify and used within the company.
    • It is still quite immature and anyone can try hers luck to contribute to this scheduler that is written in Python.
  • Apache Oozie
    • It is a workflow scheduler system to manage Apache Hadoop jobs.
    • It is a directed acyclical graph od actions.
    • It is scalable, reliable and extensible system.
  • Azkaban
    • It is a batch job scheduler.
    • It helps to control dependencies and scheduling of individual pieces to run.
  • Helix
    • It is a generic cluster management framework for automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
    • Features: (a) automatic assignments of resources to nodes, (b) node failure detection and recovery, (c) dynamic addition of resources and nodes to a cluster, (d) pluggable distributed state machine, (e) automatic load balancing and throttling of transactions
  • Norbert
    • Provides easy cluster management and workload distiribution
    • It is implemented in Scala and wraps ZooKeeper, Netty and Protocol Buffers to make it easier to build applications.
    • Purpose: (a) Provide Group Management, change configuration, add/remove nodes. (b) Partitions workload by using software load balancing. (c) Provide asynchronous client/server RPC and notifications.


Consesnsus and stuff:

Log Collection:

  • Apache Kafka
    Distributed Pub/Sub messaging systems that supports:

    • Persistent messaging with constant time performance
    • High-throughput
    • Explicit support for message partitioning over servers and machine-consumers
    • Support for parallel data load into Hadoop

    It is a viable solution to provide logged data to offline analysis systems like Hadoop, but it is might be quite limited for building real-time processing. The system is quite similar to Scribe and Apache Flume as they all do activity stream processing, even though that the architectures are different.

  • Logstash

Message Passing Frameworks:

(De)Serialization for sending objects in the network safely:

  • Avro
    • It is a data serialization system.
    • Provides: (a) rich data structures, (b) compact, fast, binary data format, (c) container file to store persistent data, (d) integration with dynamic languages. The schemas are defined with JSON.
    • Used by Spotify
  • Protocol Buffer
    • Developed and Used by Google.
    • It encodes structured data in an efficient extensible format.
  • Apache Thrift
    • Purpose: scalable cross-language services deployment.

Chasing for latency: