꼬리를 놓을 줄 모르는 직렬화/역직렬화 트러블슈팅 - 1

김형준·2025년 2월 19일
0
post-thumbnail

문제가 되는 코드

@Component
@Qualifier("redisQueue")
@RequiredArgsConstructor
public class RequestRedisQueueImpl implements RequestQueue {
    private static final String QUEUE_KEY = "reservation:request:queue";

    private final RedisTemplate<String, ReservationReqDto<?>> redisTemplate;
    private final ReservationService service;

    @Value("${queue-capacity:25}")
    private int capacity;

    @Override
    public boolean enqueue(ReservationReqDto<?> request) {
        double currentTime = System.currentTimeMillis();
        Boolean result = redisTemplate.opsForZSet().add(QUEUE_KEY, request, currentTime);
        return result != null && result;
    }

    @Override
    @Scheduled(fixedDelay = 1000)
    public void processQueue() {
        List<ReservationReqDto<?>> requests = redisTemplate.execute(new SessionCallback<List<ReservationReqDto<?>>>() {
            @Override
            public List<ReservationReqDto<?>> execute(RedisOperations operations) throws DataAccessException {
                operations.watch(QUEUE_KEY);

                // 가장 오래된 요청들을 queue size만큼 가져옴
                Set<ReservationReqDto<?>> requestSet = operations.opsForZSet().range(
                        QUEUE_KEY, 0, capacity - 1);

                if (requestSet == null || requestSet.isEmpty()) {
                    operations.unwatch();
                    return Collections.emptyList();
                }

                operations.multi(); // 트랜잭션 시작

                for (ReservationReqDto<?> req : requestSet) {
                    operations.opsForZSet().remove(QUEUE_KEY, req);
                }

                List<Object> execResults = operations.exec();

                if (execResults.isEmpty()) {
                    // 트랜잭션 실패 시 재시도 로직 추가 가능
                    return Collections.emptyList();
                } else {
                    return new ArrayList<>(requestSet);
                }
            }
        });

        for (ReservationReqDto<?> req : requests) {
            req.process(service);
        }
    }
}

processQueue() 수행 시 아래와 같은 에러 발생

Jackson이 cancelled 필드에 대해 역직렬화를 실패했다는 내용인데, 기억 상으로는 역직렬화 대상인 ReservationReqDto의 구현체들 중에서는 해당 필드를 사용하는 곳이 없었다.

public record ReservationCreateReqDto (
        Long restaurantId,
        Long memberId,
        ReservationPostReqDto reservationPostReqDto
) implements ReservationReqDto<ReservationCreateResDto> {

    @Override
    public void process(ReservationService service) {
        service.createReservation(restaurantId, memberId, reservationPostReqDto);
    }
}

