GCP 서비스 로그를 AWS로 안정적으로 통합 입수하기 — 프로덕션 파이프라인 설계와 구현

이군·2026년 4월 26일

멀티클라우드 환경에서 GCP 측 로그를 AWS의 보안/관측 스택으로 일원화하는 파이프라인을, 인증 보안과 신뢰성 보강까지 포함해 단계별로 정리한다.


1. 들어가며 — 왜 이 파이프라인이 필요한가

조직이 멀티클라우드를 운영하면 한쪽 클라우드(예: AWS)에 SIEM, 데이터 레이크, 위협 탐지 스택이 집중되어 있고, 다른 쪽(GCP)에는 일부 워크로드만 분산되어 있는 경우가 많다. 이때 GCP 쪽 로그를 별도 콘솔에서 보거나 두 개의 SIEM을 운영하는 것은 비용·운영·탐지 커버리지 측면에서 모두 불리하다.

해결책은 단순하다 — GCP에서 발생하는 로그를 안정적으로 AWS 측 저장/분석 계층으로 흘려보내는 것. 그러나 이 단순한 요구를 “프로덕션 등급”으로 만드는 데 들어가는 디테일은 결코 작지 않다. 이 글에서는 다음을 모두 다룬다.

  • 어떤 아키텍처가 가장 안정적인가, 그 이유
  • 정적 서비스 계정 키 없이 인증하는 방법(Workload Identity Federation)
  • 메시지 유실 없는 신뢰성 보강(DLQ, retry, idempotency)
  • 비용 최적화와 운영 지표 모니터링
  • 보안 점검 체크리스트

2. 요구사항 정의

먼저 “안정적”이라는 말의 의미를 풀어보자. 본 글에서는 다음을 충족해야 안정적인 파이프라인으로 본다.

  • Durability: 어떤 단계에서도 메시지가 조용히 사라지지 않는다.
  • At-least-once delivery: 최소 1회 전달을 보장한다(중복은 다운스트림에서 처리).
  • Backpressure 대응: 일시적 폭주가 발생해도 시스템이 무너지지 않는다.
  • Operational visibility: 누락·지연·오류를 즉시 관측할 수 있다.
  • Secret-less 인증: 정적 자격증명을 보관하지 않는다.
  • Cost-aware: 인입량 대비 비용이 예측 가능하다.

3. 후보 아키텍처 비교

GCP → AWS 로그 전송에는 크게 4가지 패턴이 있다.

3.1 Pattern A — Cloud Logging → Pub/Sub → AWS Pull 컨슈머 (권장)

[GCP Service] → [Cloud Logging] → [Log Sink] → [Pub/Sub Topic] → [Subscription]
                                                                       ↑ pull
                                                               [AWS Lambda/ECS]
                                                                       ↓
                                                    [Kinesis Firehose] → [S3]
                                                                       ↘ [OpenSearch]
  • 장점: Pub/Sub의 buffering·재시도·DLQ를 그대로 활용. AWS 측이 자기 페이스로 pull하므로 backpressure 자연스러움. WIF 적용 가능.
  • 단점: AWS 측 컴퓨트(컨슈머) 운영 필요.

3.2 Pattern B — Pub/Sub Push → API Gateway → Lambda

[Pub/Sub] → push → [API Gateway] → [Lambda] → [Firehose] → [S3/OpenSearch]
  • 장점: AWS 측 폴링 컴퓨트 불필요.
  • 단점: 푸시 엔드포인트 인증 경계가 늘어남(JWT 검증 필수). Pub/Sub push는 AWS 입장에서 서명 검증이 추가 작업이고, 폭주 시 Lambda 동시성 제한에 직접 부딪힘.

3.3 Pattern C — Cloud Logging → GCS Bucket → S3 동기화

[Cloud Logging] → [Log Sink] → [GCS Bucket] → [Storage Transfer Service / DataSync] → [S3]
  • 장점: 대용량 배치 적재에 단순하고 저렴.
  • 단점: 분 단위 이상의 지연 발생. 실시간 탐지·알람용으로는 부적합.

3.4 Pattern D — Fluent Bit/OTel 에이전트 직접 전송

[GCE/GKE Workload] → [Fluent Bit] → [Kinesis Firehose / OpenSearch / S3]
  • 장점: 중간 큐 없이 단순. 애플리케이션 로그에 강함.
  • 단점: GCP 관리형 서비스(Cloud Audit Log, GCS access log, BigQuery audit 등)는 Cloud Logging을 거쳐야만 잡히기 때문에 모든 로그 소스를 커버하지 못함. 에이전트 장애 시 로컬 큐가 한도를 넘으면 유실.

