[AWS] Amplify와 Instance Scheduler로 구현하는 확장성과 비용 최적화의 기술

궁금하면 500원·2024년 8월 31일

데브옵스

목록 보기
8/37

1. Amplify를 활용한 확장 가능한 풀스택 애플리케이션 구축

1.1 서버리스 인증 시스템 구현

Amplify의 Auth 카테고리를 활용하여 소셜 로그인과 MFA를 구현해보겠습니다.

// src/auth/config.js
export const authConfig = {
  region: 'ap-northeast-2',
  userPoolId: 'ap-northeast-2_xxxxxxxx',
  userPoolWebClientId: 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
  mandatorySignIn: true,
  signUpConfig: {
    hideAllDefaults: true,
    signUpFields: [
      {
        label: '이메일',
        key: 'email',
        required: true,
        type: 'email',
        custom: false,
      },
      {
        label: '비밀번호',
        key: 'password',
        required: true,
        type: 'password',
        custom: false,
      },
      {
        label: '전화번호',
        key: 'phone_number',
        required: true,
        type: 'string',
        custom: false,
      }
    ]
  }
};

// src/auth/AuthProvider.js
import { Auth } from 'aws-amplify';
import { createContext, useContext, useState, useEffect } from 'react';

const AuthContext = createContext(null);

export const AuthProvider = ({ children }) => {
  const [user, setUser] = useState(null);
  const [loading, setLoading] = useState(true);

  useEffect(() => {
    checkUser();
  }, []);

  async function checkUser() {
    try {
      const userData = await Auth.currentAuthenticatedUser();
      setUser(userData);
    } catch (err) {
      setUser(null);
    }
    setLoading(false);
  }

  const signIn = async (email, password) => {
    try {
      const user = await Auth.signIn(email, password);
      if (user.challengeName === 'SMS_MFA') {
        // MFA 처리 로직
        return { needsMFA: true, user };
      }
      setUser(user);
      return { success: true };
    } catch (error) {
      throw new Error(error.message);
    }
  };

  // 컨텍스트 값 제공
  const value = {
    user,
    signIn,
    // 기타 인증 관련 메서드들...
  };

  return (
    <AuthContext.Provider value={value}>
      {!loading && children}
    </AuthContext.Provider>
  );
};

1.2 실시간 데이터 동기화 구현

Amplify의 API (AppSync) 기능을 활용하여 실시간 채팅 시스템을 구현해보겠습니다.

# schema.graphql
type Message @model @auth(rules: [{allow: owner}]) {
  id: ID!
  content: String!
  createdAt: AWSDateTime!
  roomId: ID! @index(name: "byRoom")
  owner: String!
}

type Subscription {
  onCreateMessageByRoomId(roomId: ID!): Message
    @aws_subscribe(mutations: ["createMessage"])
}
// src/graphql/mutations.js
import { API, graphqlOperation } from 'aws-amplify';

export const createMessage = /* GraphQL */ `
  mutation CreateMessage(
    $input: CreateMessageInput!
  ) {
    createMessage(input: $input) {
      id
      content
      createdAt
      roomId
      owner
    }
  }
`;

// src/components/ChatRoom.js
import { useEffect, useState } from 'react';
import { API, graphqlOperation } from 'aws-amplify';
import { onCreateMessageByRoomId } from '../graphql/subscriptions';

const ChatRoom = ({ roomId }) => {
  const [messages, setMessages] = useState([]);

  useEffect(() => {
    // 구독 설정
    const subscription = API.graphql(
      graphqlOperation(onCreateMessageByRoomId, { roomId })
    ).subscribe({
      next: ({ value }) => {
        setMessages(msgs => [...msgs, value.data.onCreateMessageByRoomId]);
      },
      error: error => console.warn(error)
    });

    return () => subscription.unsubscribe();
  }, [roomId]);

  const sendMessage = async (content) => {
    try {
      await API.graphql(graphqlOperation(createMessage, {
        input: {
          content,
          roomId,
          createdAt: new Date().toISOString()
        }
      }));
    } catch (err) {
      console.error('메시지 전송 실패:', err);
    }
  };

  return (
    <div className="chat-container">
      {/* 채팅 UI 구현 */}
    </div>
  );
};

