[aws] 실 운영 온프레미스 EC2 환경 구축 하기

궁금하면 500원·2024년 12월 24일
0

데브옵스

목록 보기
25/36

1. 하이브리드 클라우드 아키텍처 설계

1.1 온프레미스와 AWS 연동 구성

온프레미스 환경에서 AWS 서비스를 효과적으로 활용하기 위한 네트워크 구성이 필수적입니다.

Site-to-Site VPN이나 Direct Connect를 통해 안전하고 안정적인 연결을 구축해야 합니다.

# AWS VPN 연결 상태 모니터링 스크립트
import boto3
import time
from datetime import datetime

def monitor_vpn_connection():
    ec2 = boto3.client('ec2', region_name='ap-northeast-2')
    
    while True:
        try:
            # VPN 연결 상태 확인
            vpn_connections = ec2.describe_vpn_connections()
            
            for vpn in vpn_connections['VpnConnections']:
                connection_id = vpn['VpnConnectionId']
                state = vpn['State']
                
                print(f"[{datetime.now()}] VPN Connection {connection_id}: {state}")
                
                # 연결이 다운된 경우 알림 발송
                if state != 'available':
                    send_alert(f"VPN Connection {connection_id} is {state}")
                    
        except Exception as e:
            print(f"Error monitoring VPN: {str(e)}")
            
        time.sleep(300)  # 5분마다 체크

def send_alert(message):
    sns = boto3.client('sns', region_name='ap-northeast-2')
    
    response = sns.publish(
        TopicArn='arn:aws:sns:ap-northeast-2:123456789012:vpn-alerts',
        Message=message,
        Subject='VPN Connection Alert'
    )

1.2 리소스 동기화 전략

온프레미스와 AWS 환경 간의 리소스 동기화를 위한 자동화된 시스템을 구축합니다.

# 리소스 동기화 매니저
import boto3
import json
import logging
from datetime import datetime

class ResourceSyncManager:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.s3 = boto3.client('s3')
        self.logger = logging.getLogger()
        
    def sync_instance_metadata(self, instance_id, metadata_file):
        try:
            # EC2 인스턴스 정보 가져오기
            response = self.ec2.describe_instances(
                InstanceIds=[instance_id]
            )
            
            instance_data = response['Reservations'][0]['Instances'][0]
            
            # 메타데이터 저장
            metadata = {
                'InstanceId': instance_id,
                'InstanceType': instance_data['InstanceType'],
                'State': instance_data['State']['Name'],
                'LaunchTime': instance_data['LaunchTime'].isoformat(),
                'Tags': instance_data.get('Tags', []),
                'SyncTime': datetime.now().isoformat()
            }
            
            # S3에 메타데이터 저장
            self.s3.put_object(
                Bucket='my-sync-bucket',
                Key=f'metadata/{metadata_file}',
                Body=json.dumps(metadata)
            )
            
            self.logger.info(f"Successfully synced metadata for instance {instance_id}")
            
        except Exception as e:
            self.logger.error(f"Error syncing metadata: {str(e)}")
            raise

2. 재해 복구 전략 구현

2.1 자동화된 백업 시스템

# 자동 백업 매니저
import boto3
import schedule
import time
from datetime import datetime

class BackupManager:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.sns = boto3.client('sns')
        
    def create_ami_backup(self, instance_id):
        try:
            # AMI 생성
            timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
            ami_name = f"backup-{instance_id}-{timestamp}"
            
            response = self.ec2.create_image(
                InstanceId=instance_id,
                Name=ami_name,
                Description=f"Automated backup for {instance_id}",
                NoReboot=True
            )
            
            ami_id = response['ImageId']
            
            # AMI에 태그 추가
            self.ec2.create_tags(
                Resources=[ami_id],
                Tags=[
                    {
                        'Key': 'Name',
                        'Value': ami_name
                    },
                    {
                        'Key': 'AutoBackup',
                        'Value': 'true'
                    }
                ]
            )
            
            # 백업 완료 알림
            self.send_backup_notification(instance_id, ami_id)
            
            return ami_id
            
        except Exception as e:
            print(f"Error creating backup: {str(e)}")
            raise
            
    def send_backup_notification(self, instance_id, ami_id):
        message = f"Backup completed for instance {instance_id}\nAMI ID: {ami_id}"
        
        self.sns.publish(
            TopicArn='arn:aws:sns:ap-northeast-2:123456789012:backup-notifications',
            Message=message,
            Subject='Backup Completion Notice'
        )