3.5 결론

  • 실시간 보안 분석이 목표 → Pattern A
  • 콜드 스토리지 아카이빙만 필요 → Pattern C
  • 애플리케이션 로그만 별도 → Pattern D를 A에 보조

이 글에서는 가장 보편적이고 견고한 Pattern A를 기준으로 끝까지 구현한다.


4. 최종 아키텍처

                  GCP 영역                                         AWS 영역
┌────────────────────────────────────────┐         ┌──────────────────────────────────────┐
│                                        │         │                                      │
│  GCP Services (GCE, GKE, Cloud SQL,    │         │  ┌──────────────────────────────┐    │
│  IAM, Audit, VPC Flow…)                │         │  │  Lambda (Pub/Sub Subscriber) │    │
│            │                           │         │  │   - WIF로 GCP 인증           │    │
│            ▼                           │         │  │   - Pull, ack 관리           │    │
│  ┌──────────────────┐                  │         │  │   - 정규화/Enrich            │    │
│  │  Cloud Logging   │                  │         │  └──────────────┬───────────────┘    │
│  └────────┬─────────┘                  │         │                 │                    │
│           │ inclusion/exclusion filter │         │                 ▼                    │
│           ▼                            │         │  ┌──────────────────────────────┐    │
│  ┌──────────────────┐                  │         │  │  Kinesis Data Firehose       │    │
│  │  Log Sink        │                  │         │  │   - Buffering, GZIP          │    │
│  └────────┬─────────┘                  │         │  │   - Transform Lambda(opt)    │    │
│           ▼                            │         │  └──────┬───────────────┬───────┘    │
│  ┌──────────────────┐  ┌──────────┐    │         │         │               │            │
│  │  Pub/Sub Topic   │→ │ DLQ Topic│    │         │         ▼               ▼            │
│  └────────┬─────────┘  └──────────┘    │         │      [S3]        [OpenSearch]        │
│           │                            │         │   (long-term)   (search/alert)       │
│           ▼                            │         │                                      │
│  ┌──────────────────┐                  │         │                                      │
│  │  Subscription    │ ← pull ──────────┼─────────┼──── (WIF token over HTTPS)           │
│  │   - ack deadline │                  │         │                                      │
│  │   - retry policy │                  │         │                                      │
│  └──────────────────┘                  │         │                                      │
│                                        │         │                                      │
│  ┌──────────────────┐                  │         │                                      │
│  │  Workload        │  ← trust AWS IAM │         │                                      │
│  │  Identity Pool   │     role         │         │                                      │
│  └──────────────────┘                  │         │                                      │
└────────────────────────────────────────┘         └──────────────────────────────────────┘

핵심 설계 포인트:

  1. Pub/Sub이 1차 버퍼 — Cloud Logging이 직접 AWS로 쏘지 않고 Pub/Sub에 적재한다. AWS 측 일시 장애 시에도 메시지 보존.
  2. Workload Identity Federation — AWS Lambda 실행 역할이 곧 GCP 인증의 주체가 된다. 정적 SA 키를 어디에도 저장하지 않는다.
  3. Firehose가 2차 버퍼 — Lambda가 Firehose에 PUT만 하고 끝낸다. S3/OpenSearch 적재는 Firehose가 책임진다.
  4. DLQ 이중화 — Pub/Sub DLQ(전송 실패)와 Firehose 백업 S3(다운스트림 실패) 둘 다 둔다.

5. 사전 준비

  • GCP 프로젝트 1개 — Owner 또는 동등 권한
  • AWS 계정 1개 — IAM/Lambda/Firehose/S3/OpenSearch 생성 권한
  • 로컬: gcloud, aws CLI, terraform(선택), Python 3.11+

이후 예시는 다음 변수로 진행한다(자기 환경에 맞춰 치환).

# GCP
export GCP_PROJECT="my-gcp-prod"
export GCP_PROJECT_NUMBER="123456789012"
export PUBSUB_TOPIC="aws-bound-logs"
export PUBSUB_DLQ="aws-bound-logs-dlq"
export PUBSUB_SUB="aws-bound-logs-sub"
export WIF_POOL="aws-federation"
export WIF_PROVIDER="aws-provider"
export GCP_SA="pubsub-aws-reader@${GCP_PROJECT}.iam.gserviceaccount.com"

# AWS
export AWS_ACCOUNT_ID="987654321098"
export AWS_REGION="ap-northeast-2"
export LAMBDA_ROLE_NAME="gcp-pubsub-subscriber-role"
export FIREHOSE_NAME="gcp-logs-firehose"
export S3_BUCKET="gcp-logs-archive-${AWS_ACCOUNT_ID}"

