Feb 132013
 

Megastore is a storage system develope by Google, as an improvement of their BigTable one. They claimed to support strong consistency and high availability guatantees. Moreover, fully serializable ACID semantics within fine-grained partitions of data are provided.

To achieve hight availability and consistency, synchronous replication is applied. Replication is implemeted using Paxos for every write across the data centers, e.g. synchronous replication for writes is supported.

Contribution: 

  1. Availability and scalability are apriory build-in.
  2. Paxos replication and consensus algorithm is optimized for low-latency across geo distirubted datacentres. This way high availability is supported. (Low-latency = 10-1000 miliseconds)

Availability + Scalability

Availability is supported by using synchronous, fault-tolerant log replicator optimized for long distances. While scalability is supported with partitioning the database. Those two concepts are well thought and oriented for long-distance links and active communication over them.

To scale throughput data is partitioned into a collection of entity groups. Operation across entity groups leverage asynchronous messaging. It is alos used for logically distant entities. Minimizing latency and improving throughput are laid on the applocation control level.

megastore_arch

Megastore is deployed through clients libraries to which applications are linked. This library implements Paxos, therefor they deal with failure detecting and replication. Durability of Paxos operations is provided by direct send of an operation to the local Bigtable instance.

Another cool thing about the Megastore is its various index tuning. e.g. secondary indexes can be declared, therefor optimise further lookup.

Concurrency control

Each Entity group is abstracted to be a DB that provide serializable ACID semantics. Each row i nthe Megastore can be stored with multiple values with different timestamps. This way multiversion concurency control supported. Similar approach I will use in my thesis project to suport concurrent updates to the DS.

For atomic updates across entity groups a two phase commit can be used. However its usage is not recommended as it may increase the risk of contention.

Replication

Reads and writes can be instantiated from any replica preserving ACID semantic. Repication is performed per entity group. Simply, transaction log synchronously transmitted to a quorum of replicas. Agreemet between replicas is maintained by an optimized Paxos. Coordinator tracks a set of entities groups for each replica. Fast writes are leveraged by teh master-based approach (where each read and write are dedicated to one master). This way Paxos prepare stage can be skipped, if the previous requests succeded.

Strong side

  • Partitioning the data into entity groups. This way suport for ACIT transaction is guarateed.
  • Consistency level can be loosed but the user, therefor the latency improves.
  • Indexation is extremely useful for development.
  • Extension of Paxos to optimize wide-range aggrements.

Weak side

  • Partitioning of the data laid on the application side.
  • Write rate is quite low
  • Joins give an extra overhead, as the data should be denotmalized
  • Asymetric network partitioning vulnerability.
  • Chubby for failure detection, which is, most probably, not the most effecient tool.
Jan 252013
 

Most probably many of you asked yourselves: To build from scratch or Not To build from scratch?

  1. – To build!
  2. – Not to build!
  3. – To build!
  4. – Not to build!

Hmm…  Hard to decide? Take a look on the diagram and decide!

To build from scratch or not to build from scratch

Is there any Re Use in your answer? So you are one of those lucky bastards who can relax for a while and enjoy the cocktail 🙂

Re-use in all its beauty

Distributed platforms and frameworks aka Map-Reduce:

  • Apache Hadoop is an open source software for reliable, scalable, distributed computing. It is allows to use simple programming models to support distributed processing of a large data sets across clusters of computers. The main components are HDFS (distributed file system), Hadoop YARN (framework for job scheduling resource management), Hadoop MapReduce (system for parallel large data sets processing).
  • YARN is an aka second generation of MapReduce. The main idea behind its architecture is to split JobTracker’s resource management and job scheduling/monitoring into separate daemons. So that a Global Resource Manager and per-application AppManager were represented in the system.
    Architecture:
  • Disco Project is a framework that is based on a MapReduce paradigm. It is on open Source Nokia Research Centre project which purpose is to serve handling of massive amount of data. According to the documentation it distribubed and replicates your data, schedules your jobs efficiently. Moreover, indexing of large amount of records is supported, so that real time querying is possible.
    Architecture:

    • Master adds jobs to the job queue and run them when nodes are available.
    • Client submit jobs to the master
    • Slaves are started on the nodes by master and spawn and monitor processes.
    • Workers do the jobs. Output location is notified to the Master.
    • Data locality mechanisms are applied once results are saved to the cluster.

  • Spring Batch is an open source framework for batch processing. It is provide development of robust batch applications. It is built on Spring Framework. It supports logging, transaction management, statistics, restart, skip and resource management. Some optimization and partitioning techniques can be tuned in the system as well.
    Architecture:

    • Infrastructure level is a low level tool. It is provide an opportunity to batch operations together and retry if an error occurred.
    • Execution environment provides robust features for tracing and management of the batch lifecycle.
    • Core module is the batch-focused domain and implementation. It includes statistics, job launch, restart.

  • Gearman is a framework to delegate tasks to another machines and processes that are more suited for it. Parallel execution of work is supported, as well as load balancing, multi language function calls.
    Architecture:

    • A client, a worker and a job server are the parts of the system.
    • The client creates jobs and sends them to job server.
    • Job server forward tasks to suitable workers.
    • Worker works and responds to the client through job server.
    • Communication with the server is established through TCP sockets.

