Jul 162014
 

I am heading straight into the candidacy exam in a month. It is quite a challenging time and therefore I wanted to start writing about one of the papers I chose for the exam – Elementary: Large-scale Knowledge-base Construction via Machine Learning and Statistical Inference.

Elementary is a Knowledge Base Construction (KBC) that encapsulates a range variety of resources and techniques under one umbrella - statistical inference. This system employs Markov logic, probabilistic graphical models, to incorporate both rule based approach and ML (such as Conditional Random Fields) to construct the KB with a certain uncertainty ;). On the other hand, it leverages the dual decomposition to decompose the optimisation functions as a trade-off between uncertainty and scalability. Here, they try to optimise the overall cost of the system, which is cost of having false positive and false negative inference.

The system is claimed to be a universal framework for KBC, however, it is not clear how some problems, apart from classification, segmentation and clustering, could be solved. The main intuition for this statement, is that the only problems that can be solved are those that have Markov Logic Network(MLN) representation. Apart from this, it is also unclear how the further maintaining and updating is performed, as it is well known that KBs are tend to evolve rapidly over time. Moreover, it does not support ‘cold’ start as, first, entities should be manually chosen, second, relation extraction is fully relies on the distant supervision. Basically, concept extraction should be managed by either user provided functionality or user should be an expert in the field and know the most interesting entities.

High-level process of KBC is as follows:

  1. Entities and relations should be chosen prior to the execution
  2. Corpus of the documents should be chosen
  3. This is a feature extraction (FE) step. Basically, we are up to the filling two tables: entities mentions, relation mentions (not only relations we want to infer but also some that will help to deduce inferred relations).
  4. This step manipulate with the collected evidence and put it all to the MLN representation, domain rules, ML (like CRF, logistic regression, clustering etc.). Finally, rule weighting is performed
  5. Now statistical inference over the weighted MLN can be run. As a result, we make inference on the parts of the future KB
  6. Iterate over 3 and 4 again if the results are not satistiable

And now some more details about the system and underlying approach.

As it was mentioned earlier the system relies on the MLN and consists of three parts:

  • schema is used to specify which data will be provided and generated, together with the information on the instances that are to be inferred.
  • evidence is a database representation of a schema, i.e., instances of the schema tables, or in other words, presence of the schema in the corpora (extracted concepts, co-occurrence, relations etc)
  • rules serve two purposes, constrain the inference (from domain knowledge etc) and guide the inference process (conclusions from the first order logic representation of the rules). Rules are weighted by the level  of certainty it guarantees (could be either learned or heuristically set).

Knowing what we would like to get from the system, a lot of tasks could be classified to one of the problem classes: classification(logistic regression), segmentation (conditional random fields), correlated clustering. Aforementioned tasks can be split into smaller subtasks and reduced together (when their subtasks are solved independently) with certain level of confidence. Subtasks reduction is performed with dual decomposition []. This gives us flexibility in two ways:

  • split bigger tasks into smaller one’s to reach significant speed up and efficiency
  • different/multiple input sources could be treated by different processed and further combined

Therefore, both conflicting input sources and scalability can be solved simultaneously.

Finally, rules, constrains, execution paths with different inputs are represented as MLN. Apparently, in order to accounted, each component should be represented in first order logic and any adjustment to the rules and immediately applied with the further processing, i.e., optimal path to the inferred variable will ensure each rule satisfiability with certain level of confidence (rule weight). Statistical inference is applied on top of the MLN representation so that guaranty the most probable inference for a variable. First, the rules can be scoped, i.e., the number of rules can be minimised by manually adding a scoping rule to reduce the space. For example, you can specify that there are N-1 adjacent pairs for the query Adjacent(element, element). Second, a weight for the rule might be set proportional to some of the parameters in the rule, e.g., making a rule more valuable if it contains the mention of the word “Obama”.

Evidence is a presence in the corpora any of the schema items. In other words, if we have co-occurrence predicate in the schema we would like to obtain co-occurrence statistics for concepts/instances. Each document treated as a set of possible overlapping text spans. Based on this notion, three tables are populated (evidences):

  • entity-mention M_E for each entity E, e.g., PERSON(Julia, TextSpans(doc_i, positions)). This table is filled for the entities that are chosen by the user
  • relationship-mention R_E for each relation (with the arity of the relation representation, e.g., Mention(a, b) – has arity 2)
  • relationship table which can be obtained from the two previous tables.
  • Finally, each text span is characterised with the feature vector: F_m(T[E], V_m), where V_ms are the features and T[E] is the list of text spans [entities].

Apparently, the necessity to have features puts a requirement to have annotated data, which could be filled with the distant supervision patterns.

The system basically leverages three main subproblem, i.e., entity linking and relation extraction on top of former and domain knowledge incorporation. Last subproblem is mainly manually curated unlike the first two due to their nature.

Entity extraction example: canonical names, string matching, regular expressions and NER (named entity recognition) can be used as a source of features for the MLN. After extracting the features, rules for the entity linking could be determined. Constrains could be set as well, e.g., if mention is an entity -> then it should have been detected by the NER etc.

Relation extraction could be simplified to the following. Commonly classification on the linguistic patterns is used. Among features are the following: pattern similarity, frequency based filtering, l1-norm regularisation [1]. Multiple approaches could be incorporated, however, after the rules for the inference should be specified alike in the entity extraction example.