6. Step 1 — GCP: Pub/Sub Topic, DLQ, Subscription 구성

6.1 Topic과 DLQ 생성

# 메인 토픽
gcloud pubsub topics create $PUBSUB_TOPIC --project=$GCP_PROJECT

# DLQ 토픽
gcloud pubsub topics create $PUBSUB_DLQ --project=$GCP_PROJECT

6.2 Subscription 생성 (Pull 방식)

gcloud pubsub subscriptions create $PUBSUB_SUB \
  --topic=$PUBSUB_TOPIC \
  --project=$GCP_PROJECT \
  --ack-deadline=60 \
  --message-retention-duration=7d \
  --dead-letter-topic=$PUBSUB_DLQ \
  --max-delivery-attempts=5 \
  --min-retry-delay=10s \
  --max-retry-delay=600s

옵션 의미를 확실히 짚자.

  • --ack-deadline=60: Lambda가 메시지를 받은 뒤 60초 내에 ack하지 않으면 재전달. Lambda 처리 시간 + Firehose PUT 지연을 고려해 여유 있게.
  • --message-retention-duration=7d: AWS 측이 7일까지 죽어 있어도 메시지가 보존된다. Pub/Sub 최대 7일.
  • --dead-letter-topic: 5번 재시도해도 처리 실패하면 DLQ로 넘긴다.
  • --min/max-retry-delay: 지수 백오프 범위.

6.3 Pub/Sub이 DLQ로 publish할 권한 부여

이걸 빼먹으면 DLQ가 작동하지 않는다.

PUBSUB_SERVICE_ACCOUNT="service-${GCP_PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub topics add-iam-policy-binding $PUBSUB_DLQ \
  --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}" \
  --role="roles/pubsub.publisher" \
  --project=$GCP_PROJECT

gcloud pubsub subscriptions add-iam-policy-binding $PUBSUB_SUB \
  --member="serviceAccount:${PUBSUB_SERVICE_ACCOUNT}" \
  --role="roles/pubsub.subscriber" \
  --project=$GCP_PROJECT

6.4 Cloud Logging Sink 생성

이 단계에서 어떤 로그를 보낼지 필터로 정한다. 모든 로그를 다 보내면 비용이 폭발하므로, 필요한 것만 골라낸다.

# 예: Audit Log + GCE/GKE serious 이상만 전송
gcloud logging sinks create aws-bound-sink \
  pubsub.googleapis.com/projects/${GCP_PROJECT}/topics/${PUBSUB_TOPIC} \
  --log-filter='
    logName:"cloudaudit.googleapis.com" OR
    (resource.type=("gce_instance" OR "k8s_container") AND severity>=WARNING)
  ' \
  --project=$GCP_PROJECT

Sink가 Pub/Sub에 publish할 수 있도록 권한 부여:

SINK_WRITER=$(gcloud logging sinks describe aws-bound-sink \
  --project=$GCP_PROJECT --format='value(writerIdentity)')

gcloud pubsub topics add-iam-policy-binding $PUBSUB_TOPIC \
  --member="$SINK_WRITER" \
  --role="roles/pubsub.publisher" \
  --project=$GCP_PROJECT

여기까지 하면 GCP 측 로그가 Pub/Sub에 쌓이기 시작한다. gcloud pubsub subscriptions pull로 확인:

gcloud pubsub subscriptions pull $PUBSUB_SUB --auto-ack --limit=3 --project=$GCP_PROJECT

7. Step 2 — Workload Identity Federation 구성 (AWS → GCP)

핵심 보안 결정: GCP SA의 JSON 키를 만들지 않는다. 대신 AWS Lambda 실행 역할이 GCP에 OIDC 식별 주체로서 인정받는 구조를 만든다.

7.1 Workload Identity Pool과 Provider 생성

# 1) Pool 생성
gcloud iam workload-identity-pools create $WIF_POOL \
  --location="global" \
  --display-name="AWS federation" \
  --project=$GCP_PROJECT

# 2) AWS Provider 생성 — AWS 계정 ID로 신뢰 범위를 좁힌다
gcloud iam workload-identity-pools providers create-aws $WIF_PROVIDER \
  --location="global" \
  --workload-identity-pool=$WIF_POOL \
  --account-id=$AWS_ACCOUNT_ID \
  --attribute-mapping="\
google.subject=assertion.arn,\
attribute.aws_role=assertion.arn.extract('assumed-role/{role}/'),\
attribute.aws_account=assertion.account" \
  --project=$GCP_PROJECT

attribute.aws_role로 AWS IAM Role 이름을 추출해 두면, 다음 단계에서 “특정 Lambda 역할만” 신뢰하도록 좁힐 수 있다.

