Oct 202013

The field of Distributed Video Streaming shows a great gap in churn and adversary optimization. Attempts to eliminate adversary influence were made by cool guys from Texas University at Austin in their Bar Gossip version. However, churn is not covered and resolved by the authors.

In our work, we are trying to embrace both dynamicity and adversary behavior of possibly sub-30% nodes.
The video streaming system is build on top of the framework developed by the LPD lab at EPFL. This framework makes use of highly expensive byzantine agreement protocols over small subsets of the whole system – clusters. This approach guarantees both sustainability to adversary and consensus within honest majority of nodes. Additionally, a certain level of randomization is added to handle adversary attacks, e.g. nodes are constantly changing their location in the topology, jumping from one to another cluster.

This constant move gives a certain challenge for maintaining the streaming application.


Previously, streaming systems were classified into two categories: tree-based (push) and mesh based (pull)[link]. However, both of them have its own advantages and disadvantages. The former is not suitable for handling the churn, while the latter system is robust to churn but cannot guarantee the effectiveness of the content distribution.

Even though that the mesh based topology is designed with churn in mind it is not robust agains any adversary influence and vulnerably exposed to the outside world. Oppose to the mesh systems, our framework support dynamic node placement, thus maintaining better fault-tolerance.

At the same time, two main strategies for the content dissemination exist: pull [link] and push. Pull strategies are proven to produce less overhead on the system [link], however, towards fully dynamic and partially corrupted network the pull strategy cannot provide the required fault-tolerance and constant node replacement. According to the main requirements to sustain the performance over dynamic and fault-tolerant system, we sacrify the network overhead property and adopted push strategy.

Another combination of systems are those that actually can handle byzantine adversary and try to sustain dynamicity, e.g., BarGossip[link] and FlightPath[link].

The former, BarGossip, is based on the simple gossip model however it introduces verifiable pseudo randomness for peer selection. The system is easy to implement. The main difference with simple gossip model is its introduction of pseudo random peer selection and usage of credible threats instead of reputation for peers. Credible threats mechanism is based on the fact that upon suspicion any node can send POM (proof of misbehavior message), therefore is rational node thinks that it can be suspected – it might decide to actually forward some messages. The system is implemented in rounds and uses balanced exchange (exchange some info with others if the node received something that it did not had previously in the current round) and optimistic push protocol.

The later, FlightPath, At the same time with fault-tolerance, sustain dynamicity  in the infrastructure and relies on the epsilon-Nash equilibrium. In such equilibrium rational nodes will behave differently if they expect to benefit more that factor of epsilon from such a behavior. In the system the source sends two types of messages: stream update (actual content) and linear digests (authentification of the update). This system also relies on the verifiable pseudo-random algorithm and uses history update messages.

System Model and Problem Definition

We consider a network consisting a dynamic collection of nodes with an adversary model similar to [link]. Nodes have equal roles and  connected to each other in the [link] fashion.

The underlying system architecture looks as follows:



Basically, all nodes are formed in small clusters within which a heavy consensus protocol may run relatively fast. Nodes change their places in the system constantly. Each connection is TCP, which will be updated to UDP in the future. When the source start streaming the data it broadcast it to everyone in the cluster and everyone in the neighboring clusters. Clusters are organized in Hamilton graphs and for the sake of good expandability number of these cycles is redundant (usually 2,3). As you might see it is quite preliminary description.

More details on the implementation and performance evaluation and comparison to other existing systems I am planning to add in the next posts.

Jan 252013

Most probably many of you asked yourselves: To build from scratch or Not To build from scratch?

  1. – To build!
  2. – Not to build!
  3. – To build!
  4. – Not to build!

Hmm…  Hard to decide? Take a look on the diagram and decide!

To build from scratch or not to build from scratch

Is there any Re Use in your answer? So you are one of those lucky bastards who can relax for a while and enjoy the cocktail 🙂

Re-use in all its beauty