2. Instance Scheduler 고급 활용

2.1 크로스 계정 스케줄링 구현

여러 AWS 계정에 걸쳐 있는 리소스들을 통합 관리하는 방법을 구현해보겠습니다.

# lambda_function.py
import boto3
import json
from datetime import datetime, timezone
import os

def assume_role(account_id, role_name):
    sts_client = boto3.client('sts')
    assumed_role_object = sts_client.assume_role(
        RoleArn=f"arn:aws:iam::{account_id}:role/{role_name}",
        RoleSessionName="CrossAccountScheduler"
    )
    
    credentials = assumed_role_object['Credentials']
    return boto3.client(
        'ec2',
        aws_access_key_id=credentials['AccessKeyId'],
        aws_secret_access_key=credentials['SecretAccessKey'],
        aws_session_token=credentials['SessionToken']
    )

def lambda_handler(event, context):
    # DynamoDB에서 스케줄 정보 조회
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(os.environ['SCHEDULE_TABLE'])
    
    current_time = datetime.now(timezone.utc)
    
    response = table.scan()
    schedules = response['Items']
    
    for schedule in schedules:
        if should_execute_schedule(schedule, current_time):
            account_id = schedule['account_id']
            region = schedule['region']
            
            # 크로스 계정 권한 획득
            ec2_client = assume_role(account_id, 'InstanceSchedulerRole')
            
            # 인스턴스 관리
            manage_instances(ec2_client, schedule)

def should_execute_schedule(schedule, current_time):
    # 스케줄 실행 조건 확인 로직
    schedule_time = datetime.strptime(
        schedule['execution_time'], 
        '%H:%M'
    ).time()
    
    return (
        current_time.time().hour == schedule_time.hour and
        current_time.time().minute == schedule_time.minute
    )

def manage_instances(ec2_client, schedule):
    try:
        instances = ec2_client.describe_instances(
            Filters=[
                {
                    'Name': 'tag:Environment',
                    'Values': [schedule['environment']]
                }
            ]
        )
        
        instance_ids = []
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                instance_ids.append(instance['InstanceId'])
        
        if schedule['action'] == 'start':
            ec2_client.start_instances(InstanceIds=instance_ids)
        elif schedule['action'] == 'stop':
            ec2_client.stop_instances(InstanceIds=instance_ids)
            
    except Exception as e:
        print(f"Error managing instances: {str(e)}")

2.2 비용 분석 및 리포팅 시스템

Instance Scheduler를 통해 절감된 비용을 분석하고 리포트를 생성하는 시스템을 구현해보겠습니다.

# cost_analyzer.py
import boto3
import pandas as pd
from datetime import datetime, timedelta

def get_cost_and_usage(start_date, end_date):
    client = boto3.client('ce')
    
    response = client.get_cost_and_usage(
        TimePeriod={
            'Start': start_date,
            'End': end_date
        },
        Granularity='DAILY',
        Metrics=['UnblendedCost'],
        GroupBy=[
            {'Type': 'DIMENSION', 'Key': 'SERVICE'},
            {'Type': 'TAG', 'Key': 'Environment'}
        ]
    )
    
    return response['ResultsByTime']