7.2 GCP Service Account 생성 및 권한 부여

# Pub/Sub Subscriber 역할만 가진 SA
gcloud iam service-accounts create pubsub-aws-reader \
  --project=$GCP_PROJECT

gcloud pubsub subscriptions add-iam-policy-binding $PUBSUB_SUB \
  --member="serviceAccount:${GCP_SA}" \
  --role="roles/pubsub.subscriber" \
  --project=$GCP_PROJECT

7.3 AWS IAM Role이 GCP SA를 임퍼소네이트할 수 있도록 바인딩

WIF_POOL_ID="projects/${GCP_PROJECT_NUMBER}/locations/global/workloadIdentityPools/${WIF_POOL}"

gcloud iam service-accounts add-iam-policy-binding $GCP_SA \
  --role="roles/iam.workloadIdentityUser" \
  --member="principalSet://iam.googleapis.com/${WIF_POOL_ID}/attribute.aws_role/${LAMBDA_ROLE_NAME}" \
  --project=$GCP_PROJECT

principalSet://...attribute.aws_role/<롤이름> 부분이 핵심이다. AWS 측에서 이 IAM Role을 Assume한 주체만 GCP SA를 임퍼소네이트할 수 있다. 다른 AWS 역할이 같은 계정에 있어도 통하지 않는다.

7.4 Credential Configuration JSON 생성

Lambda 코드가 사용할 자격증명 설정 파일을 만든다. 이 파일에는 비밀이 없다.

gcloud iam workload-identity-pools create-cred-config \
  "${WIF_POOL_ID}/providers/${WIF_PROVIDER}" \
  --service-account=$GCP_SA \
  --aws \
  --output-file=gcp-credential-config.json

생성된 JSON은 대략 이렇게 생겼다.

{
  "type": "external_account",
  "audience": "//iam.googleapis.com/projects/.../providers/aws-provider",
  "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
  "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/pubsub-aws-reader@...:generateAccessToken",
  "token_url": "https://sts.googleapis.com/v1/token",
  "credential_source": {
    "environment_id": "aws1",
    "region_url": "...",
    "url": "...",
    "regional_cred_verification_url": "..."
  }
}

이걸 Lambda 패키지에 동봉하거나 환경변수 GOOGLE_APPLICATION_CREDENTIALS로 경로를 지정하면 google-auth 라이브러리가 자동으로 인식한다.


8. Step 3 — AWS: S3, Firehose, IAM 구성

8.1 아카이브 S3 버킷

aws s3api create-bucket \
  --bucket $S3_BUCKET \
  --region $AWS_REGION \
  --create-bucket-configuration LocationConstraint=$AWS_REGION

# 보안 기본값
aws s3api put-public-access-block --bucket $S3_BUCKET \
  --public-access-block-configuration "BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true"

aws s3api put-bucket-encryption --bucket $S3_BUCKET \
  --server-side-encryption-configuration '{
    "Rules":[{"ApplyServerSideEncryptionByDefault":{"SSEAlgorithm":"aws:kms"}}]
  }'

aws s3api put-bucket-versioning --bucket $S3_BUCKET \
  --versioning-configuration Status=Enabled

수명 주기 정책으로 비용을 잡아둔다(Hot 30일, Glacier IR 90일, Deep Archive 1년 후).

{
  "Rules": [{
    "ID": "tier-down",
    "Status": "Enabled",
    "Filter": { "Prefix": "" },
    "Transitions": [
      { "Days": 30,  "StorageClass": "STANDARD_IA" },
      { "Days": 90,  "StorageClass": "GLACIER_IR" },
      { "Days": 365, "StorageClass": "DEEP_ARCHIVE" }
    ]
  }]
}

8.2 Kinesis Data Firehose Delivery Stream

핵심은 다이내믹 파티셔닝S3 백업 모드다. JSON 로그를 year/month/day/hour 경로로 떨어뜨리고, 변환 실패한 레코드도 별도 prefix로 살려 둔다.

Firehose IAM 역할(가정):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:AbortMultipartUpload","s3:GetBucketLocation","s3:GetObject","s3:ListBucket","s3:ListBucketMultipartUploads","s3:PutObject"],
      "Resource": ["arn:aws:s3:::gcp-logs-archive-987654321098","arn:aws:s3:::gcp-logs-archive-987654321098/*"]
    },
    {
      "Effect": "Allow",
      "Action": ["kms:Decrypt","kms:GenerateDataKey"],
      "Resource": "arn:aws:kms:ap-northeast-2:987654321098:key/<key-id>"
    },
    {
      "Effect": "Allow",
      "Action": ["logs:PutLogEvents"],
      "Resource": "arn:aws:logs:ap-northeast-2:987654321098:log-group:/aws/kinesisfirehose/*"
    }
  ]
}