Distributed platforms and frameworks aka Map-Reduce:

  • Apache Hadoop is an open source software for reliable, scalable, distributed computing. It is allows to use simple programming models to support distributed processing of a large data sets across clusters of computers. The main components are HDFS (distributed file system), Hadoop YARN (framework for job scheduling resource management), Hadoop MapReduce (system for parallel large data sets processing).
  • YARN is an aka second generation of MapReduce. The main idea behind its architecture is to split JobTracker’s resource management and job scheduling/monitoring into separate daemons. So that a Global Resource Manager and per-application AppManager were represented in the system.
  • Disco Project is a framework that is based on a MapReduce paradigm. It is on open Source Nokia Research Centre project which purpose is to serve handling of massive amount of data. According to the documentation it distribubed and replicates your data, schedules your jobs efficiently. Moreover, indexing of large amount of records is supported, so that real time querying is possible.

    • Master adds jobs to the job queue and run them when nodes are available.
    • Client submit jobs to the master
    • Slaves are started on the nodes by master and spawn and monitor processes.
    • Workers do the jobs. Output location is notified to the Master.
    • Data locality mechanisms are applied once results are saved to the cluster.

  • Spring Batch is an open source framework for batch processing. It is provide development of robust batch applications. It is built on Spring Framework. It supports logging, transaction management, statistics, restart, skip and resource management. Some optimization and partitioning techniques can be tuned in the system as well.

    • Infrastructure level is a low level tool. It is provide an opportunity to batch operations together and retry if an error occurred.
    • Execution environment provides robust features for tracing and management of the batch lifecycle.
    • Core module is the batch-focused domain and implementation. It includes statistics, job launch, restart.

  • Gearman is a framework to delegate tasks to another machines and processes that are more suited for it. Parallel execution of work is supported, as well as load balancing, multi language function calls.

    • A client, a worker and a job server are the parts of the system.
    • The client creates jobs and sends them to job server.
    • Job server forward tasks to suitable workers.
    • Worker works and responds to the client through job server.
    • Communication with the server is established through TCP sockets.

Distributed platforms and frameworks aka Directed Acyclic Graph aka Stream Processing Systems:

  • Spark a system that is optimized for data analytics to make it fast both to run and write. It is suited for in-memory data processing. Api is in Java and Scala. Purpose: machine learning and data mining, also general purpose is possible. It runs on Apache Mesos to share resources with Hadoop and other apps.
  • Dryad is a general purpose runtime for execution data parallel apps. It is modeled as a directed acyclic graph (DAG) which defines the dataflow of the application and vertices that represents the operations that should be performed on the data. However, creation of a graph is quite tricky part. That is why some high-level language compilers were created. One of them is DryadLINQ.
  • S4 and Storm
    are both for real time, reliable processing of unbounded stream of data. The main difference between Storm and S4 is the Storm guarantees messages to be processes even while failures occur, and S4 supports state recovery. More in the previous post.

Distributed Data Storage(DSS)

  • Key-Value
  • Column Based
  • SQL
  • NewSQL
  • bla bla bla…
  • Latency
  • Consistency
  • Availability
  • bal bla bla…

The problem of choosing the most suitable distributed storage system is quite tricky and require some reading in the field. Some information on Storage System with their deep review, from my previous project on Decentralized Storage Systems, can be found on my wiki. Also a brief review of come hot systems are represented in my previous post.

Actor Model Frameworks:

Is a  quite old computational model for concurrent computations that consist of concurrent digital computations called actors, that can react on the received messages: make local decisions, spawn other actors, send messages and design behavior for the next message that will be received. However this framework has some issues, that should be taken into account, if the decision to use this model is made – (a) scalability, (b) transparency, (c) inconsistency.

Actor Programming Languages are Erlang, Scala and other. This is one extra motivation to get to know closer those languages.

The most popular Actor Libraries:

  • Akka
    • Language: Java and Scala
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application on the JVM.
    • Actors: Very lightweighted concurrent entities. They process messages asynchronously using an event driven receive loop.
  • Pykka
    • Language: Python
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application.
    • Actors: It is an execution unit that executes concurrently with other actors. They don’t share state with each other. Communication is maintained by sending/receiving messages. On message, an actor can do some actions. Only one message is processed at a time.
  • Theron
    • Language: C++
    • Purpose: Build highly concurrent, distributed event-driven application.
    • Actors: They are specialized objects that execute in parallel natively. Each of them has a unique address. Communication is dome by messaging. Each actor’s behavior defined in message handlers, which are user-defined private member functions.
  • S4 – more in the Stream Processing Systems section.

Scheduling and Load Balancing:

  • Luigi Scheduler
    • It is a python module that helps to build complex pipelines of batch jobs. Also it builds in a support for Hadoop. It is a scheduler that is open-sourced by Spotify and used within the company.
    • It is still quite immature and anyone can try hers luck to contribute to this scheduler that is written in Python.
  • Apache Oozie
    • It is a workflow scheduler system to manage Apache Hadoop jobs.
    • It is a directed acyclical graph od actions.
    • It is scalable, reliable and extensible system.
  • Azkaban
    • It is a batch job scheduler.
    • It helps to control dependencies and scheduling of individual pieces to run.
  • Helix
    • It is a generic cluster management framework for automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
    • Features: (a) automatic assignments of resources to nodes, (b) node failure detection and recovery, (c) dynamic addition of resources and nodes to a cluster, (d) pluggable distributed state machine, (e) automatic load balancing and throttling of transactions
  • Norbert
    • Provides easy cluster management and workload distiribution
    • It is implemented in Scala and wraps ZooKeeper, Netty and Protocol Buffers to make it easier to build applications.
    • Purpose: (a) Provide Group Management, change configuration, add/remove nodes. (b) Partitions workload by using software load balancing. (c) Provide asynchronous client/server RPC and notifications.