Distributed platforms and frameworks aka Directed Acyclic Graph aka Stream Processing Systems:

  • Spark a system that is optimized for data analytics to make it fast both to run and write. It is suited for in-memory data processing. Api is in Java and Scala. Purpose: machine learning and data mining, also general purpose is possible. It runs on Apache Mesos to share resources with Hadoop and other apps.
  • Dryad is a general purpose runtime for execution data parallel apps. It is modeled as a directed acyclic graph (DAG) which defines the dataflow of the application and vertices that represents the operations that should be performed on the data. However, creation of a graph is quite tricky part. That is why some high-level language compilers were created. One of them is DryadLINQ.
  • S4 and Storm
    are both for real time, reliable processing of unbounded stream of data. The main difference between Storm and S4 is the Storm guarantees messages to be processes even while failures occur, and S4 supports state recovery. More in the previous post.

Distributed Data Storage(DSS)

  • Key-Value
  • Column Based
  • SQL
  • NewSQL
  • bla bla bla…
  • Latency
  • Consistency
  • Availability
  • bal bla bla…

The problem of choosing the most suitable distributed storage system is quite tricky and require some reading in the field. Some information on Storage System with their deep review, from my previous project on Decentralized Storage Systems, can be found on my wiki. Also a brief review of come hot systems are represented in my previous post.

Actor Model Frameworks:

Is a  quite old computational model for concurrent computations that consist of concurrent digital computations called actors, that can react on the received messages: make local decisions, spawn other actors, send messages and design behavior for the next message that will be received. However this framework has some issues, that should be taken into account, if the decision to use this model is made – (a) scalability, (b) transparency, (c) inconsistency.

Actor Programming Languages are Erlang, Scala and other. This is one extra motivation to get to know closer those languages.

The most popular Actor Libraries:

  • Akka
    • Language: Java and Scala
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application on the JVM.
    • Actors: Very lightweighted concurrent entities. They process messages asynchronously using an event driven receive loop.
  • Pykka
    • Language: Python
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application.
    • Actors: It is an execution unit that executes concurrently with other actors. They don’t share state with each other. Communication is maintained by sending/receiving messages. On message, an actor can do some actions. Only one message is processed at a time.
  • Theron
    • Language: C++
    • Purpose: Build highly concurrent, distributed event-driven application.
    • Actors: They are specialized objects that execute in parallel natively. Each of them has a unique address. Communication is dome by messaging. Each actor’s behavior defined in message handlers, which are user-defined private member functions.
  • S4 – more in the Stream Processing Systems section.

Scheduling and Load Balancing:

  • Luigi Scheduler
    • It is a python module that helps to build complex pipelines of batch jobs. Also it builds in a support for Hadoop. It is a scheduler that is open-sourced by Spotify and used within the company.
    • It is still quite immature and anyone can try hers luck to contribute to this scheduler that is written in Python.
  • Apache Oozie
    • It is a workflow scheduler system to manage Apache Hadoop jobs.
    • It is a directed acyclical graph od actions.
    • It is scalable, reliable and extensible system.
  • Azkaban
    • It is a batch job scheduler.
    • It helps to control dependencies and scheduling of individual pieces to run.
  • Helix
    • It is a generic cluster management framework for automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
    • Features: (a) automatic assignments of resources to nodes, (b) node failure detection and recovery, (c) dynamic addition of resources and nodes to a cluster, (d) pluggable distributed state machine, (e) automatic load balancing and throttling of transactions
  • Norbert
    • Provides easy cluster management and workload distiribution
    • It is implemented in Scala and wraps ZooKeeper, Netty and Protocol Buffers to make it easier to build applications.
    • Purpose: (a) Provide Group Management, change configuration, add/remove nodes. (b) Partitions workload by using software load balancing. (c) Provide asynchronous client/server RPC and notifications.

Replication:

Consesnsus and stuff:

