May 242013

A small break from thesis related posts ūüôā

Finally I found time to describe the project we (me and Zygimantas) were working on during the last semester. And here is some motivation for it:

There is an increasing interest in distributed machine learning algorithms. A gossip learning algorithm was proposed that works on a random graph with fully distributed data. The goal of our research is to analyse the behaviour of this algorithm on clustered graphs. Experiments show that this algorithm needs to be modified in order for it to work on clustered graphs. A triangulation technique was introduced. It modifies the original peer sampling algorithm and is used to limit model exchange between different clusters. Results show that with such algorithm it is possible to find models of local objective functions.

In other words, let’s imagine a social network where people don’t want to share their private information, but they agree to locally fit their information to some kind of model generation function and then share only parameters of obtained model. However, a model parameters from only one person is not enough to make any conclusions about the network. So why not to just randomly exchange these model between friends and merge them locally. Here is where our algorithm appear with its awesome model merging. As a result, as peers are likely to exchange their models with their friends – resulting model might characterise some clusters. And Wooalia!!! If we have models for some clusters – we can make various assumptions about them. For example, you are living in Ukraine and want to move to Sweden. You are searching for a job and have no idea on what approximately you can expect as a salary. With our approach, you can put information about you (as an input) and our merged resulting function will give you an answer for Stockholm cluster ūüôā

Obviously, all above is very simplified version of what we’ve done.¬†Now a bit more serious explanation:

Peer-to-peer (P2P) is a system model in which every participant (peer) is equal. Peers are connected to each other in such a way that they form a graph. Moreover, P2P communication and peers themselves are unreliable, i.e. peers may fail, and messages may get delayed, may get lost and
may not be delivered at any time. Systems designed for this environment are usually robust and scalable, since no central servers are needed. Adding more computation resources to such system is the same as adding more peers. These systems usually consist of a large number of peers that communicate by passing messages to each other.

Furthermore, such P2P systems can offer security to some extent. They could be used to protect sensitive data such as personal data, preferences, ratings, history, etc. by not disclosing it to other participants. For example, in P2P the data could be completely distributed, so that each peer knows only about his data. In such case, an algorithm could be run locally and only a result of an algorithm could be shared among peers. This may ensure that there is no way for peers to learn about the data kept in other peers.

This security characteristic of P2P networks can be used to build machine learning algorithms on fully distributed data. Mark Jelasity et al. in their work [1] present such algorithm that uses gossiping for sharing predictive models with neighbour peers. We can assume that in this algorithm a random walk is performed in the P2P network. During this random walk, ensemble learning is performed, that is, model built during the random walk is merged with the local model stored at each peer. After merging two models, a merged model is then updated with the local data (which is stored at each peer) and then used in the next step of a random walk. Mark Jelasity et al. conclude that in P2P networks that form a random graph such algorithm converges. Moreover, they state that this algorithm is more effective than the one that gathers the data before building the prediction model. It is so because peers exchange only models that may be considerably smaller than the data.

Although, Mark Jelasity et al. proved that in random graphs this gossip learning algorithm converges, it is still unclear if such convergence may be achieved in clustered graphs. Moreover, the behaviour of such convergence may provide these results:

  • every peer after a number of iterations will have a model that represents the data on a local¬†cluster;
  • after more iterations every peer will have a model that represents the data on every peer.


  • Our gossip learning algorithm uses a framework described in [1] with Adaline gradient descent learning algorithm;
  • We analysed gossip learning algorithm‚Äôs convergence properties on random and clustered graphs;
  • We designed and implemented a graph generating tool that generates random and clustered graphs.

[1] R. Ormandi, I. Heged-Hus, and M. Jelasity. Gossip learning with linear models on fully distributed data. Concurrency and Computation: Practice and Experience, 2012.

Mar 012013

GENIUM Data Store (GDS) is a system I am working on for my Master thesis in NASDAQ.

