Aug 162013
 

Recently I was asked to become a review for a book Apache Kafka published by the PACKT publishing. I was happy to accept the offer and now I am more than happy to share my view of the technology.

Apache Kafka is a distributed publish-subscribe messaging system that is used in many top IT companies, such as LinkedIn, Twitter, Spotify, DataSift, Square etc. The system guarantees the following properties: persistent messaging with a constant access to disc structures and high performance; high throughput; distributiveness ūüôā as the access to the system is both load balanced and partitioned to reduce the pressure on the one node; real-time properties and Hadoop and Storm integration.

Installation and building the system is very easy and won’t take much time. Depending on your needs a various set ups for the cluster are possible: single node – one broker (core Kafka process), single node – multiple brokers, multiple nodes – multiple brokers. Depending on the choice, the Kafka system should be configured properly, e.g. configuration properties of each broker need to be known by the producer (message sources).

The following figure shows the simplest case, when the messages are published by producers and, through the Kafka broker, they are delivered to the consumers. In this scenario (single node – single broker) all producers, broker and consumers are run on different machines.

ApacheKafkaExample

 

Among important design solutions are the following: message caching, opportunity to re consume messages, group messages (reduce network overhead), local maintenance of the consumed messages, the system is purely decentralized and uses zookeeper for load balancing, both asynchronous and synchronous messaging is supported; moreover, a message compression to reduce the network load is used, e.g., gzip or google snappy. Additionally, the need of replication of any active system puts Kafka to the new level. Apache Kafka allow mirroring of an active data cluster into a passive one. It simply consume the messages form the source cluster and republish them on the target cluster. This feature is highly useful if the reliability and fault-tolerance are important concerns. Additionally, a simple replication is implemented within the system. This mainly is reached by the partitioning of messages (hashes) and a presence of a lead replica for each partition (this way both synch and asynch replication can be arranged).

The API provide a sufficient support to create both producer and consumers. Additionally, consumer can be one of two different type: one that ignore further processing of messages and just need the message to be delivered and another that require further processing of the message upon delivery. Consumer implementation can be both single threaded and multithreaded. However to prevent any unpredictable behavior the number of threads should correspond to the number of topics to consume. This way the full utilization of the threads and necessary message order will be preserved.

Kafka can be integrated with the the following technologies: Twitter Storm and Apache Hadoop for further online stream processing and offline analysis respectively. Storm integration is implemented on the consumer side. That is, Kafka consumer represented as regular Storm spouts that reads the data from the Kafka cluster. On the other hand, integration with Hadoop is bilateral, as it can be integrated with Hadoop as both producer and consumer.

In the former, Hadoop is integrated as a bridge for publishing the data to the Kafka broker. Hadoop producer extracts the data from the system in two ways: (a) use Pig scripts for writing data in binary Avro format (here, writes to multiple topics are made easier), (b) use Kafka “OutputFormat” class which publish data as bytes and provides control over output.

In the latter, Hadoop consumer represents a Hadoop job that pulls information from the Kafka system to HDFS. This might happen both sequentially and in parallel.

Pros: Easy to use and deploy; scalable; fast; distributed.

Cons: Might lack configurability and automatization; replication improvements required; fault-tolerance optimization.

I am going to actually test the framework and compare it with the current project I am working on at EPFL (awesome highly dynamic bft decentralized scalable pub/sub system). Stay tuned:)

Jan 252013
 

Most probably many of you asked yourselves: To build from scratch or Not To build from scratch?

  1. – To build!
  2. – Not to build!
  3. – To build!
  4. – Not to build!

Hmm… ¬†Hard to decide? Take a look on the diagram and decide!

To build from scratch or not to build from scratch

Is there any Re Use in your answer? So you are one of those lucky bastards who can relax for a while and enjoy the cocktail ūüôā

Re-use in all its beauty