To make it clearer let’s consider an example. We have a set of rules where x is a key phrase and y is a label for the sentence. We would like to infer if a “Ronaldo” is in a sentence that the other key-phrase is also sport related :

  1. Mention(x,y) and x == “Ronaldo” => SportMention(x, y)                                   | w = 4
  2. SportMention(x, y) and Mention(x1, y) and x1 != x=> SportMention(x1, y)  | w = 3
  3. Mention(x_i, y) and each x_i != “Ronaldo” => not SportMention(x1, y)        | w = 10

Let’s say we are interested in inferring SportMention having a set of evidences, i.e., we would like to deduce a relation SportMention between x and y having a set of rules/constrains and set of evidences.

Set of rules can be represented as a MLN with activation functions proportional to the weight of the rules (btw, rule weights can be either learned or heuristically set – here I did it heuristically and proportionally to the level of believe I have in the rule:)). Basically, having an annotated corpus we would like to compute the misses

Screen Shot 2014-07-23 at 21.46.46

At this stage, we can simply impose the following problem: which node inference has the most weight and which combination of rules minimises the overall error for some training examples, knowing the dependencies and constrains.

Coming back the core subproblems of the system: classification, segmentation and correlated clustering. These are the approaches that are matched to the problems in the KBC and that enable decomposition of the problem. So that, KBC could be solved in parallel.

[1] Zhu, J., Nie, Z., Liu, X., Zhang, B., & Wen, J. (2009). Statsnowball: A statistical approach to extracting entity relationships. In Proceedings of the International Conference on World Wide Web (pp. 101–110).

Jun 212014
 

I haven’t posted for a while, I hope to change it these days:)

I am continuing working on amazing project about scientific knowledge conceptualisation. The first step on the way to reach nirvana with my project is to create a methodology for ontology creation ( aka __repr__ of the knowledge base). Talking about knowledge base and its construction will clearly lead to the problem of disambiguation and necessity of removing duplicates in the knowledge base.

Obviously, construction of the ontology expects the presence of different sources as possible input, e.g., different classifications, glossaries, concepts from various papers. All those sources might produce equivalent concepts but represent/name them differently.

Here, I would like to talk about the simplest approach to resolve disambiguations, misspelling various way to write the same thing – syntactic resolution. Conflicts resolution for multiple source of information on the syntactic level includes the following: detect whether two concepts are the same based on the syntactic structure of the aforementioned.

To tackle this problem, I’ve developed a tool for multiple sources duplicate deletion with human curation upon uncertain pairs of concepts. The tool works as follows:

  • Create an index over all possible words appearing in the concepts (with the following components: its name, its stemmed form, form without special symbols), aka inverted index but with pointers to the term in which the word appeared.
  • List of possible term candidates is created for every pair that is suspicious.
  • Each pair is scored according to the following criteria:
    • Number of equal words (lemmatised)
    • Order of the words, or how the position of words differs
    • Absence or presence of extra first or last word, as it might change the meaning a lot, e.g., Weather and Weather Model, Learning and Adaptive learning.
    • One of the things that is missing is synonyms check

Created set of terms is searched in the article’s corpora. Here, whole article text is normalised (lemmatised, simplified(w/o stop words) etc.) together with the terms in the initial seed. Each term occurrence in the article is detected and accumulated in the maps, e.g., {norm_term: [(doc, start_pos, end_pos, context), ...]}

When the occurrences are retrieved the most related pairs are detected, simply by computing a weighted co-occurrence for a pair:

  • Each pair (a, b) can acquire a max of 1 weight in an article.
  • If a pair (a, b) appear in one sentence – the weight is increased by 1, else it is increased by a weight proportional to the distance between terms in the article.
  • Finally, each weight is computed as follows:
    • normalised by a min(count(a), count(b)) for accumulated weight that is smaller than this min
    • or else return 1

Here is a basic representation of the graph that will appear after applying the procedure above.

cooccurrence_example