GENIUM INET messaging bus

To achieve performance goals, GDS leverages the high-performance of GENIUM INET messaging bus. GENIUM INET messaging bus is based on UDP multicasting and is made reliable by totally ordered sequence of backed up messages with a gap-fill mechanism using rewinders. As it is well known, a total order broadcast algorithms are fundamental building blocks for fault-tolerant systems construction. The purpose of such algorithms is to provide a communication primitives that allows processes to agree on the set of messages and order that deliver. NASDAQ OMX implementation of this abstraction assumes a perfect failure detector, e.g. it forces a process to fail if it was considered faulty. Moreover, uniform reliable total order is preserved, where a process is not allowed to deliver any message out of order, even if it faulty.

The receivers of the ordered messages should guarantee exactly-once delivery to the applications for each message, this way uniform integrity is guaranteed. Across the cluster of applications/clients and servers connected to the message stream, messages should be delivered in the same order.

The message stream can be configured without restriction, so that it can contain any number of various clients that can be placed on the same or different server, with defined replication level. ¬†The server/sequencer is responsible for the sequenced/ordered stream creation as an output from received clients’ messages.

Failure resilience is provided by the following:

  • ¬†All message callbacks are fully deterministic and replayable (if single-threaded), as the incoming stream is identical each time it is received.
  • Replication can be adopted by installing the same receivers at multiple servers.
  • As long as the new primary rewound to the same point as the failed one, the message stream is sufficient to synchronize state.

GDS uses the privileges of the described above infrastructure and a priory maintain a fault-tolerant real-time system. Moreover, the distributed state is made consistent by adhering to the sequenced numbering implied by the message stream.


In a GENIUM Data Store, transactions are submitted through the MoldUDP, therefor it ensures the lowest possible transaction latency.

MoldUDP is a networking protocol that makes transmission of data messages efficient and scalable in a scenario where one transmitter and many listeners are present. It is a lightweight protocol that is built on top of UDP where missed packets can be easily traced and detected, but retransmission is not supported.

Some optimization can be applied to make this protocol more efficient: (a) multiple messages are aggregated into a single packet Рto reduce network traffic, (b)  caching Re-request Server is placed near remote receiver Рto reduce the latency and bandwidth.

MoldUDP presumes that system consists of listeners, which are subscribed to some multicast groups, and server, which transmits on those multicast groups. MoldUDP server transmits downstream packets through UDP multicast to send normal data stream addressed to listeners. MoldUDP server sends heartbeats periodically to clients, so they can retrieve information about packet loss if it takes place. Moreover, listeners should be configured with IP and port to which they can submit the requests.

Note: message in this context is an atomic piece of information that is carried by the MoldUDP protocol from 0 to 64 KB.


In GENIUM Data Store, read query support will be maintained. That is why, TCP-like protocol is intended to be used to stream the data to the client in response to the submitted query.

SoupTCP is a lightweight point-to-point protocol build on top of TCP/IP sockets. This protocol allows to deliver a set of sequenced messages from a  server to a client. It guarantees that the client will receive all messages sent from a server strictly in sequence even when failures occur.

Server functionality with SoupTCP includes: (a) clients authentication on login and (b) ¬†delivery of a logical stream of sequenced messages to a client in a real-time scenario. Clients sends messages to a server which are not guaranteed to be delivered in case of failures. That’s why the client will need to resubmit the request to the server.

Protocol flow:

  • Client opens a TCP/IP socket to the server with login request.
  • If the login information is valid – server responds with accept and starts to send sequenced data.
  • Both client and server compute message number locally by simple counting of messages and the first message in a session is always 1.
  • Link failure detected by the hearbeating. Both server and client send these messages. Former is required to notify a client in case of failure to reconnect to another socket. Later is necessary to close the existing socket with failed client and listen for a new connection.
Feb 132013

