Aug 162013
 

Recently I was asked to become a review for a book Apache Kafka published by the PACKT publishing. I was happy to accept the offer and now I am more than happy to share my view of the technology.

Apache Kafka is a distributed publish-subscribe messaging system that is used in many top IT companies, such as LinkedIn, Twitter, Spotify, DataSift, Square etc. The system guarantees the following properties: persistent messaging with a constant access to disc structures and high performance; high throughput; distributiveness ūüôā as the access to the system is both load balanced and partitioned to reduce the pressure on the one node; real-time properties and Hadoop and Storm integration.

Installation and building the system is very easy and won’t take much time. Depending on your needs a various set ups for the cluster are possible: single node – one broker (core Kafka process), single node – multiple brokers, multiple nodes – multiple brokers. Depending on the choice, the Kafka system should be configured properly, e.g. configuration properties of each broker need to be known by the producer (message sources).

The following figure shows the simplest case, when the messages are published by producers and, through the Kafka broker, they are delivered to the consumers. In this scenario (single node – single broker) all producers, broker and consumers are run on different machines.

ApacheKafkaExample

 

Among important design solutions are the following: message caching, opportunity to re consume messages, group messages (reduce network overhead), local maintenance of the consumed messages, the system is purely decentralized and uses zookeeper for load balancing, both asynchronous and synchronous messaging is supported; moreover, a message compression to reduce the network load is used, e.g., gzip or google snappy. Additionally, the need of replication of any active system puts Kafka to the new level. Apache Kafka allow mirroring of an active data cluster into a passive one. It simply consume the messages form the source cluster and republish them on the target cluster. This feature is highly useful if the reliability and fault-tolerance are important concerns. Additionally, a simple replication is implemented within the system. This mainly is reached by the partitioning of messages (hashes) and a presence of a lead replica for each partition (this way both synch and asynch replication can be arranged).

The API provide a sufficient support to create both producer and consumers. Additionally, consumer can be one of two different type: one that ignore further processing of messages and just need the message to be delivered and another that require further processing of the message upon delivery. Consumer implementation can be both single threaded and multithreaded. However to prevent any unpredictable behavior the number of threads should correspond to the number of topics to consume. This way the full utilization of the threads and necessary message order will be preserved.

Kafka can be integrated with the the following technologies: Twitter Storm and Apache Hadoop for further online stream processing and offline analysis respectively. Storm integration is implemented on the consumer side. That is, Kafka consumer represented as regular Storm spouts that reads the data from the Kafka cluster. On the other hand, integration with Hadoop is bilateral, as it can be integrated with Hadoop as both producer and consumer.

In the former, Hadoop is integrated as a bridge for publishing the data to the Kafka broker. Hadoop producer extracts the data from the system in two ways: (a) use Pig scripts for writing data in binary Avro format (here, writes to multiple topics are made easier), (b) use Kafka “OutputFormat” class which publish data as bytes and provides control over output.

In the latter, Hadoop consumer represents a Hadoop job that pulls information from the Kafka system to HDFS. This might happen both sequentially and in parallel.

Pros: Easy to use and deploy; scalable; fast; distributed.

Cons: Might lack configurability and automatization; replication improvements required; fault-tolerance optimization.

I am going to actually test the framework and compare it with the current project I am working on at EPFL (awesome highly dynamic bft decentralized scalable pub/sub system). Stay tuned:)

Jan 162013
 

One of the possible directions for my thesis work is going to be a stream data processing. That’s why it is worth reviewing some possible architectures, which at least can be used afterwords to compare my future system with existing ones.

Data stream processing system become quite popular due to the enormous growth (in particular in Volume, Variety, Velocity) of big data analytics on the fly. Starting from Machine Learning systems, such as Mahout, WEKA etc., continuing with a financial trading system, finally ending with realtime world process analysis. Among distributed systems for data stream processing, I would like to highlight two: S4(Simple Scalable Streaming System) and Storm.

Storm

Storm is a real time computation system for reliable processing of unbounded stream of data. Storm topology takes streams of data and process them in a complex way.  A good thing about this system is that it is can be integrated with any queueing system and any database system. On the one hand, Storm is needed simply to connect to the database and in response it will handle the parallelization, partitioning and failures when necessary. Within storm tuples can be manipulated. Tuple is a names list of values.

Storm supports three abstractions: spouts, bolts and topologies. Spout Рsource of streams in a computation. Bolt Рprocess input streams and produce another output streams. Topology Рnetwork of spouts and bolts, where the bolt is an edge subscribed to the other spout or bolt. Topologies are submitted using Apache Thrift. On another hand, Storm is scalable and can be run across a cluster of machines. At the same time, fault-tolerance is taken into account. Storm daemons are developed to be stateless and fail-fast. Nimbus and Supervisors are used to maintain fault-tolerance properties.  What if:

  • Worker dies?¬†Supervisor will restart it. If it is still not responding with pings to Nimbus, Nimbus¬†will reassign the worker to another machine.
  • Node dies?¬†Tasks, assigned to that machine, will time-out and Nimbus will reassign them to another machine.
  • Nimbus (N) or Supervisor (S) dies? All states stored in Zookeeper or on disk. Usually N and S should be run over supervision of daemontools or monit. That’s why on restart they will start function as nothing had happened. Additionally, no worker processes are effected by the fail of N or S.
  • Nimbus dies?¬†Nothing critical will happen if N fails. However the ability to reassign processes to other machines won’t be able (which is equivalent to losing¬†an opportunity to reassign tasks to other machines).

Each message in the system, coming out of the spout, is guaranteed to be fully processed. Storm assumes a tuple that is fully processed when the tuple tree processed all the messages.

Architecture

The system has a master and workers. The master holds Nimbus, which perform code distribution, task assignment and failure monitoring. Each worker has a Supervisor, which listen to the incoming tasks, start and ends the processes. Each worker execute part of a topology, assigned to it. Finally, the coordination between the master and workers is handled by ZooKeeper. All Nimbus and Supervisors states are stored either in ZooKeeper or Local Disks.

As it was said before, the whole computation is based on a built computational tree – topologie. Over topologie a stream of data is circling. Each node in Storm topology executes in parallel.

Stream grouping is one of the distinctive feature of a Storm system. It manages topology to send tuples between two components in a required fashion (send it to a specific task in a component).

S4

S4 is a simple, scalable and flexible processing platform for data streams. Processing elements in  the system can be specified and form directed graphs when a data stream is supposed to flow. Nodes in the graph is called Processing Elements(PE) through which data is sent in the form of events. ZooKeeper is used for managing the configuration and tasks assignment. This way the system can be dynamically reconfigured and resolve failures by reassigning tasks to idle nodes.

PE is the basic computation units in the system and uniquely identified by functionality, types of events, keyed attribute and a value. However a keyless PEs are also possible and can accept all events to which they are associated, while keyed PE accept only events with corresponding key. Usually keyless are the first elements in a processing path. Each node usually have one instance of any keyless PE. Each PE has a state which can be changes as a reaction on received event.

Processing Node (PN) is hosting a running PE. Through PN different PEs can be reached, interruction with a communicatino leyer is estabblished and emmiting output messages are supported. Listener component in PN is used to receive events. Events are routed to PN based on a hash function for keyed attribute .

Processing Node and Communication Layer in S4

Cluster management and automatic failover provided by the communication layer.

The biggest difference between Storm and S4 is that Storm guarantees messages will be processed even in the face of failures whereas S4 will sometimes lose messages. However the state recovery is not supported by Storm.