2.2 복구 자동화 시스템

# 복구 자동화 매니저
class RecoveryManager:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        
    def initiate_recovery(self, failed_instance_id):
        try:
            # 최신 백업 AMI 찾기
            images = self.ec2.describe_images(
                Filters=[
                    {
                        'Name': 'tag:AutoBackup',
                        'Values': ['true']
                    }
                ],
                Owners=['self']
            )
            
            # 최신 AMI 선택
            latest_ami = sorted(
                images['Images'],
                key=lambda x: x['CreationDate'],
                reverse=True
            )[0]
            
            # 새 인스턴스 시작
            response = self.ec2.run_instances(
                ImageId=latest_ami['ImageId'],
                InstanceType='t3.medium',  # 원하는 인스턴스 타입으로 변경
                MinCount=1,
                MaxCount=1,
                SecurityGroupIds=['sg-xxxxxxxx'],  # 보안 그룹 ID
                SubnetId='subnet-xxxxxxxx',  # 서브넷 ID
                TagSpecifications=[
                    {
                        'ResourceType': 'instance',
                        'Tags': [
                            {
                                'Key': 'Name',
                                'Value': f'Recovered-{failed_instance_id}'
                            }
                        ]
                    }
                ]
            )
            
            return response['Instances'][0]['InstanceId']
            
        except Exception as e:
            print(f"Error during recovery: {str(e)}")
            raise

3. 데이터베이스 마이그레이션 자동화

3.1 DMS 태스크 자동화

# DMS 태스크 매니저
class DMSTaskManager:
    def __init__(self):
        self.dms = boto3.client('dms')
        
    def create_replication_task(self, source_endpoint_arn, target_endpoint_arn, replication_instance_arn):
        try:
            response = self.dms.create_replication_task(
                ReplicationTaskIdentifier=f'task-{int(time.time())}',
                SourceEndpointArn=source_endpoint_arn,
                TargetEndpointArn=target_endpoint_arn,
                ReplicationInstanceArn=replication_instance_arn,
                MigrationType='full-load-and-cdc',
                TableMappings=json.dumps({
                    'rules': [
                        {
                            'rule-type': 'selection',
                            'rule-id': '1',
                            'rule-name': '1',
                            'object-locator': {
                                'schema-name': '%',
                                'table-name': '%'
                            },
                            'rule-action': 'include'
                        }
                    ]
                }),
                ReplicationTaskSettings=json.dumps({
                    'TargetMetadata': {
                        'TargetSchema': '',
                        'SupportLobs': True,
                        'FullLobMode': False,
                        'LobChunkSize': 64,
                        'LimitedSizeLobMode': True,
                        'LobMaxSize': 32
                    },
                    'FullLoadSettings': {
                        'TargetTablePrepMode': 'DROP_AND_CREATE',
                        'CreatePkAfterFullLoad': False,
                        'StopTaskCachedChangesApplied': False,
                        'StopTaskCachedChangesNotApplied': False,
                        'MaxFullLoadSubTasks': 8,
                        'TransactionConsistencyTimeout': 600,
                        'CommitRate': 10000
                    },
                    'Logging': {
                        'EnableLogging': True
                    }
                })
            )
            
            return response['ReplicationTask']['ReplicationTaskArn']
            
        except Exception as e:
            print(f"Error creating DMS task: {str(e)}")
            raise

4. 모니터링 및 알림 시스템

4.1 통합 모니터링 시스템

# 통합 모니터링 매니저
class MonitoringManager:
    def __init__(self):
        self.cloudwatch = boto3.client('cloudwatch')
        self.sns = boto3.client('sns')
        
    def monitor_resources(self):
        try:
            # EC2 인스턴스 CPU 사용률 모니터링
            response = self.cloudwatch.get_metric_statistics(
                Namespace='AWS/EC2',
                MetricName='CPUUtilization',
                Dimensions=[
                    {
                        'Name': 'InstanceId',
                        'Value': 'i-1234567890abcdef0'
                    },
                ],
                StartTime=datetime.utcnow() - timedelta(minutes=30),
                EndTime=datetime.utcnow(),
                Period=300,
                Statistics=['Average']
            )
            
            # 임계값 확인
            for datapoint in response['Datapoints']:
                if datapoint['Average'] > 80:  # CPU 사용률 80% 초과
                    self.send_alert('High CPU Usage Alert', f"CPU Usage: {datapoint['Average']}%")
                    
        except Exception as e:
            print(f"Error monitoring resources: {str(e)}")
            raise
            
    def send_alert(self, subject, message):
        try:
            self.sns.publish(
                TopicArn='arn:aws:sns:ap-northeast-2:123456789012:monitoring-alerts',
                Message=message,
                Subject=subject
            )
        except Exception as e:
            print(f"Error sending alert: {str(e)}")
            raise

5. 운영 자동화 스크립트

5.1 일괄 작업 처리 시스템

# 배치 작업 매니저
class BatchJobManager:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.logger = logging.getLogger()
        
    def start_instances(self, tag_name, tag_value):
        try:
            instances = self.ec2.describe_instances(
                Filters=[
                    {
                        'Name': f'tag:{tag_name}',
                        'Values': [tag_value]
                    }
                ]
            )
            
            instance_ids = []
            for reservation in instances['Reservations']:
                for instance in reservation['Instances']:
                    instance_ids.append(instance['InstanceId'])
                    
            if instance_ids:
                self.ec2.start_instances(InstanceIds=instance_ids)
                self.logger.info(f"Started instances: {instance_ids}")
                
        except Exception as e:
            self.logger.error(f"Error starting instances: {str(e)}")
            raise
            
    def stop_instances(self, tag_name, tag_value):
        try:
            instances = self.ec2.describe_instances(
                Filters=[
                    {
                        'Name': f'tag:{tag_name}',
                        'Values': [tag_value]
                    }
                ]
            )
            
            instance_ids = []
            for reservation in instances['Reservations']:
                for instance in reservation['Instances']:
                    instance_ids.append(instance['InstanceId'])
                    
            if instance_ids:
                self.ec2.stop_instances(InstanceIds=instance_ids)
                self.logger.info(f"Stopped instances: {instance_ids}")
                
        except Exception as e:
            self.logger.error(f"Error stopping instances: {str(e)}")
            raise

이 코드들은 실제 운영 환경에서 사용할 수 있도록 작성되고 있습니다.
각각의 기능은 다음과 같은 목적으로 활용될 수 있습니다.

  • VPN 모니터링: 온프레미스와 AWS 간의 연결 상태를 지속적으로 모니터링
  • 리소스 동기화: 온프레미스와 클라우드 환경의 리소스 메타데이터 동기화
  • 백업 및 복구: 자동화된 백업 생성 및 장애 발생 시 복구 프로세스 자동화
  • DMS 작업 관리: 데이터베이스 마이그레이션 작업 자동화
  • 모니터링 시스템: 통합된 모니터링 및 알림 시스템
  • 배치 작업 관리: 다수의 인스턴스에 대한 일괄 작업 처리

이러한 코드들은 실제 서비스 환경에서 이런순으로 운영되고 있습니다.

6. CI/CD 파이프라인 구축

6.1 하이브리드 환경을 위한 Jenkins 파이프라인

// Jenkinsfile 예시
pipeline {
    agent any
    
    environment {
        AWS_REGION = 'ap-northeast-2'
        ONPREM_ENV = 'prod'
    }
    
    stages {
        stage('Environment Check') {
            steps {
                script {
                    // 온프레미스 환경 상태 확인
                    sh '''
                        python3 check_onprem_status.py
                        if [ $? -ne 0 ]; then
                            echo "온프레미스 환경 점검이 필요합니다."
                            exit 1
                        fi
                    '''
                }
            }
        }
        
        stage('Sync Resources') {
            steps {
                script {
                    // 리소스 동기화
                    sh '''
                        python3 sync_resources.py \
                            --source-env ${ONPREM_ENV} \
                            --target-region ${AWS_REGION}
                    '''
                }
            }
        }
        
        stage('Deploy') {
            steps {
                script {
                    // 배포 스크립트 실행
                    sh '''
                        python3 hybrid_deploy.py \
                            --env ${ONPREM_ENV} \
                            --region ${AWS_REGION}
                    '''
                }
            }
        }
    }
    
    post {
        always {
            // 결과 알림 발송
            notifyDeployResult()
        }
    }
}

6.2 배포 자동화 스크립트

# hybrid_deploy.py
import boto3
import paramiko
import logging
from typing import List, Dict

class HybridDeployer:
    def __init__(self, aws_region: str, onprem_config: Dict):
        self.ec2 = boto3.client('ec2', region_name=aws_region)
        self.ssm = boto3.client('ssm', region_name=aws_region)
        self.onprem_config = onprem_config
        self.logger = logging.getLogger(__name__)
        
    def deploy_to_hybrid_environment(self, artifact_path: str):
        try:
            # 1. AWS 환경 배포
            aws_instances = self._get_target_instances()
            self._deploy_to_aws(aws_instances, artifact_path)
            
            # 2. 온프레미스 환경 배포
            self._deploy_to_onprem(artifact_path)
            
            # 3. 상태 확인
            self._verify_deployment()
            
        except Exception as e:
            self.logger.error(f"배포 중 오류 발생: {str(e)}")
            raise
            
    def _get_target_instances(self) -> List[str]:
        response = self.ec2.describe_instances(
            Filters=[
                {
                    'Name': 'tag:Environment',
                    'Values': ['production']
                },
                {
                    'Name': 'instance-state-name',
                    'Values': ['running']
                }
            ]
        )
        
        instance_ids = []
        for reservation in response['Reservations']:
            for instance in reservation['Instances']:
                instance_ids.append(instance['InstanceId'])
                
        return instance_ids
        
    def _deploy_to_aws(self, instance_ids: List[str], artifact_path: str):
        for instance_id in instance_ids:
            try:
                # Systems Manager Run Command를 사용한 배포
                response = self.ssm.send_command(
                    InstanceIds=[instance_id],
                    DocumentName='AWS-RunShellScript',
                    Parameters={
                        'commands': [
                            f'aws s3 cp {artifact_path} /opt/app/',
                            'cd /opt/app',
                            './deploy.sh'
                        ]
                    }
                )
                
                command_id = response['Command']['CommandId']
                self._wait_for_command(instance_id, command_id)
                
            except Exception as e:
                self.logger.error(f"AWS 배포 중 오류 발생 (Instance {instance_id}): {str(e)}")
                raise
                
    def _deploy_to_onprem(self, artifact_path: str):
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        
        try:
            ssh.connect(
                hostname=self.onprem_config['host'],
                username=self.onprem_config['user'],
                key_filename=self.onprem_config['key_path']
            )
            
            # 아티팩트 전송
            sftp = ssh.open_sftp()
            sftp.put(artifact_path, '/opt/app/artifact.zip')
            sftp.close()
            
            # 배포 스크립트 실행
            stdin, stdout, stderr = ssh.exec_command('''
                cd /opt/app
                unzip -o artifact.zip
                ./deploy.sh
            ''')
            
            if stderr.channel.recv_exit_status() != 0:
                raise Exception(f"온프레미스 배포 실패: {stderr.read().decode()}")
                
        finally:
            ssh.close()
            
    def _verify_deployment(self):
        # 배포 상태 확인 로직
        health_check_endpoints = [
            'http://aws-lb.example.com/health',
            'http://onprem.example.com/health'
        ]
        
        for endpoint in health_check_endpoints:
            response = requests.get(endpoint)
            if response.status_code != 200:
                raise Exception(f"상태 확인 실패 ({endpoint}): {response.status_code}")