Distributed platforms and frameworks aka Map-Reduce:

  • Apache Hadoop is an open source software for reliable, scalable, distributed computing. It is allows to use simple programming models to support distributed processing of a large data sets across clusters of computers. The main components are HDFS (distributed file system), Hadoop YARN (framework for job scheduling resource management), Hadoop MapReduce (system for parallel large data sets processing).
  • YARN is an aka second generation of MapReduce. The main idea behind its architecture is to split JobTracker’s resource management and job scheduling/monitoring into separate daemons. So that a Global Resource Manager and per-application AppManager were represented in the system.
    Architecture:
  • Disco Project is a framework that is based on a MapReduce paradigm. It is on open Source Nokia Research Centre project which purpose is to serve handling of massive amount of data. According to the documentation it distribubed and replicates your data, schedules your jobs efficiently. Moreover, indexing of large amount of records is supported, so that real time querying is possible.
    Architecture:

    • Master adds jobs to the job queue and run them when nodes are available.
    • Client submit jobs to the master
    • Slaves are started on the nodes by master and spawn and monitor processes.
    • Workers do the jobs. Output location is notified to the Master.
    • Data locality mechanisms are applied once results are saved to the cluster.

  • Spring Batch is an open source framework for batch processing. It is provide development of robust batch applications. It is built on Spring Framework. It supports logging, transaction management, statistics, restart, skip and resource management. Some optimization and partitioning techniques can be tuned in the system as well.
    Architecture:

    • Infrastructure level is a low level tool. It is provide an opportunity to batch operations together and retry if an error occurred.
    • Execution environment provides robust features for tracing and management of the batch lifecycle.
    • Core module is the batch-focused domain and implementation. It includes statistics, job launch, restart.

  • Gearman is a framework to delegate tasks to another machines and processes that are more suited for it. Parallel execution of work is supported, as well as load balancing, multi language function calls.
    Architecture:

    • A client, a worker and a job server are the parts of the system.
    • The client creates jobs and sends them to job server.
    • Job server forward tasks to suitable workers.
    • Worker works and responds to the client through job server.
    • Communication with the server is established through TCP sockets.

Distributed platforms and frameworks aka Directed Acyclic Graph aka Stream Processing Systems:

  • Spark¬†a system that is optimized for data analytics to make it fast both to run and write. It is suited for in-memory data processing. Api is in Java and Scala. Purpose: machine learning and data mining, also general purpose is possible. It runs on Apache Mesos to share resources with Hadoop and other apps.
  • Dryad is a general purpose runtime for execution data parallel apps. It is modeled as a directed acyclic graph (DAG) which defines the dataflow of the application and vertices that represents the operations that should be performed on the data. However, creation of a graph is quite tricky part. That is why some high-level language compilers were created. One of them is DryadLINQ.
  • S4 and Storm
    are both for real time, reliable processing of unbounded stream of data. The main difference between Storm and S4 is the Storm guarantees messages to be processes even while failures occur, and S4 supports state recovery. More in the previous post.

Distributed Data Storage(DSS): 

  • Key-Value
  • Column Based
  • SQL
  • NewSQL
  • bla bla bla…
  • Latency
  • Consistency
  • Availability
  • bal bla bla…

The problem of choosing the most suitable distributed storage system is quite tricky and require some reading in the field. Some information on Storage System with their deep review, from my previous project on Decentralized Storage Systems, can be found on my wiki. Also a brief review of come hot systems are represented in my previous post.

Actor Model Frameworks:

Is a  quite old computational model for concurrent computations that consist of concurrent digital computations called actors, that can react on the received messages: make local decisions, spawn other actors, send messages and design behavior for the next message that will be received. However this framework has some issues, that should be taken into account, if the decision to use this model is made Р(a) scalability, (b) transparency, (c) inconsistency.

Actor Programming Languages are Erlang, Scala and other. This is one extra motivation to get to know closer those languages.

The most popular Actor Libraries:

  • Akka
    • Language: Java and Scala
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application on the JVM.
    • Actors: Very lightweighted concurrent entities. They process messages asynchronously using an event driven receive loop.
  • Pykka
    • Language: Python
    • Purpose: Build highly concurrent, distributed, fault-tolerant event-driven application.
    • Actors: It is an execution unit that executes concurrently with other actors. They don’t share state with each other. Communication¬†is maintained by sending/receiving messages. On message, an actor can do some actions. Only one message is processed at a time.
  • Theron
    • Language: C++
    • Purpose: Build highly concurrent, distributed event-driven application.
    • Actors: They are specialized objects that execute in parallel natively. Each of them has a unique address. Communication is dome by messaging. Each actor’s behavior defined in message handlers, which are user-defined private member functions.
  • S4¬†– more in the Stream Processing Systems section.

Scheduling and Load Balancing:

  • Luigi Scheduler
    • It is a python module that helps to build complex pipelines of batch jobs. Also it builds in a support for Hadoop. It is a scheduler that is open-sourced by Spotify and used within the company.
    • It is still quite immature and anyone can try hers luck to contribute to this scheduler that is written in Python.
  • Apache Oozie
    • It is a workflow scheduler system to manage Apache Hadoop jobs.
    • It is a directed acyclical graph od actions.
    • It is scalable, reliable and extensible system.
  • Azkaban
    • It is a batch job scheduler.
    • It helps to control dependencies¬†and scheduling of individual pieces to run.
  • Helix
    • It is a generic cluster management framework for automatic management of partitioned, replicated and distributed resources hosted on a cluster of nodes.
    • Features: (a) automatic assignments of resources to nodes, (b) node failure detection and recovery, (c) dynamic addition of resources and nodes to a cluster, (d) pluggable distributed state machine, (e) automatic load balancing and throttling of transactions
  • Norbert
    • Provides easy cluster management and workload distiribution
    • It is implemented in Scala and wraps ZooKeeper, Netty and Protocol Buffers to make it easier to build applications.
    • Purpose: (a) Provide Group Management, change configuration, add/remove nodes. (b) Partitions workload by using software load balancing. (c) Provide asynchronous client/server RPC and notifications.