Out of the available pairs generated from the articles, we select the most close to each other (i.e., pairs with the maximal score – sum(weights)/#of articles where pair occur) and proceed further with them.

Next problem, when the initial seed is filtered, terms are extracted from the text and pairs are determined, is to leverage the syntactic structure of a sentence, i.e. parse tree, to extract possible extensions to the seed ontology and detect possible (subj -> predicate -> obj) triplets, where subj and obj are noun phrases and are possible present in an ontology, and predicate is a possible relations between subj and obj. Here we solve three problems, detect a predicate between suggested pairs of terms, expand terms (e.g., human -> human health), detect a predicate between a given terms and possible other terms that is not in our seed.

The procedure is as follows and shown on the figure below:

  • Split the article into sentences.
  • Parse the sentences with the parser (dependency one)
  • Identify concepts (either form defined list of extract with some state-of-the-art NER)
  • Look up for the aka NpVpNp syntactic structure (the set of patterns can be extended, as it is in [1])
  • Finally compare the predicate (Vp) with the verb ontology and produce triplets and possible concepts expansions (aka bigger noun phrase for the extracted concept, e.g. climate -> climate change).
  • Each triplet extracted are aggregated over the whole corpora and the triplet with the most evidence is suggested to be confirmed by the user.

relation_extractino_pipeline

Together with relation extraction between pair of concepts, we can deal with one concepts and determine possible relations it might be involved in – as it is shown on the example below.

example1example2example3

With pink defined concepts are drawn. Green are the concepts that we determined by searching for relations to the “pink concept”, e.g. having concept storm we examined the syntactic structure and found the predicate causes and objects/concept climate change and vice versa.

Overall, motivated by the recent knowledge base construction [2, 3, 4], I’ve looked up hot to build knowledge bases (here is only basics). Talking about the volumes of data that are to be processed, obviously aka MapReduce approach should be further established. Additionally to the scalability, incremental updates to the constantly evolving system should be build-in.

Overall, relation extraction and concepts expansion showed a good performance with ~83 % precision for both tasks, a 40 % recall for the relation extraction task.

[1] K. Fundel, R. Küffner, and R. Zimmer, “RelEx—Relation extraction using dependency parse trees,” Bioinformatics, vol. 23, no. 3, pp. 365–371, Feb. 2007.
[2] F. Niu, C. Zhang, C. Re, and J. W. Shavlik, “DeepDive: Web-scale Knowledge-base Construction using Statistical Learning and Inference.,” in VLDS, 2012, vol. 884, pp. 25–28.
[3] O. Deshpande, D. S. Lamba, M. Tourn, S. Das, S. Subramaniam, A. Rajaraman, V. Harinarayan, and A. Doan, “Building, Maintaining, and Using Knowledge Bases: A Report from the Trenches,” in Proceedings of the 2013 ACM SIGMOD International Conference on Management of Data, New York, NY, USA, 2013, pp. 1209–1220.
[4] F. Niu, C. Zhang, C. Ré, and J. Shavlik, Elementary: Large-scale Knowledge-base Construction via Machine Learning and Statistical Inference. .

 

Feb 222014
 

Recently, I was reading a paper about building, maintaining and using knowledge bases. I feel that this paper will influence my future research significantly; therefore, I would like to discuss my point of view and thoughts about it.

To begin with, an ontology as a part of the information science is a formal representation of the knowledge. Having such representation may be beneficial to multiple fields and applications, i.e., search, query understanding, recommendations, advertising, social mining, etc. However, there has been a little or no documentation on a whole life cycle of the ontology, including building, maintaining, and using.

In the paper, authors try to answer numerous questions, i.e., what are the pitfalls of maintaining the large knowledge base (KB), what is the influence of the user on the system, how continuous updates and integration should be handled etc. The following choices are among the most distinguishable decisions:

  • Construction of the global ontology-like KB based on Wikipedia, i.e., the KB attempts to cover the entire world capturing all important concepts, instances and relations). Approach that was chosen for constructing a global KB has obvious disadvantages for domain specific construction. In case of building a domain specific ontology, e.g. computer science (CS), it is  unclear how to limit efficiently the Wikipedia mining process for capturing only CS related topics.
  • Enrichment the KB with additional sources, i.e., requirement to enhance the set of instances of the KB has paved the way for involving additional sources with more specific information. Combining several sources of knowledge always lead to the necessity of aligning/merging. This is a process that require information about the context and human intervention. On the other hand, only limited number of resources were processed to enlarge the KB. In my opinion, mining scientific articles in various domains will enrich the KB even further and will give the necessary depth of human knowledge in state of the art domain.
  • Relationships extraction from the Wikipedia, i.e., wikipedia pages that are connected to the KB concepts analysed extensively with well known natural language processing techniques to get free-form relations for concept pairs. By free-form it is assumed that there is no predefined set of possible relations in the KB. This gives a certain freedom but might limit the search as the number of such relations may grow infinitely.
  • KB updates are performed as a rerun of the KB construction pipeline from the scratch. Clearly, this imposes several disadvantages:
    • To rerun the whole KB construction takes substantial time.
    • As the KB is curated by the human (analyst) it is not clear how and to what extent such curation should be used in the future after the rerun. Moreover, the questions is how to utilize preceding analyst intervention to the newly generated KB. In particular, an analyst might change the edge weights, however, if this edge is not present in the newly constructed wiki DAG (or a node will be renamed) is won’t be utilised.
    • Regardless the construction process, an incremental update of the KB seems to be more logical in terms of speed and work facilitation for the analysts. Additionally, it is unclear how a single person can curate the KB of the entire world. In my opinion, aforementioned problems of curation should be placed on multiple people, e.g., crowdsourced.
    • It is unclear how conflicts are resolved when combining different sources of information, e.g., controversial relations, etc.

Aforementioned system design imposes some limitations. First, relation types are not managed and may lack the expressiveness that might be required for some applications, e.g., explainedBy, modelIn, methodsIn, importantIn, etc. This information can be extracted from not only Wiki pages, infoboxes and templates, but also from templates in the bottom of the page. Second, the DAG construction from the cyclic graph extracted from Wikipedia requires additional verification. The constructed model for the weight dissemination in the cyclic graph includes only three parameters, i.e., co-occurrences of the terms in the Web and in the Wiki lists, and name similarity. Third, KB model might benefit from the scientific paper analysis and integration of this information to the KB. Finally, the system might benefit from the data curation by means of crowdsourcing to improve the accuracy and facilitate user contribution.

Oct 202013
 