Log Collection:

  • Apache Kafka
    Distributed Pub/Sub messaging systems that supports:

    • Persistent messaging with constant time performance
    • High-throughput
    • Explicit support for message partitioning over servers and machine-consumers
    • Support for parallel data load into Hadoop

    It is a viable solution to provide logged data to offline analysis systems like Hadoop, but it is might be quite limited for building real-time processing. The system is quite similar to Scribe and Apache Flume as they all do activity stream processing, even though that the architectures are different.
    Architecture:

  • Logstash

Message Passing Frameworks:

(De)Serialization for sending objects in the network safely:

  • Avro
    • It is a data serialization system.
    • Provides: (a) rich data structures, (b) compact, fast, binary data format, (c) container file to store persistent data, (d) integration with dynamic languages. The schemas are defined with JSON.
    • Used by Spotify
  • Protocol Buffer
    • Developed and Used by Google.
    • It encodes structured data in an efficient extensible format.
  • Apache Thrift
    • Purpose: scalable cross-language services deployment.

Chasing for latency:

 

 

Nov 082012
 

In the field of Distributed Data-Storage is it almost impossible to come up with universal system that will satisfy all needs. That’s why, recently, various distributed storage systems appear to face different needs and use different approaches.

DynamoDB uses a key-value interface with only replication within a region. I haven’t checked myself the latencies range, but from its website latency is varies witing single digit milliseconds, what is at least 10 times more that I want to reach in the thesis system.

Megastore doesn’t reach a great performance because it is based on the Bigtable (with high communication cost), however it is scalalble and consistent. Synchronization for wide area reslication is done with Paxos.  Taking into account scalbility, consistency and faults priviledges, latency is sacrifiesed and is witing 100-400 milliseconds for reads and writes.

Scatter is a DHT-based key-value store that layers transactions on top of consistent replication (uses a low level interface). Even though it provides high availaility and scales well, still latencies for the operations are witin milliseconds.

VoltDB is an in-memory db that support master-slave replication over wide area range.

Cassandra is an column based storage developed and used by Facebook with reads within milliseconds.

Spanner provides semi-relational data model support and provides high performance, high level interface, general-purpose transactions and external consistency (using GPS and atomics clocks with new concept of time leases: TrueTime). Spanner also integrates concurrency control with replication. The main contribution of the paper is that the system solves the problem of wide-area replication system and that it implements globally synchronized timestamps (support strong consistency and linearizability for writes and snapshop isolation for reads). Good: TrueTime. Interleaving data. Atomic schema change. Snapshop reads for the past. Weak: Possible clocks uncertainty. Paxos groups are not reconfigurable. Read-Only transaction with trivial solution for executing reads (if there are a few Paxos groups, Spanner is not using communication within this groups and simply apply the latest timestamp on the read). Typical reads are near 10 ms and writes average is 100 ms.

Which characteristics can be sacrified in order to reach specific goals? The answer is: the system should be adopted as much as posible to the needs. Another thing when you are actually chasing for the latencies… Most probably rare DB will fit your requirements…

If it is not 90% well suited – Let the funny part start -> Do it yourself 🙂 Like me:))))

Nov 072012
 

Pregel is a system for processing large-scale graphs. The main contributions of the paper are the following: 1) fault-tolerant distributed programming framework for graph algorithms execution; 2)API with direct message passing among vertices.

Brief description:

Pregel is a synchronized computation process on vertices. Upon inserting a graph as an input, the graph is divided into partitions (using consistent hashing), which include a set of vertices and their outgoing edges.  One of the machine is coordinator. The workers then undergo a series of iterations, called supersteps. In every superstep, all vertices in each worker execute the same user-defined function which can (a) receive messages sent during the previous superstep, (b) modify the state of the vertex and its outgoing edges (vertices and edges are kept on the machines) and (c) send messages to be delivered during the next superstep. At the end of each superstep a global synchronization point occurs. Vertices can become inactive and the sequence of iterations terminates when all vertices are inactive and there are no messages in transit. During computation, the master also sends ping messages to check for workers failures. The network is used only for sending messages and therefore it significantly reduces the communication overhead, becoming more efficient.

The presentation can be found here:

Good:

  • Usability
  • Scalability
  • Performance
  • Transparency in vertex-to-machine assigning

Not good/Open questions/Future improvements:

  • How to detect a master failure?
  • Why not to compare with MapReduce comparison in evaluation part.
  • Failure Detection needed to be improved to confined recovery.
  • Improve automatisation for defining user-defined functions
  • hash(VertexID) mod Partition – Why not to use smarter vertex assigning (to reduce then network usage)?
Jun 282012
 

Recently I had some free space in my schedule and Why not to do some extra stuff on Scalable Distributed Systems?

My choice fell into NoSQL DB, to be precise they are Redis, MongoDB and Cassandra. Of course I could do some more, but had not enough time to do it. So I’m planning to do it later.. one day..maybe 🙂

Hmm… What actually did I do?

