S3 + Athena + Kinesis Firehose ハンズオン

ストリームログを S3 に蓄積し Athena でサーバーレス SQL 分析を行う

Amazon S3 Amazon Athena Kinesis Data Firehose AWS Glue Data Catalog マネコン操作 所要時間 75〜90 分 v1.0

📋 概要

このハンズオンでは、Kinesis Data Firehose を使ってストリームデータを Amazon S3 に自動配信し、AWS Glue Data Catalog でスキーマを定義し、Amazon Athena でサーバーレス SQL 分析を行うパイプラインを構築します。大量ログの長期保存・低コスト分析の基礎を学べます。

項目内容
対象サービスKinesis Data Firehose、Amazon S3、AWS Glue、Amazon Athena
主な学習内容Firehose 配信ストリーム・S3 パーティション設計・Glue テーブル定義・Athena SQL 分析
所要時間75〜90 分
難易度★★★☆☆(中級者向け)
前提知識S3 の基礎知識、SQL の基礎知識
費用目安約 0〜2 USD(データ量・スキャン量による)
ℹ️ このアーキテクチャが適しているケース
  • 大量のアプリケーションログ・アクセスログを低コストで長期保存したい
  • リアルタイム性は不要だが、定期的にバッチ分析したい
  • CloudWatch Logs より安価にログを保管したい(S3 は約 1/10 のコスト)
  • BI ツール(QuickSight など)と連携して可視化したい

🏗️ アーキテクチャ

📱 データ生成元
アプリケーション / CloudWatch Logs / API Gateway アクセスログ
↓ PutRecord API
🚀 Kinesis Data Firehose
バッファリング(60 秒 or 5 MB)→ 自動 S3 配信
↓ YYYY/MM/DD/HH/ でパーティション配信
🪣 Amazon S3
s3://<bucket>/logs/year=YYYY/month=MM/day=DD/hour=HH/*.json
↓ スキーマ登録
📚 AWS Glue Data Catalog
データベース + テーブル定義(パーティション含む)
↓ SQL クエリ
🔎 Amazon Athena
サーバーレス SQL 分析(スキャンした分だけ課金)

データフォーマット(JSON)

今回のハンズオンでは以下の JSON 形式のアクセスログを想定します。

{
  "timestamp": "2024-01-15T10:23:45Z",
  "method": "GET",
  "path": "/api/products",
  "status_code": 200,
  "response_time_ms": 142,
  "user_agent": "Mozilla/5.0",
  "client_ip": "203.0.113.5",
  "bytes_sent": 2048
}

✅ 前提条件

⚠️ Athena のクエリ結果バケット

Athena を初めて使う場合はクエリ結果の保存先 S3 バケットを設定する必要があります。ステップ 5 で案内します。

🪣 ステップ 1 ― S3 バケットの作成

ログデータを保存するバケットと Athena クエリ結果用バケットを作成します。

1-1. S3 コンソールを開く

AWS マネジメントコンソールで 「S3」 を検索して開きます。「バケットを作成」 をクリックします。

1-2. ログ保存バケットを作成
バケット名handson-logs-<your-account-id>
AWS リージョンap-northeast-1(東京)
オブジェクト所有権ACL 無効(推奨)
パブリックアクセスすべてブロック(デフォルト)
バージョニング無効

その他はデフォルトのまま「バケットを作成」をクリックします。

ℹ️ バケット名について

S3 バケット名はグローバルで一意である必要があります。<your-account-id> の部分をご自身の AWS アカウント ID(12 桁の数字)に置き換えてください。

1-3. Athena クエリ結果用バケットを作成
バケット名handson-athena-results-<your-account-id>
AWS リージョンap-northeast-1(東京)

同様の手順で作成します。

1-4. ライフサイクルルールの設定(オプション)

ログ保存バケットで 「管理」→「ライフサイクルルールを作成」 を開きます。

  • 90 日後に S3 Glacier に移行(長期保存コスト削減)
  • 365 日後に自動削除(保持期間の設定)

今回のハンズオンではデフォルトのまま進めても問題ありません。

✅ 確認ポイント

S3 バケット一覧に handson-logs-*handson-athena-results-* の 2 つが表示されれば成功です。

🚀 ステップ 2 ― Kinesis Data Firehose 配信ストリームの作成

ログデータを受け取り S3 に自動配信するストリームを設定します。

2-1. Kinesis コンソールを開く