Consesnsus and stuff:

Log Collection:

  • Apache Kafka
    Distributed Pub/Sub messaging systems that supports:

    • Persistent messaging with constant time performance
    • High-throughput
    • Explicit support for message partitioning over servers and machine-consumers
    • Support for parallel data load into Hadoop

    It is a viable solution to provide logged data to offline analysis systems like Hadoop, but it is might be quite limited for building real-time processing. The system is quite similar to Scribe and Apache Flume as they all do activity stream processing, even though that the architectures are different.

  • Logstash

Message Passing Frameworks:

(De)Serialization for sending objects in the network safely:

  • Avro
    • It is a data serialization system.
    • Provides: (a) rich data structures, (b) compact, fast, binary data format, (c) container file to store persistent data, (d) integration with dynamic languages. The schemas are defined with JSON.
    • Used by Spotify
  • Protocol Buffer
    • Developed and Used by Google.
    • It encodes structured data in an efficient extensible format.
  • Apache Thrift
    • Purpose: scalable cross-language services deployment.

Chasing for latency:



Oct 252012

Recently Zygis and me had a project related to Last.fm and online social networks sampling.

What to do?

  1. Crawl Last.fm with breadth-first search (BFS) and discover to the average node degree over exploration time.
  2. Start the BFS from the RJ node
  3. Use multithreading
What was done?
  1. BFS over Last.fm was implemented
  2. Multithreading is implemented to speed up the BFS
  3. Output in .gdf format
  4. Gephi is used to visualize the results


  1. Multithreading speeds up the execution of BFS 🙂 (obviously)
  2. Average Node degree after the 60 000 nodes exploration is 125. (The real one is around 13)
  3. Gephi is a very nice and easy to use tool to visualize the graph (if the number of nodes is less than 10 000 :))
  4. RJ is a friend of many hubs in the Last.fm graph
  5. Depth of the exploration – 3,5


is here.

Jun 282012

Recently I had some free space in my schedule and Why not to do some extra stuff on Scalable Distributed Systems?

My choice fell into NoSQL DB, to be precise they are Redis, MongoDB and Cassandra. Of course I could do some more, but had not enough time to do it. So I’m planning to do it later.. one day..maybe 🙂

Hmm… What actually did I do?

First!  Install

Second! Take a look on the performance. But how to do it in the most efficient way and obviously to skip the part of reinventing the wheels. Why not to use  Yahoo! Cloud Serving Benchmark for evaluating the most common in use NOSQL Databases. The main metrics to compare were:

  • Run Time (ms)
  • Throughput (ops/sec)
Third! Install the next one. Go to the step 2.
All details about the DB installations and their performance are here.

Conclusions and observations:

The main challenge was to properly set up all the parameters and connect the DB to the client side – Yahoo! Cloud Serving Benchmark, with update heavy workload and two execution phases – load and transaction – and two metrics: Run time and Throughput.

During the evaluation it was found that all three DB were performing quite similarly, with a slight difference for MongoDB. Running Time of the transaction phase in MongoDB was slightly greater than the others and the throughput was lower.

Obviously, due to the different application areas of there NOSQL DBs – in memory DB, DB with small size structured data – it’s impossible to say which one of them is the best. That’s why depending on the needs, corresponding DB should be chosen 🙂


May 182012

Currently, I’m quire inspired doing one of the project I have in UPC.

It’s a practical evaluation of processes’ coordination system ZooKeeper. The main goal is to understand completely working principles and try to find some bottlenecks, applying some sophisticated system with ZooKeeper.

Obviously, it’s quite hard to deal with the system and even not to know the basic principles and ideas put in the system. For the reason of clear and conscious work, I decided to read an article on the topic written by authors of the system.

Here is what I come up with:

ZooKeeper – a service for coordinating process of distributed applications. It’s actually a key/value table with hierarchical keys. But it’s not designed to general data storage purposes, but still some clients information could be stored there. How it actually tries to achieve all it? By incorporating elements from group messaging, shared registers and distributed lock services into a replicated, centralized service. To combine all these features into one, ZooKeeper uses wait-free shared registered and FIFO execution guarantees with an event-driven mechanism, so it could be still simple and powerful at the same time. The idea fo ZooKeeper was mainly inspired by Chubby, locking service with strong synchronization guarantees. Meanwhile, ZooKeeper was developed avoiding blocking primitives, locks. This system implements an API that manipulates simple wait-free data objects , organized hierarchically as in file system.

Which guarantees could provide ZooKeeper? 

  • Linearizable writes.
  • FIFO client order.
  • Handling shared configuration updates.
  • Liveness and durability


  • Different node types (regular, ephemeral)
  • Watches
  • Completely Replicated System

What for?

  • Configuration Management
  • Rendezvouz
  • Group Membership
  • Simple Locks
  • Simple Locks without Herd Effect
  • Read/Write Locks
  • Double Barrier

Want to read more, check on my wiki.


  Configuration Management

  Reliable Multicast

Both program were implemented with ZooKeeper. Code could be found following the inks above, explanation and activity diagrams of the application fully presented in the presentation below.


Summary and Critique from my side

ZooKeeper uses wait-free protocols to implement process coordination in distributed systems. Main goal of the system is its lack of locks, so that performance of the system could be improved significantly. Due to some features (like watches and different types of the nodes), ZooKeeper support following applications:

  • Configuration Management
  • Leader Election
  • Group Membership

All mentioned applications were implemented within following applications:

  • Dynamic LogBack Configuration Management
  • Reliable Multicast with handling of group membership and leader election to provide total order and reliability

The weakness of the ZooKeeper is that changes happened are dropped. So there could be a chance that during the time between getting the event and setting the watch multiple changes in the object could be happen. So there is a chance that not all changes will be tracked, but only the last ones. This actually means that ZooKeeper is a state based system, not an event system.

Also, complete replication within the ZooKeeper limits the total size of data that could be managed by ZooKeeper. As well, serializing all updates through a single leader could be a possible performance bottleneck.

May 172012

This is one of the variety of the project I’m currently into.

Currently deployment of translucent optical networks is the most promising short term solution in optical backbone network to reduce cost and energy consumption within the network. But at the same time some problems could appear, like inflexibility and coarse granularity in the Wavelength Switched Optical Network (WSON). So how to deal with it – apply Optical Burst Switching (OBS), or even better ( as not all modification of this switching could be good enough) Translucent OBS (T-OBS)

What is actually T-OBS?  

It’s an architecture in which core nodes switch incoming data burst to their output ports in an all-optical fashion or through regenerators when signal regeneration is required. It is. in some kind of, a bridge between transparent and opaque solutions of OBS.

The background information for the project is taken from the Oscar Pedrola Paper.

Project itself

In such kind of network with OBS switching technology, necessity of solving RRPD problem could apear.

What is RRPD?

RRPD is a Routing and Regenerators Placement and Dimensioning problem made out of two parts. The first part of this problem is the minimization of average paths that goes through one link. In the second part it is searched where to place the necessary regenerators in order to reduce the number of them in the network and provide an acceptable signal-to-noise ratio.

As the problem is very complicated and needs a lot of resources, it is a  good idea t split the work:

  • starting from the MILP for the routing problem, which was already implemented in CPLEX (code is available on the wiki). As the solution of this problem suppose to be precise, in the paper it was solved only through ILP. In this project we went a bit far and suggest an heuristic for this part as well. The main idea is to compare both ILP and heuristic, and perform an analysis based on it.
  • the next part of the project is to deal with regenerator placement and dimensioning. After a few hours of researching on the most efficient heuristics for this problem – Biased Random Key General Algorithm (BRKGA) was chosen. Even though this problem in the paper was solved only by mean of heuristic, we are planning to implement ILP for this problem in CPLEX and compare the result and performance of these algorithms.

All further details on the project could be found on my wiki.

May 072012

Survey paper on the Decentralized Storage Systems was finally accomplished. The final version of the paper could be found here.


Recently, Volunteer Computing (VC) is becoming quite popular for providing its resources for large-scale computational problems in scientific researches. Centralized control of the VC systems is the main bottleneck, since it introduces asymmetry to the system. Several systems are already benefitting from VC in terms of CPU sharing, e.g. SETI@home. Another side of the problem is storage sharing, where volunteer nodes will dedicate their free memory space to the system.

Introduction of Decentralized Storage System that is suitable for the VC is appropriate, mainly, to reduce possible overhead from the centralized part of the VC and provide available and eventually consistent decentralized storage system with minimal cost.