7. 보안 및 규정 준수

7.1 보안 감사 자동화

# security_audit.py
import boto3
import json
from datetime import datetime
from typing import List, Dict

class SecurityAuditor:
    def __init__(self):
        self.ec2 = boto3.client('ec2')
        self.inspector = boto3.client('inspector')
        self.config = boto3.client('config')
        
    def run_security_audit(self) -> Dict:
        audit_results = {
            'timestamp': datetime.now().isoformat(),
            'security_groups': self._audit_security_groups(),
            'instance_compliance': self._check_instance_compliance(),
            'vulnerability_scan': self._run_vulnerability_scan()
        }
        
        self._save_audit_results(audit_results)
        return audit_results
        
    def _audit_security_groups(self) -> List[Dict]:
        security_groups = self.ec2.describe_security_groups()
        findings = []
        
        for sg in security_groups['SecurityGroups']:
            # 보안 그룹 규칙 검사
            for rule in sg['IpPermissions']:
                if self._is_rule_unsafe(rule):
                    findings.append({
                        'security_group_id': sg['GroupId'],
                        'issue': 'Unsafe inbound rule detected',
                        'details': rule
                    })
                    
        return findings
        
    def _check_instance_compliance(self) -> List[Dict]:
        compliance_results = []
        
        # AWS Config를 통한 규정 준수 확인
        rules = self.config.describe_config_rules()
        
        for rule in rules['ConfigRules']:
            result = self.config.get_compliance_details_by_config_rule(
                ConfigRuleName=rule['ConfigRuleName']
            )
            
            for evaluation in result['EvaluationResults']:
                if evaluation['ComplianceType'] != 'COMPLIANT':
                    compliance_results.append({
                        'rule_name': rule['ConfigRuleName'],
                        'resource_id': evaluation['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId'],
                        'compliance_type': evaluation['ComplianceType']
                    })
                    
        return compliance_results
        
    def _run_vulnerability_scan(self) -> List[Dict]:
        # Amazon Inspector를 통한 취약점 스캔
        assessment_run = self.inspector.start_assessment_run(
            assessmentTemplateArn='arn:aws:inspector:region:account-id:target/template-name'
        )
        
        # 스캔 완료 대기
        waiter = self.inspector.get_waiter('assessment_run_completed')
        waiter.wait(
            assessmentRunArn=assessment_run['assessmentRunArn']
        )
        
        # 결과 수집
        findings = self.inspector.list_findings(
            assessmentRunArns=[assessment_run['assessmentRunArn']],
            filterConditions={
                'severity': [
                    {'comparison': 'EQUALS', 'value': 'High'},
                    {'comparison': 'EQUALS', 'value': 'Critical'}
                ]
            }
        )
        
        return [self._format_finding(finding) for finding in findings['findingArns']]
        
    def _save_audit_results(self, results: Dict):
        s3 = boto3.client('s3')
        
        # 결과를 S3에 저장
        s3.put_object(
            Bucket='security-audit-results',
            Key=f'audits/{datetime.now().strftime("%Y%m%d")}/results.json',
            Body=json.dumps(results)
        )

8. 비용 최적화

8.1 비용 분석 및 최적화 도구

# cost_optimizer.py
import boto3
import pandas as pd
from typing import Dict, List

