Subscribe to DSC Newsletter

Cassandra Modeling for Real-Time Analytics

by Elliott Cordo, chief architect at Caserta Concepts. Exclusive to Data Science Central.

There is much discussion these days about Lambda Architecture and its benefits for developing high performance analytic architectures.  It offers a combination of a high performance, low latency ETL with a real-time layer, and a slower, more accurate, and flexible solution that runs in batch.  

As I work with it, I have learned to appreciate Cassandra’s relative “immortality” and fit for such analytic systems.  In a complex distributed system it’s nice to know you have one component that you can rely on without much tending.    Need to ingress 500k messages per second? No problem. Need to be highly available and regionally distributed? Again, no problem.

Cassandra for Real-Time Layer

Cassandra makes an excellent database for storage in the real-time layer for several reasons:

  1. High performance writes: we will be ingesting large amounts of incoming data, and in parallel writing materializations for query support;
  2. Highly reliable, shared nothing architecture; and,
  3. Good query flexibility.

Cassandra as an analytics store requires a different mindset than you would use in a relational, Massively Parallel Processing (MPP) or Hadoop based system.  In those platforms, arbitrary queries are easy and relatively performant. As traditional data modelers we store the data in a very low level of granularity (ideally at the atomic detail), typically in a star schema.    Our dimensional data (attributes) are normalized from the fact in separate tables.  This provides better data management capabilities with the ability to perform joins.  We rely on aggregate functions to help us easily group and rollup data.  In many cases, one fact table can satisfy all analytic questions on a particular set of metrics.

Cassandra, however, does not have this same query flexibility. Cassandra does not support joins or aggregation. These features are costly to support in a distributed environment and therefore have not been pursued, as they would compromise the performances and SLA of the Cassandra cluster.  

Modeling Considerations

As a result, we have to rely on two primary techniques to make our data useable for analytic queries:  denormalization and materialization

Denormalization is required as there is no join support.   Data must be “flattened” into fact table if it is to be used in analytic queries.  Since aggregation is not available, the same data will typically be kept in several different tables depending on usage pattern and aggregation level.   These separate tables are maintained by ETL via parallel stream processes or batch processing (typically, map reduce).

It is important to recognize these techniques are not all that alien.  The traditional star schema approach relies heavily on these techniques as well -- although they manifest themselves more subtlety. Facts and dimensions are themselves denormalized structures.  We group metrics at the same grain and natural key into the same fact table and our dimensions flatten all related attributes, and even hierarchies, into single tables as well.  In the MPP world we often compromise and denormalize attributes to the fact tables themselves.   

With regard to materialization, although we try to maintain a single fact table per business process, it is not uncommon to have multiple models containing the same data.  In some cases this is to allow simpler and more performant queries based on a different query access pattern.  We might also store aggregates of this data to avoid scanning millions or billions of rows to satisfy a common summary query. 

This commonality between Cassandra and the traditional world is driven by a few core concepts:

  • Analytic models should be tuned toward their access patter - queries should be as simple as possible;
  • ETL is generally more scalable than your presentation layer (database); and,
  • Instead of performing the same costly calculation over and over again during read, there is an opportunity to perform this calculation, up front, only once, during ETL.

Trading data use case

I can now walk you through an example demonstrating a real-time model for trading data:

The incoming data represents individual buy/sell information from trading activity.  The following data elements are common to all records, although there are 100+ optional tags that may also be included:.

order_id (unique guid)

trading_date

customer_id

trade_timestamp

market

stock_symbol

buy_or_sell

shares_count

share_price

This incoming detail should be captured in its atomic form, as this practice is beneficial for two reasons: 

  • It provides reliable, un-altered data that can be leveraged to recast or replay in the event of processing issues - this data can be used for drill down from materialized views (cubes); and,
  • This atomic detail may serve as a general operational database, feeding the parallel batch layer and other systems.

There are several ways to store this data in Cassandra.  Thankfully, Cassandra’s data model makes it easy to deal with the flexible schema components (100+ variable fields).  Unlike the relational world where we would need to predefine all possible fields, or normalize to the point of being useable, Cassandra offers several options.  

My preferred option these days is to take advantage of Cassandra’s new collection data-type map.    We normalize out a few key elements as fields and put the rest of the payload in a map field.   This map field under the hood is stored in a standard CQL data structure, but is abstracted as a field resembling a map/dictionary data structure.

 

CREATE TABLE trades (

  order_id int,

  trading_date int,

  tags map<text, text>,

  PRIMARY KEY (order_id);

 

CREATE INDEX ix_trading_date ON trades (trading_date);

 

Normalizing a key field such as trade date allows us to define an index and to query against date.  This is helpful for drill down, replay/recast or extraction to batch analytic layer.

select * from trades where trade_date = 20120101;

 

 order_id | trade_date  | tags

----------+-------------+---------------------------------

   ABC123 | 20120101    | {'customer_id': 'A', 'share_price': '201'..

….

 

Materialized Views (aka Cubes)

We serve analytic queries against Cassandra by creating materialized views of the incoming data.   I commonly refer to these materializations as cubes.  These cubes are transformed and generally “lightly aggregated” by several key dimensions, providing descent query flexibility by putting a small amount of overhead on the client process in order to perform final aggregation.  These views can be calculated in real-time as source data is ingested, or in frequent batch leveraging map-reduce.

Before we review some design examples, let’s first discuss the functionality of the Cassandra primary key.    

A primary key can be defined on one column much like we did for the raw trades table, or include multiple columns (known as composite key).    A composite key is created when the first column is treated as the row key (which is how data is distributed around the cluster), and subsequent columns as column keys (how the data is organized in columnar storage).  Row keys allow for ultra-fast seeks via equal and in clauses.  Column keys allow for very efficient range scans of data including equal, greater than, and less than.

In our analytic models we typically choose a lower cardinality dimensional attribute as a row key. This row key will be a common filter in all our queries and should be chosen to give nice, even distribution across the cluster. In general, date or time alone does not make a good row key as it can create hotpots on single servers when all traffic goes to one set of servers during a given period.   However creating a composite row key of client_id|trade_date would be fine if it suits your query pattern.

We pick attributes for the column key in an order of a “drill path” that typically includes date and/or time columns to take advantage of the range scan capabilities.

In the example below we demonstrate a lightly aggregated time-series cube that stores client aggregate trading activity by hour:

 

create table client_trading_by_hour (

    client_id varchar, 

    trade_date int, 

    trade_hour int,

    trade_count int, 

    notional_volume_amt float, 

    PRIMARY KEY (client_id, trade_date, trade_hour)) ;

 

And the following query flexibility can be achieved:

 

select * from client_trading_by_hour

where client_id in ('abc')

 

select * from client_trading_by_hour

where client_id in ('abc')

  and trade_date >= 20140701 and trade_date <= 20140703

 

select * from client_trading_by_hour

where client_id in ('abc') and trade_date = 20140701

  and trade_hour >= 700 and trade_hour < = 1200

 

You can even bypass the row key. However, this is not recommended unless you are running against a relatively small table.

 

select * from client_trading_by_hour

where trade_date = 20140701

  and trade_hour >= 700 and trade_hour < = 1200

allow filtering

 

Although this cube provides good query flexibility, it would be one of many that represent the same data.   Here are a few other cubes (and corresponding keys) we might instantiate to improve analytic query coverage:

  • Drill across to stock symbol from client_trading_by_hour: client_trading_by_hour_symbol: client_id, trade_date, trade_hour, stock_symbol
  • Query stock symbol trading by date and hour: symbol_trading_by_hour: symbol, trade_date, trade_hour
  • Drill across to client detail from symbol_trading_by_hour: symbol_trading_by_hour_client: symbol, trade_date, trade_hour, client_id

Summing it up:

Cassandra is a great platform for serving a Lambda or any other form of real time analytic architecture.    With bullet proof, scalable architecture and SQL-like query language, Cassandra can be the simplest part of a complex architecture. 

 

Bio for Elliott Cordo – Chief Architect, Caserta Concepts

Elliott is a big data, data warehouse and information management expert with a passion for helping transform data into powerful information.  He has more than a decade of experience in implementing big data and data warehouse solutions with hands-on experience in every component of the data warehouse software development lifecycle. At Caserta Concepts, Elliott oversees large-scale major technology projects, including those involving business intelligence, data analytics, big data and data warehousing.

Elliott is recognized for his many successful Big Data projects ranging from Big Data Warehousing, Machine Learning, with his personal favorite, Recommendation Engines. His passion is helping people understand the true potential in their data, working hand in hand with clients and partners to learn and develop cutting edge platforms to truly enable their organizations.

Views: 20393

Comment

You need to be a member of Data Science Central to add comments!

Join Data Science Central

Videos

  • Add Videos
  • View All

© 2019   Data Science Central ®   Powered by

Badges  |  Report an Issue  |  Privacy Policy  |  Terms of Service