Megastore is a storage system develope by Google, as an improvement of their BigTable one. They claimed to support strong consistency and high availability guatantees. Moreover, fully serializable ACID semantics within fine-grained partitions of data are provided.

To achieve hight availability and consistency, synchronous replication is applied. Replication is implemeted using Paxos for every write across the data centers, e.g. synchronous replication for writes is supported.


  1. Availability and scalability are apriory build-in.
  2. Paxos replication and consensus algorithm is optimized for low-latency across geo distirubted datacentres. This way high availability is supported. (Low-latency = 10-1000 miliseconds)

Availability + Scalability

Availability is supported by using synchronous, fault-tolerant log replicator optimized for long distances. While scalability is supported with partitioning the database. Those two concepts are well thought and oriented for long-distance links and active communication over them.

To scale throughput data is partitioned into a collection of entity groups. Operation across entity groups leverage asynchronous messaging. It is alos used for logically distant entities. Minimizing latency and improving throughput are laid on the applocation control level.


Megastore is deployed through clients libraries to which applications are linked. This library implements Paxos, therefor they deal with failure detecting and replication. Durability of Paxos operations is provided by direct send of an operation to the local Bigtable instance.

Another cool thing about the Megastore is its various index tuning. e.g. secondary indexes can be declared, therefor optimise further lookup.

Concurrency control

Each Entity group is abstracted to be a DB that provide serializable ACID semantics. Each row i nthe Megastore can be stored with multiple values with different timestamps. This way multiversion concurency control supported. Similar approach I will use in my thesis project to suport concurrent updates to the DS.

For atomic updates across entity groups a two phase commit can be used. However its usage is not recommended as it may increase the risk of contention.


Reads and writes can be instantiated from any replica preserving ACID semantic. Repication is performed per entity group. Simply, transaction log synchronously transmitted to a quorum of replicas. Agreemet between replicas is maintained by an optimized Paxos. Coordinator tracks a set of entities groups for each replica. Fast writes are leveraged by teh master-based approach (where each read and write are dedicated to one master). This way Paxos prepare stage can be skipped, if the previous requests succeded.

Strong side

  • Partitioning the data into entity groups. This way suport for ACIT transaction is guarateed.
  • Consistency level can be loosed but the user, therefor the latency improves.
  • Indexation is extremely useful for development.
  • Extension of Paxos to optimize wide-range aggrements.

Weak side

  • Partitioning of the data laid on the application side.
  • Write rate is quite low
  • Joins give an extra overhead, as the data should be denotmalized
  • Asymetric network partitioning vulnerability.
  • Chubby for failure detection, which is, most probably, not the most effecient tool.
Jan 292013

Designing architecture from scratch may require some tools and approaches to put all the things together.

High-Level structure of software can be illustrated/represented variously, and one of the approaches is to make an Architectural Blueprint with 5 different views to the system [by Philippe Kruchten].

Main views:

  • Logical View – an object model of the design.
  • Process View – concurrency and synchronization aspects.
  • Physical View – mapping of the software to the hardware.
  • Development View – static organization of the software.
  • Use Cases – various usage scenarios.

The Logical Architecture

Serves the functional requirements and decomposes the system into a form of objects and object classes. Class diagrams and class templates are usually used to illustrate this abstraction. Common mechanisms or services are defined in class utilities. Keeping a single, coherent object model across the whole system is a general advice when building a logical view.


The Process Architecture

It takes into account non-functional requirements, performance and availability, concurrency and distribution, system integrity and fault-tolerance. it can be represented as a high level view to a set of independently executing logical networks of communication programs that are distributed across a set of hardware. A process is a group of tasks that form an executable unit and which can be (a) tactically controlled, (b) replicated, (c) partitioned into a  set of independent tasks: major and minor (cyclic activities, buffering, time-outs).


The Development Architecture