AWS マネジメントコンソールで 「Kinesis」 を検索して開きます。左メニューから 「Data Firehose」→「配信ストリームを作成」 をクリックします。

2-2. ソースと送信先を設定
設定項目設定値
ソースDirect PUT(アプリケーションから直接 PUT)
送信先Amazon S3
配信ストリーム名handson-access-logs-stream
2-3. S3 送信先の詳細設定
S3 バケットhandson-logs-<your-account-id>
S3 プレフィックスlogs/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/
S3 エラープレフィックスerrors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/
ℹ️ Hive 形式パーティションの重要性

year=YYYY/month=MM/day=DD/ の形式(Hive 形式)にすると Athena が自動でパーティションを認識し、クエリで特定日時のデータのみスキャンできるためコストを大幅に削減できます。

2-4. バッファリング設定
設定項目設定値説明
バッファサイズ5 MB5 MB 溜まったら S3 に書き込む
バッファ間隔60 秒60 秒経過したら S3 に書き込む
圧縮GZIPストレージコスト削減
データ形式変換無効(今回は JSON のまま)Parquet 変換も可能
2-5. IAM ロールを設定

「サービスロールを作成または更新」を選択します。Firehose が S3 に書き込むための IAM ロールが自動で作成されます。

設定を確認して「配信ストリームを作成」をクリックします。

✅ 確認ポイント

配信ストリーム一覧に handson-access-logs-stream が表示され、ステータスが「アクティブ」になれば成功です(1〜2 分かかる場合があります)。

📤 ステップ 3 ― テストデータの投入

Firehose にサンプルデータを送信して S3 への配信を確認します。

3-1. テストレコードを送信する(マネジメントコンソール)

Kinesis Data Firehose の詳細画面から 「テストレコードを送信」 タブを開きます。以下のサンプル JSON を入力して「テストレコードを送信」をクリックします。

{"timestamp":"2024-01-15T10:23:45Z","method":"GET","path":"/api/products","status_code":200,"response_time_ms":142,"user_agent":"Mozilla/5.0","client_ip":"203.0.113.5","bytes_sent":2048}
{"timestamp":"2024-01-15T10:23:47Z","method":"POST","path":"/api/orders","status_code":201,"response_time_ms":320,"user_agent":"Mozilla/5.0","client_ip":"203.0.113.10","bytes_sent":512}
{"timestamp":"2024-01-15T10:23:50Z","method":"GET","path":"/api/products/42","status_code":404,"response_time_ms":28,"user_agent":"curl/7.68.0","client_ip":"198.51.100.5","bytes_sent":128}
{"timestamp":"2024-01-15T10:23:55Z","method":"GET","path":"/api/users","status_code":500,"response_time_ms":5432,"user_agent":"Mozilla/5.0","client_ip":"192.0.2.20","bytes_sent":256}
3-2. AWS CLI でまとめてデータを送信する(オプション)

より多くのテストデータを投入したい場合は AWS CLI を使います。

aws firehose put-record-batch \ --delivery-stream-name handson-access-logs-stream \ --records '[ {"Data":"eyJ0aW1lc3RhbXAiOiIyMDI0LTAxLTE1VDExOjAwOjAwWiIsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvYXBpL2l0ZW1zIiwic3RhdHVzX2NvZGUiOjIwMCwicmVzcG9uc2VfdGltZV9tcyI6OTUsInVzZXJfYWdlbnQiOiJNb3ppbGxhLzUuMCIsImNsaWVudF9pcCI6IjEwMy4wLjExMy41IiwiYnl0ZXNfc2VudCI6MTAyNH0="} ]' \ --region ap-northeast-1
ℹ️ S3 への反映タイミング

Firehose はバッファリング設定に従い「60 秒経過」または「5 MB 蓄積」のいずれか早い方で S3 に書き込みます。テスト後 1〜2 分待ってから S3 を確認してください。

3-3. S3 でファイルを確認

S3 コンソールで handson-logs-<your-account-id> バケットを開きます。logs/year=2024/month=01/day=15/hour=10/ のようなパスにファイルが作成されているか確認します。

ファイル名は自動生成されます(例: 2024-01-15-10-23-45-xxxx)。

✅ 確認ポイント

S3 バケット内にパーティション形式のディレクトリが作成され、JSON/GZIP ファイルが存在すれば成功です。

📚 ステップ 4 ― AWS Glue Data Catalog にテーブルを定義する