Firehose 생성(콘솔 또는 CLI). CLI 핵심 옵션:

  • Source: Direct PUT
  • Destination: Amazon S3 (필요 시 OpenSearch와 듀얼 destination은 별도 스트림으로 분기)
  • Buffering: 5 MB / 60 seconds
  • Compression: GZIP
  • Dynamic partitioning: enabled
  • Partition keys: severity, resource_type, !{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/...

실무에선 Terraform이 가장 깔끔하다. 핵심만 보면:

resource "aws_kinesis_firehose_delivery_stream" "gcp_logs" {
  name        = "gcp-logs-firehose"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn   = aws_iam_role.firehose.arn
    bucket_arn = aws_s3_bucket.archive.arn
    prefix              = "logs/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/hour=!{partitionKeyFromQuery:hour}/"
    error_output_prefix = "errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/"

    buffering_size     = 5
    buffering_interval = 60
    compression_format = "GZIP"

    dynamic_partitioning_configuration {
      enabled = true
    }

    processing_configuration {
      enabled = true
      processors {
        type = "MetadataExtraction"
        parameters {
          parameter_name  = "MetadataExtractionQuery"
          parameter_value = "{year:.timestamp|strptime(\"%Y-%m-%dT%H:%M:%S\")|strftime(\"%Y\"), month:.timestamp|strptime(\"%Y-%m-%dT%H:%M:%S\")|strftime(\"%m\"), day:.timestamp|strptime(\"%Y-%m-%dT%H:%M:%S\")|strftime(\"%d\"), hour:.timestamp|strptime(\"%Y-%m-%dT%H:%M:%S\")|strftime(\"%H\")}"
        }
        parameters {
          parameter_name  = "JsonParsingEngine"
          parameter_value = "JQ-1.6"
        }
      }
    }
  }
}

8.3 Lambda 실행 역할

이 역할 이름이 7.3에서 GCP에 등록한 ${LAMBDA_ROLE_NAME}과 정확히 일치해야 한다.

신뢰 정책:

{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": { "Service": "lambda.amazonaws.com" },
    "Action": "sts:AssumeRole"
  }]
}

권한 정책(최소권한):

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["firehose:PutRecord","firehose:PutRecordBatch"],
      "Resource": "arn:aws:firehose:ap-northeast-2:987654321098:deliverystream/gcp-logs-firehose"
    },
    {
      "Effect": "Allow",
      "Action": ["logs:CreateLogGroup","logs:CreateLogStream","logs:PutLogEvents"],
      "Resource": "*"
    }
  ]
}

여기에는 GCP에 닿기 위한 권한이 들어가지 않는다 — GCP 인증은 WIF가 STS GetCallerIdentity 서명만 사용하기 때문이다. Lambda는 외부로 나가는 NAT 또는 퍼블릭 인터넷 접근만 있으면 GCP STS와 Pub/Sub에 닿을 수 있다.


9. Step 4 — AWS: Pub/Sub Subscriber Lambda 구현

9.1 패키지 구조

subscriber/
├── lambda_function.py
├── gcp-credential-config.json
└── requirements.txt

requirements.txt:

google-cloud-pubsub==2.21.1
google-auth==2.30.0
boto3>=1.34.0

9.2 코드

핵심 동작:

  1. Lambda 콜드스타트 시 google-auth가 WIF로 GCP access token을 받아옴
  2. Pub/Sub pull 호출(synchronous pull, 한 번에 최대 1000건)
  3. 메시지 디코드, 정규화, Firehose PutRecordBatch
  4. 성공한 ack_id만 ack
  5. EventBridge Scheduler 등으로 1분마다 실행
# lambda_function.py
import os, json, base64, gzip, logging
from datetime import datetime, timezone
from typing import List, Dict

import boto3
from google.cloud import pubsub_v1
from google.api_core import retry as g_retry

log = logging.getLogger()
log.setLevel(logging.INFO)

PROJECT_ID    = os.environ["GCP_PROJECT_ID"]
SUBSCRIPTION  = os.environ["PUBSUB_SUBSCRIPTION"]
FIREHOSE_NAME = os.environ["FIREHOSE_NAME"]
MAX_MESSAGES  = int(os.environ.get("MAX_MESSAGES", "500"))

# Lambda 환경에선 GOOGLE_APPLICATION_CREDENTIALS=/var/task/gcp-credential-config.json 로 지정
firehose = boto3.client("firehose")