It represents software module organization on the software development environment. It consists of libraries and subsystems representation. The subsystems are organized into the hierarchy of layers with well-defined interface. Overall, this view is represented by module and subsystem diagram, showing the export/import relationships. Also it takes into account internal requirements. Layered style is recommended for this view.


The Physical Architecture

It represents non-functional requirements such as availability, reliability, scalability and performance. It shows how networks, processes, tasks and objects are mapped onto the various nodes.



This fifth view is redundancy but it has two main purposes:

  • driver to discover the architectural elements during the architecture design
  • validation and illustration role after the architecture design is complete, also it can be used as a starting point for the tests of an architectural prototype.

It uses the components of the logical view with connector’s elements from the Process view for the interaction between the objects.



Jan 282013

The system should provide a functionality to insert, modify and read the data.


  • One instance of the data store should handle 250 000 messages per second, where each message is no more than 200 bytes.
  • A query for a record, with a unique index, should be returned in no more than 10 milliseconds with a 10 billion records database.
  • The capacity of a given response stream should be 100 Mbps.
  • The response streaming capacity of a datastore server should be 300 Mbps (divided across¬†several clients),¬†while receiving 100 000 messages per second.
  • A transaction should be acknowledges on the message bus within 100 microseconds.
  • Fail over within 30 seconds on the same site.

It is important to note that consistency and fault-tolerance is an important part when building distributed systems, however the leverage of the internal NASDAQ OMX messaging system INET provides a build-in support for this functionality. In the further posts (at least I really hope ūüôā lol) I will discover how the load will be handled and which techniques will be used to provide a high availability level and specific faults recovery.

Jan 252013

Most probably many of you asked yourselves: To build from scratch or Not To build from scratch?

  1. – To build!
  2. – Not to build!
  3. – To build!
  4. – Not to build!

Hmm… ¬†Hard to decide? Take a look on the diagram and decide!

To build from scratch or not to build from scratch

Is there any Re Use in your answer? So you are one of those lucky bastards who can relax for a while and enjoy the cocktail ūüôā

Re-use in all its beauty

Distributed platforms and frameworks aka Map-Reduce:

  • Apache Hadoop is an open source software for reliable, scalable, distributed computing. It is allows to use simple programming models to support distributed processing of a large data sets across clusters of computers. The main components are HDFS (distributed file system), Hadoop YARN (framework for job scheduling resource management), Hadoop MapReduce (system for parallel large data sets processing).
  • YARN is an aka second generation of MapReduce. The main idea behind its architecture is to split JobTracker’s resource management and job scheduling/monitoring into separate daemons. So that a Global Resource Manager and per-application AppManager were represented in the system.
  • Disco Project is a framework that is based on a MapReduce paradigm. It is on open Source Nokia Research Centre project which purpose is to serve handling of massive amount of data. According to the documentation it distribubed and replicates your data, schedules your jobs efficiently. Moreover, indexing of large amount of records is supported, so that real time querying is possible.

    • Master adds jobs to the job queue and run them when nodes are available.
    • Client submit jobs to the master
    • Slaves are started on the nodes by master and spawn and monitor processes.
    • Workers do the jobs. Output location is notified to the Master.
    • Data locality mechanisms are applied once results are saved to the cluster.

  • Spring Batch is an open source framework for batch processing. It is provide development of robust batch applications. It is built on Spring Framework. It supports logging, transaction management, statistics, restart, skip and resource management. Some optimization and partitioning techniques can be tuned in the system as well.

    • Infrastructure level is a low level tool. It is provide an opportunity to batch operations together and retry if an error occurred.
    • Execution environment provides robust features for tracing and management of the batch lifecycle.
    • Core module is the batch-focused domain and implementation. It includes statistics, job launch, restart.

  • Gearman is a framework to delegate tasks to another machines and processes that are more suited for it. Parallel execution of work is supported, as well as load balancing, multi language function calls.

    • A client, a worker and a job server are the parts of the system.
    • The client creates jobs and sends them to job server.
    • Job server forward tasks to suitable workers.
    • Worker works and responds to the client through job server.
    • Communication with the server is established through TCP sockets.

