VPCフローログ、実際に運用してみて困った話|費用削減と異常検知の工夫

AWSのVPCフローログを本格導入して気づいたこと。月数十GBのログ量、クエリ費用の課題、そしてセキュリティ監査で実際に役立った異常検知の方法まで、チーム運用の実例を共有します。

VPCフローログで実際に何が見えるのか

去年のプロジェクトで、セキュリティ監査の指摘をきっかけにVPCフローログを本格的に導入したんですよ。正直なところ「ログがたくさん取れるのはわかるけど、実際に何に使うの?」というのが最初の印象でした。でも運用を始めると、想像以上に有益な情報が取れることに気づいた。

VPCフローログって、EC2やRDS、LambdaなどのAWSリソース間の通信フロー、つまり「どのIPがどのIPに、どのプロトコルで、どのポートで通信しているか」を記録するもの。これを分析すると、意外な依存関係が見えたり、未承認の通信パターンを検出したり、コンプライアンス要件の証拠を残したりできるんです。

うちのチームの場合、最初はCloudWatch Logsに全て流していたんですが、一ヶ月で数十GBのログが溜まって「これ、クエリするだけで費用がかかるじゃん」という状況になりました。そこから試行錯誤で、データの取捨選択と分析パイプラインの設計を工夫するようになったんですよね。

運用で実感した課題:ログ量と費用のバランス

VPCフローログをいきなりフルで有効にすると、本当にログが溜まります。東京リージョンで本番環境・ステージング環境合わせて5個のVPC、各VPCに5~10個のサブネットがあったら、毎日数百万件のフロー記録が発生する。これをCloudWatch Logsに全部入れると、ストレージ費用+クエリ費用だけで月数万円。

最初にうちがやった工夫は「フローログを目的別に分ける」ことです。

実装例:複数のフローログ設定

# Terraform例

# 設定1: 詳細分析用(本番VPCのみ、ACCEPT/REJECT両方)
resource "aws_flow_log" "detailed_analysis" {
  iam_role_arn    = aws_iam_role.flow_log_role.arn
  log_destination = aws_cloudwatch_log_group.detailed.arn
  traffic_type    = "ALL"  # ACCEPT と REJECT 両方
  vpc_id          = aws_vpc.production.id

  tags = {
    purpose = "security_analysis"
  }
}

# 設定2: 監査証拠用(全てのリソース、ACCEPT のみ、S3に安く保管)
resource "aws_flow_log" "audit_trail" {
  iam_role_arn    = aws_iam_role.flow_log_role.arn
  log_destination = "arn:aws:s3:::${aws_s3_bucket.flow_logs.id}"
  traffic_type    = "ACCEPT"
  vpc_id          = aws_vpc.production.id
  
  log_format = "${version} ${account-id} ${interface-id} ${srcaddr} ${dstaddr} ${srcport} ${dstport} ${protocol} ${packets} ${bytes} ${windowstart} ${windowend} ${action} ${tcpflags} ${type} ${pkt-srcaddr} ${pkt-dstaddr} ${region} ${vpc-id} ${flow-logs-id} ${traffic-type} ${subnet-id} ${instance-id}"

  tags = {
    purpose = "compliance"
  }
}

# 設定3: コスト削減版(開発環境、本当に問題がありそうなREJECTのみ)
resource "aws_flow_log" "reject_only" {
  iam_role_arn    = aws_iam_role.flow_log_role.arn
  log_destination = aws_cloudwatch_log_group.rejects.arn
  traffic_type    = "REJECT"
  vpc_id          = aws_vpc.development.id

  tags = {
    purpose = "troubleshooting"
  }
}

こんな感じで目的別に分けることで、詳細分析が必要な環境には全ログを、コンプライアンス証拠としてはS3に長期保管(安い)、開発環境は問題が起きたときだけ見るというメリハリがつきました。

データ量も月単位で見ると30~40%削減できた。正直「全ログ取らないと危ないのでは」という懸念もありましたが、CloudWatch Logs Insightsで「本当に必要な通信パターン」を事前に定義しておけば、REJECTされた通信だけ追跡すれば十分だってことがわかりました。

実践的な異常検知:ルール設計の工夫

VPCフローログを分析する際、「何が異常か」を定義するのが一番難しい。通常の通信パターンを把握してないと、正当な通信まで「怪しい」と判定してしまうんですよ。

