ストリームログを S3 に蓄積し Athena でサーバーレス SQL 分析を行う
このハンズオンでは、Kinesis Data Firehose を使ってストリームデータを Amazon S3 に自動配信し、AWS Glue Data Catalog でスキーマを定義し、Amazon Athena でサーバーレス SQL 分析を行うパイプラインを構築します。大量ログの長期保存・低コスト分析の基礎を学べます。
| 項目 | 内容 |
|---|---|
| 対象サービス | Kinesis Data Firehose、Amazon S3、AWS Glue、Amazon Athena |
| 主な学習内容 | Firehose 配信ストリーム・S3 パーティション設計・Glue テーブル定義・Athena SQL 分析 |
| 所要時間 | 75〜90 分 |
| 難易度 | ★★★☆☆(中級者向け) |
| 前提知識 | S3 の基礎知識、SQL の基礎知識 |
| 費用目安 | 約 0〜2 USD(データ量・スキャン量による) |
今回のハンズオンでは以下の JSON 形式のアクセスログを想定します。
{
"timestamp": "2024-01-15T10:23:45Z",
"method": "GET",
"path": "/api/products",
"status_code": 200,
"response_time_ms": 142,
"user_agent": "Mozilla/5.0",
"client_ip": "203.0.113.5",
"bytes_sent": 2048
}
ap-northeast-1(東京) に固定することAthena を初めて使う場合はクエリ結果の保存先 S3 バケットを設定する必要があります。ステップ 5 で案内します。
ログデータを保存するバケットと Athena クエリ結果用バケットを作成します。
AWS マネジメントコンソールで 「S3」 を検索して開きます。「バケットを作成」 をクリックします。
その他はデフォルトのまま「バケットを作成」をクリックします。
S3 バケット名はグローバルで一意である必要があります。<your-account-id> の部分をご自身の AWS アカウント ID(12 桁の数字)に置き換えてください。
同様の手順で作成します。
ログ保存バケットで 「管理」→「ライフサイクルルールを作成」 を開きます。
今回のハンズオンではデフォルトのまま進めても問題ありません。
S3 バケット一覧に handson-logs-* と handson-athena-results-* の 2 つが表示されれば成功です。
ログデータを受け取り S3 に自動配信するストリームを設定します。
AWS マネジメントコンソールで 「Kinesis」 を検索して開きます。左メニューから 「Data Firehose」→「配信ストリームを作成」 をクリックします。
| 設定項目 | 設定値 |
|---|---|
| ソース | Direct PUT(アプリケーションから直接 PUT) |
| 送信先 | Amazon S3 |
| 配信ストリーム名 | handson-access-logs-stream |
year=YYYY/month=MM/day=DD/ の形式(Hive 形式)にすると Athena が自動でパーティションを認識し、クエリで特定日時のデータのみスキャンできるためコストを大幅に削減できます。
| 設定項目 | 設定値 | 説明 |
|---|---|---|
| バッファサイズ | 5 MB | 5 MB 溜まったら S3 に書き込む |
| バッファ間隔 | 60 秒 | 60 秒経過したら S3 に書き込む |
| 圧縮 | GZIP | ストレージコスト削減 |
| データ形式変換 | 無効(今回は JSON のまま) | Parquet 変換も可能 |
「サービスロールを作成または更新」を選択します。Firehose が S3 に書き込むための IAM ロールが自動で作成されます。
設定を確認して「配信ストリームを作成」をクリックします。
配信ストリーム一覧に handson-access-logs-stream が表示され、ステータスが「アクティブ」になれば成功です(1〜2 分かかる場合があります)。
Firehose にサンプルデータを送信して S3 への配信を確認します。
Kinesis Data Firehose の詳細画面から 「テストレコードを送信」 タブを開きます。以下のサンプル JSON を入力して「テストレコードを送信」をクリックします。
{"timestamp":"2024-01-15T10:23:45Z","method":"GET","path":"/api/products","status_code":200,"response_time_ms":142,"user_agent":"Mozilla/5.0","client_ip":"203.0.113.5","bytes_sent":2048}
{"timestamp":"2024-01-15T10:23:47Z","method":"POST","path":"/api/orders","status_code":201,"response_time_ms":320,"user_agent":"Mozilla/5.0","client_ip":"203.0.113.10","bytes_sent":512}
{"timestamp":"2024-01-15T10:23:50Z","method":"GET","path":"/api/products/42","status_code":404,"response_time_ms":28,"user_agent":"curl/7.68.0","client_ip":"198.51.100.5","bytes_sent":128}
{"timestamp":"2024-01-15T10:23:55Z","method":"GET","path":"/api/users","status_code":500,"response_time_ms":5432,"user_agent":"Mozilla/5.0","client_ip":"192.0.2.20","bytes_sent":256}
より多くのテストデータを投入したい場合は AWS CLI を使います。
aws firehose put-record-batch \ --delivery-stream-name handson-access-logs-stream \ --records '[ {"Data":"eyJ0aW1lc3RhbXAiOiIyMDI0LTAxLTE1VDExOjAwOjAwWiIsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvYXBpL2l0ZW1zIiwic3RhdHVzX2NvZGUiOjIwMCwicmVzcG9uc2VfdGltZV9tcyI6OTUsInVzZXJfYWdlbnQiOiJNb3ppbGxhLzUuMCIsImNsaWVudF9pcCI6IjEwMy4wLjExMy41IiwiYnl0ZXNfc2VudCI6MTAyNH0="} ]' \ --region ap-northeast-1Firehose はバッファリング設定に従い「60 秒経過」または「5 MB 蓄積」のいずれか早い方で S3 に書き込みます。テスト後 1〜2 分待ってから S3 を確認してください。
S3 コンソールで handson-logs-<your-account-id> バケットを開きます。logs/year=2024/month=01/day=15/hour=10/ のようなパスにファイルが作成されているか確認します。
ファイル名は自動生成されます(例: 2024-01-15-10-23-45-xxxx)。
S3 バケット内にパーティション形式のディレクトリが作成され、JSON/GZIP ファイルが存在すれば成功です。
Athena で SQL クエリを実行するために、S3 のデータに対してスキーマを定義します。
AWS マネジメントコンソールで 「Athena」 を検索して開きます。初回利用の場合は「クエリ結果の場所を設定」が求められます。
「保存」をクリックします。
Athena のクエリエディタで以下の SQL を実行します。
CREATE DATABASE IF NOT EXISTS handson_logs_db;
以下の SQL を実行してテーブルを定義します。<your-account-id> を実際のアカウント ID に変更してください。
CREATE EXTERNAL TABLE IF NOT EXISTS handson_logs_db.access_logs (
`timestamp` string,
`method` string,
`path` string,
`status_code` int,
`response_time_ms` int,
`user_agent` string,
`client_ip` string,
`bytes_sent` int
)
PARTITIONED BY (
year string,
month string,
day string,
hour string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
)
LOCATION 's3://handson-logs-<your-account-id>/logs/'
TBLPROPERTIES ('has_encrypted_data' = 'false');
Hive 形式のパーティション(year=YYYY/month=MM/...)を定義することで、Athena クエリ時に WHERE year='2024' AND month='01' のような条件を付けてスキャン量を絞り込めます。これがコスト削減の核心です。
テーブル定義後、既存のパーティションを Glue Catalog に登録します。
MSCK REPAIR TABLE handson_logs_db.access_logs;
このコマンドは S3 のパーティションディレクトリを自動スキャンしてカタログに登録します。新しいパーティションが追加されるたびに実行するか、後述の自動追加設定を行います。
SHOW PARTITIONS handson_logs_db.access_logs; DESCRIBE handson_logs_db.access_logs;
SHOW PARTITIONS の結果にステップ 3 で投入したデータのパーティション(例: year=2024/month=01/day=15/hour=10)が表示されれば成功です。
Athena でサンプルクエリを実行し、ログを多角的に分析します。
SELECT * FROM handson_logs_db.access_logs WHERE year = '2024' AND month = '01' AND day = '15' ORDER BY timestamp DESC LIMIT 20;
SELECT status_code, COUNT(*) AS request_count FROM handson_logs_db.access_logs WHERE year = '2024' AND month = '01' GROUP BY status_code ORDER BY request_count DESC;
SELECT timestamp, method, path, status_code, response_time_ms, client_ip FROM handson_logs_db.access_logs WHERE year = '2024' AND month = '01' AND status_code >= 400 ORDER BY timestamp DESC LIMIT 50;
SELECT timestamp, method, path, status_code, response_time_ms FROM handson_logs_db.access_logs WHERE year = '2024' AND month = '01' ORDER BY response_time_ms DESC LIMIT 10;
SELECT path, COUNT(*) AS request_count, ROUND(AVG(response_time_ms), 0) AS avg_response_ms, MAX(response_time_ms) AS max_response_ms, SUM(bytes_sent) AS total_bytes FROM handson_logs_db.access_logs WHERE year = '2024' AND month = '01' GROUP BY path ORDER BY request_count DESC;
SELECT client_ip, COUNT(*) AS request_count, SUM(bytes_sent) AS total_bytes_sent FROM handson_logs_db.access_logs WHERE year = '2024' AND month = '01' GROUP BY client_ip ORDER BY request_count DESC LIMIT 10;
SELECT hour, COUNT(*) AS request_count, ROUND(AVG(response_time_ms), 0) AS avg_response_ms FROM handson_logs_db.access_logs WHERE year = '2024' AND month = '01' GROUP BY hour ORDER BY CAST(hour AS int);
Athena のクエリエディタで上記クエリを 1 つずつ実行します。クエリ実行後、下部に「データスキャン量」と「実行時間」が表示されます。
Athena はスキャンしたデータ量 1 TB あたり $5 USD の課金です。WHERE year='...' AND month='...' でパーティションを絞ることでスキャン量を最小化できます。今回のテストデータ(数 KB)では実質 $0 です。
クエリが正常に実行されステップ 3 で投入したデータが結果に表示されれば成功です。
継続的にデータが追加される本番環境では、パーティションを自動で登録する仕組みが必要です。
AWS Glue コンソールを開き 「クローラー」→「クローラーを作成」 をクリックします。
| 設定項目 | 設定値 |
|---|---|
| クローラー名 | handson-logs-crawler |
| データソース | S3 → s3://handson-logs-<your-account-id>/logs/ |
| IAM ロール | AWSGlueServiceRole を新規作成 |
| ターゲットデータベース | handson_logs_db |
| スケジュール | 毎時 or オンデマンド実行 |
「クローラーを作成」→「今すぐ実行」をクリックするとパーティションが自動登録されます。
Lambda 関数から Athena の MSCK REPAIR TABLE を定期実行することも可能です。本番環境では新しいパーティションが毎日追加されるため、EventBridge で毎日 00:05 に実行するスケジュールを設定します。
{
"Statement": "MSCK REPAIR TABLE handson_logs_db.access_logs"
}
| 最適化手法 | 効果 | 難易度 |
|---|---|---|
| Parquet 形式に変換(Firehose の変換機能) | スキャン量 70〜90% 削減 | ★★☆ |
| Hive パーティション(年/月/日/時) | 特定期間のスキャンのみ | ★☆☆ |
| GZIP/Snappy 圧縮 | ストレージコスト 60〜80% 削減 | ★☆☆ |
| S3 Intelligent-Tiering | アクセス頻度に応じた自動移行 | ★☆☆ |
| Athena CTAS でデータ整形 | 繰り返しクエリの高速化 | ★★☆ |
Kinesis Firehose は稼働時間に応じて課金されます($0.029/GB + 基本料金)。S3 はデータ保管量で課金されます。
handson-logs-crawler を選択 → 「削除」
DROP TABLE IF EXISTS handson_logs_db.access_logs; DROP DATABASE IF EXISTS handson_logs_db;
handson-access-logs-stream → 「削除」
handson-logs-* と handson-athena-results-* の両方を削除
KinesisFirehoseServiceRole-*、AWSGlueServiceRole-* を確認して削除
handson-logs-* が削除されたhandson_logs_db が表示されなくなった| 習得したスキル | 実践内容 |
|---|---|
| S3 バケット設計 | Hive 形式パーティション・ライフサイクル設定 |
| Kinesis Data Firehose | 配信ストリーム作成・バッファリング・GZIP 圧縮・S3 配信 |
| AWS Glue Data Catalog | データベース・テーブル定義・パーティション登録 |
| Amazon Athena | 7 種類の SQL クエリによるログ分析・パーティション絞り込みによるコスト最適化 |
| 運用自動化 | Glue クローラーによるパーティション自動追加 |