The field of Distributed Video Streaming shows a great gap in churn and adversary optimization. Attempts to eliminate adversary influence were made by cool guys from Texas University at Austin in their Bar Gossip version. However, churn is not covered and resolved by the authors.

In our work, we are trying to embrace both dynamicity and adversary behavior of possibly sub-30% nodes.
The video streaming system is build on top of the framework developed by the LPD lab at EPFL. This framework makes use of highly expensive byzantine agreement protocols over small subsets of the whole system – clusters. This approach guarantees both sustainability to adversary and consensus within honest majority of nodes. Additionally, a certain level of randomization is added to handle adversary attacks, e.g. nodes are constantly changing their location in the topology, jumping from one to another cluster.

This constant move gives a certain challenge for maintaining the streaming application.

Background

Previously, streaming systems were classified into two categories: tree-based (push) and mesh based (pull)[link]. However, both of them have its own advantages and disadvantages. The former is not suitable for handling the churn, while the latter system is robust to churn but cannot guarantee the effectiveness of the content distribution.

Even though that the mesh based topology is designed with churn in mind it is not robust agains any adversary influence and vulnerably exposed to the outside world. Oppose to the mesh systems, our framework support dynamic node placement, thus maintaining better fault-tolerance.

At the same time, two main strategies for the content dissemination exist: pull [link] and push. Pull strategies are proven to produce less overhead on the system [link], however, towards fully dynamic and partially corrupted network the pull strategy cannot provide the required fault-tolerance and constant node replacement. According to the main requirements to sustain the performance over dynamic and fault-tolerant system, we sacrify the network overhead property and adopted push strategy.

Another combination of systems are those that actually can handle byzantine adversary and try to sustain dynamicity, e.g., BarGossip[link] and FlightPath[link].

The former, BarGossip, is based on the simple gossip model however it introduces verifiable pseudo randomness for peer selection. The system is easy to implement. The main difference with simple gossip model is its introduction of pseudo random peer selection and usage of credible threats instead of reputation for peers. Credible threats mechanism is based on the fact that upon suspicion any node can send POM (proof of misbehavior message), therefore is rational node thinks that it can be suspected – it might decide to actually forward some messages. The system is implemented in rounds and uses balanced exchange (exchange some info with others if the node received something that it did not had previously in the current round) and optimistic push protocol.

The later, FlightPath, At the same time with fault-tolerance, sustain dynamicity  in the infrastructure and relies on the epsilon-Nash equilibrium. In such equilibrium rational nodes will behave differently if they expect to benefit more that factor of epsilon from such a behavior. In the system the source sends two types of messages: stream update (actual content) and linear digests (authentification of the update). This system also relies on the verifiable pseudo-random algorithm and uses history update messages.

System Model and Problem Definition

We consider a network consisting a dynamic collection of nodes with an adversary model similar to [link]. Nodes have equal roles and  connected to each other in the [link] fashion.

The underlying system architecture looks as follows:

EffectiveStreaming

 

Basically, all nodes are formed in small clusters within which a heavy consensus protocol may run relatively fast. Nodes change their places in the system constantly. Each connection is TCP, which will be updated to UDP in the future. When the source start streaming the data it broadcast it to everyone in the cluster and everyone in the neighboring clusters. Clusters are organized in Hamilton graphs and for the sake of good expandability number of these cycles is redundant (usually 2,3). As you might see it is quite preliminary description.

More details on the implementation and performance evaluation and comparison to other existing systems I am planning to add in the next posts.

Aug 162013
 

Recently I was asked to become a review for a book Apache Kafka published by the PACKT publishing. I was happy to accept the offer and now I am more than happy to share my view of the technology.

Apache Kafka is a distributed publish-subscribe messaging system that is used in many top IT companies, such as LinkedIn, Twitter, Spotify, DataSift, Square etc. The system guarantees the following properties: persistent messaging with a constant access to disc structures and high performance; high throughput; distributiveness :) as the access to the system is both load balanced and partitioned to reduce the pressure on the one node; real-time properties and Hadoop and Storm integration.

Installation and building the system is very easy and won’t take much time. Depending on your needs a various set ups for the cluster are possible: single node – one broker (core Kafka process), single node – multiple brokers, multiple nodes – multiple brokers. Depending on the choice, the Kafka system should be configured properly, e.g. configuration properties of each broker need to be known by the producer (message sources).

The following figure shows the simplest case, when the messages are published by producers and, through the Kafka broker, they are delivered to the consumers. In this scenario (single node – single broker) all producers, broker and consumers are run on different machines.

ApacheKafkaExample

 

Among important design solutions are the following: message caching, opportunity to re consume messages, group messages (reduce network overhead), local maintenance of the consumed messages, the system is purely decentralized and uses zookeeper for load balancing, both asynchronous and synchronous messaging is supported; moreover, a message compression to reduce the network load is used, e.g., gzip or google snappy. Additionally, the need of replication of any active system puts Kafka to the new level. Apache Kafka allow mirroring of an active data cluster into a passive one. It simply consume the messages form the source cluster and republish them on the target cluster. This feature is highly useful if the reliability and fault-tolerance are important concerns. Additionally, a simple replication is implemented within the system. This mainly is reached by the partitioning of messages (hashes) and a presence of a lead replica for each partition (this way both synch and asynch replication can be arranged).