Distributed platforms and frameworks aka Directed Acyclic Graph aka Stream Processing Systems:

  • Spark¬†a system that is optimized for data analytics to make it fast both to run and write. It is suited for in-memory data processing. Api is in Java and Scala. Purpose: machine learning and data mining, also general purpose is possible. It runs on Apache Mesos to share resources with Hadoop and other apps.
  • Dryad is a general purpose runtime for execution data parallel apps. It is modeled as a directed acyclic graph (DAG) which defines the dataflow of the application and vertices that represents the operations that should be performed on the data. However, creation of a graph is quite tricky part. That is why some high-level language compilers were created. One of them is DryadLINQ.
  • S4 and Storm
    are both for real time, reliable processing of unbounded stream of data. The main difference between Storm and S4 is the Storm guarantees messages to be processes even while failures occur, and S4 supports state recovery. More in the previous post.

Distributed Data Storage(DSS): 

  • Key-Value
  • Column Based
  • SQL
  • NewSQL
  • bla bla bla…
  • Latency
  • Consistency
  • Availability
  • bal bla bla…

The problem of choosing the most suitable distributed storage system is quite tricky and require some reading in the field. Some information on Storage System with their deep review, from my previous project on Decentralized Storage Systems, can be found on my wiki. Also a brief review of come hot systems are represented in my previous post.

Actor Model Frameworks:

Is a  quite old computational model for concurrent computations that consist of concurrent digital computations called actors, that can react on the received messages: make local decisions, spawn other actors, send messages and design behavior for the next message that will be received. However this framework has some issues, that should be taken into account, if the decision to use this model is made Р(a) scalability, (b) transparency, (c) inconsistency.

Actor Programming Languages are Erlang, Scala and other. This is one extra motivation to get to know closer those languages.

The most popular Actor Libraries:

  • Akka
    • Language: Java and Scala
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application on the JVM.
    • Actors: Very lightweighted concurrent entities. They process messages asynchronously using an event driven receive loop.
  • Pykka
    • Language: Python
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application.
    • Actors: It is an execution unit that executes concurrently with other actors. They don’t share state with each other. Communication¬†is maintained by sending/receiving messages. On message, an actor can do some actions. Only one message is processed at a time.
  • Theron
    • Language: C++
    • Purpose: Build highly concurrent, distributed event-driven application.
    • Actors: They are specialized objects that execute in parallel natively. Each of them has a unique address. Communication is dome by messaging. Each actor’s behavior defined in message handlers, which are user-defined private member functions.
  • S4¬†– more in the Stream Processing Systems section.

Scheduling and Load Balancing:

  • Luigi Scheduler
    • It is a python module that helps to build complex pipelines of batch jobs. Also it builds in a support for Hadoop. It is a scheduler that is open-sourced by Spotify and used within the company.
    • It is still quite immature and anyone can try hers luck to contribute to this scheduler that is written in Python.
  • Apache Oozie
    • It is a workflow scheduler system to manage Apache Hadoop jobs.
    • It is a directed acyclical graph od actions.
    • It is scalable, reliable and extensible system.
  • Azkaban
    • It is a batch job scheduler.
    • It helps to control dependencies¬†and scheduling of individual pieces to run.
  • Helix
    • It is a generic cluster management framework for automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
    • Features: (a) automatic assignments of resources to nodes, (b) node failure detection and recovery, (c) dynamic addition of resources and nodes to a cluster, (d) pluggable distributed state machine, (e) automatic load balancing and throttling of transactions
  • Norbert
    • Provides easy cluster management and workload distiribution
    • It is implemented in Scala and wraps ZooKeeper, Netty and Protocol Buffers to make it easier to build applications.
    • Purpose: (a) Provide Group Management, change configuration, add/remove nodes. (b) Partitions workload by using software load balancing. (c) Provide asynchronous client/server RPC and notifications.


  • Bucardo¬†is an asynchronous¬†PostgreSQL¬†replication
  • And whatever others…

