The purpose of this post is to reveal the system organization and properties.
Figure above shows some concepts of the system design and demonstrate functionality that is covered by the system. The GDS (Genium Data Store) system design can be captured as a set of interactive layers as presented on thefigure. The main idea of this figure is to highlight multilayer organization of the system where each of these layers serve it is own purpose and which are separated between each other. The lowest two level establishes communication between nodes in the system. Nodes are both clients and data stores. Each node, when joining the system, declare its status and add itself to corresponding subscription group. There are several subscription abstraction, among them client, sequencer.
To maintain the total ordering a special subscription group is reserved: sequencer group. Over the messaging middleware a distributed component is places. It support the data replication which guarantee the scalability and availability by means of traffic reduction over the components. On top of replication layer a data store operation layer is placed which (a) support a wide range of operation over data, e.g., insert, update, read, range queries; (b) frame client messages with necessary information needed to access the stores, hence, resolving concurrency conflicts; (c) apply a snapshot mechanism to allow safe range query re-request.
These infrastructure makes it easy to maintain and control the system. Relying on the INET messaging provide a great advantage to prevent all possible inconsistencies and conflicts.
The basic functionality provided by the GDS composed from distributed, consistent, fault-tolerant and scalable infrastructure that serve simple requests over data. Among the requests are the following: insert, get, range queries. In order to make a request, the client communicates with storage part through the provided API. Each data store processes only those messages that belong to its partition; therefore, all information about the partitioning is stored on the sequencer to keep track on the number of replicas that serve the data.
With this functionality is it possible to:
- Store/Retrieve the data
- Provide consistency, availability, scalability and high performance
- Leverage the high-performance message bus and in-memory datastore
- Eliminate a need for highly scalable storage hardware
Data and Query Model
GDS presents a column oriented data store at the first place with the further extension to any data base provider. This made simple, as adding new database schemas and tables into the system are relatively easy and can be plugged by the API for the Data store. Schemas are not flexible: new attributed can not be added at any time but only at creating the table, as the data is stored in a fixed size column fashion.. Moreover, each data must by marked with a timestamp, to speed up further read requests and avoid inconsistencies during the updates. The timestamp for an update is serves as a version, which should be checked before making an update and this way, a timestamp consistency is guaranteed.
The query language of GDS supports selection from a single table. Updates must specify the primary key, similar to PNUTS. Single table queries provide very flexible access during range requests compared to distributed hash or ordered data stores, while still being restrictive compared to relational systems.
Read Query Support
Adaptation of the NoSQL data stores to the relational ones keeps the need for range queries. This functionality is sufficient to further maintain data processing and analysis in offline mode. In the trading environment, support for the time range querying is very important, as further, transactional and analytic processing of data are required. Main use cases are logging, extracting order history, price history, index calculation etc. All these usages dictate the necessity for the range query support.
Moreover, it can be a backbone for an stable way of analyzing the data “on the fly”.
There is an extensive set of works on exploring and evaluating range queries. Among the most common solutions to support range querying is special hash function usage, that preserve locality, different distributed index structures, like trees.
GDS relies on the data locality and timestamp index which is added either by the user or data store automatically. Used data store assures that each record timestamped and therefore, look up can be improved by specifying approximate time range. Data in the store is divided into chunks, each around 100 000 records. Each chunk is indexed according to the timestamp. Records in the chunk is time indexed. This level of separation significantly reduces information lookup time.
It was decided to apply some limitation on the range query response size. Main reason for that is an availability of the system, which could degrade under transmission of unlimited size range responses. The limit is set to maximum L = 10 000 records, which is around 5MB. When the query request is processes the information on the quire size is reported to the client. If the response exceeds L, only the L first records is transmitted to the client. If it is necessary a new additional request can be issued to retrieve missing records.
To guarantee consistency in case of additional request a simple snapshot mechanism is triggered and snipped below. The same procedure is done to guarantee consistency during the failure of TCP connection that transmit the response.
Snapshot mechanism works as follows:
on_range_request: send(type = SNAPSHOT, empty message) // Append SNAPSHOT message append to the end of current store retrieve(query) // Read the data from the store send(response directly to client) if (failure || limit_for_response is exceeded) retrieve data untill the snapshot point is reached
Snapshot mechanism is only used for the logging use case. Approach from this snippet guarantees that range query response will be equal whenever it is requested. This implies only due to the absence of update operation on the time oriented data schema.
Towards Consistency, Availability and Speed
The design of a system that needs to operate in a production and within strong SLA requirements of NOMX is complex. The system needs to have scalable and robust solution for failure recovery, replica synchronization concurrency and request routing. The servers must be resilient to many kinds of faults ranging from the failure of individual disks, machines or routers. GDS uses active replication, based on the produced by sequencer totally ordered stream of messages, to achieve high availability and a consistent view of the data. Shortly, it produces fully serializable ACID semantic over the data store.
To do so, the following is used:
- for consistency, reliable totally ordered stream of messages produced by sequencer is used;
- for availability, a highly robust and fast NOMX message bus is used to support a great number of incoming operations and active replication is implemented to reduce the load from the single replica;
- for speed, a highly robust and fast NOMX message bus is used.
It is not hard to notice that all, consistency, availability and performance, depend on NOMX message middleware. This subsystem, which various functionality, leverages sustainable behavior of the GDS system, is very critical.
Latency is a critical part of the production oriented system architecture. However, making latency a first order constraint in the architecture is not very common. As the result systems are usually heavily influenced by the failure resilience, availability, consistency problems etc.
The main question here is how to design a system that is oriented towards latency. A few reductions for the system requirements on the aggressive production environment are done:
- GDS applications does not require wide range deployment
- Localized disasters are not taken into account, however it could be adjusted be adding site replication
Here are the following steps on the way to the speed:
- Lightweight Sequencer. The sequencer in the system has a limited functionality and his main functions reduced to assigning a sequence number to messages and forwarding them to all subscribers. Moreover, sequencer completely isolated from the incoming message content; however, it can add additional information to the message, such as, sequenced number, other user information.
- Good Decomposition. Decomposition of the application is very important during the design of any distributed application. GDS exposes relatively decent decoupling in the system with several levels and components. The roles in the system are sequencer, clients, data stores. All of them replicated and easily replaceable. Moreover, a layer of abstraction is placed under both clients and data stores, which manages registration, communication with sequencer and makes it transparent for both clients and stores.
- Asynchronous Interactions. All interaction in the system is based on a well-known event-driven paradigm and rely on the asynchronous communication using UDP. The underlying messaging system, that uses MoldUDP, made the communication reliable. Moreover, if the necessity to rely on synchronous API appears, it is very easy to maintain it from the asynchronous API.
- Non Monolithic Data. The whole system is supposed to be stored in the column oriented storage and partitioned both by range and hash for different data sets, respectively. This gives the effect of highly decomposed data without any need to perform join, which are not supported by the system.
- Low Latency Reliable Totally Ordered Message Bus. To improve the performance a highly scalable and fast NOMX messaging middleware was leveraged in many ways.
- Effective programming techniques. Following the advises from the [Effective C++, Java], GDS was build to reduce all possible overheads from the initialization, communication, garbage collection.
GDS ia a unique distributed system build on top of the reliable total order multicast messaging middleware developed in-house by NOMX. It is build to serve a large amount of requests per second and perform it fast, with consistency, fault-tolerance and availability in mind. Moreover, it is supplemented with a performance of the NOMX messaging system.
A wide set of operation is supported over the data, such as insert, read, range query, update. Moreover this set is spread over two different data sets: immutable log and mutable object records, which are actively replication by the total order stream of messages from the sequencer. Over the immutable data two types of operation are supported: insert and range query. Mutable data supports three operations: insert, update and get. First subset is made reliable by the extra fault-resilient, e.g., link failure. Second subset provides resolution for the concurrent updates, e.g., timestamp consistency. Depending of the data type, the data is partitioned either by range or hash, respectively, to guarantee the maximum performance of the subset operation.
Further chapters describe the architecture of the system and show the proof of concept for performance, scalability and failure resilience properties of the prototype system.