The API provide a sufficient support to create both producer and consumers. Additionally, consumer can be one of two different type: one that ignore further processing of messages and just need the message to be delivered and another that require further processing of the message upon delivery. Consumer implementation can be both single threaded and multithreaded. However to prevent any unpredictable behavior the number of threads should correspond to the number of topics to consume. This way the full utilization of the threads and necessary message order will be preserved.

Kafka can be integrated with the the following technologies: Twitter Storm and Apache Hadoop for further online stream processing and offline analysis respectively. Storm integration is implemented on the consumer side. That is, Kafka consumer represented as regular Storm spouts that reads the data from the Kafka cluster. On the other hand, integration with Hadoop is bilateral, as it can be integrated with Hadoop as both producer and consumer.

In the former, Hadoop is integrated as a bridge for publishing the data to the Kafka broker. Hadoop producer extracts the data from the system in two ways: (a) use Pig scripts for writing data in binary Avro format (here, writes to multiple topics are made easier), (b) use Kafka “OutputFormat” class which publish data as bytes and provides control over output.

In the latter, Hadoop consumer represents a Hadoop job that pulls information from the Kafka system to HDFS. This might happen both sequentially and in parallel.

Pros: Easy to use and deploy; scalable; fast; distributed.

Cons: Might lack configurability and automatization; replication improvements required; fault-tolerance optimization.

I am going to actually test the framework and compare it with the current project I am working on at EPFL (awesome highly dynamic bft decentralized scalable pub/sub system). Stay tuned:)

Jul 062013
 

Finally.

I am a master of science… for the second time and now officially have three master diplomas (Ukrainian, Spanish and Swedish)…. A bit too much but I will manage:)

Getting down to business, the abstract for the thesis is the following:

In recent years the need for distributed data storage has led the way to design new systems in a large-scale environment. The growth of unbounded stream of data, the necessity to store and analyze it in real time, reliably, scalably and fast are the reasons for appearance of such systems in financial sector, stock exchange Nasdaq OMX especially.

Futhermore, internally designed totally ordered reliable message bus is used in Nasdaq OMX for almost all internal subsystems. Theoretical and practical extensive studies on reliable totally ordered multicast were made in academia and it was proven to serve as a fundamental block in construction of distributed fault-tolerant applications.

In this work, we are leveraging Nasdaq OMX low-latency reliable totally ordered message bus with a capacity of at least 2 million messages per second to build high performance distributed data store. The data operations consistency can be easily achieved by using the messaging bus as it forwards all messages in reliable total order fashion. Moreover, relying on the reliable totally ordered messaging, active in-memory replication support for fault tolerance and load balancing is integrated. Consequently, the prototype was developed using pro- duction environment requirements to demonstrate its feasibility.

Experimental results show a great scalability, and performace serving around 400,000 insert operations per second over 6 data nodes that can be served with 100 microseconds latency. Latency for single record read operations are bound to sub-half millisecond, while data ranges are retrieved with sub-100 Mbps capacity from one node. Moreover, performance improvements under a greater number of data store nodes are shown for both writes and reads. It is concluded that uniform totally ordered sequenced input data can be used in real time for large-scale distributed data storage to maintain strong consistency, fault-tolerance and high performance.

The report is here. And the presentation can be found below:

May 242013
 

A small break from thesis related posts :)

Finally I found time to describe the project we (me and Zygimantas) were working on during the last semester. And here is some motivation for it:

There is an increasing interest in distributed machine learning algorithms. A gossip learning algorithm was proposed that works on a random graph with fully distributed data. The goal of our research is to analyse the behaviour of this algorithm on clustered graphs. Experiments show that this algorithm needs to be modified in order for it to work on clustered graphs. A triangulation technique was introduced. It modifies the original peer sampling algorithm and is used to limit model exchange between different clusters. Results show that with such algorithm it is possible to find models of local objective functions.

In other words, let’s imagine a social network where people don’t want to share their private information, but they agree to locally fit their information to some kind of model generation function and then share only parameters of obtained model. However, a model parameters from only one person is not enough to make any conclusions about the network. So why not to just randomly exchange these model between friends and merge them locally. Here is where our algorithm appear with its awesome model merging. As a result, as peers are likely to exchange their models with their friends – resulting model might characterise some clusters. And Wooalia!!! If we have models for some clusters – we can make various assumptions about them. For example, you are living in Ukraine and want to move to Sweden. You are searching for a job and have no idea on what approximately you can expect as a salary. With our approach, you can put information about you (as an input) and our merged resulting function will give you an answer for Stockholm cluster :)

Obviously, all above is very simplified version of what we’ve done. Now a bit more serious explanation:

Peer-to-peer (P2P) is a system model in which every participant (peer) is equal. Peers are connected to each other in such a way that they form a graph. Moreover, P2P communication and peers themselves are unreliable, i.e. peers may fail, and messages may get delayed, may get lost and
may not be delivered at any time. Systems designed for this environment are usually robust and scalable, since no central servers are needed. Adding more computation resources to such system is the same as adding more peers. These systems usually consist of a large number of peers that communicate by passing messages to each other.