class CostOptimizer:
    def __init__(self):
        self.ce = boto3.client('ce')  # Cost Explorer
        self.ec2 = boto3.client('ec2')
        
    def analyze_costs(self) -> Dict:
        # 비용 분석 수행
        cost_analysis = {
            'total_cost': self._get_total_cost(),
            'cost_by_service': self._get_cost_by_service(),
            'optimization_recommendations': self._generate_recommendations()
        }
        
        return cost_analysis
        
    def _get_total_cost(self) -> float:
        response = self.ce.get_cost_and_usage(
            TimePeriod={
                'Start': '2024-01-01',
                'End': '2024-01-31'
            },
            Granularity='MONTHLY',
            Metrics=['UnblendedCost']
        )
        
        return float(response['ResultsByTime'][0]['Total']['UnblendedCost']['Amount'])
        
    def _get_cost_by_service(self) -> List[Dict]:
        response = self.ce.get_cost_and_usage(
            TimePeriod={
                'Start': '2024-01-01',
                'End': '2024-01-31'
            },
            Granularity='MONTHLY',
            GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}],
            Metrics=['UnblendedCost']
        )
        
        services = []
        for group in response['ResultsByTime'][0]['Groups']:
            services.append({
                'service': group['Keys'][0],
                'cost': float(group['Metrics']['UnblendedCost']['Amount'])
            })
            
        return services
        
    def _generate_recommendations(self) -> List[Dict]:
        recommendations = []
        
        # EC2 인스턴스 최적화 추천
        instances = self.ec2.describe_instances()
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                recommendations.extend(self._analyze_instance(instance))
                
        return recommendations
        
    def _analyze_instance(self, instance: Dict) -> List[Dict]:
        instance_recommendations = []
        
        # CPU 사용률 분석
        cloudwatch = boto3.client('cloudwatch')
        response = cloudwatch.get_metric_statistics(
            Namespace='AWS/EC2',
            MetricName='CPUUtilization',
            Dimensions=[
                {
                    'Name': 'InstanceId',
                    'Value': instance['InstanceId']
                }
            ],
            StartTime='2024-01-01',
            EndTime='2024-01-31',
            Period=3600,
            Statistics=['Average']
        )
        
        # 평균 CPU 사용률 계산
        if response['Datapoints']:
            avg_cpu = sum(d['Average'] for d in response['Datapoints']) / len(response['Datapoints'])
            
            # 낮은 사용률의 인스턴스 식별
            if avg_cpu < 20:
                instance_recommendations.append({
                    'instance_id': instance['InstanceId'],
                    'type': 'low_utilization',
                    'current_type': instance['InstanceType'],
                    'recommendation': 'Consider downsizing instance type',
                    'potential_savings': self._calculate_potential_savings(instance['InstanceType'])
                })
                
        return instance_recommendations
        
    def _calculate_potential_savings(self, instance_type: str) -> float:
        # 인스턴스 타입별 비용 차이 계산
        pricing = boto3.client('pricing')
        
        # 현재 인스턴스 타입의 가격 조회
        current_price = self._get_instance_price(pricing, instance_type)
        
        # 한 단계 작은 인스턴스 타입의 가격 조회
        smaller_type = self._get_smaller_instance_type(instance_type)
        smaller_price = self._get_instance_price(pricing, smaller_type)
        
        return (current_price - smaller_price) * 730  # 월간 예상 절감액

이러한 코드들은 실제 업무 환경에서 다음과 같은 이점을 제공합니다.

  • 자동화된 배포 프로세스
  • 통합된 보안 감사
  • 비용 최적화 및 모니터링
  • 인프라 관리 효율성 향상

각 모듈은 독립적으로 동작하면서도 서로 연계되어 전체 시스템의 효율성을 높입니다.

이러한 접근 방식은 하이브리드 클라우드 환경에서 특히 유용하며, 온프레미스와 AWS 환경을 seamless하게 통합하는 데 도움이 되었습니다.

profile
꾸준히, 의미있는 사이드 프로젝트 경험과 문제해결 과정을 기록하기 위한 공간입니다.

0개의 댓글