Consesnsus and stuff:

Log Collection:

  • Apache Kafka
    Distributed Pub/Sub messaging systems that supports:

    • Persistent messaging with constant time performance
    • High-throughput
    • Explicit support for message partitioning over servers and machine-consumers
    • Support for parallel data load into Hadoop

    It is a viable solution to provide logged data to offline analysis systems like Hadoop, but it is might be quite limited for building real-time processing. The system is quite similar to Scribe and Apache Flume as they all do activity stream processing, even though that the architectures are different.

  • Logstash

Message Passing Frameworks:

(De)Serialization for sending objects in the network safely:

  • Avro
    • It is a data serialization system.
    • Provides: (a) rich data structures, (b) compact, fast, binary data format, (c) container file to store persistent data, (d) integration with dynamic languages. The schemas are defined with JSON.
    • Used by Spotify
  • Protocol Buffer
    • Developed and Used by Google.
    • It encodes structured data in an efficient extensible format.
  • Apache Thrift
    • Purpose: scalable cross-language services deployment.

Chasing for latency:



Jan 232013

NASDAQ OMX (N) is the world’s largest stock exchange technology provider. Around ten percent of all world’s securities transaction is happening here. More importantly is that, N multi-assets trading and clearing system is the world’s fastest and scalable one. This enables the trading of any asset in any place in the world. The core system of the company for trading and clearing is called Genium INET and is very complex, optimized and customized for different customers. ¬†N owns 70 stock exchanges of 50 countries and serves multiple assets including equities, derivatives, debts, commodities, structured products and ETFs.

  • N united 3900 companies in 39 countries with $4,5 trillion in total market value.
  • N is the world’s largest IT industry exchange.
  • N is the world’s largest biotech industry exchange.
  • N is the world’s largest provider of technology for the exchange industry.
  • Over 700 companies use more than 1500 different corporate products and services.
  • N data accessed by more than two millions users in 83 countries.
  • The average transaction latency on N in 2010 was less than 100 microseconds.
  • The N implementation of INET is able to handle more than one million transactions per second.

Main issues that are faced is to maintain high level of reliability, low latency and security in the system. This niche is unique and normally not covered among conventional IT vendors and consultancies. Among the existing N systems are the following: trading systems, clearing systems, market data systems, surveillance systems, advisory systems.

Obviously all the data in the system is required to be stored reliably, with an extremely fast response. Moreover, some requests and queries can be performed over the data, in addition to “on fly” processing the incoming data and storing the results.

Variety of challenges appears to design and implement such a system, among them a load balancing of incoming requests to store the information, fault tolerance to maintain 24/7 availability and ability to hot update without losing the availability.

Jan 162013

DBMS with extremely low latency? Suitable for real-time requirements?

MySQL CLuster –¬†¬†is an Open Source, Scalable and highly Performable DataBase. It is a technology that enables clustering of in-memory database in a shared-nothing system.

Following the paper on MySQL Cluster from Mikael  Ronstrom MySql Cluster combining with Dolphin Express cluster is highly suitable for the  low latency and real time tasks. Dolphin Express cluster provides low latency on the hardware level in combination with optimized TCP/IP bypass software РSuperSockets. All these interconnections and combination, makes MySQL Cluster improve latency, efficiency and bandwidth. Moreover, Dolphine Express hardware can handle 3-5 times higher bandwidth than Gigabit Ethernet.

Even though the results presented in the paper is quite old, latency improvment is quite significant. Nowadays, the latency of Dolphin SuperSockets as big as a couple of microseconds, as it is visualized below.