def normalize(msg_data: bytes) -> Dict:
    """Cloud Logging LogEntry → 우리 표준 스키마로."""
    raw = json.loads(msg_data.decode("utf-8"))
    return {
        "timestamp":     raw.get("timestamp") or datetime.now(timezone.utc).isoformat(),
        "severity":      raw.get("severity", "DEFAULT"),
        "log_name":      raw.get("logName"),
        "resource_type": (raw.get("resource") or {}).get("type"),
        "resource_labels": (raw.get("resource") or {}).get("labels", {}),
        "insert_id":     raw.get("insertId"),     # ← idempotency 키
        "trace":         raw.get("trace"),
        "payload":       raw.get("jsonPayload") or raw.get("textPayload") or raw.get("protoPayload"),
        "source_cloud":  "gcp",
        "source_project": PROJECT_ID,
    }

def to_firehose_records(entries: List[Dict]):
    out = []
    for e in entries:
        # 줄바꿈 구분자 — Firehose가 S3에 NDJSON으로 쌓도록
        out.append({"Data": (json.dumps(e, ensure_ascii=False) + "\n").encode("utf-8")})
    return out

def chunk(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def lambda_handler(event, context):
    sub_client = pubsub_v1.SubscriberClient()
    sub_path   = sub_client.subscription_path(PROJECT_ID, SUBSCRIPTION)

    response = sub_client.pull(
        request={"subscription": sub_path, "max_messages": MAX_MESSAGES},
        timeout=20,
        retry=g_retry.Retry(deadline=30),
    )

    if not response.received_messages:
        log.info("no messages")
        return {"pulled": 0}

    log.info(f"pulled {len(response.received_messages)} messages")

    normalized, ack_ids = [], []
    for rm in response.received_messages:
        try:
            normalized.append(normalize(rm.message.data))
            ack_ids.append(rm.ack_id)
        except Exception as ex:
            # 디코드/정규화 실패는 ack 하지 않는다 → Pub/Sub 재시도, 5회 후 DLQ로
            log.error(f"normalize failed: {ex}; ack_id={rm.ack_id}")

    # Firehose는 PutRecordBatch가 1회 500레코드 / 4MB 제한
    delivered_idx = 0
    failed_indices = set()
    for batch in chunk(normalized, 500):
        records = to_firehose_records(batch)
        resp = firehose.put_record_batch(DeliveryStreamName=FIREHOSE_NAME, Records=records)
        if resp.get("FailedPutCount", 0) > 0:
            for i, r in enumerate(resp["RequestResponses"]):
                if "ErrorCode" in r:
                    failed_indices.add(delivered_idx + i)
        delivered_idx += len(records)

    # 성공한 메시지만 ack
    success_ack_ids = [ack_ids[i] for i in range(len(ack_ids)) if i not in failed_indices]
    if success_ack_ids:
        sub_client.acknowledge(request={"subscription": sub_path, "ack_ids": success_ack_ids})
        log.info(f"acked {len(success_ack_ids)} messages")
    if failed_indices:
        log.warning(f"{len(failed_indices)} records failed Firehose; will be redelivered")

    return {
        "pulled": len(response.received_messages),
        "acked":  len(success_ack_ids),
        "failed_to_firehose": len(failed_indices),
    }

9.3 배포 시 환경변수

GCP_PROJECT_ID=my-gcp-prod
PUBSUB_SUBSCRIPTION=aws-bound-logs-sub
FIREHOSE_NAME=gcp-logs-firehose
GOOGLE_APPLICATION_CREDENTIALS=/var/task/gcp-credential-config.json
MAX_MESSAGES=500

타임아웃은 60초 이상, 메모리는 512MB부터 시작해 실측으로 조정한다.

9.4 트리거

EventBridge Scheduler로 1분 간격 호출:

aws scheduler create-schedule \
  --name gcp-pubsub-pull-1min \
  --schedule-expression "rate(1 minute)" \
  --flexible-time-window '{"Mode":"OFF"}' \
  --target '{
    "Arn":"arn:aws:lambda:ap-northeast-2:987654321098:function:gcp-pubsub-subscriber",
    "RoleArn":"arn:aws:iam::987654321098:role/EventBridgeInvokeLambda"
  }'

왜 Lambda를 1분 간격 폴링인가? Lambda를 무한 루프로 돌릴 수도 있지만 비용·관측 측면에서 정해진 주기 호출이 운영하기 쉽다. 메시지 폭주 구간엔 ECS Fargate로 streaming pull(상시 구독)을 돌리는 변형이 더 적합하다 — 다음 절 참고.


10. Step 5 — 고처리량 변형: ECS Fargate 기반 streaming pull