Furthermore, such P2P systems can offer security to some extent. They could be used to protect sensitive data such as personal data, preferences, ratings, history, etc. by not disclosing it to other participants. For example, in P2P the data could be completely distributed, so that each peer knows only about his data. In such case, an algorithm could be run locally and only a result of an algorithm could be shared among peers. This may ensure that there is no way for peers to learn about the data kept in other peers.

This security characteristic of P2P networks can be used to build machine learning algorithms on fully distributed data. Mark Jelasity et al. in their work [1] present such algorithm that uses gossiping for sharing predictive models with neighbour peers. We can assume that in this algorithm a random walk is performed in the P2P network. During this random walk, ensemble learning is performed, that is, model built during the random walk is merged with the local model stored at each peer. After merging two models, a merged model is then updated with the local data (which is stored at each peer) and then used in the next step of a random walk. Mark Jelasity et al. conclude that in P2P networks that form a random graph such algorithm converges. Moreover, they state that this algorithm is more effective than the one that gathers the data before building the prediction model. It is so because peers exchange only models that may be considerably smaller than the data.

Although, Mark Jelasity et al. proved that in random graphs this gossip learning algorithm converges, it is still unclear if such convergence may be achieved in clustered graphs. Moreover, the behaviour of such convergence may provide these results:

  • every peer after a number of iterations will have a model that represents the data on a local cluster;
  • after more iterations every peer will have a model that represents the data on every peer.

Summary:

  • Our gossip learning algorithm uses a framework described in [1] with Adaline gradient descent learning algorithm;
  • We analysed gossip learning algorithm’s convergence properties on random and clustered graphs;
  • We designed and implemented a graph generating tool that generates random and clustered graphs.

[1] R. Ormandi, I. Heged-Hus, and M. Jelasity. Gossip learning with linear models on fully distributed data. Concurrency and Computation: Practice and Experience, 2012.

May 202013
 

The purpose of this post is to reveal the system organization and properties.

GDSSystemArchitecture2

Figure above shows some concepts of the system design and demonstrate functionality that is covered by the system. The GDS (Genium Data Store) system design can be captured as a set of interactive layers as presented on thefigure. The main idea of this figure is to highlight multilayer organization of the system where each of these layers serve it is own purpose and which are separated between each other. The lowest two level establishes communication between nodes in the system. Nodes are both clients and data stores. Each node, when joining the system, declare its status and add itself to corresponding subscription group. There are several subscription abstraction, among them client, sequencer.

To maintain the total ordering a special subscription group is reserved: sequencer group. Over the messaging middleware a distributed component is places. It support the data replication which guarantee the scalability and availability by means of traffic reduction over the components. On top of replication layer a data store operation layer is placed which (a) support a wide range of operation over data, e.g., insert, update, read, range queries; (b) frame client messages with necessary information needed to access the stores, hence, resolving concurrency conflicts; (c) apply a snapshot mechanism to allow safe range query re-request.

These infrastructure makes it easy to maintain and control the system. Relying on the INET messaging provide a great advantage to prevent all possible inconsistencies and conflicts.

Functionality

The basic functionality provided by the GDS composed from distributed, consistent, fault-tolerant and scalable infrastructure that serve simple requests over data. Among the requests are the following: insert, get, range queries. In order to make a request, the client communicates with storage part through the provided API. Each data store processes only those messages that belong to its partition; therefore, all information about the partitioning is stored on the sequencer to keep track on the number of replicas that serve the data.

With this functionality is it possible to:

  • Store/Retrieve the data
  • Provide consistency, availability, scalability and high performance
  • Leverage the high-performance message bus and in-memory datastore
  • Eliminate a need for highly scalable storage hardware

Data and Query Model

GDS presents a column oriented data store at the first place with the further extension to any data base provider. This made simple, as adding new database schemas and tables into the system are relatively easy and can be plugged by the API for the Data store. Schemas are not flexible: new attributed can not be added at any time but only at creating the table, as the data is stored in a fixed size column fashion.. Moreover, each data must by marked with a timestamp, to speed up further read requests and avoid inconsistencies during the updates. The timestamp for an update is serves as a version, which should be checked before making an update and this way, a timestamp consistency is guaranteed.

The query language of GDS supports selection from a single table. Updates must specify the primary key, similar to PNUTS. Single table queries provide very flexible access during range requests compared to distributed hash or ordered data stores, while still being restrictive compared to relational systems.

Read Query Support

Adaptation of the NoSQL data stores to the relational ones keeps the need for range queries. This functionality is sufficient to further maintain data processing and analysis in offline mode. In the trading environment, support for the time range querying is very important, as further, transactional and analytic processing of data are required. Main use cases are logging, extracting order history, price history, index calculation etc. All these usages dictate the necessity for the range query support.

Moreover, it can be a backbone for an stable way of analyzing the data “on the fly”.

There is an extensive set of works on exploring and evaluating range queries. Among the most common solutions to support range querying is special hash function usage, that preserve locality, different distributed index structures, like trees.

GDS relies on the data locality and timestamp index which is added either by the user or data store automatically. Used data store assures that each record timestamped and therefore, look up can be improved by specifying approximate time range. Data in the store is divided into chunks, each around 100 000 records. Each chunk is indexed according to the timestamp. Records in the chunk is time indexed. This level of separation significantly reduces information lookup time.