public record ReservationGetByMemberReqDto(
        CompletableFuture<List<ReservationGetResDto>> future
) implements ReservationReqDto<List<ReservationGetResDto>> {
    @Override
    public void process(ReservationService service) {
        try {
            future.complete(service.getReservationByMember());
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }
}

public record ReservationGetByRestaurantReqDto(
        Long restaurantId,
        CompletableFuture<List<ReservationGetResDto>> future
) implements ReservationReqDto<List<ReservationGetResDto>> {

    @Override
    public void process(ReservationService service) {
        try {
            future.complete(service.getReservationByRestaurant(restaurantId));
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }
}

public record ReservationDeleteReqDto(
        Long reservationId
) implements ReservationReqDto<Boolean> {

    @Override
    public void process(ReservationService service) {
        service.deleteReservation(reservationId);
    }
}

이상하다고 생각해서 다시 확인해보니 CompletableFuture 클래스가 내부 필드로 cancelled를 갖고 있었다.

의문점

그렇다면 Jackson은 왜 CompletableFuture 역직렬화에 실패했을까?

CompletableFuture 코드에는 내부적으로 cancelled 필드를 보유하고 있지만, 정작 이에 대한 getter/setter는 제공되고 있지 않다.

따라서 Jackson이 해당 필드에 접근해 직렬화/역직렬화가 불가능해지는 것이고, 이를 알 수 없는 필드로 인식해 에러를 발생시키는 것이다.

해결 과정

내가 구현한 Redis Queue 클래스 상에서는 굳이 CompletableFuture를 직렬화할 필요가 없었다.

따라서 이를 직렬화 대상에서 제외시키는 게 맞다고 판단했고, 아래와 같이
com.fasterxml.jackson.annotation.JsonIgnore 를 통해 직렬화가 적용되지 않게 했다.

public record ReservationGetByMemberReqDto(
        @JsonIgnore CompletableFuture<List<ReservationGetResDto>> future
) implements ReservationReqDto<List<ReservationGetResDto>> {
    @Override
    public void process(ReservationService service) {
        try {
            future.complete(service.getReservationByMember());
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }
}

public record ReservationGetByRestaurantReqDto(
        Long restaurantId,
        @JsonIgnore CompletableFuture<List<ReservationGetResDto>> future
) implements ReservationReqDto<List<ReservationGetResDto>> {

    @Override
    public void process(ReservationService service) {
        try {
            future.complete(service.getReservationByRestaurant(restaurantId));
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }
}

하지만 요청 DTO 객체들을 Redis 큐에 쌓아두고 이를 비동기적으로 처리하는 구조에서 CompletableFuture를 큐에 저장되지 않게 수정한 결과 아래와 같이 DTO 클래스의 CompletableFuture에 대해 NullPointerException이 발생했다.

하지만 요청을 큐로 관리하는 이상 요청 전송, 응답 반환받는 작업들 간 비동기 처리가 필요하다고 생각했다.

따라서 DTO 클래스에서 CompletableFuture 필드를 제거하면서도 기존 코드 흐름이 이어질 수 있도록 다소 복잡한 방법으로 코드를 수정하였다.

@Component
public class RequestFutureStore {
    // Reservation 외의 다른 곳에서도 사용할 경우 제네릭 타입 수정할 것
    private final ConcurrentHashMap<
            String, CompletableFuture<List<ReservationGetResDto>>> futureMap = new ConcurrentHashMap<>();

    public void registerFuture(String requestId, CompletableFuture<List<ReservationGetResDto>> future) {
        futureMap.put(requestId, future);
    }

    public CompletableFuture<List<ReservationGetResDto>> getFuture(String requestId) {
        return futureMap.get(requestId);
    }

    public CompletableFuture<List<ReservationGetResDto>> removeFuture(String requestId) {
        return futureMap.remove(requestId);
    }
}

위와 같이 별도의 자료구조에서 CompletableFuture를 따로 큐잉하도록 구현했다.

하지만 테스트 수행 시 아래와 같이 RequestFutureQueue에서 CompletableFuture 조회를 실패하였다.

원인이 될 수 있는 사항들에 대해 테스트해 보았다.

  • RequestFutureStore의 오작동 가능성 아래와 같이 테스트해 본 결과 잘 작동하였다.
    @Test
        public void getRequestTest() {
            // given
            String uuid = UUID.randomUUID().toString();
            CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();
    
            // when
            requestFutureStore.registerFuture(uuid, future);
    
            // then
            assertEquals(future, requestFutureStore.getFuture(uuid));
        }
  • 비동기로 인한 Future 저장 & process 메서드 실행 타이밍 불일치 스레드 수와 큐 길이를 같게 만들어도 여전히 실패하였다. 따라서 해당 사항이 테스트 실패의 원인이라고 생각해볼 수 있었다.

요청의 등록과 처리가 비동기적으로 동작하는 구조에서 요청이 큐에 제대로 등록되기 전에 요청을 가져다 사용하려고 한 것 같았다.

순서를 정리해보면 아래와 같다.

  1. controller에서는 요청이 들어오면 RequestFutureStore에 저장할 requestId 값, CompletableFuture을 생성함
    1-1. CompletableFuture는 직렬화 불가능하기에 별도로 저장할 수 있는 RequestFutureStore를 구현함
  2. 별도로 생성한 요청 DTO를 요청 큐에 enqueue 함
  3. controller에서는 future.get()으로 큐에서 요청이 처리되기를 대기 중
  4. enqueue() 실행 시 구현체별로 큐에 요청을 저장
  5. @Scheduled를 적용해 초 단위마다 아래의 과정이 수행됨
    5-1. 큐 전체에서 요청들을 추출
    5-2. 추출한 요청들의 process() 실행
    5-3. process() 내부에서는 전달받은 RequestFutureStore 내부에서 requestId로 CompletableFuture 조회
    5-4. 전달받은 서비스 객체를 통해 서비스 로직을 실행시키고 결과를 조회한 CompletableFuture로 전달
  6. 이후 controller는 CompletableFuture를 통해 요청 처리 결과를 반환

우선 큐에 요청이 없는 경우 에러를 던지도록 변경했다.

그 결과, 아래와 같이 거의 모든 경우에 에러가 발생했다.

이 와중에 하필 10개의 출력에서만 “큐에 저장된 요청이 없습니다”와 같은 출력이 발생하였다.

// 테스트 수정한 부분
        
        for (int i = 0; i < threadCount; i++) {
            executorService.submit(() -> {
                String requestId = UUID.randomUUID().toString();
                CompletableFuture<List<ReservationGetResDto>> future = new CompletableFuture<>();

                try {
                    requestFutureStore.registerFuture(requestId, future);
                    System.out.println(requestFutureStore.getFuture(requestId).toString());

                    if (!redisQueue.enqueue(new ReservationGetByRestaurantReqDtoImpl(restaurantId, requestId))) {
                        throw CustomException.of(ErrorCode.TOO_MANY_REQUESTS);
                    }

                    redisQueue.processQueue();

                    List<ReservationGetResDto> foundReservations = future.get();
                    System.out.println("current future:" + future +
                        " successfully got future: " + (future == requestFutureStore.getFuture(requestId)));
                    successCnt.incrementAndGet();
                    foundReservationCount.addAndGet(foundReservations.size());
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                    future.completeExceptionally(e);
                } finally {
                    latch.countDown();
                }
            });
        }
        
// processQueue 수정한 부분

@Override
    // @Scheduled(fixedDelay = 10000)
    public synchronized void processQueue() {
        List<ReservationReqDto> requests = redisTemplate.execute(new SessionCallback<>() {
            @Override
            public List<ReservationReqDto> execute(RedisOperations operations) throws DataAccessException {
                operations.watch(QUEUE_KEY);

                // 가장 오래된 요청들을 queue size만큼 가져옴
                Set<ReservationReqDto> requestSet = operations.opsForZSet().range(
                        QUEUE_KEY, 0, capacity - 1);

                if (requestSet == null || requestSet.isEmpty()) {
                    operations.unwatch();
                    return Collections.emptyList();
                }

                operations.multi(); // 트랜잭션 시작

                for (ReservationReqDto req : requestSet) {
                    operations.opsForZSet().remove(QUEUE_KEY, req);
                }

                List<Object> execResults = operations.exec();

                if (execResults.isEmpty()) {
                    // 트랜잭션 실패 시 재시도 로직 추가 가능
                    return Collections.emptyList();
                } else {
                    return new ArrayList<>(requestSet);
                }
            }
        });

        if (requests.isEmpty()) { throw new RuntimeException("큐에 저장된 요청이 없습니다."); }

        for (ReservationReqDto req : requests) {
            req.process(service, requestFutureStore);
        }
    }

따라서 기존에 각 스레드들이 저장한 요청들을 앞에서 먼저 제거한 게 아닐까 하는 의심이 들었다.

이에 대해 의문을 풀기 위해 요청 큐를 길이만큼 한 번에 비우는 방식이 아니라, 맨 앞의 값만 꺼내와 처리하는 식으로 수정하였다.

@Override
    // @Scheduled(fixedDelay = 10000)
    public synchronized void processQueue() {
        ReservationReqDto request = redisTemplate.execute(new SessionCallback<>() {
            @Override
            public ReservationReqDto execute(RedisOperations operations) throws DataAccessException {
                operations.watch(QUEUE_KEY);

                // 가장 오래된 하나의 요청을 가져옴
                ZSetOperations.TypedTuple<ReservationReqDto> requestSet = operations.opsForZSet().popMin(QUEUE_KEY);

                if (requestSet == null) {
                    System.out.println("ReservationReqDto 조회 실패");
                    operations.unwatch();
                    return null;
                }

                return requestSet.getValue();
            }
        });

        System.out.println("조회 성공");

        request.process(service, requestFutureStore);
    }

popMin 수행 시 Error in Execution이 발생했지만, 대신 트랜잭션 내에서 수행 가능한 range(), remove()를 조합해 다시 구현하였다.

하지만 여전히 CompletableFuture 조회가 잘 되지 않아 로그를 찍어보았다.

                try {
                    requestFutureStore.registerFuture(requestId, future);
                    System.out.println(requestFutureStore.getFuture(requestId).toString());

                    if (!redisQueue.enqueue(new ReservationGetByRestaurantReqDtoImpl(restaurantId, requestId))) {
                        throw CustomException.of(ErrorCode.TOO_MANY_REQUESTS);
                    }

                    redisQueue.processQueue();

                    List<ReservationGetResDto> foundReservations = future.get();
                    System.out.println("current future:" + future +
                        " successfully got future: " + (future == requestFutureStore.getFuture(requestId)));
                    successCnt.incrementAndGet();
                    foundReservationCount.addAndGet(foundReservations.size());
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                    future.completeExceptionally(e);
                } finally {
                    latch.countDown();
                }
                
    @Override
    public void process(ReservationService service, RequestFutureStore futureStore) {
        CompletableFuture<List<ReservationGetResDto>> future = futureStore.getFuture(requestId);
        System.out.println("saved RequestId: " + requestId);
        if (future == null) { throw new RuntimeException("CompletableFuture 조회 실패"); }

        List<ReservationGetResDto> result = service.getReservationByRestaurant(restaurantId);
        try {
            future.complete(result);
        } catch (Exception e) {
            future.completeExceptionally(e);
        } finally {
            futureStore.removeFuture(requestId);
        }
    }

그 결과, 생성된 UUID 값과 실제로 저장된 UUID의 값이 다르다는 사실을 알 수 있었다.

돌고 돌아 이 또한 역직렬화 문제로, record를 역직렬화 시 기본 생성자가 없다면 일부 데이터가 손실될 수 있다.

따라서 @JsonCreator, @JsonProperty를 명확히 지정해 이러한 경우가 생기지 않도록 구현하였다.

하지만 이제는 이런 에러까지 발생했는데,

Could not read JSON: Unrecognized field "@class" (class org.example.easytable.reservation.dto.request.ReservationGetByRestaurantReqDtoImpl), not marked as ignorable (2 known properties: "restaurantId", "requestId"])
at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION disabled); line: 1, column: 164] (through reference chain: org.example.easytable.reservation.dto.request.ReservationGetByRestaurantReqDtoImpl["@class"])

@JsonIgnoreProperties를 적용해 값이 없는 필드를 무시하도록 구현해도 제대로 적용되지 않아서, 따로 테스트를 구현하였다.

@ExtendWith(SpringExtension.class)
@SpringBootTest
public class SerializationTest {
    private final RedisTemplate<String, ReservationReqDto> redisTemplate;

    @Autowired
    public SerializationTest(RedisTemplate<String, ReservationReqDto> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Test
    public void testSerialization() throws Exception {
        // given
        ObjectMapper mapper = new ObjectMapper();
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

        // when
        ReservationGetByRestaurantReqDtoImpl original = new ReservationGetByRestaurantReqDtoImpl(1L, "test-id");
        String json = mapper.writeValueAsString(original);
        System.out.println("Serialized JSON: " + json);

        ReservationGetByRestaurantReqDtoImpl deserialized = mapper.readValue(json, ReservationGetByRestaurantReqDtoImpl.class);
        System.out.println("Deserialized object: " + deserialized);

        // then
        assertEquals(original.getRequestId(), deserialized.getRequestId());
    }

    @Test
    public void testRedisSerialization() throws Exception {
        // given
        String redisKey = "test:reservation:req";

        String originalRequestId = UUID.randomUUID().toString();
        ReservationGetByRestaurantReqDtoImpl originalObject =
            new ReservationGetByRestaurantReqDtoImpl(1L, originalRequestId);

        // when
        redisTemplate.opsForValue().set(redisKey, originalObject);

        ReservationGetByRestaurantReqDtoImpl retrievedObject =
            (ReservationGetByRestaurantReqDtoImpl) redisTemplate.opsForValue().get(redisKey);

        // then
        assertNotNull(retrievedObject);
        assertEquals(originalObject.getRestaurantId(), retrievedObject.getRestaurantId());
        assertEquals(originalObject.getRequestId(), retrievedObject.getRequestId());

        System.out.println("Original Object: " + originalObject);
        System.out.println("Retrieved Object: " + retrievedObject);
    }
}

처음 테스트 시에는 문제가 발생했지만, 인터페이스 및 ReqDto 구현체의 Json 관련 어노테이션들을 제거하고 구현체를 class로 변경하니 잘 동작하였다.

@Getter
public class ReservationGetByRestaurantReqDtoImpl implements ReservationReqDto {
    private final Long restaurantId;
    private final String requestId;

    public ReservationGetByRestaurantReqDtoImpl(
            Long restaurantId,
            String requestId
    ) {
        this.restaurantId = restaurantId;
        this.requestId = requestId;
    }

    @Override
    public void process(ReservationService service, RequestFutureStore futureStore) {
        CompletableFuture<List<ReservationGetResDto>> future = futureStore.getFuture(requestId);
        System.out.println("saved RequestId: " + requestId);
        if (future == null) { throw new RuntimeException("CompletableFuture 조회 실패"); }

        List<ReservationGetResDto> result = service.getReservationByRestaurant(restaurantId);
        try {
            future.complete(result);
        } catch (Exception e) {
            future.completeExceptionally(e);
        } finally {
            futureStore.removeFuture(requestId);
        }
    }

    @Override
    public String toString() {
        return "ReservationGetByRestaurantReqDtoImpl{" +
            "restaurantId=" + restaurantId +
            ", requestId='" + requestId + '\'' +
            '}';
    }
}

//@JsonTypeInfo(
//        use = JsonTypeInfo.Id.NAME,
//        include = JsonTypeInfo.As.PROPERTY,
//        property = "type"
//)
//@JsonSubTypes({
//    @JsonSubTypes.Type(value = ReservationGetByRestaurantReqDtoImpl.class, name = "restaurant"),
//    @JsonSubTypes.Type(value = ReservationGetByMemberReqDtoImpl.class, name = "member"),
//    @JsonSubTypes.Type(value = ReservationCreateReqDtoImpl.class, name = "create"),
//    @JsonSubTypes.Type(value = ReservationDeleteReqDtoImpl.class, name = "delete")
//})
public interface ReservationReqDto {
    void process(ReservationService service, RequestFutureStore futureStore);
}

이러한 방식을 큐 테스트에도 그대로 적용해 문제를 해결할 예정이다.

0개의 댓글

관련 채용 정보