Replication:

  • Bucardo¬†is an asynchronous¬†PostgreSQL¬†replication
  • And whatever others…

Consesnsus and stuff:

Log Collection:

  • Apache Kafka
    Distributed Pub/Sub messaging systems that supports:

    • Persistent messaging with constant time performance
    • High-throughput
    • Explicit support for message partitioning over servers and machine-consumers
    • Support for parallel data load into Hadoop

    It is a viable solution to provide logged data to offline analysis systems like Hadoop, but it is might be quite limited for building real-time processing. The system is quite similar to Scribe and Apache Flume as they all do activity stream processing, even though that the architectures are different.
    Architecture:

  • Logstash

Message Passing Frameworks:

(De)Serialization for sending objects in the network safely:

  • Avro
    • It is a data serialization system.
    • Provides: (a) rich data structures, (b) compact, fast, binary data format, (c) container file to store persistent data, (d) integration with dynamic languages. The schemas are defined with JSON.
    • Used by Spotify
  • Protocol Buffer
    • Developed and Used by Google.
    • It encodes structured data in an efficient extensible format.
  • Apache Thrift
    • Purpose: scalable cross-language services deployment.

Chasing for latency:

 

 

May 182012
 

Currently, I’m quire inspired doing one of the project I have in UPC.

It’s a practical evaluation of processes’ coordination system ZooKeeper. The main goal is to understand completely working principles and try to find some bottlenecks, applying some¬†sophisticated system with ZooKeeper.

Obviously, it’s quite hard to deal with the system and even not to know the basic principles and ideas put in the system. For the reason of clear and¬†conscious work, I decided to read an article on the topic written by authors of the system.

Here is what I come up with:

ZooKeeper – a service for coordinating process of distributed applications. It’s actually a key/value table with hierarchical keys. But it’s not designed to general data storage purposes, but still some clients information could be stored there. How it actually tries to achieve all it? By incorporating elements from group messaging, shared registers and distributed lock services into a replicated, centralized service. To combine all these features into one, ZooKeeper uses wait-free shared registered and FIFO execution guarantees with an event-driven mechanism, so it could be still simple and powerful at the same time.¬†The idea fo ZooKeeper was mainly inspired by Chubby, locking service with strong synchronization guarantees. Meanwhile, ZooKeeper was developed avoiding blocking primitives, locks. This system implements an¬†API¬†that¬†manipulates simple wait-free data objects¬†, organized hierarchically as in file system.

Which guarantees could provide ZooKeeper? 

  • Linearizable writes.
  • FIFO client order.
  • Handling shared configuration updates.
  • Liveness and durability

How?

  • Different node types (regular, ephemeral)
  • Watches
  • Completely Replicated System

What for?

  • Configuration Management
  • Rendezvouz
  • Group Membership
  • Simple Locks
  • Simple Locks without Herd Effect
  • Read/Write Locks
  • Double Barrier

Want to read more, check on my wiki.

Implementations

  Configuration Management

  Reliable Multicast

Both program were implemented with ZooKeeper. Code could be found following the inks above, explanation and activity diagrams of the application fully presented in the presentation below.

Presentation

Summary and Critique from my side

ZooKeeper uses wait-free protocols to implement process coordination in distributed systems. Main goal of the system is its lack of locks, so that performance of the system could be improved significantly. Due to some features (like watches and different types of the nodes), ZooKeeper support following applications:

  • Configuration Management
  • Leader Election
  • Group Membership

All mentioned applications were implemented within following applications:

  • Dynamic LogBack Configuration Management
  • Reliable Multicast with handling of group membership and leader election to provide total order and reliability

The weakness of the ZooKeeper is that changes happened are dropped. So there could be a chance that during the time between getting the event and setting the watch multiple changes in the object could be happen. So there is a chance that not all changes will be tracked, but only the last ones. This actually means that ZooKeeper is a state based system, not an event system.

Also, complete replication within the ZooKeeper limits the total size of data that could be managed by ZooKeeper. As well, serializing all updates through a single leader could be a possible performance bottleneck.