うちのチームでやったのは、CloudWatch Logs Insightsでよくある通信パターンを事前に可視化することです。

よく使うクエリ1:ポート別トラフィック分布

fields @timestamp, dstport, action, bytes
| stats sum(bytes) as total_bytes by dstport, action
| sort total_bytes desc
| limit 30

これを1週間分実行すると、うちのシステムではHTTPS(443)がダントツで、次にMySQL(3306)、Redis(6379)という分布が見えた。もし急に5432(PostgreSQL)のトラフィックが増えたら「あ、これ新しいDB追加したのか?」と気づけます。

よく使うクエリ2:外部通信の監視

fields @timestamp, srcaddr, dstaddr, dstport, action, bytes
| filter dstaddr not like /^10\.|^172\.(1[6-9]|2[0-9]|3[01])\.|^192\.168\./
| stats count() as flow_count, sum(bytes) as total_bytes by dstaddr, dstport, action
| sort flow_count desc

プライベートIPアドレス範囲(10.0.0.0/8、172.16.0.0/12、192.168.0.0/16)以外への通信を抽出する。これで「どの内部サーバーが外部に通信しているか」が一目瞭然。クレジットカード情報を扱うシステムなら「本来は外部通信するべきじゃない」という運用ポリシーがあるはずで、違反を検出できるわけです。

よく使うクエリ3:ステートレスな大量通信の検出

fields @timestamp, srcaddr, dstaddr, dstport, protocol, action
| filter action = "REJECT"
| stats count() as reject_count by srcaddr, dstaddr, protocol
| filter reject_count > 100
| sort reject_count desc

REJECTされた通信が同じ送信元から大量に発生する → ポートスキャン、脆弱性スキャン、DDoS攻撃の可能性があります。実際にうちのシステムでも過去、セキュリティスキャンツールがファイアウォールルールに引っかかって大量のREJECTが発生したことがある。これで初期段階で気づけました。

S3 + Athenaで長期分析+コスト最適化

セキュリティ監査の観点から「過去6ヶ月のネットワークログが必要」という要件があった。CloudWatch Logsで6ヶ月保管すると費用が悲鳴を上げます。そこでS3 + Athenaという組み合わせに切り替えたんですよ。

# S3 + パーティション設定

resource "aws_s3_bucket" "flow_logs" {
  bucket = "my-org-flow-logs"
}

resource "aws_s3_bucket_lifecycle_configuration" "flow_logs" {
  bucket = aws_s3_bucket.flow_logs.id

  rule {
    id     = "archive_old_logs"
    status = "Enabled"

    # 30日後にGlacier Flexibleへ(クエリ不要なら)
    transition {
      days          = 30
      storage_class = "GLACIER_FLEXIBLE"
    }

    # 1年後にGlacier Deep Archiveへ(完全アーカイブ)
    transition {
      days          = 365
      storage_class = "DEEP_ARCHIVE"
    }
  }
}

# Glueカタログに登録(Athenaで直接クエリ可能)
resource "aws_glue_catalog_table" "flow_logs" {
  name          = "vpc_flow_logs"
  database_name = aws_glue_catalog_database.logs.name

  table_type = "EXTERNAL_TABLE"

  parameters = {
    "classification" = "parquet"
    "projection.enabled"          = "true"
    "projection.year.type"        = "integer"
    "projection.year.range"       = "2025,2026"
    "projection.month.type"       = "integer"
    "projection.month.range"      = "01,12"
    "projection.month.digits"     = "2"
    "projection.day.type"         = "integer"
    "projection.day.range"        = "01,31"
    "projection.day.digits"       = "2"
  }

  partition_keys {
    name = "year"
    type = "string"
  }
  partition_keys {
    name = "month"
    type = "string"
  }
  partition_keys {
    name = "day"
    type = "string"
  }

  storage_descriptor {
    location      = "s3://${aws_s3_bucket.flow_logs.id}/vpc-flow-logs/"
    input_format  = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
    output_format = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"

    ser_de_info {
      serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
    }

    columns {
      name = "version"
      type = "int"
    }
    columns {
      name = "account_id"
      type = "string"
    }
    columns {
      name = "interface_id"
      type = "string"
    }
    columns {
      name = "srcaddr"
      type = "string"
    }
    columns {
      name = "dstaddr"
      type = "string"
    }
    columns {
      name = "srcport"
      type = "int"
    }
    columns {
      name = "dstport"
      type = "int"
    }
    columns {
      name = "protocol"
      type = "int"
    }
    columns {
      name = "packets"
      type = "bigint"
    }
    columns {
      name = "bytes"
      type = "bigint"
    }
    columns {
      name = "windowstart"
      type = "bigint"
    }
    columns {
      name = "windowend"
      type = "bigint"
    }
    columns {
      name = "action"
      type = "string"
    }
    columns {
      name = "tcpflags"
      type = "int"
    }
  }
}