def generate_cost_report():
    end_date = datetime.now().strftime('%Y-%m-%d')
    start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
    
    cost_data = get_cost_and_usage(start_date, end_date)
    
    # 데이터 정제
    records = []
    for daily_cost in cost_data:
        date = daily_cost['TimePeriod']['Start']
        for group in daily_cost['Groups']:
            service = group['Keys'][0]
            environment = group['Keys'][1].split('$')[-1]
            cost = float(group['Metrics']['UnblendedCost']['Amount'])
            
            records.append({
                'Date': date,
                'Service': service,
                'Environment': environment,
                'Cost': cost
            })
    
    # DataFrame 생성
    df = pd.DataFrame(records)
    
    # 비용 분석
    total_cost = df['Cost'].sum()
    cost_by_service = df.groupby('Service')['Cost'].sum()
    cost_by_env = df.groupby('Environment')['Cost'].sum()
    
    # HTML 리포트 생성
    html_report = f"""
    <html>
        <head>
            <style>
                table {{ border-collapse: collapse; width: 100%; }}
                th, td {{ border: 1px solid black; padding: 8px; text-align: left; }}
                th {{ background-color: #f2f2f2; }}
            </style>
        </head>
        <body>
            <h1>AWS 비용 분석 리포트</h1>
            <h2>기간: {start_date} ~ {end_date}</h2>
            <h3>총 비용: ${total_cost:.2f}</h3>
            
            <h3>서비스별 비용</h3>
            {cost_by_service.to_frame().to_html()}
            
            <h3>환경별 비용</h3>
            {cost_by_env.to_frame().to_html()}
        </body>
    </html>
    """
    
    return html_report

3. 실전 시나리오와 문제 해결

3.1 대규모 배치 작업 스케줄링

여러 리소스를 동시에 관리해야 하는 경우의 처리 방법입니다.

# batch_scheduler.py
import boto3
import json
from concurrent.futures import ThreadPoolExecutor
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def process_instance_batch(instances, action):
    ec2 = boto3.client('ec2')
    instance_ids = [i['InstanceId'] for i in instances]
    
    try:
        if action == 'start':
            response = ec2.start_instances(InstanceIds=instance_ids)
        elif action == 'stop':
            response = ec2.stop_instances(InstanceIds=instance_ids)
            
        logger.info(f"Successfully processed instances: {instance_ids}")
        return response
    except Exception as e:
        logger.error(f"Error processing instances {instance_ids}: {str(e)}")
        return None

def lambda_handler(event, context):
    max_workers = 10  # 동시 처리할 최대 작업 수
    batch_size = 20   # 각 배치당 처리할 인스턴스 수
    
    ec2 = boto3.client('ec2')
    
    # 대상 인스턴스 조회
    instances = []
    paginator = ec2.get_paginator('describe_instances')
    for page in paginator.paginate(
        Filters=[
            {
                'Name': 'tag:Scheduler',
                'Values': ['true']
            }
        ]
    ):
        for reservation in page['Reservations']:
            instances.extend(reservation['Instances'])
    
    # 배치 처리
    batches = [
        instances[i:i + batch_size] 
        for i in range(0, len(instances), batch_size)
    ]
    
    action = event.get('action', 'stop')
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(process_instance_batch, batch, action)
            for batch in batches
        ]
        
        results = [f.result() for f in futures]
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': f'Processed {len(instances)} instances',
            'successful': len([r for r in results if r is not None])
        })
    }

이 코드들은 실제 프로덕션 환경에서 활용할 수 있는 구체적인 예제들입니다.

특히 주목할만한 포인트들은

  • 인증 시스템: Cognito를 활용한 MFA 구현
  • 실시간 데이터: AppSync를 이용한 GraphQL 구독 기능
  • 크로스 계정 관리: STS 서비스를 이용한 역할 전환
  • 비용 최적화: Cost Explorer API를 활용한 상세 분석
  • 대규모 처리: ThreadPoolExecutor를 활용한 병렬 처리

3.2 장애 복구 및 모니터링 시스템

Instance Scheduler 운영 시 발생할 수 있는 장애 상황을 감지하고 대응하는 시스템을 구현해보겠습니다.

# monitoring_system.py
import boto3
import json
import os
from datetime import datetime, timedelta

