At DeltaX we have been using Amazon Athena as part of our data pipeline for running ad-hoc queries and analytic workloads on logs collected through our tracking and ad-serving system. Amazon Athena responds anywhere from few seconds to minutes for data than runs into hundreds of GBs and has pleasantly surprised us by its ease of use. As part of this blog post, I shall discuss how we went about setting up Athena to query our JSON data.

Kinesis Firehose -> AWS S3 -> Amazon Athena

Amazon Athena is an interactive query engine that makes it easy to analyze data in Amazon S3. It’s serverless and built on top of Presto with ANSI SQL support and hence you can run queries using SQL.

It has 3 basic building blocks:

  • AWS S3 - Persistent Store
  • Apache Hive - Schema Definition (DDL)
  • Presto SQL - Query Language

1. AWS S3 - Persistent Store

Athena uses AWS S3 as it’s persistent store and supports files in the following formats - CSV, TSV, JSON, or Textfiles and also supports open source columnar formats such as Apache ORC and Apache Parquet. Athena also supports compressed data in Snappy, Zlib, LZO, and GZIP formats.

In our use case, we log data in plain JSON and also use GZIP compression to utilize the storage space optimally. Athen recognizes whether the files are GZIP compressed based on their extension.

Files are dropped into the bucket by Kinesis Data Firehose in the following format: s3://<bucket>/events/[YYYY]/[MM]/[DD]/[HH]/*

Example: s3://<bucket>/events/2018/03/03/02/<prefix>-1-2018-03-03-02-14-47-8edfc8ab-6a05-4329-a45b-2644b6e468f9.gz

2. Apache Hive - Schema Definition (DDL)

Athena uses Apache Hive (DDL) for schema definition. Here is a sample create schema query:

CREATE EXTERNAL TABLE `es_eventlogs`(
  `xevent` string, 
  `xb` string, 
  `xcid` string, 
  `xw` string, 
  `xh` string, 
  ...
  ...
  ...
  `user_agent_device` string, 
  `user_agent_browser` string, 
  `user_agent_os` string, 
  `ip_country` string, 
  `ip_region` string, 
  `ip_city` string)
PARTITIONED BY ( 
  `year` string, 
  `month` string, 
  `day` string, 
  `dt` string)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
LOCATION
  's3://<bucket>/events'

Here is an explanation for various parts of the query:

1. CREATE EXTERNAL TABLE - clause

Apache Hive supports ‘external’ and ‘internal’ tables. In case of Internal hive manages the complete lifecycle of the data and hence when you issue a delete table it deletes the data as well. For an external table, the data is stored outside of the hive system and it only recognizes the schema to be able to interpret the data. On issuing a delete table query on an external table doesn’t delete the underlying data.

Only external tables are supported in case of Athena.

2. PARTITIONED BY - clause

In case of Athena, you are charged per GB of data scanned. Hence, to optimize the query execution time and the pricing it’s ideal that you partition your data such that only data that’s relevant is accessed.

We use a pretty elaborate partition scheme with year, month, day and dt which helps us access the right partition of data.

Athena is able to auto=magically load the default Hive partition scheme - YYYY-MM-DD-HH-MM. This can be done by issuing the command MSCK REPAIR TABLE es_eventlogs For any custom partition scheme; you would need to load the partitions manually.

Example: s3://<bucket>/events/2018/03/03/02/<prefix>-1-2018-03-03-02-14-47-8edfc8ab-6a05-4329-a45b-2644b6e468f9.gz

ALTER TABLE es_eventlogs ADD
PARTITION (year = '2018', month='03', day='03', dt=2018-03-03) location 's3://<bucket>/events/2018/03/03/'

3. ROW FORMAT SERDE

This specifies the SerDe (Serializer and Deserializer) to use. In this case, it’s JSON represented as org.openx.data.jsonserde.JsonSerDe. You can find other supported options here.

4. LOCATION

This specifies the base path for the location of external data. Athena reads all files from the location you have specified.

3. Presto SQL - Query Language

Presto, a distributed query engine was built at Facebook as an alternative to the Apache Hive execution model which used the Hadoop MapReduce mechanism on each query, Presto does not write intermediate results to disk resulting in a significant speed improvement. Athena supports a sub-set of the Presto SQL largely limited to SELECT and a selection of operators and functions.

A simple query to find count of events by xb and xevent would look like:

SELECT xb,
  xevent,
  count(1)
FROM es_eventlogs
WHERE dt >= '2018-03-01'
  AND dt <= '2018-03-02'
GROUP BY  xb, xevent

Sample Output:

| xb   | xevent  |  _col2 |
|-------|---------|--------|
| XXXX1 | xviews  | 584170 |
| XXXX1 | xrender | 622430 |
| XXXX3 | xclicks | 164899 |
| XXXX2 | xrender | 351229 |
| XXXX2 | xviews  | 332886 |

Here is typical query which we run to calculate distinct reach for an advertising campaign:

SELECT xevent,
  dt,
  COUNT(distinct userid)as reach
FROM es_eventlogs es
WHERE es.xevent IN('xclicks', 'xviews')
  AND es.xccid in('7')
  AND dt >= '2018-03-07'
  AND dt<= '2018-04-06'AND Createdatts
  BETWEEN timestamp '2018-03-07 05:30:00'
  AND timestamp '2018-04-06 05:30:00'
  AND xb = 'XXXXXX'
GROUP BY  ROLLUP(xevent, dt)
ORDER BY xevent, dt

Sample Output:

| xevent | dt       | reach   |
|---------|----------|---------|
| xclicks | 06/03/18 | 36607   |
| xclicks | 07/03/18 | 35648   |
| xclicks |          | 71886   |
| xviews  | 06/03/18 | 1015553 |
| xviews  | 07/03/18 | 924290  |
| xviews  |          | 1857806 |
|         |          | 1926163 |

Considering some queries could take considerably longer time, Athena console also shows a list of historical of queries and their results are also available for download as CSV.

In the last 3 months, we are increasingly using Athena to run quick ad-hoc queries and some analytical workloads. The ease of SQL on top of Big Data store is a lethal combination. The cloud is evolving fast towards the serverless paradigm and services like Amazon Athena exemplify the serverless and pay per use model.