분당 수만 건 이상이거나 지연 SLA가 수 초 이내면 Lambda 폴링은 한계가 있다. 이때 같은 코드를 살짝 바꿔 ECS Task로 상시 구동한다.

핵심 변경점:

# 동기 pull 대신 streaming pull
def callback(message):
    try:
        entry = normalize(message.data)
        firehose.put_record(
            DeliveryStreamName=FIREHOSE_NAME,
            Record={"Data": (json.dumps(entry, ensure_ascii=False) + "\n").encode("utf-8")},
        )
        message.ack()
    except Exception as e:
        log.exception(e)
        message.nack()  # 즉시 재전달

streaming_pull_future = sub_client.subscribe(sub_path, callback=callback,
    flow_control=pubsub_v1.types.FlowControl(max_messages=1000))
streaming_pull_future.result()

ECS Task의 Task Role이 7.3의 attribute.aws_role 매핑 대상이 되도록 IAM Role 이름을 등록만 갱신하면 끝. Firehose batch 사용을 위해 작은 인메모리 버퍼(예: 100건 또는 1초)를 추가하면 처리량/비용 모두 좋아진다.


11. Step 6 — 신뢰성 보강

11.1 메시지 유실 방지 체크리스트

  • ✅ Cloud Logging Sink → Pub/Sub publisher 권한 정확히 부여
  • ✅ Pub/Sub message-retention-duration ≥ 운영팀 평균 복구시간 + 안전마진
  • ✅ Lambda는 Firehose 응답 성공한 메시지만 ack
  • ✅ Firehose의 S3BackupMode = AllData 또는 최소 FailedDataOnly 설정
  • ✅ Pub/Sub Dead Letter Topic + 그 토픽에 자체 알람용 subscription

11.2 중복 처리 (Idempotency)

Pub/Sub은 at-least-once다. 같은 로그가 두 번 들어올 수 있다. 우리 정규화 스키마에 insert_id(Cloud Logging이 부여하는 unique ID)를 보존했다는 점이 중요하다.

  • 분석 단계 중복 제거: OpenSearch에 적재할 때 _id = insert_id로 지정하면 동일 ID 재인덱싱 시 덮어쓰기로 끝난다.
  • S3 raw 적재: 중복은 허용. 분석 시점에 DISTINCT insert_id로 처리.

11.3 DLQ 모니터링

Pub/Sub DLQ에 별도 subscription을 만들고, 메시지 수가 0보다 크면 알람.

gcloud pubsub subscriptions create ${PUBSUB_DLQ}-monitor \
  --topic=$PUBSUB_DLQ \
  --message-retention-duration=7d

GCP Cloud Monitoring metric subscription/num_undelivered_messages > 0에 알람.

11.4 Backpressure

Lambda 변형은 1분마다 최대 500건 처리 → 분당 30,000건이 한계다. 이걸 넘기는 조건이 한 번이라도 보이면 ECS streaming pull로 옮기거나, Lambda 동시성을 늘리고 한 번 호출당 max_messages를 키워야 한다.

Pub/Sub은 버퍼로 버티지만 7일 retention을 넘으면 정말로 메시지가 사라진다. oldest_unacked_message_age 메트릭이 추세적으로 상승하면 즉시 대응.


12. Step 7 — 모니터링과 알람

다음 메트릭 4가지를 대시보드에 묶어둔다.

영역메트릭임계
GCP Pub/Subsubscription/num_undelivered_messages> 100K 5분 지속
GCP Pub/Subsubscription/oldest_unacked_message_age> 600s
GCP Pub/Subtopic/send_message_operation_count (DLQ)> 0
AWS LambdaErrors, Throttles, Duration p99Errors > 0 또는 p99 > 30s
AWS FirehoseIncomingRecords, DeliveryToS3.Records, DeliveryToS3.DataFreshnessDataFreshness > 300s
AWS S3BucketSizeBytes 일별 증가율평소 대비 ±50% 이탈

알람 라우팅은 SNS → PagerDuty/Opsgenie/Slack 중 하나로 통일.


13. 보안 점검 체크리스트

보안 아키텍트 관점에서 마지막으로 훑을 항목들.

인증/자격증명

  • GCP SA 키 JSON 발급 0건. gcloud iam service-accounts keys list로 확인
  • WIF Provider attribute condition으로 AWS 계정·역할명 화이트리스트
  • Lambda 실행 역할의 신뢰 정책에 lambda.amazonaws.com만 등록
  • GOOGLE_APPLICATION_CREDENTIALS 파일에 시크릿 미포함 확인

