
온프레미스 환경에서 AWS 서비스를 효과적으로 활용하기 위한 네트워크 구성이 필수적입니다.
Site-to-Site VPN이나 Direct Connect를 통해 안전하고 안정적인 연결을 구축해야 합니다.
# AWS VPN 연결 상태 모니터링 스크립트
import boto3
import time
from datetime import datetime
def monitor_vpn_connection():
ec2 = boto3.client('ec2', region_name='ap-northeast-2')
while True:
try:
# VPN 연결 상태 확인
vpn_connections = ec2.describe_vpn_connections()
for vpn in vpn_connections['VpnConnections']:
connection_id = vpn['VpnConnectionId']
state = vpn['State']
print(f"[{datetime.now()}] VPN Connection {connection_id}: {state}")
# 연결이 다운된 경우 알림 발송
if state != 'available':
send_alert(f"VPN Connection {connection_id} is {state}")
except Exception as e:
print(f"Error monitoring VPN: {str(e)}")
time.sleep(300) # 5분마다 체크
def send_alert(message):
sns = boto3.client('sns', region_name='ap-northeast-2')
response = sns.publish(
TopicArn='arn:aws:sns:ap-northeast-2:123456789012:vpn-alerts',
Message=message,
Subject='VPN Connection Alert'
)
온프레미스와 AWS 환경 간의 리소스 동기화를 위한 자동화된 시스템을 구축합니다.
# 리소스 동기화 매니저
import boto3
import json
import logging
from datetime import datetime
class ResourceSyncManager:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.s3 = boto3.client('s3')
self.logger = logging.getLogger()
def sync_instance_metadata(self, instance_id, metadata_file):
try:
# EC2 인스턴스 정보 가져오기
response = self.ec2.describe_instances(
InstanceIds=[instance_id]
)
instance_data = response['Reservations'][0]['Instances'][0]
# 메타데이터 저장
metadata = {
'InstanceId': instance_id,
'InstanceType': instance_data['InstanceType'],
'State': instance_data['State']['Name'],
'LaunchTime': instance_data['LaunchTime'].isoformat(),
'Tags': instance_data.get('Tags', []),
'SyncTime': datetime.now().isoformat()
}
# S3에 메타데이터 저장
self.s3.put_object(
Bucket='my-sync-bucket',
Key=f'metadata/{metadata_file}',
Body=json.dumps(metadata)
)
self.logger.info(f"Successfully synced metadata for instance {instance_id}")
except Exception as e:
self.logger.error(f"Error syncing metadata: {str(e)}")
raise
# 자동 백업 매니저
import boto3
import schedule
import time
from datetime import datetime
class BackupManager:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.sns = boto3.client('sns')
def create_ami_backup(self, instance_id):
try:
# AMI 생성
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
ami_name = f"backup-{instance_id}-{timestamp}"
response = self.ec2.create_image(
InstanceId=instance_id,
Name=ami_name,
Description=f"Automated backup for {instance_id}",
NoReboot=True
)
ami_id = response['ImageId']
# AMI에 태그 추가
self.ec2.create_tags(
Resources=[ami_id],
Tags=[
{
'Key': 'Name',
'Value': ami_name
},
{
'Key': 'AutoBackup',
'Value': 'true'
}
]
)
# 백업 완료 알림
self.send_backup_notification(instance_id, ami_id)
return ami_id
except Exception as e:
print(f"Error creating backup: {str(e)}")
raise
def send_backup_notification(self, instance_id, ami_id):
message = f"Backup completed for instance {instance_id}\nAMI ID: {ami_id}"
self.sns.publish(
TopicArn='arn:aws:sns:ap-northeast-2:123456789012:backup-notifications',
Message=message,
Subject='Backup Completion Notice'
)
# 복구 자동화 매니저
class RecoveryManager:
def __init__(self):
self.ec2 = boto3.client('ec2')
def initiate_recovery(self, failed_instance_id):
try:
# 최신 백업 AMI 찾기
images = self.ec2.describe_images(
Filters=[
{
'Name': 'tag:AutoBackup',
'Values': ['true']
}
],
Owners=['self']
)
# 최신 AMI 선택
latest_ami = sorted(
images['Images'],
key=lambda x: x['CreationDate'],
reverse=True
)[0]
# 새 인스턴스 시작
response = self.ec2.run_instances(
ImageId=latest_ami['ImageId'],
InstanceType='t3.medium', # 원하는 인스턴스 타입으로 변경
MinCount=1,
MaxCount=1,
SecurityGroupIds=['sg-xxxxxxxx'], # 보안 그룹 ID
SubnetId='subnet-xxxxxxxx', # 서브넷 ID
TagSpecifications=[
{
'ResourceType': 'instance',
'Tags': [
{
'Key': 'Name',
'Value': f'Recovered-{failed_instance_id}'
}
]
}
]
)
return response['Instances'][0]['InstanceId']
except Exception as e:
print(f"Error during recovery: {str(e)}")
raise
# DMS 태스크 매니저
class DMSTaskManager:
def __init__(self):
self.dms = boto3.client('dms')
def create_replication_task(self, source_endpoint_arn, target_endpoint_arn, replication_instance_arn):
try:
response = self.dms.create_replication_task(
ReplicationTaskIdentifier=f'task-{int(time.time())}',
SourceEndpointArn=source_endpoint_arn,
TargetEndpointArn=target_endpoint_arn,
ReplicationInstanceArn=replication_instance_arn,
MigrationType='full-load-and-cdc',
TableMappings=json.dumps({
'rules': [
{
'rule-type': 'selection',
'rule-id': '1',
'rule-name': '1',
'object-locator': {
'schema-name': '%',
'table-name': '%'
},
'rule-action': 'include'
}
]
}),
ReplicationTaskSettings=json.dumps({
'TargetMetadata': {
'TargetSchema': '',
'SupportLobs': True,
'FullLobMode': False,
'LobChunkSize': 64,
'LimitedSizeLobMode': True,
'LobMaxSize': 32
},
'FullLoadSettings': {
'TargetTablePrepMode': 'DROP_AND_CREATE',
'CreatePkAfterFullLoad': False,
'StopTaskCachedChangesApplied': False,
'StopTaskCachedChangesNotApplied': False,
'MaxFullLoadSubTasks': 8,
'TransactionConsistencyTimeout': 600,
'CommitRate': 10000
},
'Logging': {
'EnableLogging': True
}
})
)
return response['ReplicationTask']['ReplicationTaskArn']
except Exception as e:
print(f"Error creating DMS task: {str(e)}")
raise
# 통합 모니터링 매니저
class MonitoringManager:
def __init__(self):
self.cloudwatch = boto3.client('cloudwatch')
self.sns = boto3.client('sns')
def monitor_resources(self):
try:
# EC2 인스턴스 CPU 사용률 모니터링
response = self.cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{
'Name': 'InstanceId',
'Value': 'i-1234567890abcdef0'
},
],
StartTime=datetime.utcnow() - timedelta(minutes=30),
EndTime=datetime.utcnow(),
Period=300,
Statistics=['Average']
)
# 임계값 확인
for datapoint in response['Datapoints']:
if datapoint['Average'] > 80: # CPU 사용률 80% 초과
self.send_alert('High CPU Usage Alert', f"CPU Usage: {datapoint['Average']}%")
except Exception as e:
print(f"Error monitoring resources: {str(e)}")
raise
def send_alert(self, subject, message):
try:
self.sns.publish(
TopicArn='arn:aws:sns:ap-northeast-2:123456789012:monitoring-alerts',
Message=message,
Subject=subject
)
except Exception as e:
print(f"Error sending alert: {str(e)}")
raise
# 배치 작업 매니저
class BatchJobManager:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.logger = logging.getLogger()
def start_instances(self, tag_name, tag_value):
try:
instances = self.ec2.describe_instances(
Filters=[
{
'Name': f'tag:{tag_name}',
'Values': [tag_value]
}
]
)
instance_ids = []
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_ids.append(instance['InstanceId'])
if instance_ids:
self.ec2.start_instances(InstanceIds=instance_ids)
self.logger.info(f"Started instances: {instance_ids}")
except Exception as e:
self.logger.error(f"Error starting instances: {str(e)}")
raise
def stop_instances(self, tag_name, tag_value):
try:
instances = self.ec2.describe_instances(
Filters=[
{
'Name': f'tag:{tag_name}',
'Values': [tag_value]
}
]
)
instance_ids = []
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_ids.append(instance['InstanceId'])
if instance_ids:
self.ec2.stop_instances(InstanceIds=instance_ids)
self.logger.info(f"Stopped instances: {instance_ids}")
except Exception as e:
self.logger.error(f"Error stopping instances: {str(e)}")
raise
이 코드들은 실제 운영 환경에서 사용할 수 있도록 작성되고 있습니다.
각각의 기능은 다음과 같은 목적으로 활용될 수 있습니다.
이러한 코드들은 실제 서비스 환경에서 이런순으로 운영되고 있습니다.
// Jenkinsfile 예시
pipeline {
agent any
environment {
AWS_REGION = 'ap-northeast-2'
ONPREM_ENV = 'prod'
}
stages {
stage('Environment Check') {
steps {
script {
// 온프레미스 환경 상태 확인
sh '''
python3 check_onprem_status.py
if [ $? -ne 0 ]; then
echo "온프레미스 환경 점검이 필요합니다."
exit 1
fi
'''
}
}
}
stage('Sync Resources') {
steps {
script {
// 리소스 동기화
sh '''
python3 sync_resources.py \
--source-env ${ONPREM_ENV} \
--target-region ${AWS_REGION}
'''
}
}
}
stage('Deploy') {
steps {
script {
// 배포 스크립트 실행
sh '''
python3 hybrid_deploy.py \
--env ${ONPREM_ENV} \
--region ${AWS_REGION}
'''
}
}
}
}
post {
always {
// 결과 알림 발송
notifyDeployResult()
}
}
}
# hybrid_deploy.py
import boto3
import paramiko
import logging
from typing import List, Dict
class HybridDeployer:
def __init__(self, aws_region: str, onprem_config: Dict):
self.ec2 = boto3.client('ec2', region_name=aws_region)
self.ssm = boto3.client('ssm', region_name=aws_region)
self.onprem_config = onprem_config
self.logger = logging.getLogger(__name__)
def deploy_to_hybrid_environment(self, artifact_path: str):
try:
# 1. AWS 환경 배포
aws_instances = self._get_target_instances()
self._deploy_to_aws(aws_instances, artifact_path)
# 2. 온프레미스 환경 배포
self._deploy_to_onprem(artifact_path)
# 3. 상태 확인
self._verify_deployment()
except Exception as e:
self.logger.error(f"배포 중 오류 발생: {str(e)}")
raise
def _get_target_instances(self) -> List[str]:
response = self.ec2.describe_instances(
Filters=[
{
'Name': 'tag:Environment',
'Values': ['production']
},
{
'Name': 'instance-state-name',
'Values': ['running']
}
]
)
instance_ids = []
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instance_ids.append(instance['InstanceId'])
return instance_ids
def _deploy_to_aws(self, instance_ids: List[str], artifact_path: str):
for instance_id in instance_ids:
try:
# Systems Manager Run Command를 사용한 배포
response = self.ssm.send_command(
InstanceIds=[instance_id],
DocumentName='AWS-RunShellScript',
Parameters={
'commands': [
f'aws s3 cp {artifact_path} /opt/app/',
'cd /opt/app',
'./deploy.sh'
]
}
)
command_id = response['Command']['CommandId']
self._wait_for_command(instance_id, command_id)
except Exception as e:
self.logger.error(f"AWS 배포 중 오류 발생 (Instance {instance_id}): {str(e)}")
raise
def _deploy_to_onprem(self, artifact_path: str):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(
hostname=self.onprem_config['host'],
username=self.onprem_config['user'],
key_filename=self.onprem_config['key_path']
)
# 아티팩트 전송
sftp = ssh.open_sftp()
sftp.put(artifact_path, '/opt/app/artifact.zip')
sftp.close()
# 배포 스크립트 실행
stdin, stdout, stderr = ssh.exec_command('''
cd /opt/app
unzip -o artifact.zip
./deploy.sh
''')
if stderr.channel.recv_exit_status() != 0:
raise Exception(f"온프레미스 배포 실패: {stderr.read().decode()}")
finally:
ssh.close()
def _verify_deployment(self):
# 배포 상태 확인 로직
health_check_endpoints = [
'http://aws-lb.example.com/health',
'http://onprem.example.com/health'
]
for endpoint in health_check_endpoints:
response = requests.get(endpoint)
if response.status_code != 200:
raise Exception(f"상태 확인 실패 ({endpoint}): {response.status_code}")
# security_audit.py
import boto3
import json
from datetime import datetime
from typing import List, Dict
class SecurityAuditor:
def __init__(self):
self.ec2 = boto3.client('ec2')
self.inspector = boto3.client('inspector')
self.config = boto3.client('config')
def run_security_audit(self) -> Dict:
audit_results = {
'timestamp': datetime.now().isoformat(),
'security_groups': self._audit_security_groups(),
'instance_compliance': self._check_instance_compliance(),
'vulnerability_scan': self._run_vulnerability_scan()
}
self._save_audit_results(audit_results)
return audit_results
def _audit_security_groups(self) -> List[Dict]:
security_groups = self.ec2.describe_security_groups()
findings = []
for sg in security_groups['SecurityGroups']:
# 보안 그룹 규칙 검사
for rule in sg['IpPermissions']:
if self._is_rule_unsafe(rule):
findings.append({
'security_group_id': sg['GroupId'],
'issue': 'Unsafe inbound rule detected',
'details': rule
})
return findings
def _check_instance_compliance(self) -> List[Dict]:
compliance_results = []
# AWS Config를 통한 규정 준수 확인
rules = self.config.describe_config_rules()
for rule in rules['ConfigRules']:
result = self.config.get_compliance_details_by_config_rule(
ConfigRuleName=rule['ConfigRuleName']
)
for evaluation in result['EvaluationResults']:
if evaluation['ComplianceType'] != 'COMPLIANT':
compliance_results.append({
'rule_name': rule['ConfigRuleName'],
'resource_id': evaluation['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId'],
'compliance_type': evaluation['ComplianceType']
})
return compliance_results
def _run_vulnerability_scan(self) -> List[Dict]:
# Amazon Inspector를 통한 취약점 스캔
assessment_run = self.inspector.start_assessment_run(
assessmentTemplateArn='arn:aws:inspector:region:account-id:target/template-name'
)
# 스캔 완료 대기
waiter = self.inspector.get_waiter('assessment_run_completed')
waiter.wait(
assessmentRunArn=assessment_run['assessmentRunArn']
)
# 결과 수집
findings = self.inspector.list_findings(
assessmentRunArns=[assessment_run['assessmentRunArn']],
filterConditions={
'severity': [
{'comparison': 'EQUALS', 'value': 'High'},
{'comparison': 'EQUALS', 'value': 'Critical'}
]
}
)
return [self._format_finding(finding) for finding in findings['findingArns']]
def _save_audit_results(self, results: Dict):
s3 = boto3.client('s3')
# 결과를 S3에 저장
s3.put_object(
Bucket='security-audit-results',
Key=f'audits/{datetime.now().strftime("%Y%m%d")}/results.json',
Body=json.dumps(results)
)
# cost_optimizer.py
import boto3
import pandas as pd
from typing import Dict, List
class CostOptimizer:
def __init__(self):
self.ce = boto3.client('ce') # Cost Explorer
self.ec2 = boto3.client('ec2')
def analyze_costs(self) -> Dict:
# 비용 분석 수행
cost_analysis = {
'total_cost': self._get_total_cost(),
'cost_by_service': self._get_cost_by_service(),
'optimization_recommendations': self._generate_recommendations()
}
return cost_analysis
def _get_total_cost(self) -> float:
response = self.ce.get_cost_and_usage(
TimePeriod={
'Start': '2024-01-01',
'End': '2024-01-31'
},
Granularity='MONTHLY',
Metrics=['UnblendedCost']
)
return float(response['ResultsByTime'][0]['Total']['UnblendedCost']['Amount'])
def _get_cost_by_service(self) -> List[Dict]:
response = self.ce.get_cost_and_usage(
TimePeriod={
'Start': '2024-01-01',
'End': '2024-01-31'
},
Granularity='MONTHLY',
GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}],
Metrics=['UnblendedCost']
)
services = []
for group in response['ResultsByTime'][0]['Groups']:
services.append({
'service': group['Keys'][0],
'cost': float(group['Metrics']['UnblendedCost']['Amount'])
})
return services
def _generate_recommendations(self) -> List[Dict]:
recommendations = []
# EC2 인스턴스 최적화 추천
instances = self.ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
recommendations.extend(self._analyze_instance(instance))
return recommendations
def _analyze_instance(self, instance: Dict) -> List[Dict]:
instance_recommendations = []
# CPU 사용률 분석
cloudwatch = boto3.client('cloudwatch')
response = cloudwatch.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{
'Name': 'InstanceId',
'Value': instance['InstanceId']
}
],
StartTime='2024-01-01',
EndTime='2024-01-31',
Period=3600,
Statistics=['Average']
)
# 평균 CPU 사용률 계산
if response['Datapoints']:
avg_cpu = sum(d['Average'] for d in response['Datapoints']) / len(response['Datapoints'])
# 낮은 사용률의 인스턴스 식별
if avg_cpu < 20:
instance_recommendations.append({
'instance_id': instance['InstanceId'],
'type': 'low_utilization',
'current_type': instance['InstanceType'],
'recommendation': 'Consider downsizing instance type',
'potential_savings': self._calculate_potential_savings(instance['InstanceType'])
})
return instance_recommendations
def _calculate_potential_savings(self, instance_type: str) -> float:
# 인스턴스 타입별 비용 차이 계산
pricing = boto3.client('pricing')
# 현재 인스턴스 타입의 가격 조회
current_price = self._get_instance_price(pricing, instance_type)
# 한 단계 작은 인스턴스 타입의 가격 조회
smaller_type = self._get_smaller_instance_type(instance_type)
smaller_price = self._get_instance_price(pricing, smaller_type)
return (current_price - smaller_price) * 730 # 월간 예상 절감액
이러한 코드들은 실제 업무 환경에서 다음과 같은 이점을 제공합니다.
각 모듈은 독립적으로 동작하면서도 서로 연계되어 전체 시스템의 효율성을 높입니다.
이러한 접근 방식은 하이브리드 클라우드 환경에서 특히 유용하며, 온프레미스와 AWS 환경을 seamless하게 통합하는 데 도움이 되었습니다.