
SafetyFence는 피보호자의 실시간 위치를 보호자에게 전달하는 서비스다. 위치 데이터의 흐름을 먼저 파악하자
피보호자 앱 →→ [WebSocket] →→ 서버 →→ [WebSocket] →→ 보호자 앱
│
└→→ [Async 스레드] →→ DB 저장
서버 내부에서는 processLocationUpdate() 하나의 메서드에서 세 가지 작업이 일어난다.
// LocationWebSocketController.java — 현재 코드
private void processLocationUpdate(String userNumber, LocationUpdateDto location) {
location.setUserNumber(userNumber);
location.setTimestamp(System.currentTimeMillis());
// 1. 캐시에 최신 위치 저장 (동기)
cacheService.updateLocation(userNumber, location);
// 2. 보호자에게 WebSocket 전송 (동기)
messagingTemplate.convertAndSend("/topic/location/" + userNumber, location);
// 3. 조건부 DB 저장 (비동기)
locationService.saveLocationIfNeeded(location);
}
1번과 2번은 WebSocket 스레드에서 동기적으로 처리되고, 3번은 @Async로 별도의 스레드에서 비동기 처리된다. 여기서 두 가지 문제가 동시에 발생한다.
LocationUpdateDto의 정의를 보자.
// LocationUpdateDto.java — 현재 코드
@Data // getter, setter 모두 생성 → mutable 객체
@NoArgsConstructor
@AllArgsConstructor
public class LocationUpdateDto {
private String userNumber;
private Double latitude;
private Double longitude;
private Long timestamp;
}
@Data는 setter를 생성하므로 가변(mutable) 객체다. 그런데 이 객체가 두 개의 스레드에서 동시에 사용된다.
private void processLocationUpdate(String userNumber, LocationUpdateDto location) {
location.setUserNumber(userNumber); // WebSocket 스레드가 값을 변경
location.setTimestamp(...); // WebSocket 스레드가 값을 변경
cacheService.updateLocation(..., location); // WebSocket 스레드가 읽음
messagingTemplate.convertAndSend(..., location); // WebSocket 스레드가 읽음
locationService.saveLocationIfNeeded(location); // Async 스레드가 읽음 ← 다른 스레드!
}
[WebSocket 스레드 A] [Async 스레드 B]
location 객체 생성 │
location.setTimestamp(1000) │
캐시 저장 │
WebSocket 전송 │
saveLocationIfNeeded(location) ──────→ │
│ location.getTimestamp() → 1000 ✅
│ location.getUserNumber() 읽기
│ │
다음 요청의 location이 같은 객체라면? │
location.setTimestamp(2000) ← │
location.getTimestamp() → 2000 ❌
잘못된 값으로 DB 저장!
WebSocket 프레임워크가 요청마다 새 DTO를 만들어주기 때문에 실제로 이 문제가 발생할 확률은 낮지만, 구조적으로 안전하지 않다. 가변 객체를 스레드 간에 공유하는 것 자체가 위험한 패턴이다.
// 개선된 LocationUpdateDto — 불변 객체
public class LocationUpdateDto {
private final String userNumber;
private final Double latitude;
private final Double longitude;
private final Long timestamp;
public LocationUpdateDto(String userNumber, Double latitude,
Double longitude, Long timestamp) {
this.userNumber = userNumber;
this.latitude = latitude;
this.longitude = longitude;
this.timestamp = timestamp;
}
// getter만 존재, setter 없음
public String getUserNumber() { return userNumber; }
public Double getLatitude() { return latitude; }
public Double getLongitude() { return longitude; }
public Long getTimestamp() { return timestamp; }
}
// Controller에서 새 객체를 생성해서 넘김
private void processLocationUpdate(String userNumber, LocationUpdateDto rawLocation) {
// 불변 객체를 새로 생성
LocationUpdateDto location = new LocationUpdateDto(
userNumber,
rawLocation.getLatitude(),
rawLocation.getLongitude(),
System.currentTimeMillis()
);
cacheService.updateLocation(userNumber, location);
messagingTemplate.convertAndSend("/topic/location/" + userNumber, location);
locationService.saveLocationIfNeeded(location);
// Async 스레드가 이 객체를 읽어도 안전 — 값이 바뀔 수 없으니까
}
final 필드 + setter 제거로 어떤 스레드에서 읽어도 항상 같은 값을 보장한다. 멀티스레드 환경에서 공유 객체의 가장 안전한 해결책은 애초에 변경할 수 없게 만드는 것이다.
생산자-소비자 문제는 여러 스레드가 공유 버퍼를 통해 데이터를 주고받을 때 발생하는 동기화 문제다. 한정 버퍼 문제(bounded-buffer problem)라고도 한다. 핵심은 세 가지 상황의 처리다.
1. 버퍼가 가득 찼을 때 → 생산자가 기다려야 함 (블로킹)
2. 버퍼가 비었을 때 → 소비자가 기다려야 함 (블로킹)
3. 동시에 버퍼 접근 → 데이터 깨짐 (동기화 필요)
생산자: 피보호자 앱 N명 → 초당 N건 위치 전송
버퍼: 서버의 처리 대기열
소비자: @Async 스레드 → DB 저장 (건당 ~100ms)
생산자(피보호자)가 위치를 보내는 속도와 소비자(DB 저장)가 처리하는 속도가 다르다. 이 차이를 버퍼(큐)로 흡수하지 않으면, 생산자가 너무 빠를 때 데이터가 유실되거나 시스템이 터진다.
Java의 BlockingQueue는 생산자-소비자 문제를 위해 설계된 자료구조다. 버퍼가 가득 차면 생산자가 블로킹되고, 비면 소비자가 블로킹되는 동기화를 내부적으로 처리해준다.
SafetyFence의 위치 저장을 BlockingQueue로 직접 구현하면 이렇게 된다.
@Component
public class LocationQueue {
// 한정 버퍼 (bounded buffer) — 최대 1,000개
private final BlockingQueue<LocationUpdateDto> queue =
new LinkedBlockingQueue<>(1000);
// 생산자: WebSocket 스레드가 호출
public void produce(LocationUpdateDto location) throws InterruptedException {
queue.put(location);
// 큐가 가득 차면 → 이 스레드는 여기서 멈춤 (블로킹)
// 소비자가 하나를 꺼내서 공간이 생기면 → 다시 진행
}
// 소비자: 별도 스레드가 호출
public LocationUpdateDto consume() throws InterruptedException {
return queue.take();
// 큐가 비어있으면 → 이 스레드는 여기서 멈춤 (블로킹)
// 생산자가 하나를 넣으면 → 다시 진행
}
}
소비자 스레드를 따로 만들어서 큐에서 꺼내 처리한다.
@Component
public class LocationConsumer {
private final LocationQueue locationQueue;
private final UserRepository userRepository;
private final UserLocationRepository userLocationRepository;
@PostConstruct
public void startConsumer() {
// 소비자 스레드 4개 가동
for (int i = 0; i < 4; i++) {
new Thread(() -> {
while (true) {
try {
LocationUpdateDto location = locationQueue.consume(); // 큐가 비면 대기
saveToDatabase(location);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "location-consumer-" + i).start();
}
}
private void saveToDatabase(LocationUpdateDto location) {
// DB 저장 로직
}
}
이것이 고전적 생산자-소비자 패턴의 정석 구현이다. 하지만 실무에서 이렇게 직접 구현하면 스레드 생명주기 관리, 예외 처리, 종료 처리 등을 모두 직접 해야 한다.
ThreadPoolTaskExecutor의 내부 구조를 보면:
ThreadPoolTaskExecutor 내부:
├─ BlockingQueue<Runnable> ← 한정 버퍼 (queueCapacity로 크기 설정)
├─ Worker 스레드 풀 ← 소비자 (corePoolSize ~ maxPoolSize)
└─ RejectedExecutionHandler ← 버퍼 초과 시 정책
직접 BlockingQueue + Consumer 스레드를 만들 필요 없이, ThreadPoolTaskExecutor 설정만으로 생산자-소비자 패턴이 완성된다. @Async 메서드를 호출하면 태스크가 내부 큐에 들어가고, 스레드 풀의 워커가 꺼내서 처리한다.
// AsyncConfig.java — 현재 코드
@Configuration
@EnableAsync
public class AsyncConfig {
// 비어 있음
}
Executor Bean이 없으면 Spring은 SimpleAsyncTaskExecutor를 사용한다. 이것은 스레드 풀이 아니라 매 호출마다 새 스레드를 생성하는 Executor다. 큐(버퍼)도 없다.
생산자-소비자 관점에서 보면:
생산자: WebSocket 스레드 (초당 1,000건 생산)
버퍼: 없음 (BlockingQueue가 없는 상태)
소비자: 매번 새 스레드 생성 (제한 없음)
초당 1,000건 → 스레드 1,000개 생성
1분이면 → 60,000개 스레드
스레드 1개 = ~1MB 메모리
60,000 × 1MB → OOM 발생!
위에서 BlockingQueue로 직접 구현한 것처럼, 한정 버퍼가 있어야 생산 속도를 제어할 수 있다. 지금은 버퍼 없이 생산자가 보내는 대로 무한히 스레드를 만들고 있는 상태다.
// 개선된 AsyncConfig.java
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8); // 평소 유지 스레드 (소비자 8명)
executor.setMaxPoolSize(20); // 최대 스레드 (소비자 최대 20명)
executor.setQueueCapacity(500); // 대기 큐 (버퍼 크기 500)
executor.setThreadNamePrefix("location-async-");
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.initialize();
return executor;
}
}
이제 생산자-소비자 구조가 만들어진다.
생산자: WebSocket 스레드 (초당 1,000건 생산)
│
▼
버퍼: 큐 (최대 500개 대기) ← ThreadPoolTaskExecutor의 큐
│
▼
소비자: 스레드 풀 (8~20개 스레드가 큐에서 꺼내서 처리)
초당 1,000건 들어오면:
→ 소비자 20개 스레드가 처리 (초당 ~200건)
→ 나머지 800건은 큐에 대기
→ 큐 500개 초과하면?
→ CallerRunsPolicy: WebSocket 스레드가 직접 처리 (요청 유실 방지)
큐가 가득 찼을 때의 거부 정책은 여러 가지가 있다.
| 정책 | 동작 | SafetyFence에 적합한가? |
|---|---|---|
AbortPolicy (기본) | RejectedExecutionException 던짐 | ❌ 위치 데이터 유실 |
DiscardPolicy | 조용히 버림 | ❌ 위치 데이터 유실 |
DiscardOldestPolicy | 가장 오래된 태스크 버림 | ❌ 과거 위치 유실 |
CallerRunsPolicy | 호출한 스레드에서 직접 실행 | ✅ 데이터 유실 없음 |
CallerRunsPolicy는 큐가 가득 차면 WebSocket 스레드가 직접 DB 저장을 실행한다. WebSocket 응답이 잠깐 느려지지만, 위치 데이터가 유실되지 않는다. 해당 서비스에서는 데이터 유실보다 약간의 지연이 낫다.
또한 이 정책은 소비자가 처리할 수 없을 만큼 빠르게 생산되면, 생산자 자신이 처리를 떠안게 되면서 생산 속도가 자동으로 줄어든다.
// NotificationService.java — 현재 코드
public void sendNotificationToSupporters(User elderUser, String title, String body) {
List<Link> links = linkRepository.findByUserNumber(elderUser.getNumber());
for (Link link : links) {
User supporter = link.getUser();
sendNotificationToUser(supporter.getNumber(), title, body, elderUser.getNumber());
// ↑ FCM 전송: 네트워크 I/O, 건당 ~100ms
// 보호자 5명이면 500ms 순차 블로킹
}
}
보호자(소비자)에게 알림을 보내는 것도 일종의 소비자 작업인데, 하나의 스레드에서 순차적으로 처리하고 있다.
보호자 1명: 100ms
보호자 5명: 500ms (순차)
보호자 10명: 1,000ms (순차)
// 개선된 NotificationService.java
public void sendNotificationToSupporters(User elderUser, String title, String body) {
List<Link> links = linkRepository.findByUserNumber(elderUser.getNumber());
if (links.isEmpty()) {
log.info("보호자가 없어 알림 전송 생략: 어르신={}", elderUser.getNumber());
return;
}
// 각 보호자에게 병렬로 알림 전송
List<CompletableFuture<Void>> futures = links.stream()
.map(link -> CompletableFuture.runAsync(() -> {
User supporter = link.getUser();
sendNotificationToUser(supporter.getNumber(), title, body, elderUser.getNumber());
}))
.toList();
// 모든 전송이 완료될 때까지 대기
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.exceptionally(ex -> {
log.error("알림 전송 중 일부 실패: 어르신={}, error={}",
elderUser.getNumber(), ex.getMessage());
return null;
})
.join();
}
보호자 수에 관계없이 가장 느린 하나의 전송 시간 만큼만 걸린다.
// 개선된 긴급 알림 전송
private void sendEmergencyNotificationToSupporters(User elderUser, String title, String body) {
List<Link> links = linkRepository.findByUserNumber(elderUser.getNumber());
if (links.isEmpty()) {
log.info("보호자가 없어 긴급 알림 전송 생략: 어르신={}", elderUser.getNumber());
return;
}
// 긴급 알림은 모든 보호자의 모든 기기에 병렬 전송
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (Link link : links) {
User supporter = link.getUser();
List<DeviceToken> tokens = deviceTokenRepository.findByUser(supporter);
for (DeviceToken deviceToken : tokens) {
futures.add(CompletableFuture.runAsync(() ->
sendEmergencyFCM(deviceToken.getToken(), title, body, elderUser.getNumber())
));
}
}
// 긴급 알림은 모든 전송 완료를 보장
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.exceptionally(ex -> {
log.error("긴급 알림 일부 전송 실패: 어르신={}", elderUser.getNumber(), ex);
return null;
})
.join();
log.info("긴급 알림 전송 완료: 어르신={}, 전송 수={}", elderUser.getNumber(), futures.size());
}
긴급 알림은 보호자 N명 × 기기 M개로 전송 대상이 많아지므로 병렬화의 효과가 더 크다.
피보호자 앱 → [WebSocket 스레드]
├─ 캐시 저장
├─ WebSocket → 보호자 앱
└─ @Async DB 저장 ← 스레드 풀 없음, 무한 스레드 생성
└─ 가변 객체 공유 (스레드 안전 X)
알림 전송 → 보호자 5명 순차 처리 (500ms)
피보호자 앱 → [WebSocket 스레드]
├─ 불변 DTO 생성 (스레드 안전)
├─ 캐시 저장
├─ WebSocket → 보호자 앱
└─ @Async DB 저장
└─ ThreadPoolTaskExecutor (스레드 풀 + 큐 버퍼)
├─ 코어 8 / 최대 20 스레드
├─ 큐 500개 버퍼
└─ CallerRunsPolicy (데이터 유실 방지)
알림 전송 → CompletableFuture.allOf() 병렬 처리 (~100ms)
| 항목 | Before | After |
|---|---|---|
| DTO 스레드 안전성 | ❌ 가변 객체 공유 | ✅ 불변 객체 |
| 동시 접속 1,000명 | OOM (스레드 무한 생성) | 안정 (스레드 풀 20개 + 큐 500) |
| 알림 전송 (보호자 5명) | ~500ms (순차) | ~100ms (병렬) |
| 큐 초과 시 | 대응 없음 | CallerRunsPolicy (유실 방지) |
| 긴급 알림 (5명 × 2기기) | ~1,000ms (순차) | ~100ms (병렬) |