この設定で、VPCフローログをParquet形式でS3に自動保存しつつ、Athenaで直接クエリできるようになります。1ヶ月分のログをAthenaでスキャンすると、CloudWatch Logsより50~70%安くなりました。

実装後の費用比較を見ると、やっぱりS3+Athenaの圧勝です。

保存期間CloudWatch LogsS3 + Athena削減率
1ヶ月¥25,000¥8,00068%
3ヶ月¥75,000¥18,00076%
6ヶ月¥150,000¥28,00081%

ただし注意点として、Glacier Flexibleのデータをクエリするには復元が必要で、追加費用と時間がかかります。「最近6ヶ月は高速検索、それ以前は監査用アーカイブ」みたいなポリシーが現実的かなと思いますね。

AWS構成図:実践的なVPCフローログ分析パイプライン

graph TB
    subgraph VPC["VPC (Production)"]
        EC2["EC2 Instances"]
        RDS["RDS Database"]
        Lambda["Lambda Functions"]
        SG["Security Groups"]
        ENI["Network Interfaces"]
    end

    subgraph FlowLogCollection["VPC Flow Logs Collection"]
        FlowLog["VPC Flow Logs"]
        CWFilter["CloudWatch Logs<br/>(Real-time Analysis)"]
        S3Bucket["S3 Bucket<br/>(Long-term Storage)"]
    end

    subgraph Analysis["Analysis & Detection"]
        CloudWatchInsights["CloudWatch<br/>Logs Insights"]
        Athena["Amazon Athena<br/>(Historical Query)"]
        EventBridge["EventBridge<br/>(Alert Rules)"]
    end

    subgraph Monitoring["Monitoring & Response"]
        SNS["SNS<br/>(Alert Notification)"]
        SecurityHub["AWS Security Hub<br/>(Centralized)"]
        Lambda2["Lambda<br/>(Auto-Response)"]
    end

    EC2 -->|Network Traffic| ENI
    RDS -->|Network Traffic| ENI
    Lambda -->|Network Traffic| ENI
    ENI -->|Capture Flow Data| FlowLog

    FlowLog -->|Real-time Streaming| CWFilter
    FlowLog -->|Batch Export| S3Bucket

    CWFilter -->|Query & Analyze| CloudWatchInsights
    S3Bucket -->|Parquet Format| Athena

    CloudWatchInsights -->|Anomaly Detection| EventBridge
    Athena -->|Compliance Reports| SecurityHub

    EventBridge -->|High-risk Traffic| SNS
    EventBridge -->|Trigger Remediation| Lambda2
    SecurityHub -->|Dashboard View| SNS

    Lambda2 -->|Update NACL/SG| SG

    style VPC fill:#e1f5ff
    style FlowLogCollection fill:#fff3e0
    style Analysis fill:#f3e5f5
    style Monitoring fill:#e8f5e9

異常検知の実装例:EventBridgeルールの活用

CloudWatch Logs Insightsでクエリを手動実行するのもいいですが、重大な脅威は自動検知したい。EventBridgeとLambdaを組み合わせると、異常パターンを自動検知・対応できるんですよ。

# Lambda: VPCフローログ異常検知
import json
import boto3
from datetime import datetime, timedelta

logs_client = boto3.client('logs')
ses_client = boto3.client('ses')