class SchedulerMonitor:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.sns = boto3.client('sns')
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(os.environ['MONITORING_TABLE'])

    def check_scheduler_health(self):
        """스케줄러 상태 확인"""
        try:
            # 최근 실행 이력 확인
            response = self.table.query(
                KeyConditionExpression='status = :status',
                ExpressionAttributeValues={
                    ':status': 'COMPLETED',
                    ':time': (datetime.now() - timedelta(hours=1)).isoformat()
                },
                FilterExpression='execution_time > :time'
            )

            if not response['Items']:
                self._alert_scheduler_failure("최근 1시간 동안 완료된 스케줄 작업이 없습니다.")
                return False

            return True

        except Exception as e:
            self._alert_scheduler_failure(f"모니터링 중 오류 발생: {str(e)}")
            return False

    def record_execution(self, schedule_id, status, details=None):
        """스케줄 실행 결과 기록"""
        timestamp = datetime.now().isoformat()
        
        item = {
            'schedule_id': schedule_id,
            'status': status,
            'execution_time': timestamp,
            'details': details or {}
        }
        
        self.table.put_item(Item=item)
        
        # CloudWatch 메트릭 발행
        self.cloudwatch.put_metric_data(
            Namespace='InstanceScheduler',
            MetricData=[
                {
                    'MetricName': 'ScheduleExecution',
                    'Value': 1 if status == 'COMPLETED' else 0,
                    'Unit': 'Count',
                    'Dimensions': [
                        {
                            'Name': 'ScheduleId',
                            'Value': schedule_id
                        }
                    ]
                }
            ]
        )

    def _alert_scheduler_failure(self, message):
        """장애 알림 발송"""
        try:
            self.sns.publish(
                TopicArn=os.environ['ALERT_TOPIC_ARN'],
                Message=json.dumps({
                    'default': message,
                    'email': f"""
                        Instance Scheduler 장애 알림
                        
                        시간: {datetime.now().isoformat()}
                        내용: {message}
                        
                        즉시 확인이 필요합니다.
                    """
                }),
                MessageStructure='json'
            )
        except Exception as e:
            print(f"알림 발송 실패: {str(e)}")

3.3 리소스 복구 자동화 시스템

예기치 않은 상황으로 인스턴스가 중지되거나 시작되지 않을 때를 대비한 복구 시스템입니다.

# recovery_system.py
import boto3
import json
import logging
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

class InstanceRecovery:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.ssm = boto3.client('ssm')
        self.sns = boto3.client('sns')
        
    def verify_instance_state(self, instance_id, expected_state):
        """인스턴스 상태 확인"""
        try:
            response = self.ec2.describe_instances(InstanceIds=[instance_id])
            actual_state = response['Reservations'][0]['Instances'][0]['State']['Name']
            
            return actual_state == expected_state
        except Exception as e:
            logger.error(f"인스턴스 상태 확인 실패: {str(e)}")
            return False
    
    def recover_instance(self, instance_id, target_state):
        """인스턴스 복구 시도"""
        try:
            if target_state == 'running':
                # 시작 실패한 인스턴스 복구
                self.ec2.start_instances(InstanceIds=[instance_id])
                
                # 애플리케이션 상태 확인
                self._verify_application_health(instance_id)
                
            elif target_state == 'stopped':
                # 중지 실패한 인스턴스 강제 중지
                self.ec2.stop_instances(InstanceIds=[instance_id], Force=True)
            
            return True
            
        except Exception as e:
            logger.error(f"인스턴스 복구 실패: {str(e)}")
            self._send_alert(instance_id, str(e))
            return False
    
    def _verify_application_health(self, instance_id):
        """애플리케이션 헬스체크"""
        try:
            response = self.ssm.send_command(
                InstanceIds=[instance_id],
                DocumentName='AWS-RunShellScript',
                Parameters={
                    'commands': [
                        'systemctl is-active application.service',
                        'curl -f localhost:8080/health'
                    ]
                }
            )
            
            command_id = response['Command']['CommandId']
            
            # 결과 대기
            waiter = self.ssm.get_waiter('command_executed')
            waiter.wait(
                CommandId=command_id,
                InstanceId=instance_id
            )
            
        except Exception as e:
            logger.error(f"애플리케이션 상태 확인 실패: {str(e)}")
            raise
    
    def _send_alert(self, instance_id, error_message):
        """장애 알림 발송"""
        message = {
            'instance_id': instance_id,
            'error': error_message,
            'timestamp': datetime.now().isoformat(),
            'recovery_action': '수동 개입이 필요합니다.'
        }
        
        self.sns.publish(
            TopicArn='arn:aws:sns:region:account:topic',
            Message=json.dumps(message)
        )

3.4 비용 최적화 대시보드 구현

Instance Scheduler를 통한 비용 절감 효과를 시각화하는 대시보드를 React로 구현해보겠습니다.

// src/components/CostDashboard.js
import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import {
  LineChart,
  Line,
  XAxis,
  YAxis,
  CartesianGrid,
  Tooltip,
  Legend
} from 'recharts';

const CostDashboard = () => {
  const [costData, setCostData] = useState(null);
  const [loading, setLoading] = useState(true);
  const [error, setError] = useState(null);

  useEffect(() => {
    fetchCostData();
  }, []);

  const fetchCostData = async () => {
    try {
      const response = await API.get('costApi', '/costs');
      setCostData(processCostData(response.data));
    } catch (err) {
      setError('비용 데이터 로딩 실패');
      console.error(err);
    } finally {
      setLoading(false);
    }
  };

  const processCostData = (data) => {
    // 데이터 가공 로직
    return data.map(item => ({
      date: new Date(item.date).toLocaleDateString(),
      actual: parseFloat(item.actualCost),
      projected: parseFloat(item.projectedCost),
      savings: parseFloat(item.projectedCost - item.actualCost)
    }));
  };

  if (loading) return <div>데이터 로딩중...</div>;
  if (error) return <div>에러: {error}</div>;

  return (
    <div className="p-4">
      <h2 className="text-2xl font-bold mb-4">비용 절감 현황</h2>
      <div className="grid grid-cols-1 md:grid-cols-3 gap-4 mb-6">
        <div className="bg-white p-4 rounded shadow">
          <h3 className="text-lg font-semibold">이번 달 절감액</h3>
          <p className="text-2xl text-green-600">
            ${costData[costData.length - 1]?.savings.toFixed(2)}
          </p>
        </div>
        {/* 다른 요약 통계 카드들 */}
      </div>

      <div className="bg-white p-4 rounded shadow">
        <LineChart width={800} height={400} data={costData}>
          <CartesianGrid strokeDasharray="3 3" />
          <XAxis dataKey="date" />
          <YAxis />
          <Tooltip />
          <Legend />
          <Line
            type="monotone"
            dataKey="actual"
            stroke="#8884d8"
            name="실제 비용"
          />
          <Line
            type="monotone"
            dataKey="projected"
            stroke="#82ca9d"
            name="예상 비용"
          />
        </LineChart>
      </div>
    </div>
  );
};

export default CostDashboard;

3.5 주요 운영 포인트 및 모범 사례

1. 태그 기반 관리

  • 모든 리소스에 일관된 태그 정책 적용
  • 환경(Environment), 용도(Purpose), 소유자(Owner) 등 필수 태그 정의
  • 태그 기반으로 스케줄링 정책 적용

2. 장애 대응 전략

  • 실패한 스케줄링 작업에 대한 재시도 메커니즘 구현
  • 알림 우선순위 설정 (Critical/Warning/Info)
  • 자동 복구 가능한 상황과 수동 개입이 필요한 상황 구분

3. 모니터링 전략

  • CloudWatch 대시보드를 통한 실시간 모니터링
  • 주요 지표: 스케줄 성공률, 리소스 상태, 비용 절감액
  • 이상 징후 조기 발견을 위한 알림 임계값 설정

4. 보안 고려사항

  • IAM 역할 최소 권한 원칙 적용
  • 크로스 계정 접근 시 STS 임시 자격 증명 사용
  • 중요 정보는 AWS Secrets Manager 활용

이러한 구현을 통해 얻을 수 있는 이점

  • 안정적인 리소스 관리
  • 투명한 비용 관리
  • 효율적인 장애 대응
  • 확장 가능한 아키텍처

운영 시 주의사항

  • 프로덕션 환경 스케줄링은 충분한 테스트 후 적용
  • 중요 시스템의 경우 단계적 적용
  • 정기적인 설정 검토 및 업데이트

4. 엔터프라이즈급 확장 구현

4.1 멀티 리전 동기화 시스템

여러 리전에 걸쳐 있는 리소스들을 효율적으로 관리하는 시스템을 구현해보겠습니다.

# multi_region_sync.py
import boto3
import json
from datetime import datetime
from botocore.exceptions import ClientError

class MultiRegionScheduler:
    def __init__(self, primary_region, secondary_regions):
        self.primary_region = primary_region
        self.secondary_regions = secondary_regions
        self.dynamodb = boto3.resource('dynamodb')
        self.sns = boto3.client('sns')
        
    def sync_schedules(self):
        """모든 리전의 스케줄 동기화"""
        try:
            # 기본 리전에서 스케줄 데이터 가져오기
            primary_table = self.dynamodb.Table(f'InstanceScheduler-{self.primary_region}')
            schedules = primary_table.scan()['Items']
            
            # 보조 리전으로 복제
            for region in self.secondary_regions:
                self._replicate_to_region(region, schedules)
                
            return True
        except Exception as e:
            self._handle_sync_error(str(e))
            return False
            
    def _replicate_to_region(self, region, schedules):
        """특정 리전으로 스케줄 복제"""
        session = boto3.Session(region_name=region)
        ddb = session.resource('dynamodb')
        table = ddb.Table(f'InstanceScheduler-{region}')
        
        with table.batch_writer() as batch:
            for schedule in schedules:
                # 리전별 특성 반영
                schedule['region'] = region
                schedule['last_synced'] = datetime.now().isoformat()
                batch.put_item(Item=schedule)
                
    def _handle_sync_error(self, error_message):
        """동기화 오류 처리"""
        self.sns.publish(
            TopicArn=f'arn:aws:sns:{self.primary_region}:account:MultiRegionSync',
            Message=json.dumps({
                'error': error_message,
                'timestamp': datetime.now().isoformat(),
                'regions_affected': self.secondary_regions
            })
        )

4.2 고급 비용 예측 시스템

머신러닝을 활용하여 비용을 예측하고 최적의 스케줄을 추천하는 시스템입니다.

# cost_prediction.py
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
import boto3

class CostPredictor:
    def __init__(self):
        self.model = RandomForestRegressor(n_estimators=100)
        self.ce_client = boto3.client('ce')
        
    def fetch_historical_data(self, start_date, end_date):
        """비용 데이터 수집"""
        response = self.ce_client.get_cost_and_usage(
            TimePeriod={
                'Start': start_date,
                'End': end_date
            },
            Granularity='DAILY',
            Metrics=['UnblendedCost'],
            GroupBy=[
                {'Type': 'DIMENSION', 'Key': 'SERVICE'},
                {'Type': 'TAG', 'Key': 'Environment'}
            ]
        )
        
        return self._process_cost_data(response)
        
    def train_model(self, data):
        """예측 모델 학습"""
        features = ['hour', 'day_of_week', 'day_of_month', 'month',
                   'instance_count', 'previous_cost']
        
        X = data[features]
        y = data['cost']
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        self.model.fit(X_train, y_train)
        return self.model.score(X_test, y_test)
        
    def predict_costs(self, future_data):
        """미래 비용 예측"""
        predictions = self.model.predict(future_data)
        return predictions
        
    def _process_cost_data(self, raw_data):
        """원시 데이터 전처리"""
        records = []
        
        for item in raw_data['ResultsByTime']:
            date = pd.to_datetime(item['TimePeriod']['Start'])
            
            record = {
                'hour': date.hour,
                'day_of_week': date.dayofweek,
                'day_of_month': date.day,
                'month': date.month,
                'cost': float(item['Total']['UnblendedCost']['Amount'])
            }
            
            records.append(record)
            
        return pd.DataFrame(records)