It was decided to apply some limitation on the range query response size. Main reason for that is an availability of the system, which could degrade under transmission of unlimited size range responses. The limit is set to maximum L = 10 000 records, which is around 5MB. When the query request is processes the information on the quire size is reported to the client. If the response exceeds L, only the L first records is transmitted to the client. If it is necessary a new additional request can be issued to retrieve missing records.

To guarantee consistency in case of additional request a simple snapshot mechanism is triggered and snipped below. The same procedure is done to guarantee consistency during the failure of TCP connection that transmit the response.

Snapshot mechanism works as follows:

on_range_request:
   send(type = SNAPSHOT, empty message) // Append SNAPSHOT message append to the end of current store
   retrieve(query) // Read the data from the store
   send(response directly to client)
   if (failure || limit_for_response is exceeded)
      retrieve data untill the snapshot point is reached

Snapshot mechanism is only used for the logging use case. Approach from this snippet guarantees that range query response will be equal whenever it is requested. This implies only due to the absence of update operation on the time oriented data schema.

Towards Consistency, Availability and Speed

The design of a system that needs to operate in a production and within strong SLA requirements of NOMX is complex. The system needs to have scalable and robust solution for failure recovery, replica synchronization concurrency and request routing. The servers must be resilient to many kinds of faults ranging from the failure of individual disks, machines or routers. GDS uses active replication, based on the produced by sequencer totally ordered stream of messages, to achieve high availability and a consistent view of the data. Shortly, it produces fully serializable ACID semantic over the data store.

To do so, the following is used:

  • for consistency, reliable totally ordered stream of messages produced by sequencer is used;
  • for availability, a highly robust and fast NOMX message bus is used to support a great number of incoming operations and active replication is implemented to reduce the load from the single replica;
  • for speed, a highly robust and fast NOMX message bus is used.

It is not hard to notice that all, consistency, availability and performance, depend on NOMX message middleware. This subsystem, which various functionality, leverages sustainable behavior of the GDS system, is very critical.

Low Latency

Latency is a critical part of the production oriented system architecture. However, making latency a first order constraint in the architecture is not very common. As the result systems are usually heavily influenced by the failure resilience, availability, consistency problems etc.

The main question here is how to design a system that is oriented towards latency. A few reductions for the system requirements on the aggressive production environment are done:

  • GDS applications does not require wide range deployment
  • Localized disasters are not taken into account, however it could be adjusted be adding site replication

Here are the following steps on the way to the speed:

  • Lightweight Sequencer. The sequencer in the system has a limited functionality and his main functions reduced to assigning a sequence number to messages and forwarding them to all subscribers. Moreover, sequencer completely isolated from the incoming message content; however, it can add additional information to the message, such as, sequenced number, other user information.
  • Good Decomposition. Decomposition of the application is very important during the design of any distributed application. GDS exposes relatively decent decoupling in the system with several levels and components. The roles in the system are sequencer, clients, data stores. All of them replicated and easily replaceable. Moreover, a layer of abstraction is placed under both clients and data stores, which manages registration, communication with sequencer and makes it transparent for both clients and stores.
  • Asynchronous Interactions. All interaction in the system is based on a well-known event-driven paradigm and rely on the asynchronous communication using UDP. The underlying messaging system, that uses MoldUDP, made the communication reliable. Moreover, if the necessity to rely on synchronous API appears, it is very easy to maintain it from the asynchronous API.
  • Non Monolithic Data. The whole system is supposed to be stored in the column oriented storage and partitioned both by range and hash for different data sets, respectively. This gives the effect of highly decomposed data without any need to perform join, which are not supported by the system.
  • Low Latency Reliable Totally Ordered Message Bus. To improve the performance a highly scalable and fast NOMX messaging middleware was leveraged in many ways.
  • Effective programming techniques. Following the advises from the [Effective C++, Java], GDS was build to reduce all possible overheads from the initialization, communication, garbage collection.

Summary

GDS ia a unique distributed system build on top of the reliable total order multicast messaging middleware developed in-house by NOMX. It is build to serve a large amount of requests per second and perform it fast, with consistency, fault-tolerance and availability in mind. Moreover, it is supplemented with a performance of the NOMX messaging system.

A wide set of operation is supported over the data, such as insert, read, range query, update. Moreover this set is spread over two different data sets: immutable log and mutable object records, which are actively replication by the total order stream of messages from the sequencer. Over the immutable data two types of operation are supported: insert and range query. Mutable data supports three operations: insert, update and get. First subset is made reliable by the extra fault-resilient, e.g., link failure. Second subset provides resolution for the concurrent updates, e.g., timestamp consistency. Depending of the data type, the data is partitioned either by range or hash, respectively, to guarantee the maximum performance of the subset operation.

Further chapters describe the architecture of the system and show the proof of concept for performance, scalability and failure resilience properties of the prototype system.

 

 

Apr 122013
 

Multicast operations are the operations that are sent from one process to a set of processes and the membership of a group is usually transparent for a sender [1]. However simple multicast protocol does not guarantee any ordering or message delivery. Therefor, stronger assumptions should be made in a frame of the nowadays distributed systems, such as, reliability. Some systems [5] relies on the reliable multicast, in which any transmitted message is either received by all or none processes. In other words, there could not be a situation where a client accesses a server just before it crashes and observe an update that no other server will process. This property is called uniform agreement. Moreover, to maintain a consistent and fault-tolerant system a total order assumption should be made additionally to reliable uniform multicast.