The focus of this paper is to thoroughly survey currently existing decentralized storage systems, evaluate them according to its suitability for VC system, and provide a state-of-the-art solution for this issue.

Apr 242012

Here is the recent presentation that I had chance to complete successfully. The following topic was more than far away from me but still I manage to do it.

The main parts of the research was to find and analysis some possible bottlenecks within the application of the chosen benchmark.

I chose PARSEC one and fluidanimate application within it.

Steps to achieve the goal of the research: 

  • Install PARSEC and perform analysis of the application both on my Laptop (Macbook Air) and Boada Server(12 physical cores, Intel Xeon, 2.4 Gb, 24 GB RAM). -> Strange behavior was found
  • Extract traces from the application using Extrae tool.
  • Perform analysis of extracted traces on the Paraver.

More details you could find on the

Apr 192012

The last project on the Measurement Tools and Techniques subject was about the simulation. For this purpose two tools were used: Dimemas and Pin-tool.

Analysis of high-performance parallel application is very impor- tant to design and tune both application and hardware. Simulation tools help to perform analysis on how application works on a different environment or how hardware would performs with different config- urations. In this project two simulating tools are described and used for performance analysis. First one simulates a different environments for NAS benchmark’s application in order to find out the optimal environment configuration on which application would run comparable as on environment similar to MareNostrum’s supercomputer. Second one simulates three levels of cache in order to find the optimal configuration of caches for matrix multiplication application.


Dimemas tool for performance analysis of message-passing programs was used. The main reason to use this tool is to develop and tune parallel applications on a workstation, when providing an accurate prediction of their performance on the parallel target machine. Dimemas generate trace files that are suitable for further analysis and visualization with Paraver tool, so even more detailed examination of the application could be done. Paraver traces analysis are presented in this work as the result of Dimemas simulation.

Analysis was performed using Dimemas tool for different number of processors (2, 4, 8,16, …) with various parameters of latency, network bandwidth, number of buses and relative CPU performance. The main goal was to find:

  • max acceptable latency when performance of a system reduces not sig- nificantly in comparison to ”ideal”
  • min acceptable bandwidth when performance of a system reduces not significantly
  • min acceptable number of buses (connectivity) in order the application still performs comparable to the ”ideal” application
  • min relative CPU performance in order application still would be com- parable to the ”ideal” version of the application

Performing the analysis and the simulations require the right choice for the tool to make it. To perform the simulation initial configuration of the system was taken from the MareNostrum SuperComputer with following parameters:

  • Number of buses: 0 (that is, infinite)
  • Network bandwidth: 250 Mbyte/sec
  • Startup on remote communication (latency): 0.000008 ms • Relative CPU speed – 1.0
  • 128 cores, 1 core per processor

To discover the best tuning for the system environment parameters were changed. Ranges of the parameters were the following:

  • Number of buses: (0 .. 128], exponential step size 2
  • Network Bandwidth: (250 .. 100] MBytes/sec, step size 10
  • Startup on remote communication (latency): (0.000008..0.524288] s, exponential step size 2
  • Relative CPU performance: [3.0 .. 0.2], step size 0.2


Pin is a dynamic binary instrumentation framework that enable the creating of dynamic program analysis tools. The tools created from Pin are called Pintools and are used to perform program analysis on user space application.

This part fo the project contains results of the multilevel cache simulation with different per-processor L1 data cache, cluster-shared L2 data cache and globally-shared L3 data cache. To analyze the application, next parameters were varied:

  • number of application threads (or CPUs)
  • sizes of L1, L2, L3 cache size
  • number of processors per cluster that are share the same cluster-shared cache (L2)

For the simulation a Pintool was created that simulates multilevel cache with different per-processor L1 data cache, cluster-shared L2 data cache and globally-shared L3 data cache.


The first simulation simulated NAS benchmark’s Integer Sort application on a similar environment to MareNostrum supercomputer’s environment. The goal of simulation was to find a minimum environment configuration values where its execution would still be comparable to the original execution simulation on MareNostrum supercomputers environment. Simulation was done with Dimemas. Simulation results that such environment configuration is: 64 buses, latency up to 1 ms, bandwidth 250 MB/s and choosing faster CPU won’t make much difference.

The second simulation was multilevel cache simulation. Matrix multiplication application was executed on different configuration of L1, L2 and L3 caches in order to find out the optimal configuration of caches for this execution. For simulation a Pintool was created that could gather miss rates of caches. Gathered results show that optimal configuration is: 64 kB for L1, 2 MB for L2, L3 cache is not necessary (but if to choose to use it, it’s better to use 16 MB L3 cache) and 8 processors share one L2 cache.