The newest version of the MySQLCluster 7.2 reaches a throughput of 72M reads/sec (a bit unclear what is “operation”: simple read/write or something more complex) ¬†on 30 node cluster using MySQLCluster 7.2.5. How is it possible to reach such numbers? ¬†Well tuned and very deep performance analysis, so that any strange behavior can be controlled or fixed.

Interesting, what will be the performance of my Thesis System? ūüôā Stay tuned!

Nov 082012

In the field of Distributed Data-Storage is it almost impossible to come up with universal system that will satisfy all needs. That’s why, recently, various distributed storage systems appear to face different needs and use different approaches.

DynamoDB uses a key-value interface with only replication within a region. I haven’t checked myself the latencies range, but from its website latency is varies witing single digit milliseconds, what is at least 10 times more that I want to reach in the thesis system.

Megastore doesn’t reach a great performance because it is based on the Bigtable (with high communication cost), however it is scalalble and consistent. Synchronization for wide area reslication is done with Paxos. ¬†Taking into account scalbility, consistency and faults priviledges, latency is sacrifiesed and is witing 100-400 milliseconds for reads and writes.

Scatter is a DHT-based key-value store that layers transactions on top of consistent replication (uses a low level interface). Even though it provides high availaility and scales well, still latencies for the operations are witin milliseconds.

VoltDB is an in-memory db that support master-slave replication over wide area range.

Cassandra is an column based storage developed and used by Facebook with reads within milliseconds.

Spanner provides semi-relational data model support and provides high performance, high level interface, general-purpose transactions and external consistency (using GPS and atomics clocks with new concept of time leases: TrueTime). Spanner also integrates concurrency control with replication. The main contribution of the paper is that the system solves the problem of wide-area replication system and that it implements globally synchronized timestamps (support strong consistency and linearizability for writes and snapshop isolation for reads). Good: TrueTime. Interleaving data. Atomic schema change. Snapshop reads for the past. Weak: Possible clocks uncertainty. Paxos groups are not reconfigurable. Read-Only transaction with trivial solution for executing reads (if there are a few Paxos groups, Spanner is not using communication within this groups and simply apply the latest timestamp on the read). Typical reads are near 10 ms and writes average is 100 ms.

Which¬†characteristics can be¬†sacrified¬†in order to reach specific goals? The answer is: the system should be adopted as much as posible to the needs. Another thing when you are actually chasing for the latencies… Most probably rare DB will fit your requirements…

If it is not 90% well suited – Let the funny part start -> Do it yourself ūüôā Like me:))))

Nov 072012

Pregel is a system for processing large-scale graphs. The main contributions of the paper are the following: 1) fault-tolerant distributed programming framework for graph algorithms execution; 2)API with direct message passing among vertices.

Brief description:

Pregel is a synchronized computation process on vertices. Upon inserting a graph as an input, the graph is divided into partitions (using consistent hashing), which include a set of vertices and their outgoing edges.  One of the machine is coordinator. The workers then undergo a series of iterations, called supersteps. In every superstep, all vertices in each worker execute the same user-defined function which can (a) receive messages sent during the previous superstep, (b) modify the state of the vertex and its outgoing edges (vertices and edges are kept on the machines) and (c) send messages to be delivered during the next superstep. At the end of each superstep a global synchronization point occurs. Vertices can become inactive and the sequence of iterations terminates when all vertices are inactive and there are no messages in transit. During computation, the master also sends ping messages to check for workers failures. The network is used only for sending messages and therefore it significantly reduces the communication overhead, becoming more efficient.

The presentation can be found here:


  • Usability
  • Scalability
  • Performance
  • Transparency in vertex-to-machine assigning

Not good/Open questions/Future improvements:

  • How to detect a master failure?
  • Why not to compare with MapReduce comparison in evaluation part.
  • Failure Detection needed to be¬†improved¬†to confined recovery.
  • Improve automatisation for defining user-defined functions
  • hash(VertexID) mod Partition – Why not to use smarter vertex assigning (to reduce then network usage)?