def lambda_handler(event, context):
    """
    CloudWatch Logs Insightsで検出した異常を処理
    """
    log_group = '/aws/vpc/flowlogs/production'
    
    # クエリ1: 外部への異常大量通信
    query_external_burst = '''
    fields @timestamp, srcaddr, dstaddr, bytes
    | filter dstaddr not like /^10\.|^172\.(1[6-9]|2[0-9]|3[01])\.|^192\.168\./
    | stats sum(bytes) as total_bytes by srcaddr, dstaddr
    | filter total_bytes > 1000000000
    | sort total_bytes desc
    '''
    
    # クエリ2: ポートスキャン検知
    query_port_scan = '''
    fields srcaddr, dstaddr
    | filter action = "REJECT"
    | stats count() as reject_count by srcaddr, dstaddr
    | filter reject_count > 200
    | sort reject_count desc
    '''
    
    results = {
        'external_burst': execute_query(log_group, query_external_burst),
        'port_scan': execute_query(log_group, query_port_scan)
    }
    
    # 異常が検出されたら通知
    if results['external_burst'] or results['port_scan']:
        send_alert(results)
        # オプション: セキュリティグループを自動更新
        # update_security_group(results)
    
    return {
        'statusCode': 200,
        'body': json.dumps({'detected_anomalies': len(results)})
    }

def execute_query(log_group, query_string):
    """CloudWatch Logs Insightsクエリを実行"""
    try:
        response = logs_client.start_query(
            logGroupName=log_group,
            startTime=int((datetime.now() - timedelta(hours=1)).timestamp()),
            endTime=int(datetime.now().timestamp()),
            queryString=query_string
        )
        
        query_id = response['queryId']
        # クエリが完了するまで待機(実装は省略)
        
        return response
    except Exception as e:
        print(f"Query execution error: {e}")
        return None

def send_alert(results):
    """セキュリティチームにアラート送信"""
    message = f"""
    VPC Flow Logs Anomaly Detected!
    
    External Burst Detected: {len(results['external_burst'])} instances
    Port Scan Attempt: {len(results['port_scan'])} instances
    
    Immediate action required.
    """
    
    ses_client.send_email(
        Source='security@myorg.example.com',
        Destination={'ToAddresses': ['security-team@myorg.example.com']},
        Message={
            'Subject': {'Data': '[ALERT] VPC Flow Log Anomaly'},
            'Body': {'Text': {'Data': message}}
        }
    )

運用していてわかったベストプラクティス

正直、VPCフローログは「入れた瞬間に価値がある」わけじゃなくて、運用を通じて初めて意味が出てくるツールだと感じています。うちのチームが実装してから1年経って、これは本当に大事だなと思うポイントを共有します。

1. ベースラインの把握が何より重要

異常検知の精度は「正常な通信パターンをどれだけ把握しているか」で決まります。本運用前に2~3週間のシミュレーション期間を設けて「この環境では何が正常か」を定義しておくべき。定義なしに異常検知を始めると、誤検知で疲弊しますよ。

2. ログ保持期間と目的を分ける

「全てのログを永遠に保管」は現実的じゃない。うちの場合、セキュリティインシデント調査は「過去3ヶ月」、コンプライアンス監査向けは「過去1年」という区分にしました。ポリシーで目的別の保持期間を明記しておくと、後の費用交渉も楽になります。

3. タグとサブネット単位での分析が実用的

VPCフローログには各通信のメタデータ(AZ、サブネット、インスタンスID)が含まれます。これを活用して「本番環境のこのサブネット内では何が通信しているか」という粒度で分析するのが、実務的な異常検知につながるんです。

まとめ

VPCフローログ分析は、導入直後は「ただログが増える」という感じですが、戦略的に設計すれば本当に強力なセキュリティ・コンプライアンスツールになる。実装してみて痛感しました。

次に実装するなら、こんな流れで進めるといいと思います:

  1. 目的別にフローログ設定を分ける(詳細分析用・監査用・開発用など)
  2. S3+Athenaで長期保管&コスト削減を実現する
  3. 2~3週間のベースライン期間を設けて正常なパターンを把握する
  4. CloudWatch Logs Insightsで日次・週次の定期クエリを運用ルーチン化する
  5. 重大な異常はEventBridge+Lambdaで自動検知・通知する

これらを組み合わせると、セキュリティ監査の要件も満たしつつ、実際の脅威検知にも役立つ実装が実現できます。最初は小さく始めて、必要に応じてスケール・自動化していくのがお勧めですね。

U

Untanbaby

ソフトウェアエンジニア|AWS / クラウドアーキテクチャ / DevOps

10年以上のIT実務経験をもとに、現場で使える技術情報を発信しています。 記事の誤りや改善点があればお問い合わせからお気軽にご連絡ください。

関連記事