4.3 고가용성 설계

시스템의 안정성을 높이기 위한 고가용성 아키텍처 구현입니다.

# high_availability.py
import boto3
from botocore.exceptions import ClientError
import json
import time
from datetime import datetime

class HAScheduler:
    def __init__(self):
        self.dynamodb = boto3.resource('dynamodb')
        self.lambda_client = boto3.client('lambda')
        self.cloudwatch = boto3.client('cloudwatch')
        
    def ensure_ha(self):
        """고가용성 보장"""
        try:
            # 리더 선출
            if self._elect_leader():
                self._process_as_leader()
            else:
                self._process_as_follower()
                
        except Exception as e:
            self._handle_failover(str(e))
            
    def _elect_leader(self):
        """리더 선출 프로세스"""
        table = self.dynamodb.Table('SchedulerLeaderLock')
        
        try:
            table.put_item(
                Item={
                    'lock_id': 'scheduler_leader',
                    'holder': self._get_instance_id(),
                    'timestamp': int(time.time()),
                    'ttl': int(time.time()) + 300  # 5분 TTL
                },
                ConditionExpression='attribute_not_exists(lock_id) OR ttl < :now',
                ExpressionAttributeValues={
                    ':now': int(time.time())
                }
            )
            return True
            
        except ClientError as e:
            if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                return False
            raise
            
    def _process_as_leader(self):
        """리더로서의 처리"""
        # 헬스체크 메트릭 발행
        self.cloudwatch.put_metric_data(
            Namespace='InstanceScheduler',
            MetricData=[{
                'MetricName': 'LeaderHealth',
                'Value': 1,
                'Unit': 'Count'
            }]
        )
        
        # 메인 스케줄링 로직 실행
        self._execute_scheduling()
        
    def _process_as_follower(self):
        """팔로워로서의 처리"""
        # 리더 모니터링
        self._monitor_leader_health()
        
    def _handle_failover(self, error):
        """장애 복구 처리"""
        # 장애 발생 시 자동 복구
        try:
            if self._should_trigger_failover():
                self._trigger_failover()
        except Exception as failover_error:
            self._notify_admin(str(failover_error))
            
    def _execute_scheduling(self):
        """실제 스케줄링 작업 수행"""
        try:
            # 스케줄 작업 실행
            response = self.lambda_client.invoke(
                FunctionName='SchedulerMain',
                InvocationType='Event'
            )
            
            return response['StatusCode'] == 202
            
        except Exception as e:
            self._notify_admin(f"스케줄링 실행 실패: {str(e)}")
            return False

4.4 확장 가능한 프론트엔드 구현

대규모 리소스 관리를 위한 React 기반의 관리 콘솔입니다.

// src/components/ResourceManager.js
import React, { useState, useEffect } from 'react';
import { API } from 'aws-amplify';
import { DataGrid } from '@mui/x-data-grid';