Athena で SQL クエリを実行するために、S3 のデータに対してスキーマを定義します。

4-1. Athena コンソールからデータベースを作成

AWS マネジメントコンソールで 「Athena」 を検索して開きます。初回利用の場合は「クエリ結果の場所を設定」が求められます。

クエリ結果の場所s3://handson-athena-results-<your-account-id>/

「保存」をクリックします。

4-2. データベースを作成

Athena のクエリエディタで以下の SQL を実行します。

CREATE DATABASE IF NOT EXISTS handson_logs_db;
4-3. テーブルを作成

以下の SQL を実行してテーブルを定義します。<your-account-id> を実際のアカウント ID に変更してください。

CREATE EXTERNAL TABLE IF NOT EXISTS handson_logs_db.access_logs (
  `timestamp`        string,
  `method`           string,
  `path`             string,
  `status_code`      int,
  `response_time_ms` int,
  `user_agent`       string,
  `client_ip`        string,
  `bytes_sent`       int
)
PARTITIONED BY (
  year  string,
  month string,
  day   string,
  hour  string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)
LOCATION 's3://handson-logs-<your-account-id>/logs/'
TBLPROPERTIES ('has_encrypted_data' = 'false');
ℹ️ PARTITIONED BY の意味

Hive 形式のパーティション(year=YYYY/month=MM/...)を定義することで、Athena クエリ時に WHERE year='2024' AND month='01' のような条件を付けてスキャン量を絞り込めます。これがコスト削減の核心です。

4-4. パーティションを登録する

テーブル定義後、既存のパーティションを Glue Catalog に登録します。

MSCK REPAIR TABLE handson_logs_db.access_logs;

このコマンドは S3 のパーティションディレクトリを自動スキャンしてカタログに登録します。新しいパーティションが追加されるたびに実行するか、後述の自動追加設定を行います。

4-5. テーブル構造を確認
SHOW PARTITIONS handson_logs_db.access_logs;

DESCRIBE handson_logs_db.access_logs;
✅ 確認ポイント

SHOW PARTITIONS の結果にステップ 3 で投入したデータのパーティション(例: year=2024/month=01/day=15/hour=10)が表示されれば成功です。

🔎 ステップ 5 ― Amazon Athena で SQL 分析を行う

Athena でサンプルクエリを実行し、ログを多角的に分析します。

基本クエリ

① 全データを確認する

SELECT *
FROM handson_logs_db.access_logs
WHERE year = '2024' AND month = '01' AND day = '15'
ORDER BY timestamp DESC
LIMIT 20;

② HTTP ステータスコード別の件数

SELECT
  status_code,
  COUNT(*) AS request_count
FROM handson_logs_db.access_logs
WHERE year = '2024' AND month = '01'
GROUP BY status_code
ORDER BY request_count DESC;

③ エラーリクエスト(4xx / 5xx)の一覧

SELECT
  timestamp,
  method,
  path,
  status_code,
  response_time_ms,
  client_ip
FROM handson_logs_db.access_logs
WHERE year = '2024' AND month = '01'
  AND status_code >= 400
ORDER BY timestamp DESC
LIMIT 50;

④ レスポンスタイムが遅いリクエスト Top 10

SELECT
  timestamp,
  method,
  path,
  status_code,
  response_time_ms
FROM handson_logs_db.access_logs
WHERE year = '2024' AND month = '01'
ORDER BY response_time_ms DESC
LIMIT 10;

⑤ パス別のリクエスト数と平均レスポンスタイム

SELECT
  path,
  COUNT(*)                           AS request_count,
  ROUND(AVG(response_time_ms), 0)   AS avg_response_ms,
  MAX(response_time_ms)              AS max_response_ms,
  SUM(bytes_sent)                    AS total_bytes
FROM handson_logs_db.access_logs
WHERE year = '2024' AND month = '01'
GROUP BY path
ORDER BY request_count DESC;

⑥ クライアント IP 別のアクセス数(上位 10 件)

SELECT
  client_ip,
  COUNT(*) AS request_count,
  SUM(bytes_sent) AS total_bytes_sent
FROM handson_logs_db.access_logs
WHERE year = '2024' AND month = '01'
GROUP BY client_ip
ORDER BY request_count DESC
LIMIT 10;

⑦ 時間帯別リクエスト数(ピーク時間分析)

