host level docker postgres backup.python NCP pg_dump

agnusdei·2025년 7월 25일
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import subprocess
import os
import datetime
import hashlib
import hmac
import urllib.parse
import requests
import shutil

MAX_BACKUPS = 720
BACKUP_DIR = "/tmp/backups"

DB_HOST = "127.0.0.1"
DB_USER = "project_name"
DB_PASSWORD = "1234"
DB_NAME = "project_name"

CLIENT_ID = "ncp_iam_1234"
CLIENT_SECRET = "ncp_iam_1234"
NCP_BUCKET = "project_name"
NCP_BUCKET_FOLDER = "backups"
NCP_REGION = "kr-standard"
NCP_ENDPOINT = "kr.object.ncloudstorage.com"

def log(msg):
    print(msg)

def check_install(command, install_cmds):
    if shutil.which(command) is None:
        log(f"'{command}' not found. Installing...")
        try:
            for cmd in install_cmds:
                subprocess.run(cmd, check=True)
            log(f"'{command}' installed successfully.")
        except subprocess.CalledProcessError:
            log(f"Failed to install '{command}'. Please install manually.")
            exit(1)
    else:
        log(f"'{command}' is already installed.")

# === Signature Helpers ===

def get_hash(key, msg):
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

def create_signed_headers(headers):
    return ';'.join(sorted(headers))

def create_standardized_headers(headers):
    return ''.join(f"{k}:{headers[k]}\n" for k in sorted(headers))

def create_standardized_query_parameters(params):
    if not params:
        return ''
    return '&'.join(f"{k}={urllib.parse.quote(params[k], safe='')}" for k in sorted(params))

class NCPObjectStorage:
    def __init__(self, access_key, secret_key):
        self.region = NCP_REGION
        self.endpoint = f'https://{NCP_ENDPOINT}'
        self.host = NCP_ENDPOINT
        self.access_key = access_key
        self.secret_key = secret_key
        self.payload_hash = 'UNSIGNED-PAYLOAD'
        self.algorithm = 'AWS4-HMAC-SHA256'
        self.service = 's3'
        self.req_type = 'aws4_request'
        self.time_fmt = '%Y%m%dT%H%M%SZ'
        self.date_fmt = '%Y%m%d'

    def _credential_scope(self, date_stamp):
        return f'{date_stamp}/{self.region}/{self.service}/{self.req_type}'

    def _canonical_request(self, method, path, params, headers):
        return (
            f"{method}\n"
            f"{path}\n"
            f"{create_standardized_query_parameters(params)}\n"
            f"{create_standardized_headers(headers)}\n"
            f"{create_signed_headers(headers)}\n"
            f"{self.payload_hash}"
        )

    def _string_to_sign(self, timestamp, scope, canonical_request):
        return (
            f"{self.algorithm}\n"
            f"{timestamp}\n"
            f"{scope}\n"
            f"{hashlib.sha256(canonical_request.encode()).hexdigest()}"
        )

    def _signature_key(self, date_stamp):
        k_date = get_hash(('AWS4' + self.secret_key).encode(), date_stamp)
        k_region = get_hash(k_date, self.region)
        k_service = get_hash(k_region, self.service)
        return get_hash(k_service, self.req_type)

    def _authorization(self, headers, key, string_to_sign, scope):
        sig = hmac.new(key, string_to_sign.encode(), hashlib.sha256).hexdigest()
        return (
            f"{self.algorithm} "
            f"Credential={self.access_key}/{scope}, "
            f"SignedHeaders={create_signed_headers(headers)}, "
            f"Signature={sig}"
        )

    def _sign(self, method, path, headers, now, params=None):
        ts = now.strftime(self.time_fmt)
        ds = now.strftime(self.date_fmt)
        scope = self._credential_scope(ds)
        canonical = self._canonical_request(method, path, params, headers)
        string_to_sign = self._string_to_sign(ts, scope, canonical)
        sig_key = self._signature_key(ds)
        headers['authorization'] = self._authorization(headers, sig_key, string_to_sign, scope)

    def put_object(self, bucket, object_name, file_path, params=None):
        method = 'PUT'
        with open(file_path, 'rb') as f:
            now = datetime.datetime.utcnow()
            headers = {
                'x-amz-date': now.strftime(self.time_fmt),
                'host': self.host,
                'x-amz-content-sha256': self.payload_hash
            }

            path = f'/{bucket}/{object_name}'
            self._sign(method, path, headers, now, params)
            url = f"{self.endpoint}{path}"
            r = requests.put(url, headers=headers, params=params, data=f.read())

            if r.status_code in (200, 201):
                log(f'Upload success: {object_name}')
                return True
            else:
                log(f'Upload failed: {r.status_code} {r.text}')
                return False