const ResourceManager = () => {
  const [resources, setResources] = useState([]);
  const [loading, setLoading] = useState(true);
  const [schedules, setSchedules] = useState([]);
  
  useEffect(() => {
    fetchResources();
    fetchSchedules();
  }, []);
  
  const fetchResources = async () => {
    try {
      const response = await API.get('resourceApi', '/resources');
      setResources(response.items);
    } catch (err) {
      console.error('리소스 로딩 실패:', err);
    } finally {
      setLoading(false);
    }
  };
  
  const fetchSchedules = async () => {
    try {
      const response = await API.get('scheduleApi', '/schedules');
      setSchedules(response.items);
    } catch (err) {
      console.error('스케줄 로딩 실패:', err);
    }
  };
  
  const columns = [
    { field: 'id', headerName: 'ID', width: 130 },
    { field: 'name', headerName: '리소스명', width: 200 },
    { field: 'type', headerName: '유형', width: 130 },
    { field: 'status', headerName: '상태', width: 130 },
    {
      field: 'schedule',
      headerName: '스케줄',
      width: 200,
      renderCell: (params) => (
        <select
          value={params.value}
          onChange={(e) => handleScheduleChange(params.row.id, e.target.value)}
        >
          <option value="">선택하세요</option>
          {schedules.map(schedule => (
            <option key={schedule.id} value={schedule.id}>
              {schedule.name}
            </option>
          ))}
        </select>
      )
    }
  ];
  
  const handleScheduleChange = async (resourceId, scheduleId) => {
    try {
      await API.put('resourceApi', `/resources/${resourceId}/schedule`, {
        body: { scheduleId }
      });
      fetchResources(); // 리소스 목록 새로고침
    } catch (err) {
      console.error('스케줄 변경 실패:', err);
    }
  };
  
  if (loading) {
    return <div>로딩 중...</div>;
  }
  
  return (
    <div style={{ height: 400, width: '100%' }}>
      <DataGrid
        rows={resources}
        columns={columns}
        pageSize={5}
        rowsPerPageOptions={[5]}
        checkboxSelection
        disableSelectionOnClick
      />
    </div>
  );
};

export default ResourceManager;

4.5 보안 강화 구현

리소스 접근 제어와 감사 로그 시스템입니다.

# security_manager.py
import boto3
import json
from datetime import datetime
import hashlib

class SecurityManager:
    def __init__(self):
        self.iam = boto3.client('iam')
        self.cloudtrail = boto3.client('cloudtrail')
        self.guardduty = boto3.client('guardduty')
        
    def audit_access(self, event):
        """접근 감사"""
        # 이벤트 해시 생성
        event_hash = self._create_event_hash(event)
        
        # CloudTrail 로그 검증
        trail_events = self._get_cloudtrail_events(event)
        
        # 의심스러운 활동 확인
        findings = self._check_guardduty_findings(event)
        
        return {
            'event_hash': event_hash,
            'trail_events': trail_events,
            'suspicious_activity': len(findings) > 0,
            'findings': findings
        }
        
    def _create_event_hash(self, event):
        """이벤트 무결성 검증을 위한 해시 생성"""
        event_string = json.dumps(event, sort_keys=True)
        return hashlib.sha256(event_string.encode()).hexdigest()
        
    def _get_cloudtrail_events(self, event):
        """CloudTrail 이벤트 조회"""
        response = self.cloudtrail.lookup_events(
            LookupAttributes=[
                {
                    'AttributeKey': 'EventId',
                    'AttributeValue': event['eventId']
                }
            ]
        )
        return response['Events']
        
    def _check_guardduty_findings(self, event):
        """GuardDuty 조회"""
        response = self.guardduty.list_findings(
            DetectorId='detector-id',  # 실제 DetectorId 필요
            FindingCriteria={
                'Criterion': {
                    'resource.instanceId': {
                        'Eq': [event.get('instanceId', '')]
                    }
                }
            }
        )
        return response['FindingIds']

이러한 구현들은 엔터프라이즈급 환경에서 Instance Scheduler를 안정적으로 운영하는데 필요한 핵심 컴포넌트들입니다.

  • 멀티 리전 지원으로 글로벌 서비스 가능
  • 머신러닝 기반 비용 예측으로 더 정확한 리소스 계획 수립
  • 고가용성 설계로 서비스 안정성 보장
  • 사용자 친화적인 관리 콘솔
  • 엄격한 보안 감사 및 모니터링

이러한 기능들은 대규모 AWS 환경에서 필수적인 요소들입니다.

코드는 실제 구현 가능한 형태로 작성되었으며, 필요에 따라 수정하여 사용할 수 있습니다.

profile
에러가 나도 괜찮아 — 그건 내가 배우고 있다는 증거야.

0개의 댓글