First!  Install

Second! Take a look on the performance. But how to do it in the most efficient way and obviously to skip the part of reinventing the wheels. Why not to use  Yahoo! Cloud Serving Benchmark for evaluating the most common in use NOSQL Databases. The main metrics to compare were:

  • Run Time (ms)
  • Throughput (ops/sec)
Third! Install the next one. Go to the step 2.
All details about the DB installations and their performance are here.

Conclusions and observations:

The main challenge was to properly set up all the parameters and connect the DB to the client side – Yahoo! Cloud Serving Benchmark, with update heavy workload and two execution phases – load and transaction – and two metrics: Run time and Throughput.

During the evaluation it was found that all three DB were performing quite similarly, with a slight difference for MongoDB. Running Time of the transaction phase in MongoDB was slightly greater than the others and the throughput was lower.

Obviously, due to the different application areas of there NOSQL DBs – in memory DB, DB with small size structured data – it’s impossible to say which one of them is the best. That’s why depending on the needs, corresponding DB should be chosen 🙂

 

May 182012
 

Currently, I’m quire inspired doing one of the project I have in UPC.

It’s a practical evaluation of processes’ coordination system ZooKeeper. The main goal is to understand completely working principles and try to find some bottlenecks, applying some sophisticated system with ZooKeeper.

Obviously, it’s quite hard to deal with the system and even not to know the basic principles and ideas put in the system. For the reason of clear and conscious work, I decided to read an article on the topic written by authors of the system.

Here is what I come up with:

ZooKeeper – a service for coordinating process of distributed applications. It’s actually a key/value table with hierarchical keys. But it’s not designed to general data storage purposes, but still some clients information could be stored there. How it actually tries to achieve all it? By incorporating elements from group messaging, shared registers and distributed lock services into a replicated, centralized service. To combine all these features into one, ZooKeeper uses wait-free shared registered and FIFO execution guarantees with an event-driven mechanism, so it could be still simple and powerful at the same time. The idea fo ZooKeeper was mainly inspired by Chubby, locking service with strong synchronization guarantees. Meanwhile, ZooKeeper was developed avoiding blocking primitives, locks. This system implements an API that manipulates simple wait-free data objects , organized hierarchically as in file system.

Which guarantees could provide ZooKeeper? 

  • Linearizable writes.
  • FIFO client order.
  • Handling shared configuration updates.
  • Liveness and durability

How?

  • Different node types (regular, ephemeral)
  • Watches
  • Completely Replicated System

What for?

  • Configuration Management
  • Rendezvouz
  • Group Membership
  • Simple Locks
  • Simple Locks without Herd Effect
  • Read/Write Locks
  • Double Barrier

Want to read more, check on my wiki.

Implementations

  Configuration Management

  Reliable Multicast

Both program were implemented with ZooKeeper. Code could be found following the inks above, explanation and activity diagrams of the application fully presented in the presentation below.

Presentation

Summary and Critique from my side

ZooKeeper uses wait-free protocols to implement process coordination in distributed systems. Main goal of the system is its lack of locks, so that performance of the system could be improved significantly. Due to some features (like watches and different types of the nodes), ZooKeeper support following applications:

  • Configuration Management
  • Leader Election
  • Group Membership

All mentioned applications were implemented within following applications:

  • Dynamic LogBack Configuration Management
  • Reliable Multicast with handling of group membership and leader election to provide total order and reliability

The weakness of the ZooKeeper is that changes happened are dropped. So there could be a chance that during the time between getting the event and setting the watch multiple changes in the object could be happen. So there is a chance that not all changes will be tracked, but only the last ones. This actually means that ZooKeeper is a state based system, not an event system.

Also, complete replication within the ZooKeeper limits the total size of data that could be managed by ZooKeeper. As well, serializing all updates through a single leader could be a possible performance bottleneck.

May 072012
 

Survey paper on the Decentralized Storage Systems was finally accomplished. The final version of the paper could be found here.

Abstract.

Recently, Volunteer Computing (VC) is becoming quite popular for providing its resources for large-scale computational problems in scientific researches. Centralized control of the VC systems is the main bottleneck, since it introduces asymmetry to the system. Several systems are already benefitting from VC in terms of CPU sharing, e.g. SETI@home. Another side of the problem is storage sharing, where volunteer nodes will dedicate their free memory space to the system.

Introduction of Decentralized Storage System that is suitable for the VC is appropriate, mainly, to reduce possible overhead from the centralized part of the VC and provide available and eventually consistent decentralized storage system with minimal cost.

The focus of this paper is to thoroughly survey currently existing decentralized storage systems, evaluate them according to its suitability for VC system, and provide a state-of-the-art solution for this issue.