# === Backup Logic ===

def backup_database():
    log("Backing up database...")
    os.makedirs(BACKUP_DIR, exist_ok=True)
    date_str = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    dump_file = os.path.join(BACKUP_DIR, f"{DB_NAME}_{date_str}.dump")

    env = os.environ.copy()
    env['PGPASSWORD'] = DB_PASSWORD

    cmd = [
        "pg_dump", "-U", DB_USER, "-h", DB_HOST, "-d", DB_NAME,
        "-F", "c", "-b",
        "--exclude-table-data=_prisma_migrations",
        "--exclude-table-data=_MemberToNotification",
        "-f", dump_file
    ]

    try:
        subprocess.run(cmd, env=env, check=True)
        log(f"Backup created: {dump_file}")
        return dump_file
    except subprocess.CalledProcessError as e:
        log(f"Backup failed: {e}")
        return None

def delete_old_local_backups(ncp):
    touch_files = sorted(
        [f for f in os.listdir(BACKUP_DIR) if f.endswith('.dump.touch')],
        key=lambda x: os.path.getmtime(os.path.join(BACKUP_DIR, x))
    )

    while len(touch_files) > MAX_BACKUPS:
        oldest = touch_files.pop(0)
        touch_path = os.path.join(BACKUP_DIR, oldest)
        dump_path = touch_path[:-6]

        log(f"Deleting old backup: {dump_path} and {touch_path}")
        try:
            if os.path.exists(dump_path):
                os.remove(dump_path)
            if os.path.exists(touch_path):
                os.remove(touch_path)
        except Exception as e:
            log(f"Error deleting local files: {e}")

        object_name = f"{NCP_BUCKET_FOLDER}/{os.path.basename(dump_path)}"
        try:
            now = datetime.datetime.utcnow()
            headers = {
                'x-amz-date': now.strftime('%Y%m%dT%H%M%SZ'),
                'host': NCP_ENDPOINT,
                'x-amz-content-sha256': 'UNSIGNED-PAYLOAD'
            }

            path = f'/{NCP_BUCKET}/{object_name}'
            ncp._sign('DELETE', path, headers, now)

            url = f"{ncp.endpoint}{path}"
            resp = requests.delete(url, headers=headers)
            if resp.status_code in (200, 204):
                log(f"Deleted remote backup: {object_name}")
            else:
                log(f"Failed to delete remote backup: {resp.status_code} {resp.text}")
        except Exception as e:
            log(f"Exception deleting remote file: {e}")

def main():
    log("=== Backup Process Start ===")

    check_install("pg_dump", [
        ["sudo", "apt", "update"],
        ["sudo", "apt", "install", "-y", "postgresql-client"]
    ])

    dump_file = backup_database()
    if not dump_file:
        log("Backup failed, aborting.")
        return

    ncp = NCPObjectStorage(CLIENT_ID, CLIENT_SECRET)
    log("Uploading to NCP Object Storage...")

    filename = os.path.basename(dump_file)
    object_name = f"{NCP_BUCKET_FOLDER}/{filename}"
    if ncp.put_object(NCP_BUCKET, object_name, dump_file):
        log("Upload successful")

        touch_file = os.path.join(BACKUP_DIR, filename + ".touch")
        with open(touch_file, "w") as f:
            pass
        log(f"Created touch file: {touch_file}")

        os.remove(dump_file)
        log(f"Deleted local dump: {dump_file}")
    else:
        log("Upload failed, keeping dump.")

    delete_old_local_backups(ncp)
    log("=== Backup Process Completed ===")

if __name__ == "__main__":
    main()
# 기존 postgresql-client 제거
sudo apt remove --purge postgresql-client postgresql-client-16

# PostgreSQL 공식 저장소 추가
sudo sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list'
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add -

# 패키지 목록 업데이트
sudo apt update

# postgresql-client 17 설치
sudo apt install -y postgresql-client-17

# 설치 확인
pg_dump --version
profile
DevSecOps Pentest🚩

0개의 댓글