Legoliss Blog

legoliss’s blog

CDP、Salesforce Marketing Cloudについて書いてます。

データソースをCSVからParquetに変換してAthenaのスキャン量を改善する

AWS Athenaは簡単に大規模データにクエリが投げられてとても便利ですが、GBサイズ以上のCSVファイルをデータソースにするとスキャン量が増え、クエリ内容によってはタイムアウトしてしまったり、メモリ枯渇でエラー終了したりします。
今回はこういった場合にデータソースの形式を変更してデータスキャン量を改善する方法をご紹介します。

AWS Athena
AWS Athena

そもそもAthenaって何?

ご存知ない方のためにざっくり説明すると以下のようなサービスです。

  • S3上のデータソースへ直接SQLを投げられるサービス.
  • 所謂サーバレスなので,インフラ運用の手間が無い,
  • スキャンしたデータ量に対する課金なので低コスト.
  • Presto基盤で,大きなデータセットに対しても高速.

この中で3つ目の項目について、スキャン量が多くて思ったよりコストかかっちゃってるねっていう場合、どうしよっかというお話。

今回お話するケース

下記のケースを想定します。さほど珍しいケースではないかなと思います。

  • データソースは無圧縮のCSVファイル(!).
  • 数GB~数十GBのデータが単一のファイルに集約されている.
  • 上記のファイルがS3の特定のバケットバッチ処理で置かれる.
  • このデータをMAツールに流し込む前に,DISTINCTやGROUP化,ソート等の集計クエリを施しておきたい.

先に結論

CTASクエリでCSVファイルをParquet(*1)ファイル(圧縮有り)に変換してから、カラムを指定して集計クエリを実行する.

データソースは分割して圧縮しろってことですね。
カラムナーに変更することでクエリ実行時に読み取るデータを、クエリに使用するカラムのみに限定できます。Parquetファイルは圧縮できるので、さらにスキャン量の削減が期待できます。

Parquetファイルへの変換方法

S3上のファイルをAthenaのクエリだけでCSVからParquetに変換する方法です。

  1. CSVファイルをデータソースにして一旦テーブルを作る
  2. 上記テーブルを使用してCTASクエリでParquetとして出力
  3. 上記Parquetファイルの出力ディレクトリをデータソースにしてテーブルを作成する

手順1.のSQL

ここでは全てスキャンする必要があります。もうこれはしょうがないかなと。これが嫌ならバッチ処理での出力の時点でParquetに出来ないか検討しましょう。

-- 1. CSVファイルをデータソースにして一旦テーブルを作る
CREATE EXTERNAL TABLE IF NOT EXISTS [database].[table_1] (
  field_1 [type],
  field_2 [type],
  field_3 [type],
  ...
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  'separatorChar' = ',',
  'quoteChar' = '"',
  'escapeChar' = '\\\\'
)
LOCATION 's3://[bucket_name]/[key]'
TBLPROPERTIES (
    'skip.header.line.count' = '1',
    'has_encrypted_data'='false'
);

上記のSQLでは[bucket_name]バケットに置かれた[key]のファイルを下記フォーマットでロードします。

  • カンマ区切り
  • 二重引用符囲み
  • 「\」でエスケープ
  • ヘッダ行を1行スキップ

AthenaでCSVをロードする際に使用するSerDeは主に下記2種かと思いますが、プロパティの指定方法やサポートするCSVフォーマットが微妙に異なりますので適宜判断してください。

手順2.のSQL

上記で作成したテーブルを元にCTASクエリで別テーブルを作成し、出力フォーマットをParquetにします。

-- 2. 上記テーブルを使用してCTASクエリでParquetとして出力
CREATE TABLE [database].[table_2]
WITH ( format='PARQUET',
       external_location='s3://[bucket_name]/[key]/')
AS SELECT *
FROM [database].[table_1];

上記のSQL[external_location]に指定した場所に分割された圧縮済みのParquetファイルが出力されます。

手順3.のSQL

上記でParquetファイルを出力したディレクトリ(*2)をデータソースに指定してテーブルを作成します。

-- 3. 上記Parquetファイルの出力ディレクトリをデータソースにしてテーブルを作成する
CREATE EXTERNAL TABLE IF NOT EXISTS [database].[table_3] (
  field_1 [type],
  field_2 [type],
  field_3 [type],
  ...
)
STORED AS PARQUET
LOCATION 's3://[bucket_name]/[key]/'
tblproperties ("parquet.compression"="SNAPPY")

上記で作成したテーブルに対してのクエリでは、SELECT句でカラム名を指定することでメモリ使用量の削減が期待できます。


実験

国立環境研究所の「大気環境時間値データ」を使用して実験してみます。

800MB程のCSVデータをParquetに変換して、クエリ時にスキャンするデータ量を比較します。

CSV

カラムを指定しない場合

CSV全カラム
CSV カラム指定なし

カラムを指定した場合

CSVカラム指定あり
CSV カラム指定あり
CSVでは、カラムを指定してもスキャンするデータ量は変わりません。

Parquet

カラムを指定しない場合

Parquet全カラム
Parquet カラム指定なし
この時点で既にCSVより少ないですね。圧縮してるからだと思われます。

カラムを指定した場合

Parquetカラム指定あり
Parquet カラム指定あり
カラムを指定することで、さらにスキャンするデータ量が減ります。

今回の場合、CSVのスキャン量の2.6%程まで削減出来ています。(22MB : 839MB)

まとめ

今回は、単一の無圧縮ファイルをParquetに変換してスキャン量を削減する方法をご紹介しました。
これ以外にも、クエリプランや同時実行数、パーティションの設定など、Athenaのパフォーマンスを改善する方法は沢山あると思います。
今回の記事が何かのヒントになれば幸いです。


*1:pάɚkeɪ (パーケイ)

*2:厳密には、S3にはディレクトリやフォルダという概念は存在せず、KeyとPrefixを組み合わせて表現します