The simplest specification of the uniform reliable total order multicast can be defined in terms of two primitives [2], which are TO-multicast(m) and TO-deliver(m), where m is some message. When a process issued a uniquely identified message m as TO-multicast(m), this assumes following properties [3]:

• Validity. If a correct process TO-multicast a message m, then it eventually TO-delivers m.

• Uniform Agreement. If a process TO-delivers a message m, then all correct processes eventually TO-deliver m.

• Uniform Integrity. For any message m, every process TO-delivers m at most once, and only if m was previously TO-broadcast by the sender.

• Uniform Total Order. If two processes,p and q, both TO-deliver message m and m’, then p TO-deliver m before m’, if and only if q TO-delivers m before m’.

If all these properties satisfied then reliable total order multicast takes place. Uniformity in the system is presented as not allowance to deliver a message out of order by any process at any time.

Internally in NOMX, multicast communication is used for most of the subsystems as it is the only fast and reliable way to guarantee consistency and agreement within all nodes with minimal cost.

Although there are three main ways to maintain total order, e.g., symmetric messaging, collective agreement [Birman and Joseph 1987], sequencer based [Kaashoek 1989]. The system that I am developing within my master project uses the single sequencer ordering mechanism as the more efficient in comparison to the consensus one. The simpliest presentation of the total order ordering is illustrated on the picture down. This figure shows that no matter when the messages were issued they will be delivered in the same order to all the processes. For the sequenced mechanisms the main problem is a possible bottleneck and critical point of failure in sequencer part. Moreover, sequencer may limit the scalability of the system. It can be overcomes using the replicated standby sequencer that is delivers all messages issued by the primary one and takes over in case of failure.

TotalOrderBG

References:

[1] George F. Coulouris, Jean Dollimore, and Tim Kindberg. Distributed Systems: Concepts And Design. Pearson Education, 2005. ISBN 9780321263544.

[2] Xavier Défago, André Schiper, and Péter Urbán. Total order broad- cast and multicast algorithms: Taxonomy and survey. ACM Com- put. Surv., 36(4):372–421, December 2004. ISSN 0360-0300. URL http://doi.acm.org/10.1145/1041680.1041682.

[3] Vassos Hadzilacos and Sam Toueg. A modular approach to fault-tolerant broad- casts and related problems. Technical report, 1994.

[4] L. E. T. Rodrigues, H. Fonseca, and P. Verissimo, “Totally ordered multicast in large-scale systems,” in , Proceedings of the 16th International Conference on Distributed Computing Systems, 1996, 1996, pp. 503–510.

[5]

S. K. Kasera, J. Kurose, and D. Towsley, “Scalable reliable multicast using multiple multicast groups,” SIGMETRICS Perform. Eval. Rev., vol. 25, no. 1, pp. 64–74, Jun. 1997.
Mar 082013
 

Dynamo is a highly available key-value storage system that sacrifices consistency user certain failure scenarios. Moreover conflict resolution is placed mostly on the application side and versioning is highly used for it. The main contribution of the system is that they developed highly decentralized, loosely coupled, service oriented architecture with hundreds of services, combining different techniques.

Combination of different techniques is used to reach defined level of availability and scalability: Partitioning and replication is based on consistent hashing, and consistency is leveraged by object versioning. The consistency among replicas during the updates are facilitated by quorum-like techniques, while failure detection relies on gossip based protocols.

Architecture

Simple read and writes operation is uniquely identified by a key and do not support any relational schema. Dynamo does not provide any isolation guarantees and permits only single key update. It support always writable design, as its applications require it. This way, conflict resolution is placed on the reads. Incremental scalability, symmetry, heterogeneity are key features of the system.

Only two operation exposed: get and put. Get return the object and its version, while put uses this version as one of the parameter when it’s called.

Partitioning of the data relies on the consistent hashing and this way load is distributed across hosts. Moreover, each node is mapped to  multiple points in the ring. Replication is done on the multiple hosts across the ring and ensured to have unique hosts as a replica, whereas the number of replicas is configured. Preference list is used to store replicas information.

Concurrent updates are resolved by versioning, this way updates can be propagated to all replicas. To resolve updates on the different sites vector clocks are adopted. This way causality between different versions can be tracked. So, each time an object requested to be updated, version number that was obtained before should be specified.

Consistency among replicas is maintained with quorum like mechanism, where W and R, write and read quorum respectively, are configured. On update (put) coordinator of the put generate a vector clock and write the new version of the data. Similarly for a get, where coordinator requests all existing versions for the key. But most of the time “sloppy quorum’ is used, where all read and write operation performed on the first N healthy nodes in the preference list.

This mix of the techniques proved to work to supply highly available and scalable data store, while consistency can be sacrificed in some failure scenarios. Moreover, all parameters, like read, write quorum and number of replicas can be configured by the user.

Pros

  • Cool
  • Inspiring
  • Scalable
  • Available
  • Writable

Cons

  • Sacrifices consistency
  • Hashing for load balancing and replication
  • Nothing more that get and put
  • Quite slow with all its conflict resolution and failure detection/handling
  • Target write specific applications
  • No security guarantees
  • No isolation guarantees
  • Only single key update