A. Factory Pattern
- 객체 생성 로직 캡슐화
- 객체 생성의 유연성 제공
- 구체적인 타입 은닉
B. Strategy Pattern
- 알고리즘/동작의 교체 가능
- 실행 중 전략 변경 가능
- 확장성이 좋음
C. Singleton Pattern
- 전역적으로 하나의 인스턴스만 보장
- 리소스 공유/관리에 유용
- 테스트 어려움이 단점
A. Annotation 최소화
- @Getter 정도만 사용
- @AllArgsConstructor, @NoNull, @Valid 등은 선택적
- 과도한 어노테이션은 지양
B. Message Converter
- RequestBody 처리 시 Jackson2MessageConverter 주로 사용
- ObjectMapper를 통한 역직렬화
- Reflection으로 객체에 값 할당
C. 금융권 특수성
- byte 전문/데이터 사용
- 인코딩/암호화 처리
- 소켓 통신 시 규약 기반 처리
- 커스텀 메시지 컨버터 필요
A. 특징
- 지연 연산 (Lazy Evaluation)
- 파이프라이닝
- 내부 반복
B. 연산 구분
- 중간 연산: 파이프라인 구성
- 최종 연산: 결과 생성
C. Python 비교
- Generator와 유사
- Iterator 패턴
- 일회성 사용
A. 지연 연산 (Lazy Evaluation)
- 최종 연산 전까지 실행 지연
- 불필요한 연산 회피
- 성능 최적화
B. 데이터 파이프라인
- 메서드 체이닝으로 구성
- 순차적 데이터 처리
- 가독성 향상
A. 중간 연산 (Intermediate Operations)
- filter(): 조건에 맞는 요소 필터링
- map(): 요소 변환
- sorted(): 정렬
- distinct(): 중복 제거
B. 최종 연산 (Terminal Operations)
- collect(): 결과 수집
- forEach(): 각 요소 처리
- reduce(): 요소 결합
- count(), min(), max() 등
A. 병렬 스트림 (Parallel Stream)
- parallelStream() 사용
- 멀티스레드 처리
- 대용량 데이터 처리에 효과적
B. 라운드 로빈 처리
- 여러 처리기에 작업 분배
- 순환 방식으로 할당
- 부하 분산 효과
// 메시지 큐 라운드 로빈 처리
List<MessageQueue> queues = Arrays.asList(queue1, queue2, queue3);
AtomicInteger counter = new AtomicInteger(0);
messages.stream()
.forEach(msg -> {
int index = counter.getAndIncrement() % queues.size();
queues.get(index).send(msg);
});
A. 일반 스트림
- 단일 스레드
- 순차 처리
- 메모리 효율적
B. 병렬 스트림
- 멀티 스레드
- 동시 처리
- CPU 활용도 높음
A. 데이터 처리
- 필터링/변환
- 그룹화/집계
- 결과 수집
B. 메시지 처리
- 라운드 로빈 분배
- 부하 분산
- 순차적 처리
A. 유사점
- 지연 평가
- 메모리 효율성
- 일회성 사용
B. 차이점
- 구현 방식
- 병렬 처리 지원
- 메서드 체이닝
A. 재사용 불가
- 스트림은 일회용
- 재사용 시 예외 발생
- 필요시 새로 생성
B. 상태 관리
- 스트림 작업은 상태 비저장
- 부작용 없는 함수 사용
- 동시성 고려
A. 일반 스트림
- 작은 데이터셋
- 단순한 연산
- 순차적 처리 필요
B. 병렬 스트림
- 대용량 데이터
- 복잡한 연산
- 독립적 처리 가능
A. 구현 방식
- 순환 카운터 사용
- 모듈러 연산으로 인덱스 계산
- 원형 큐 개념
B. 활용 사례
- 로드 밸런싱
- 메시지 큐 분배
- 작업 스케줄링
A. 중간 확인
- peek() 사용
- 로깅 추가
- 단계별 검증
B. 성능 모니터링
- 처리 시간 측정
- 메모리 사용량 확인
- 병목 지점 식별
Stream이 일회성인 이유를 예시로 설명해드리겠습니다:
// 1. 스트림 생성
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> stream = numbers.stream();
// 2. 스트림 사용
stream.forEach(System.out::println); // 작동함
// 3. 재사용 시도
stream.forEach(System.out::println); // IllegalStateException 발생
스트림이 일회성인 이유를 코드로 살펴보겠습니다:
import java.util.*;
import java.util.stream.Stream;
class StreamExample {
public static void main(String[] args) {
// 1. 기본적인 스트림 사용
List<String> list = Arrays.asList("a", "b", "c");
Stream<String> stream = list.stream();
// 첫 번째 사용
stream.forEach(System.out::println); // 작동함
try {
// 두 번째 사용 시도
stream.forEach(System.out::println); // 예외 발생
} catch (IllegalStateException e) {
System.out.println("스트림 재사용 불가: " + e.getMessage());
}
// 2. 스트림 상태 변화 예시
Stream<String> stream2 = list.stream()
.map(s -> {
System.out.println("매핑: " + s);
return s.toUpperCase();
});
// 중간 연산은 지연 실행됨
System.out.println("중간 연산 후");
// 최종 연산시 실제 실행
stream2.forEach(s -> System.out.println("최종: " + s));
// 3. 스트림 재사용이 필요한 경우의 올바른 방법
Supplier<Stream<String>> streamSupplier = () -> list.stream();
// 첫 번째 사용
streamSupplier.get().forEach(System.out::println);
// 두 번째 사용 (새로운 스트림 생성)
streamSupplier.get().forEach(System.out::println);
}
}
// 4. 커스텀 이터레이터 예시 (스트림의 내부 동작 이해)
class CustomIterator<T> {
private final List<T> list;
private boolean used = false;
private int position = 0;
public CustomIterator(List<T> list) {
this.list = list;
}
public T next() {
if (used) {
throw new IllegalStateException("이미 소비된 이터레이터");
}
if (position >= list.size()) {
used = true;
return null;
}
return list.get(position++);
}
}
일회성인 주요 이유들:
// 스트림은 내부적으로 상태를 가짐
Stream<Integer> stream = numbers.stream()
.map(n -> {
System.out.println("매핑: " + n);
return n * 2;
});
// 한번 처리된 요소는 다시 되돌릴 수 없음
// 파일 스트림의 예
try (Stream<String> lines = Files.lines(path)) {
lines.forEach(System.out::println);
// 스트림 종료 후 자동으로 리소스 해제
}
// 병렬 스트림에서의 상태 관리
numbers.parallelStream()
.map(n -> n * 2)
.forEach(System.out::println);
// 병렬 처리 후 상태 복원이 복잡함
해결 방법:
// 스트림 공급자 생성
Supplier<Stream<String>> streamSupplier = () ->
Arrays.asList("a", "b", "c").stream();
// 필요할 때마다 새 스트림 생성
streamSupplier.get().forEach(System.out::println);
streamSupplier.get().forEach(System.out::println);
List<String> list = Arrays.asList("a", "b", "c");
// 필요할 때마다 새 스트림 생성
list.stream().forEach(System.out::println);
list.stream().forEach(System.out::println);
정리:
1. 스트림은 데이터의 흐름을 나타냄
2. 한번 소비된 데이터는 다시 되돌릴 수 없음
3. 내부 상태 관리와 리소스 해제를 위해 일회성으로 설계됨
4. 재사용이 필요한 경우 새로운 스트림을 생성해야 함
5. 병렬 처리의 안전성을 위해 일회성 특성 필요
이는 Python의 제너레이터와 유사한 개념입니다:
def generator():
for i in range(3):
yield i
gen = generator()
list(gen) # [0, 1, 2]
list(gen) # [] (이미 소진됨)
# Python Generator 방식
def message_queue_generator():
while True:
message = get_ai_message() # AI 모델에서 메시지 받기
yield message
# 일회성 사용
queue = message_queue_generator()
for msg in queue: # 한번 사용하면 끝
process_message(msg)
from typing import Generator
import logging
class AIMessageProcessor:
def __init__(self):
self.logger = logging.getLogger('ai_messages')
self.logger.setLevel(logging.INFO)
def get_message_stream(self) -> Generator:
"""새로운 메시지 스트림 생성"""
while True:
message = self.get_ai_message()
# 로그는 별도로 처리
self.logger.info(f"Message received: {message}")
yield message
def process_messages(self):
# 매번 새로운 스트림 생성
message_stream = self.get_message_stream()
for message in message_stream:
try:
self.process_single_message(message)
except Exception as e:
self.logger.error(f"Error processing message: {e}")
from typing import Generator
import logging
from datetime import datetime
from queue import Queue
import threading
class AIMessageQueue:
def __init__(self):
# 로깅 설정
self.logger = self.setup_logger()
self.message_queue = Queue()
self.is_running = True
def setup_logger(self):
logger = logging.getLogger('ai_queue')
logger.setLevel(logging.INFO)
# 파일 핸들러 (영구 저장)
fh = logging.FileHandler('ai_messages.log')
# 스트림 핸들러 (콘솔 출력)
sh = logging.StreamHandler()
# 포맷 설정
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
fh.setFormatter(formatter)
sh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(sh)
return logger
def message_generator(self) -> Generator:
"""메시지 스트림 생성"""
while self.is_running:
if not self.message_queue.empty():
message = self.message_queue.get()
# 로그는 별도로 저장
self.logger.info(f"Processing message: {message}")
yield message
self.message_queue.task_done()
def add_message(self, message: dict):
"""새 메시지 추가"""
timestamp = datetime.now().isoformat()
message['timestamp'] = timestamp
self.message_queue.put(message)
self.logger.info(f"Message added to queue: {message}")
def process_messages(self):
"""메시지 처리 (새로운 스트림 사용)"""
message_stream = self.message_generator()
try:
for message in message_stream:
# 비즈니스 로직 처리
self.process_single_message(message)
except Exception as e:
self.logger.error(f"Error in message processing: {e}")
def process_single_message(self, message: dict):
"""단일 메시지 처리"""
try:
# 메시지 처리 로직
result = f"Processed: {message}"
self.logger.info(result)
return result
except Exception as e:
self.logger.error(f"Failed to process message {message}: {e}")
raise
def start_processing(self):
"""백그라운드에서 메시지 처리 시작"""
thread = threading.Thread(target=self.process_messages)
thread.daemon = True
thread.start()
def stop(self):
"""처리 중지"""
self.is_running = False
self.logger.info("Message processing stopped")
# 사용 예시
if __name__ == "__main__":
queue = AIMessageQueue()
queue.start_processing()
# 메시지 추가
for i in range(5):
queue.add_message({"id": i, "content": f"AI Message {i}"})
# 잠시 대기 후 종료
import time
time.sleep(2)
queue.stop()
A. 스트림 (일회성)
- 데이터 처리 흐름
- 메모리 효율적 처리
- 한 번 사용 후 소멸
B. 로그 (영구적)
- 데이터 기록
- 디스크에 저장
- 지속적 참조 가능
class MessageProcessor:
def __init__(self):
self.messages = []
self.logger = logging.getLogger('message_processor')
def create_stream(self) -> Generator:
"""새로운 스트림 생성"""
for msg in self.messages:
self.logger.info(f"Streaming message: {msg}")
yield msg
def process_batch(self):
"""배치 처리"""
stream = self.create_stream() # 새 스트림
for msg in stream:
self.process_message(msg)
class LogAnalyzer:
def __init__(self, log_file):
self.log_file = log_file
def analyze_logs(self):
"""로그 분석"""
with open(self.log_file, 'r') as f:
for line in f: # 파일 스트림
yield line.strip()
def get_message_history(self):
"""로그에서 메시지 이력 조회"""
return list(self.analyze_logs())
요약:
1. 스트림은 데이터 처리를 위한 일회성 흐름
2. 로그는 데이터 기록을 위한 영구 저장소
3. 메시지 큐 처리할 때마다 새로운 스트림 생성 필요
4. 로그는 별도로 저장되어 언제든 조회/분석 가능
5. 스트림과 로그는 다른 목적으로 사용됨
이런 구조를 통해:
A. ResponseBody
- REST API 여부 결정
- Context-Type 영향
- Return Value 처리 방식 차이
B. 보안 처리
- Spring Security vs Custom Interceptor
- 팀별 관습 차이
- 응집도 고려
A. Message Converter 상세
- 최상위 추상체
- Supporter
- 메시지 파싱/직렬화 과정
B. 스트림 심화
- 병렬 스트림 처리
- 라운드로빈 방식의 메시지 큐
- 스트림 데이터 라이닝
C. Wrapper
- 래퍼 클래스의 개념과 용도
- 일반 스트림과의 관계
A. 동작 방식
- ObjectMapper 사용
- 생성자 방식이 아닌 역직렬화 방식
- Setter 불필요
B. 처리 과정
1. JSON 문자열 수신
2. ObjectMapper로 역직렬화
3. 객체 생성 및 값 매핑
A. 생명주기 관리
- 객체 생성/소멸 자동화
- 의존성 자동 주입
- 스코프 관리
B. Factory vs Strategy vs Bean
- Factory: 객체 생성 패턴
- Strategy: 알고리즘 교체 패턴
- Bean: Spring 컨테이너 관리 객체
A. 의존성 주입
- Depends vs @Autowired
- 타입 힌팅과 Annotated
- 계층적 의존성 관리
B. 패턴 적용
- 둘 다 다양한 패턴 적용 가능
- 프레임워크별 특성 고려
- 구현 방식의 차이