네트워크/암호화

  • Lambda가 VPC를 사용한다면 NAT Gateway만 거쳐 외부로 나가도록(Outbound IP 화이트리스트화)
  • S3 SSE-KMS, OpenSearch encryption-at-rest 활성
  • Firehose KMS 키 사용. 키 정책에서 Firehose 역할만 허용
  • TLS 1.2 이상 강제(S3 bucket policy의 aws:SecureTransport)

권한 최소화

  • GCP SA는 단일 subscription에 대한 roles/pubsub.subscriber
  • Lambda 역할은 Firehose 단일 스트림 PutRecord/PutRecordBatch만
  • Firehose 역할은 S3 단일 버킷 + KMS 키 + 자기 LogGroup만

감사

  • GCP IAM Policy 변경 알람 (Workload Identity Pool, SA, Subscription)
  • AWS CloudTrail에서 Lambda 함수·환경변수 변경 알람
  • S3 Object-level logging(또는 CloudTrail data event) 활성

유실/조작 방지

  • S3 Versioning + Object Lock(법적 보존이 필요하면)
  • Pub/Sub messageRetentionDuration 7일 명시
  • DLQ 비어 있는지 1일 1회 확인 잡 또는 알람

14. 비용 추정과 최적화

GCP 측:

  • Pub/Sub: 메시지 전송량(bytes) 기반. 첫 10GB/월 무료, 이후 $40/TiB 수준.
  • Cloud Logging: Sink 자체는 무료지만, 남기지 않을 로그를 사전에 exclusion filter로 거르면 Logging 자체 비용($0.50/GiB)이 줄어든다.

AWS 측:

  • Lambda: 1분 호출 × 30일 = 43,200 invocations/월. 메모리 512MB·평균 5초면 매우 저렴.
  • Firehose: $0.029/GB(첫 500TB 구간) + 다이내믹 파티셔닝 옵션 비용. GZIP 압축 80% 가정하면 1TB 원본이 200GB로 줄어 효율↑.
  • S3: 압축된 GZIP을 Standard에 30일, IA에 90일, Deep Archive로 보내는 라이프사이클이 가장 저렴.

비용 최적화 3원칙:

  1. Sink 필터로 들어오는 양을 줄여라 — 가장 효과 큼.
  2. Firehose 압축은 무조건 GZIP — 저장·전송·OpenSearch 색인 모두에서 이득.
  3. OpenSearch에는 핫 데이터만, 그 외엔 S3 + Athena — OpenSearch는 비싼 저장소다.

15. 트러블슈팅 가이드

증상원인 후보점검
Lambda 401/403, GCP SA impersonation 실패WIF principalSet의 role 이름 불일치, Lambda 역할 이름 변경gcloud iam service-accounts get-iam-policy $GCP_SA
pulled 0 무한 반복인데 Logging은 들어옴Sink Writer가 Topic publisher 권한 미보유Sink writerIdentity의 IAM 바인딩 확인
Firehose DeliveryToS3.DataFreshness 폭증S3 권한, KMS 키 정책Firehose 콘솔의 Error Logs
같은 메시지 무한 재시도normalize 단계 영구 실패. ack 안 됨DLQ로 빠지는지 확인. 5회 후 DLQ 가야 정상
비용 급증신규 Audit Log 카테고리 활성화 또는 디버그 로그 폭주Sink의 --log-filter를 더 엄격히
oldest_unacked_message_age 상승AWS 측 처리량 부족Lambda → ECS streaming으로 전환

16. 마무리

GCP → AWS 로그 통합 파이프라인은 “로그를 보낸다”는 한 줄 요구사항 뒤에, 인증의 정적 키 제거, 큐 기반 버퍼링, 이중 DLQ, 압축·파티셔닝·라이프사이클까지 챙겨야 비로소 프로덕션이라 부를 수 있다. 이 글에서 다룬 패턴은 다음 한 문장으로 요약된다.

Cloud Logging이 Pub/Sub에 적재하고, AWS Lambda가 Workload Identity Federation으로 GCP를 인증해 메시지를 끌어와 Firehose에 떨군다. Firehose는 S3에 GZIP NDJSON으로 적재하고, 동시에 OpenSearch로 보내 분석한다. 모든 단계는 자체 DLQ와 메트릭 알람을 가진다.

이 구조는 처리량이 늘면 Lambda를 ECS streaming pull로 교체하는 것만으로 수십 배 확장된다. 작게 시작하고 — 그러나 보안과 신뢰성 디테일은 처음부터 빠뜨리지 말 것. 운영하면서 가장 비싸지는 것은 인입량이 아니라, 사라진 한 건의 로그가 사고 분석에서 만들어내는 공백이다.

profile
이군의 보안, 그리고 생각을 다룹니다.

0개의 댓글