
Amplify의 Auth 카테고리를 활용하여 소셜 로그인과 MFA를 구현해보겠습니다.
// src/auth/config.js
export const authConfig = {
region: 'ap-northeast-2',
userPoolId: 'ap-northeast-2_xxxxxxxx',
userPoolWebClientId: 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
mandatorySignIn: true,
signUpConfig: {
hideAllDefaults: true,
signUpFields: [
{
label: '이메일',
key: 'email',
required: true,
type: 'email',
custom: false,
},
{
label: '비밀번호',
key: 'password',
required: true,
type: 'password',
custom: false,
},
{
label: '전화번호',
key: 'phone_number',
required: true,
type: 'string',
custom: false,
}
]
}
};
// src/auth/AuthProvider.js
import { Auth } from 'aws-amplify';
import { createContext, useContext, useState, useEffect } from 'react';
const AuthContext = createContext(null);
export const AuthProvider = ({ children }) => {
const [user, setUser] = useState(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
checkUser();
}, []);
async function checkUser() {
try {
const userData = await Auth.currentAuthenticatedUser();
setUser(userData);
} catch (err) {
setUser(null);
}
setLoading(false);
}
const signIn = async (email, password) => {
try {
const user = await Auth.signIn(email, password);
if (user.challengeName === 'SMS_MFA') {
// MFA 처리 로직
return { needsMFA: true, user };
}
setUser(user);
return { success: true };
} catch (error) {
throw new Error(error.message);
}
};
// 컨텍스트 값 제공
const value = {
user,
signIn,
// 기타 인증 관련 메서드들...
};
return (
<AuthContext.Provider value={value}>
{!loading && children}
</AuthContext.Provider>
);
};
Amplify의 API (AppSync) 기능을 활용하여 실시간 채팅 시스템을 구현해보겠습니다.
# schema.graphql
type Message @model @auth(rules: [{allow: owner}]) {
id: ID!
content: String!
createdAt: AWSDateTime!
roomId: ID! @index(name: "byRoom")
owner: String!
}
type Subscription {
onCreateMessageByRoomId(roomId: ID!): Message
@aws_subscribe(mutations: ["createMessage"])
}
// src/graphql/mutations.js
import { API, graphqlOperation } from 'aws-amplify';
export const createMessage = /* GraphQL */ `
mutation CreateMessage(
$input: CreateMessageInput!
) {
createMessage(input: $input) {
id
content
createdAt
roomId
owner
}
}
`;
// src/components/ChatRoom.js
import { useEffect, useState } from 'react';
import { API, graphqlOperation } from 'aws-amplify';
import { onCreateMessageByRoomId } from '../graphql/subscriptions';
const ChatRoom = ({ roomId }) => {
const [messages, setMessages] = useState([]);
useEffect(() => {
// 구독 설정
const subscription = API.graphql(
graphqlOperation(onCreateMessageByRoomId, { roomId })
).subscribe({
next: ({ value }) => {
setMessages(msgs => [...msgs, value.data.onCreateMessageByRoomId]);
},
error: error => console.warn(error)
});
return () => subscription.unsubscribe();
}, [roomId]);
const sendMessage = async (content) => {
try {
await API.graphql(graphqlOperation(createMessage, {
input: {
content,
roomId,
createdAt: new Date().toISOString()
}
}));
} catch (err) {
console.error('메시지 전송 실패:', err);
}
};
return (
<div className="chat-container">
{/* 채팅 UI 구현 */}
</div>
);
};
여러 AWS 계정에 걸쳐 있는 리소스들을 통합 관리하는 방법을 구현해보겠습니다.
# lambda_function.py
import boto3
import json
from datetime import datetime, timezone
import os
def assume_role(account_id, role_name):
sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role(
RoleArn=f"arn:aws:iam::{account_id}:role/{role_name}",
RoleSessionName="CrossAccountScheduler"
)
credentials = assumed_role_object['Credentials']
return boto3.client(
'ec2',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
def lambda_handler(event, context):
# DynamoDB에서 스케줄 정보 조회
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['SCHEDULE_TABLE'])
current_time = datetime.now(timezone.utc)
response = table.scan()
schedules = response['Items']
for schedule in schedules:
if should_execute_schedule(schedule, current_time):
account_id = schedule['account_id']
region = schedule['region']
# 크로스 계정 권한 획득
ec2_client = assume_role(account_id, 'InstanceSchedulerRole')
# 인스턴스 관리
manage_instances(ec2_client, schedule)
def should_execute_schedule(schedule, current_time):
# 스케줄 실행 조건 확인 로직
schedule_time = datetime.strptime(
schedule['execution_time'],
'%H:%M'
).time()
return (
current_time.time().hour == schedule_time.hour and
current_time.time().minute == schedule_time.minute
)
def manage_instances(ec2_client, schedule):
try:
instances = ec2_client.describe_instances(
Filters=[
{
'Name': 'tag:Environment',
'Values': [schedule['environment']]
}
]
)
instance_ids = []
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_ids.append(instance['InstanceId'])
if schedule['action'] == 'start':
ec2_client.start_instances(InstanceIds=instance_ids)
elif schedule['action'] == 'stop':
ec2_client.stop_instances(InstanceIds=instance_ids)
except Exception as e:
print(f"Error managing instances: {str(e)}")
Instance Scheduler를 통해 절감된 비용을 분석하고 리포트를 생성하는 시스템을 구현해보겠습니다.
# cost_analyzer.py
import boto3
import pandas as pd
from datetime import datetime, timedelta
def get_cost_and_usage(start_date, end_date):
client = boto3.client('ce')
response = client.get_cost_and_usage(
TimePeriod={
'Start': start_date,
'End': end_date
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'},
{'Type': 'TAG', 'Key': 'Environment'}
]
)
return response['ResultsByTime']
def generate_cost_report():
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
cost_data = get_cost_and_usage(start_date, end_date)
# 데이터 정제
records = []
for daily_cost in cost_data:
date = daily_cost['TimePeriod']['Start']
for group in daily_cost['Groups']:
service = group['Keys'][0]
environment = group['Keys'][1].split('$')[-1]
cost = float(group['Metrics']['UnblendedCost']['Amount'])
records.append({
'Date': date,
'Service': service,
'Environment': environment,
'Cost': cost
})
# DataFrame 생성
df = pd.DataFrame(records)
# 비용 분석
total_cost = df['Cost'].sum()
cost_by_service = df.groupby('Service')['Cost'].sum()
cost_by_env = df.groupby('Environment')['Cost'].sum()
# HTML 리포트 생성
html_report = f"""
<html>
<head>
<style>
table {{ border-collapse: collapse; width: 100%; }}
th, td {{ border: 1px solid black; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<h1>AWS 비용 분석 리포트</h1>
<h2>기간: {start_date} ~ {end_date}</h2>
<h3>총 비용: ${total_cost:.2f}</h3>
<h3>서비스별 비용</h3>
{cost_by_service.to_frame().to_html()}
<h3>환경별 비용</h3>
{cost_by_env.to_frame().to_html()}
</body>
</html>
"""
return html_report
여러 리소스를 동시에 관리해야 하는 경우의 처리 방법입니다.
# batch_scheduler.py
import boto3
import json
from concurrent.futures import ThreadPoolExecutor
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def process_instance_batch(instances, action):
ec2 = boto3.client('ec2')
instance_ids = [i['InstanceId'] for i in instances]
try:
if action == 'start':
response = ec2.start_instances(InstanceIds=instance_ids)
elif action == 'stop':
response = ec2.stop_instances(InstanceIds=instance_ids)
logger.info(f"Successfully processed instances: {instance_ids}")
return response
except Exception as e:
logger.error(f"Error processing instances {instance_ids}: {str(e)}")
return None
def lambda_handler(event, context):
max_workers = 10 # 동시 처리할 최대 작업 수
batch_size = 20 # 각 배치당 처리할 인스턴스 수
ec2 = boto3.client('ec2')
# 대상 인스턴스 조회
instances = []
paginator = ec2.get_paginator('describe_instances')
for page in paginator.paginate(
Filters=[
{
'Name': 'tag:Scheduler',
'Values': ['true']
}
]
):
for reservation in page['Reservations']:
instances.extend(reservation['Instances'])
# 배치 처리
batches = [
instances[i:i + batch_size]
for i in range(0, len(instances), batch_size)
]
action = event.get('action', 'stop')
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(process_instance_batch, batch, action)
for batch in batches
]
results = [f.result() for f in futures]
return {
'statusCode': 200,
'body': json.dumps({
'message': f'Processed {len(instances)} instances',
'successful': len([r for r in results if r is not None])
})
}
이 코드들은 실제 프로덕션 환경에서 활용할 수 있는 구체적인 예제들입니다.
특히 주목할만한 포인트들은
Instance Scheduler 운영 시 발생할 수 있는 장애 상황을 감지하고 대응하는 시스템을 구현해보겠습니다.
# monitoring_system.py
import boto3
import json
import os
from datetime import datetime, timedelta
class SchedulerMonitor:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(os.environ['MONITORING_TABLE'])
def check_scheduler_health(self):
"""스케줄러 상태 확인"""
try:
# 최근 실행 이력 확인
response = self.table.query(
KeyConditionExpression='status = :status',
ExpressionAttributeValues={
':status': 'COMPLETED',
':time': (datetime.now() - timedelta(hours=1)).isoformat()
},
FilterExpression='execution_time > :time'
)
if not response['Items']:
self._alert_scheduler_failure("최근 1시간 동안 완료된 스케줄 작업이 없습니다.")
return False
return True
except Exception as e:
self._alert_scheduler_failure(f"모니터링 중 오류 발생: {str(e)}")
return False
def record_execution(self, schedule_id, status, details=None):
"""스케줄 실행 결과 기록"""
timestamp = datetime.now().isoformat()
item = {
'schedule_id': schedule_id,
'status': status,
'execution_time': timestamp,
'details': details or {}
}
self.table.put_item(Item=item)
# CloudWatch 메트릭 발행
self.cloudwatch.put_metric_data(
Namespace='InstanceScheduler',
MetricData=[
{
'MetricName': 'ScheduleExecution',
'Value': 1 if status == 'COMPLETED' else 0,
'Unit': 'Count',
'Dimensions': [
{
'Name': 'ScheduleId',
'Value': schedule_id
}
]
}
]
)
def _alert_scheduler_failure(self, message):
"""장애 알림 발송"""
try:
self.sns.publish(
TopicArn=os.environ['ALERT_TOPIC_ARN'],
Message=json.dumps({
'default': message,
'email': f"""
Instance Scheduler 장애 알림
시간: {datetime.now().isoformat()}
내용: {message}
즉시 확인이 필요합니다.
"""
}),
MessageStructure='json'
)
except Exception as e:
print(f"알림 발송 실패: {str(e)}")
예기치 않은 상황으로 인스턴스가 중지되거나 시작되지 않을 때를 대비한 복구 시스템입니다.
# recovery_system.py
import boto3
import json
import logging
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class InstanceRecovery:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.ssm = boto3.client('ssm')
self.sns = boto3.client('sns')
def verify_instance_state(self, instance_id, expected_state):
"""인스턴스 상태 확인"""
try:
response = self.ec2.describe_instances(InstanceIds=[instance_id])
actual_state = response['Reservations'][0]['Instances'][0]['State']['Name']
return actual_state == expected_state
except Exception as e:
logger.error(f"인스턴스 상태 확인 실패: {str(e)}")
return False
def recover_instance(self, instance_id, target_state):
"""인스턴스 복구 시도"""
try:
if target_state == 'running':
# 시작 실패한 인스턴스 복구
self.ec2.start_instances(InstanceIds=[instance_id])
# 애플리케이션 상태 확인
self._verify_application_health(instance_id)
elif target_state == 'stopped':
# 중지 실패한 인스턴스 강제 중지
self.ec2.stop_instances(InstanceIds=[instance_id], Force=True)
return True
except Exception as e:
logger.error(f"인스턴스 복구 실패: {str(e)}")
self._send_alert(instance_id, str(e))
return False
def _verify_application_health(self, instance_id):
"""애플리케이션 헬스체크"""
try:
response = self.ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunShellScript',
Parameters={
'commands': [
'systemctl is-active application.service',
'curl -f localhost:8080/health'
]
}
)
command_id = response['Command']['CommandId']
# 결과 대기
waiter = self.ssm.get_waiter('command_executed')
waiter.wait(
CommandId=command_id,
InstanceId=instance_id
)
except Exception as e:
logger.error(f"애플리케이션 상태 확인 실패: {str(e)}")
raise
def _send_alert(self, instance_id, error_message):
"""장애 알림 발송"""
message = {
'instance_id': instance_id,
'error': error_message,
'timestamp': datetime.now().isoformat(),
'recovery_action': '수동 개입이 필요합니다.'
}
self.sns.publish(
TopicArn='arn:aws:sns:region:account:topic',
Message=json.dumps(message)
)
Instance Scheduler를 통한 비용 절감 효과를 시각화하는 대시보드를 React로 구현해보겠습니다.
// src/components/CostDashboard.js
import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import {
LineChart,
Line,
XAxis,
YAxis,
CartesianGrid,
Tooltip,
Legend
} from 'recharts';
const CostDashboard = () => {
const [costData, setCostData] = useState(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState(null);
useEffect(() => {
fetchCostData();
}, []);
const fetchCostData = async () => {
try {
const response = await API.get('costApi', '/costs');
setCostData(processCostData(response.data));
} catch (err) {
setError('비용 데이터 로딩 실패');
console.error(err);
} finally {
setLoading(false);
}
};
const processCostData = (data) => {
// 데이터 가공 로직
return data.map(item => ({
date: new Date(item.date).toLocaleDateString(),
actual: parseFloat(item.actualCost),
projected: parseFloat(item.projectedCost),
savings: parseFloat(item.projectedCost - item.actualCost)
}));
};
if (loading) return <div>데이터 로딩중...</div>;
if (error) return <div>에러: {error}</div>;
return (
<div className="p-4">
<h2 className="text-2xl font-bold mb-4">비용 절감 현황</h2>
<div className="grid grid-cols-1 md:grid-cols-3 gap-4 mb-6">
<div className="bg-white p-4 rounded shadow">
<h3 className="text-lg font-semibold">이번 달 절감액</h3>
<p className="text-2xl text-green-600">
${costData[costData.length - 1]?.savings.toFixed(2)}
</p>
</div>
{/* 다른 요약 통계 카드들 */}
</div>
<div className="bg-white p-4 rounded shadow">
<LineChart width={800} height={400} data={costData}>
<CartesianGrid strokeDasharray="3 3" />
<XAxis dataKey="date" />
<YAxis />
<Tooltip />
<Legend />
<Line
type="monotone"
dataKey="actual"
stroke="#8884d8"
name="실제 비용"
/>
<Line
type="monotone"
dataKey="projected"
stroke="#82ca9d"
name="예상 비용"
/>
</LineChart>
</div>
</div>
);
};
export default CostDashboard;
여러 리전에 걸쳐 있는 리소스들을 효율적으로 관리하는 시스템을 구현해보겠습니다.
# multi_region_sync.py
import boto3
import json
from datetime import datetime
from botocore.exceptions import ClientError
class MultiRegionScheduler:
def __init__(self, primary_region, secondary_regions):
self.primary_region = primary_region
self.secondary_regions = secondary_regions
self.dynamodb = boto3.resource('dynamodb')
self.sns = boto3.client('sns')
def sync_schedules(self):
"""모든 리전의 스케줄 동기화"""
try:
# 기본 리전에서 스케줄 데이터 가져오기
primary_table = self.dynamodb.Table(f'InstanceScheduler-{self.primary_region}')
schedules = primary_table.scan()['Items']
# 보조 리전으로 복제
for region in self.secondary_regions:
self._replicate_to_region(region, schedules)
return True
except Exception as e:
self._handle_sync_error(str(e))
return False
def _replicate_to_region(self, region, schedules):
"""특정 리전으로 스케줄 복제"""
session = boto3.Session(region_name=region)
ddb = session.resource('dynamodb')
table = ddb.Table(f'InstanceScheduler-{region}')
with table.batch_writer() as batch:
for schedule in schedules:
# 리전별 특성 반영
schedule['region'] = region
schedule['last_synced'] = datetime.now().isoformat()
batch.put_item(Item=schedule)
def _handle_sync_error(self, error_message):
"""동기화 오류 처리"""
self.sns.publish(
TopicArn=f'arn:aws:sns:{self.primary_region}:account:MultiRegionSync',
Message=json.dumps({
'error': error_message,
'timestamp': datetime.now().isoformat(),
'regions_affected': self.secondary_regions
})
)
머신러닝을 활용하여 비용을 예측하고 최적의 스케줄을 추천하는 시스템입니다.
# cost_prediction.py
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import boto3
class CostPredictor:
def __init__(self):
self.model = RandomForestRegressor(n_estimators=100)
self.ce_client = boto3.client('ce')
def fetch_historical_data(self, start_date, end_date):
"""비용 데이터 수집"""
response = self.ce_client.get_cost_and_usage(
TimePeriod={
'Start': start_date,
'End': end_date
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'},
{'Type': 'TAG', 'Key': 'Environment'}
]
)
return self._process_cost_data(response)
def train_model(self, data):
"""예측 모델 학습"""
features = ['hour', 'day_of_week', 'day_of_month', 'month',
'instance_count', 'previous_cost']
X = data[features]
y = data['cost']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
self.model.fit(X_train, y_train)
return self.model.score(X_test, y_test)
def predict_costs(self, future_data):
"""미래 비용 예측"""
predictions = self.model.predict(future_data)
return predictions
def _process_cost_data(self, raw_data):
"""원시 데이터 전처리"""
records = []
for item in raw_data['ResultsByTime']:
date = pd.to_datetime(item['TimePeriod']['Start'])
record = {
'hour': date.hour,
'day_of_week': date.dayofweek,
'day_of_month': date.day,
'month': date.month,
'cost': float(item['Total']['UnblendedCost']['Amount'])
}
records.append(record)
return pd.DataFrame(records)
시스템의 안정성을 높이기 위한 고가용성 아키텍처 구현입니다.
# high_availability.py
import boto3
from botocore.exceptions import ClientError
import json
import time
from datetime import datetime
class HAScheduler:
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.lambda_client = boto3.client('lambda')
self.cloudwatch = boto3.client('cloudwatch')
def ensure_ha(self):
"""고가용성 보장"""
try:
# 리더 선출
if self._elect_leader():
self._process_as_leader()
else:
self._process_as_follower()
except Exception as e:
self._handle_failover(str(e))
def _elect_leader(self):
"""리더 선출 프로세스"""
table = self.dynamodb.Table('SchedulerLeaderLock')
try:
table.put_item(
Item={
'lock_id': 'scheduler_leader',
'holder': self._get_instance_id(),
'timestamp': int(time.time()),
'ttl': int(time.time()) + 300 # 5분 TTL
},
ConditionExpression='attribute_not_exists(lock_id) OR ttl < :now',
ExpressionAttributeValues={
':now': int(time.time())
}
)
return True
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return False
raise
def _process_as_leader(self):
"""리더로서의 처리"""
# 헬스체크 메트릭 발행
self.cloudwatch.put_metric_data(
Namespace='InstanceScheduler',
MetricData=[{
'MetricName': 'LeaderHealth',
'Value': 1,
'Unit': 'Count'
}]
)
# 메인 스케줄링 로직 실행
self._execute_scheduling()
def _process_as_follower(self):
"""팔로워로서의 처리"""
# 리더 모니터링
self._monitor_leader_health()
def _handle_failover(self, error):
"""장애 복구 처리"""
# 장애 발생 시 자동 복구
try:
if self._should_trigger_failover():
self._trigger_failover()
except Exception as failover_error:
self._notify_admin(str(failover_error))
def _execute_scheduling(self):
"""실제 스케줄링 작업 수행"""
try:
# 스케줄 작업 실행
response = self.lambda_client.invoke(
FunctionName='SchedulerMain',
InvocationType='Event'
)
return response['StatusCode'] == 202
except Exception as e:
self._notify_admin(f"스케줄링 실행 실패: {str(e)}")
return False
대규모 리소스 관리를 위한 React 기반의 관리 콘솔입니다.
// src/components/ResourceManager.js
import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import { DataGrid } from '@mui/x-data-grid';
const ResourceManager = () => {
const [resources, setResources] = useState([]);
const [loading, setLoading] = useState(true);
const [schedules, setSchedules] = useState([]);
useEffect(() => {
fetchResources();
fetchSchedules();
}, []);
const fetchResources = async () => {
try {
const response = await API.get('resourceApi', '/resources');
setResources(response.items);
} catch (err) {
console.error('리소스 로딩 실패:', err);
} finally {
setLoading(false);
}
};
const fetchSchedules = async () => {
try {
const response = await API.get('scheduleApi', '/schedules');
setSchedules(response.items);
} catch (err) {
console.error('스케줄 로딩 실패:', err);
}
};
const columns = [
{ field: 'id', headerName: 'ID', width: 130 },
{ field: 'name', headerName: '리소스명', width: 200 },
{ field: 'type', headerName: '유형', width: 130 },
{ field: 'status', headerName: '상태', width: 130 },
{
field: 'schedule',
headerName: '스케줄',
width: 200,
renderCell: (params) => (
<select
value={params.value}
onChange={(e) => handleScheduleChange(params.row.id, e.target.value)}
>
<option value="">선택하세요</option>
{schedules.map(schedule => (
<option key={schedule.id} value={schedule.id}>
{schedule.name}
</option>
))}
</select>
)
}
];
const handleScheduleChange = async (resourceId, scheduleId) => {
try {
await API.put('resourceApi', `/resources/${resourceId}/schedule`, {
body: { scheduleId }
});
fetchResources(); // 리소스 목록 새로고침
} catch (err) {
console.error('스케줄 변경 실패:', err);
}
};
if (loading) {
return <div>로딩 중...</div>;
}
return (
<div style={{ height: 400, width: '100%' }}>
<DataGrid
rows={resources}
columns={columns}
pageSize={5}
rowsPerPageOptions={[5]}
checkboxSelection
disableSelectionOnClick
/>
</div>
);
};
export default ResourceManager;
리소스 접근 제어와 감사 로그 시스템입니다.
# security_manager.py
import boto3
import json
from datetime import datetime
import hashlib
class SecurityManager:
def __init__(self):
self.iam = boto3.client('iam')
self.cloudtrail = boto3.client('cloudtrail')
self.guardduty = boto3.client('guardduty')
def audit_access(self, event):
"""접근 감사"""
# 이벤트 해시 생성
event_hash = self._create_event_hash(event)
# CloudTrail 로그 검증
trail_events = self._get_cloudtrail_events(event)
# 의심스러운 활동 확인
findings = self._check_guardduty_findings(event)
return {
'event_hash': event_hash,
'trail_events': trail_events,
'suspicious_activity': len(findings) > 0,
'findings': findings
}
def _create_event_hash(self, event):
"""이벤트 무결성 검증을 위한 해시 생성"""
event_string = json.dumps(event, sort_keys=True)
return hashlib.sha256(event_string.encode()).hexdigest()
def _get_cloudtrail_events(self, event):
"""CloudTrail 이벤트 조회"""
response = self.cloudtrail.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'EventId',
'AttributeValue': event['eventId']
}
]
)
return response['Events']
def _check_guardduty_findings(self, event):
"""GuardDuty 조회"""
response = self.guardduty.list_findings(
DetectorId='detector-id', # 실제 DetectorId 필요
FindingCriteria={
'Criterion': {
'resource.instanceId': {
'Eq': [event.get('instanceId', '')]
}
}
}
)
return response['FindingIds']
이러한 구현들은 엔터프라이즈급 환경에서 Instance Scheduler를 안정적으로 운영하는데 필요한 핵심 컴포넌트들입니다.
이러한 기능들은 대규모 AWS 환경에서 필수적인 요소들입니다.
코드는 실제 구현 가능한 형태로 작성되었으며, 필요에 따라 수정하여 사용할 수 있습니다.