SELECT
  hour,
  COUNT(*) AS request_count,
  ROUND(AVG(response_time_ms), 0) AS avg_response_ms
FROM handson_logs_db.access_logs
WHERE year = '2024' AND month = '01'
GROUP BY hour
ORDER BY CAST(hour AS int);
5-1. クエリを実行して結果を確認

Athena のクエリエディタで上記クエリを 1 つずつ実行します。クエリ実行後、下部に「データスキャン量」と「実行時間」が表示されます。

ℹ️ Athena のコスト

Athena はスキャンしたデータ量 1 TB あたり $5 USD の課金です。WHERE year='...' AND month='...' でパーティションを絞ることでスキャン量を最小化できます。今回のテストデータ(数 KB)では実質 $0 です。

✅ 確認ポイント

クエリが正常に実行されステップ 3 で投入したデータが結果に表示されれば成功です。

⚡ ステップ 6 ― パーティション自動追加の設定

継続的にデータが追加される本番環境では、パーティションを自動で登録する仕組みが必要です。

方法 A: AWS Glue クローラーを使う

6-1. Glue クローラーを設定する

AWS Glue コンソールを開き 「クローラー」→「クローラーを作成」 をクリックします。

設定項目設定値
クローラー名handson-logs-crawler
データソースS3 → s3://handson-logs-<your-account-id>/logs/
IAM ロールAWSGlueServiceRole を新規作成
ターゲットデータベースhandson_logs_db
スケジュール毎時 or オンデマンド実行

「クローラーを作成」→「今すぐ実行」をクリックするとパーティションが自動登録されます。

方法 B: Athena の MSCK REPAIR を定期実行

6-2. Lambda + EventBridge で自動化(参考)

Lambda 関数から Athena の MSCK REPAIR TABLE を定期実行することも可能です。本番環境では新しいパーティションが毎日追加されるため、EventBridge で毎日 00:05 に実行するスケジュールを設定します。

{
  "Statement": "MSCK REPAIR TABLE handson_logs_db.access_logs"
}

パフォーマンス最適化のヒント

ℹ️ 本番環境でのベストプラクティス
最適化手法効果難易度
Parquet 形式に変換(Firehose の変換機能)スキャン量 70〜90% 削減★★☆
Hive パーティション(年/月/日/時)特定期間のスキャンのみ★☆☆
GZIP/Snappy 圧縮ストレージコスト 60〜80% 削減★☆☆
S3 Intelligent-Tieringアクセス頻度に応じた自動移行★☆☆
Athena CTAS でデータ整形繰り返しクエリの高速化★★☆

🧹 クリーンアップ

⚠️ 課金防止のためクリーンアップを必ず実施してください

Kinesis Firehose は稼働時間に応じて課金されます($0.029/GB + 基本料金)。S3 はデータ保管量で課金されます。

削除手順(逆順で実施)

  1. Glue クローラーを削除(作成した場合)
    Glue → クローラー → handson-logs-crawler を選択 → 「削除」
  2. Athena テーブルとデータベースを削除
    Athena のクエリエディタで実行:
    DROP TABLE IF EXISTS handson_logs_db.access_logs;
    DROP DATABASE IF EXISTS handson_logs_db;
  3. Kinesis Firehose 配信ストリームを削除
    Kinesis → Data Firehose → handson-access-logs-stream → 「削除」
  4. S3 バケットを削除
    S3 → バケットを選択 → 「空にする」でオブジェクトを全削除 → 「削除」
    handson-logs-*handson-athena-results-* の両方を削除
  5. IAM ロールを削除(自動作成されたもの)
    IAM → ロール → KinesisFirehoseServiceRole-*AWSGlueServiceRole-* を確認して削除
✅ クリーンアップ完了の確認
  • Kinesis Data Firehose の配信ストリーム一覧が空になった
  • S3 バケット handson-logs-* が削除された
  • Athena → データベース一覧に handson_logs_db が表示されなくなった

学習のまとめ

習得したスキル実践内容
S3 バケット設計Hive 形式パーティション・ライフサイクル設定
Kinesis Data Firehose配信ストリーム作成・バッファリング・GZIP 圧縮・S3 配信
AWS Glue Data Catalogデータベース・テーブル定義・パーティション登録
Amazon Athena7 種類の SQL クエリによるログ分析・パーティション絞り込みによるコスト最適化
運用自動化Glue クローラーによるパーティション自動追加