배치 작업 실행 시 다음과 같은 경고가 발생하며 작업이 skip됨:
2025-12-18 09:13:54 - WARNING - Execution of job "P(H) Batch (Custom Trigger)" skipped:
maximum number of running instances reached (1)
특이사항:
ps aux 확인 결과 배치 프로세스가 없음APScheduler는 각 Job 함수의 실행 인스턴스 수를 내부적으로 추적합니다:
# Job 실행 전
job._instances += 1
if job._instances > job.max_instances:
# "maximum number of running instances reached" 경고
job._instances -= 1
return
# Job 실행 완료 후 (finally 블록)
try:
job.func(*args, **kwargs)
except:
# 에러 로깅
finally:
job._instances -= 1 # 반드시 실행되어야 함
핵심: job._instances 카운트는 Job 함수가 return될 때만 감소합니다.
배치 코드 구조:
# probability_timeseries_batch.py
def run_probability_batch(...):
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(_process_task_worker, task): task
for task in tasks
}
for future in as_completed(futures):
result = future.result() # ❌ Timeout 없음!
# 결과 처리...
문제 시나리오:
1. 배치 시작 (00:17:10.053)
↓
2. ProcessPoolExecutor 생성, 4개 worker 프로세스 시작
↓
3. Worker에서 DB 연결 시도
↓
4. DB 연결 실패: "connection already closed" (00:17:10.110)
- Connection Pool Race Condition 발생
- Worker 프로세스들이 무효한 연결을 받음
↓
5. Worker가 DB 재연결을 시도하며 무한 대기
↓
6. future.result()가 무한 대기 (Timeout 없음)
↓
7. ProcessPoolExecutor의 with 블록이 종료되지 않음
↓
8. run_probability_batch() 함수가 return되지 않음
↓
9. probability_batch_job() 함수도 return되지 않음
↓
10. APScheduler는 _instances 카운트를 감소시키지 못함
↓
11. 영구적으로 "maximum instances reached" 상태 유지
배치 시작 로그는 있지만 종료 로그가 없음:
# main.py
def probability_batch_job():
logger.info("P(H) BATCH JOB STARTED") # ✓ 로그 있음
try:
run_probability_batch(...)
logger.info("P(H) BATCH JOB COMPLETED SUCCESSFULLY") # ❌ 로그 없음
except Exception as e:
logger.error(f"P(H) BATCH JOB FAILED: {e}") # ❌ 로그 없음
결론: 함수가 try 블록 내에서 멈춰서 완료 로그도, 에러 로그도 출력되지 않음
# 컨테이너 내부에서 확인
ps aux | grep -E "(probability|hazard)"
# → 배치 프로세스 없음
ps -eLf | grep python | wc -l
# → 24개 Python 쓰레드 (main + API workers)
# 00:57-00:58에 생성된 worker 프로세스들 발견
# 이들은 site_assessment API의 ThreadPoolExecutor 워커들
각 태스크(격자점 하나의 계산)가 5분 안에 완료되지 않으면 실패 처리:
# probability_timeseries_batch.py (Line 271)
# hazard_timeseries_batch.py (Line 297)
for future in as_completed(futures):
task = futures[future]
try:
result = future.result(timeout=300) # ✓ 5분 timeout 추가
if result['status'] == 'success':
# 결과 처리...
else:
failed_count += 1
except TimeoutError:
# Timeout 발생 시 해당 태스크만 실패 처리
failed_count += 1
logger.error(f"Task timeout after 300s: {task}")
# 다음 태스크 계속 진행
효과:
run_probability_batch() 함수가 항상 return됨Before:
✗ Worker 멈춤 → future.result() 무한 대기
✗ 함수 return 안 됨 → 인스턴스 release 안 됨
✗ 다음 배치 skip: "maximum instances reached"
After:
✓ Worker 멈춤 → 5분 후 TimeoutError
✓ 해당 태스크 실패 처리, 다음 태스크 계속
✓ 함수 항상 return → 인스턴스 정상 release
✓ 다음 배치 정상 실행 가능
전체 배치 실행 시간 제한:
# as_completed에 전체 timeout 추가
for future in as_completed(futures, timeout=7200): # 2시간
try:
result = future.result(timeout=300) # 개별 5분
연속 실패 시 빠른 실패 처리:
class ConnectionCircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failures = 0
self.threshold = failure_threshold
self.last_failure_time = None
self.timeout = timeout
def execute(self, func):
# Circuit이 열린 상태인지 확인
if self.failures >= self.threshold:
if (datetime.now() - self.last_failure_time).seconds < self.timeout:
raise CircuitOpenError("Too many failures, circuit open")
else:
# Timeout 지나면 재시도 허용
self.failures = 0
try:
result = func()
self.failures = 0 # 성공 시 리셋
return result
except Exception as e:
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.threshold:
# Circuit 열림 - 전체 배치 중단
raise CircuitOpenError(
f"Circuit opened after {self.failures} failures"
) from e
raise
# 사용
breaker = ConnectionCircuitBreaker(failure_threshold=5)
for task in tasks:
try:
result = breaker.execute(lambda: process_task(task))
except CircuitOpenError:
logger.error("Circuit opened, stopping batch")
break # 배치 중단하고 return
효과:
전체를 한 번에 처리하지 않고 청크 단위로:
def run_probability_batch_chunked(
grid_points: List[Tuple[float, float]] = None,
chunk_size: int = 1000,
**kwargs
):
"""청크 단위로 배치 처리"""
if grid_points is None:
grid_points = get_all_grid_points()
# 격자점을 chunk_size 단위로 분할
chunks = [
grid_points[i:i+chunk_size]
for i in range(0, len(grid_points), chunk_size)
]
total_success = 0
total_failed = 0
for i, chunk in enumerate(chunks):
logger.info(f"Processing chunk {i+1}/{len(chunks)}")
try:
# 청크당 timeout 설정
with timeout_context(600): # 10분
run_probability_batch(
grid_points=chunk,
**kwargs
)
total_success += len(chunk)
except TimeoutError:
logger.error(f"Chunk {i} timeout, skipping")
total_failed += len(chunk)
continue # 한 청크 실패해도 다음 청크 계속
except Exception as e:
logger.error(f"Chunk {i} failed: {e}")
# 초반 3개 청크 실패 시 전체 중단
if i < 3:
raise
total_failed += len(chunk)
continue
# 통계 로깅
logger.info(f"Batch completed: {total_success} success, {total_failed} failed")
# 항상 return 보장
return {
'success': total_success,
'failed': total_failed
}
효과:
실패한 작업을 별도 저장하여 나중에 재처리:
def save_to_dead_letter_queue(task: Dict, error: str):
"""실패한 태스크를 DLQ에 저장"""
db = DatabaseConnection()
db.execute("""
INSERT INTO batch_dead_letter_queue
(task_type, task_data, error_message, created_at)
VALUES (%s, %s, %s, NOW())
""", ('probability', json.dumps(task), error))
# 배치 처리 중
for future in as_completed(futures):
try:
result = future.result(timeout=300)
if result['status'] == 'failed':
# DLQ에 저장
save_to_dead_letter_queue(
task=result['task'],
error=result.get('error', 'Unknown error')
)
except TimeoutError:
# Timeout도 DLQ에 저장
save_to_dead_letter_queue(
task=task,
error='Task timeout after 300s'
)
나중에 DLQ를 조회하여 실패한 태스크만 재처리:
def reprocess_dead_letter_queue():
"""DLQ의 실패한 태스크들을 재처리"""
db = DatabaseConnection()
failed_tasks = db.fetch_all(
"SELECT * FROM batch_dead_letter_queue WHERE reprocessed = FALSE"
)
for record in failed_tasks:
task = json.loads(record['task_data'])
try:
result = process_task(task)
# 성공 시 DLQ에서 제거
db.execute(
"UPDATE batch_dead_letter_queue SET reprocessed = TRUE WHERE id = %s",
(record['id'],)
)
except Exception as e:
logger.error(f"Reprocess failed: {e}")
배치 상태를 외부에서 모니터링하여 강제 종료:
import redis
import os
import signal
# 배치 시작 시 상태 등록
def start_batch_monitoring(job_id: str, max_duration: int = 7200):
"""배치 시작을 Redis에 등록"""
r = redis.Redis()
r.hset(f'batch:{job_id}', mapping={
'status': 'running',
'pid': os.getpid(),
'start_time': datetime.now().isoformat(),
'max_duration': max_duration
})
# 별도 모니터링 프로세스
def batch_monitor():
"""주기적으로 배치 상태 확인"""
r = redis.Redis()
while True:
for key in r.scan_iter('batch:*'):
info = r.hgetall(key)
if info['status'] == 'running':
start_time = datetime.fromisoformat(info['start_time'])
running_time = (datetime.now() - start_time).seconds
max_duration = int(info['max_duration'])
if running_time > max_duration:
# 최대 실행 시간 초과 - 강제 종료
pid = int(info['pid'])
logger.warning(f"Killing hung batch process: PID {pid}")
os.kill(pid, signal.SIGTERM)
# 상태 업데이트
r.hset(key, 'status', 'killed')
r.hset(key, 'killed_at', datetime.now().isoformat())
time.sleep(60) # 1분마다 체크
# main.py
from apscheduler.executors.pool import ThreadPoolExecutor as APSThreadPoolExecutor
scheduler = BackgroundScheduler(
executors={
'default': APSThreadPoolExecutor(max_workers=2)
},
job_defaults={
'coalesce': False, # 밀린 작업 건너뛰기
'max_instances': 1, # 동시 실행 인스턴스 수
'misfire_grace_time': 3600 # 1시간 이내 실행 실패 허용
}
)
# Job 등록
scheduler.add_job(
probability_batch_job,
trigger=CronTrigger(month=1, day=1, hour=2, minute=0),
id='probability_batch',
name='P(H) Timeseries Batch',
replace_existing=True,
max_instances=1,
misfire_grace_time=3600, # 예정 시각 지나도 1시간 내 실행
coalesce=True # 밀린 작업 하나로 통합
)
┌─────────────────────────────────────────┐
│ APScheduler Job Level (3시간) │
│ ┌───────────────────────────────────┐ │
│ │ Batch Function Level (2시간) │ │
│ │ ┌─────────────────────────────┐ │ │
│ │ │ Chunk Level (10분) │ │ │
│ │ │ ┌───────────────────────┐ │ │ │
│ │ │ │ Task Level (5분) │ │ │ │
│ │ │ └───────────────────────┘ │ │ │
│ │ └─────────────────────────────┘ │ │
│ └───────────────────────────────────┘ │
└─────────────────────────────────────────┘
실패 유형별 처리:
1. Timeout (5분)
→ 해당 태스크만 실패 처리
→ DLQ에 저장
→ 다음 태스크 계속
2. DB Connection Error (Circuit Breaker)
→ 5번 연속 실패 시 Circuit Open
→ 배치 중단하고 return
→ 인스턴스 release
3. 데이터 오류 (개별 처리)
→ 로그 기록
→ 다음 태스크 계속
4. 심각한 에러 (시스템 레벨)
→ 배치 전체 중단
→ Exception raise
→ 알림 발송
배치 실행 시 수집할 메트릭:
- start_time: 시작 시각
- end_time: 종료 시각
- duration: 실행 시간
- total_tasks: 전체 태스크 수
- completed_tasks: 완료된 태스크 수
- failed_tasks: 실패한 태스크 수
- timeout_tasks: Timeout된 태스크 수
- success_rate: 성공률
- avg_task_duration: 평균 태스크 처리 시간
- peak_memory: 최대 메모리 사용량
- db_connection_errors: DB 연결 에러 수
근본 원인:
future.result()에 timeout이 없어서 Worker가 멈출 때 무한 대기핵심 해결책:
future.result(timeout=300) 추가로 개별 태스크 timeout 설정추가 방어:
효과:
관련 파일:
참고 자료: