[JAVA] 한국투자증권 OpenAPI 사용 (WebSocket)

Sadie·2025년 3월 14일
0

Spring And JPA

목록 보기
11/12

[JAVA] 한국투자증권 OpenAPI 사용 (Rest)
위 링크에서 Rest 코드에 대해 확인할 수 있습니다
이번에는 실시간 웹소켓 통신을 사용하는 api에 대해 포스팅 하겠습니다

(같은 조원이신 김바보님께 감사를 표합니다)

웹소켓을 제공하는 경우 왼쪽 상단처럼 WEBSOCKET 태그가 달려있는 것을 확인할 수 있습니다



웹소켓 연결

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Value("${hantu-openapi.websocket-uri:ws://ops.koreainvestment.com:31000}")
    private String websocketUri;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 서버 측 웹소켓 핸들러 등록 - 필요한 경우에만 사용
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public WebSocketClient webSocketClient() {
        StandardWebSocketClient client = new StandardWebSocketClient();
        client.getUserProperties().put("org.apache.tomcat.websocket.IO_TIMEOUT_MS", "60000");
        return client;
    }

    // WebSocketEventService 빈 생성
    @Bean
    public WebSocketEventService webSocketEventService() {
        return new WebSocketEventService();
    }

    // KoreaInvestmentWebSocketHandler 빈 생성
    @Bean
    public KoreaInvestmentWebSocketHandler koreaInvestmentWebSocketHandler(
            StockInfoService stockInfoService,
            ObjectMapper objectMapper,
            RestTemplate restTemplate,
            WebSocketEventService webSocketEventService) {
        return new KoreaInvestmentWebSocketHandler(
                stockInfoService,
                objectMapper,
                restTemplate,
                webSocketEventService
        );
    }

    // MyWebSocketConnectionManager 빈 생성
    @Bean
    public MyWebSocketConnectionManager koreaInvestmentWebSocketConnectionManager(
            WebSocketClient webSocketClient,
            WebSocketEventService webSocketEventService,
            ApplicationContext applicationContext) {

        // KoreaInvestmentWebSocketHandler를 ApplicationContext에서 가져오는 대신
        // 클래스를 생성한 후 주입
        KoreaInvestmentWebSocketHandler handler = applicationContext.getBean(KoreaInvestmentWebSocketHandler.class);

        MyWebSocketConnectionManager connectionManager = new MyWebSocketConnectionManager(
                webSocketClient,
                handler,
                websocketUri,
                webSocketEventService
        );

        // 자동 시작 비활성화 (서비스 레이어에서 명시적으로 시작)
        connectionManager.setAutoStartup(false);
        return connectionManager;
    }

    // 초기화 로직을 위한 빈
    @Bean(initMethod = "startConnection", destroyMethod = "stopConnection")
    public ConnectionInitializer connectionInitializer(MyWebSocketConnectionManager connectionManager) {
        return new ConnectionInitializer(connectionManager);
    }

    // 내부 초기화 클래스
    public static class ConnectionInitializer {
        private final MyWebSocketConnectionManager connectionManager;

        public ConnectionInitializer(MyWebSocketConnectionManager connectionManager) {
            this.connectionManager = connectionManager;
        }

        public void startConnection() {
            connectionManager.startConnection();
        }

        public void stopConnection() {
            connectionManager.stopConnection();
        }
    }
}

웹소켓 클라이언트를 설정
웹소켓 핸들러 및 연결 관리자를 빈(Bean)으로 등록

(위 코드는 모의투자 기준으로 작성된 코드입니다)


@Slf4j
public class MyWebSocketConnectionManager extends WebSocketConnectionManager {

    private final ScheduledExecutorService scheduler;
    private ScheduledFuture<?> heartbeatTask;
    private final WebSocketEventService eventService;

    /**
     * 생성자
     */
    public MyWebSocketConnectionManager(WebSocketClient client,
                                        WebSocketHandler webSocketHandler,
                                        String uriTemplate,
                                        WebSocketEventService eventService) {
        super(client, webSocketHandler, uriTemplate);
        this.scheduler = new ScheduledThreadPoolExecutor(1);
        this.eventService = eventService;
    }

    /**
     * 연결 시작
     */
    public void startConnection() {
        log.info("Starting WebSocket connection manager");
        super.start();
        startHeartbeat();
    }

    /**
     * 연결 종료
     */
    public void stopConnection() {
        log.info("Stopping WebSocket connection manager");
        stopHeartbeat();
        super.stop();
    }

    /**
     * 하트비트 시작
     */
    private void startHeartbeat() {
        stopHeartbeat();  // 기존 하트비트 중지

        heartbeatTask = scheduler.scheduleAtFixedRate(() -> {
            WebSocketSession session = eventService.getSession();
            if (session != null && session.isOpen()) {
                // 하트비트 이벤트 발생 - 구현은 나중에 추가
                log.debug("Heartbeat check - session is active");
            } else if (eventService.isConnectionActive()) {
                // 세션이 없거나 닫혀있는데 활성 상태라고 생각하는 경우
                log.warn("Heartbeat detected inactive session while connection marked as active. Reconnecting...");
                reconnect();
            }
        }, 30, 30, TimeUnit.SECONDS);

        log.info("WebSocket heartbeat started");
    }

    /**
     * 하트비트 중지
     */
    private void stopHeartbeat() {
        if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
            heartbeatTask.cancel(false);
            log.info("WebSocket heartbeat stopped");
        }
    }

    /**
     * 재연결 시도
     */
    private void reconnect() {
        log.info("Attempting to reconnect WebSocket");
        stopConnection();
        try {
            Thread.sleep(1000);  // 약간의 지연 후 재연결
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        startConnection();
    }
}

확장된 웹소켓 연결 관리자 클래스
Spring의 WebSocketConnectionManager를 확장하여 하트비트, 세션 관리 등 추가 기능 구현


@Service
@Slf4j
public class WebSocketEventService {

    @Getter
    private WebSocketSession session;
    private boolean connectionActive = false;

    /**
     * 세션 설정
     */
    public void setSession(WebSocketSession session) {
        this.session = session;
        this.connectionActive = (session != null && session.isOpen());
        log.info("WebSocket session set: {}, active: {}",
                session != null ? session.getId() : "null",
                connectionActive);
    }

    /**
     * 연결 상태 설정
     */
    public void setConnectionActive(boolean active) {
        this.connectionActive = active;
        log.info("WebSocket connection active state changed to: {}", active);
    }

    /**
     * 연결 상태 반환
     */
    public boolean isConnectionActive() {
        return connectionActive;
    }
}

WebSocket 이벤트를 처리하는 서비스
핸들러와 연결 관리자 사이의 중재자 역할을 수행


동작방식 정리

  1. 애플리케이션이 실행되면 ConnectionInitializer가 startConnection()을 호출하여 웹소켓 연결을 시작
  2. MyWebSocketConnectionManager가 웹소켓 연결을 관리하며, 30초마다 하트비트를 체크
  3. 만약 웹소켓 세션이 닫히면 reconnect()가 호출되어 자동으로 재연결을 수행
  4. 애플리케이션 종료 시 stopConnection()을 호출하여 안전하게 연결을 종료


WebSocket 코드

국내주식 실시간체결가 (KRX) [실시간-003]을 예시로 실시간으로 정보를 받아오고 출력하는 코드를 작성해보겠습니다
(모의투자를 기준으로 코드가 작성되었습니다)


model

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StockPriceData {
    private String stockCode;            // 종목코드
    private String tradeTime;            // 체결 시간
    private int currentPrice;            // 현재가
    private String dayOverDaySign;       // 전일 대비 부호
    private int dayOverDayChange;        // 전일 대비
    private double dayOverDayRate;       // 전일 대비율
    private double weightedAveragePrice; // 가중 평균 가격
    private int openPrice;               // 시가
    private int highPrice;               // 고가
    private int lowPrice;                // 저가
    private int askPrice1;               // 매도호가1
    private int bidPrice1;               // 매수호가1
    private long tradeVolume;            // 체결 거래량
    private long accumulatedVolume;      // 누적 거래량
    private long accumulatedTradeAmount; // 누적 거래 대금
    private int sellTradeCount;          // 매도 체결 건수
    private int buyTradeCount;           // 매수 체결 건수
    private int netBuyTradeCount;        // 순매수 체결 건수
    private double tradeStrength;        // 체결강도
    private long totalSellQuantity;      // 총 매도 수량
    private long totalBuyQuantity;       // 총 매수 수량
    private String tradeType;            // 체결구분
    private double buyRatio;             // 매수비율
    private double volumeChangeRate;     // 전일 거래량 대비 등락율

    // 시가 관련 정보
    private String openPriceTime;        // 시가 시간
    private String openPriceChangeSign;  // 시가대비구분
    private int openPriceChange;         // 시가대비

    // 고가 관련 정보
    private String highPriceTime;        // 고가 시간
    private String highPriceChangeSign;  // 고가대비구분
    private int highPriceChange;         // 고가대비

    // 저가 관련 정보
    private String lowPriceTime;         // 저가 시간
    private String lowPriceChangeSign;   // 저가대비구분
    private int lowPriceChange;          // 저가대비

    private String businessDate;         // 영업 일자
    private String marketOperationCode;  // 신 장운영 구분 코드
    private String suspendedFlag;        // 거래정지 여부

    private long askQuantity1;           // 매도호가 잔량1
    private long bidQuantity1;           // 매수호가 잔량1
    private long totalAskQuantity;       // 총 매도호가 잔량
    private long totalBidQuantity;       // 총 매수호가 잔량

    private double volumeTurnoverRate;   // 거래량 회전율
    private long prevDaySameTimeVolume;  // 전일 동시간 누적 거래량
    private double prevDaySameTimeVolumeRate; // 전일 동시간 누적 거래량 비율

    private String timeClassCode;        // 시간 구분 코드
    private String randomCloseCode;      // 임의종료구분코드
    private int staticVIBasePrice;       // 정적VI발동기준가

    /**
     * 체결가 데이터를 이쁘게 출력하는 메서드
     */
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("\n╔══════════════════════════════════════╗\n");
        sb.append("║       주식 체결 정보: ").append(String.format("%-10s", stockCode)).append("   ║\n");
        sb.append("╠══════════════════════════════════════╣\n");

        // 기본 가격 정보
        sb.append("║ ").append(formatTime(tradeTime)).append(" | ");
        sb.append(formatMarketStatus(timeClassCode)).append(" ").append(suspendedFlag.equals("Y") ? "[거래정지]" : "").append(String.format("%15s", "")).append(" ║\n");

        sb.append("╠══════════════╦═══════════════════════╣\n");
        sb.append("║ 현재가       ║ ").append(formatPrice(currentPrice)).append(String.format("%16s", "")).append(" ║\n");

        String changeStr = formatPriceChangeWithSign(dayOverDaySign, dayOverDayChange, dayOverDayRate);
        sb.append("║ 전일대비     ║ ").append(changeStr).append(String.format("%"+(26-changeStr.length())+"s", "")).append(" ║\n");

        sb.append("╠══════════════╬═══════════════════════╣\n");
        sb.append("║ 시가         ║ ").append(formatPrice(openPrice)).append(" (").append(formatTime(openPriceTime)).append(")").append(String.format("%3s", "")).append(" ║\n");
        sb.append("║ 고가         ║ ").append(formatPrice(highPrice)).append(" (").append(formatTime(highPriceTime)).append(")").append(String.format("%3s", "")).append(" ║\n");
        sb.append("║ 저가         ║ ").append(formatPrice(lowPrice)).append(" (").append(formatTime(lowPriceTime)).append(")").append(String.format("%3s", "")).append(" ║\n");

        sb.append("╠══════════════╬═══════════════════════╣\n");
        sb.append("║ 거래량       ║ ").append(formatVolume(accumulatedVolume));
        double volRateStr = prevDaySameTimeVolumeRate;
        if (volRateStr != 0) {
            sb.append(" (").append(String.format("%+.2f%%", volRateStr)).append(")");
        }
        sb.append(String.format("%7s", "")).append(" ║\n");

        sb.append("║ 거래대금     ║ ").append(formatAmount(accumulatedTradeAmount)).append(String.format("%7s", "")).append(" ║\n");

        sb.append("╠══════════════╬═══════════════════════╣\n");
        sb.append("║ 체결강도     ║ ").append(String.format("%.2f%%", tradeStrength)).append(String.format("%16s", "")).append(" ║\n");
        sb.append("║ 매수비율     ║ ").append(String.format("%.2f%%", buyRatio)).append(String.format("%16s", "")).append(" ║\n");

        sb.append("╠══════════════╩═══════════════════════╣\n");
        sb.append("║ 매도호가1: ").append(formatPrice(askPrice1)).append(" (").append(formatVolume(askQuantity1)).append(")").append(String.format("%5s", "")).append(" ║\n");
        sb.append("║ 매수호가1: ").append(formatPrice(bidPrice1)).append(" (").append(formatVolume(bidQuantity1)).append(")").append(String.format("%5s", "")).append(" ║\n");

        sb.append("╠══════════════════════════════════════╣\n");
        sb.append("║ 총매도잔량: ").append(formatVolume(totalAskQuantity)).append(String.format("%14s", "")).append(" ║\n");
        sb.append("║ 총매수잔량: ").append(formatVolume(totalBidQuantity)).append(String.format("%14s", "")).append(" ║\n");

        sb.append("╚══════════════════════════════════════╝\n");

        return sb.toString();
    }

    /**
     * 시간 포맷팅
     */
    private String formatTime(String timeStr) {
        if (timeStr == null || timeStr.length() != 6) {
            return "N/A";
        }

        return timeStr.substring(0, 2) + ":" + timeStr.substring(2, 4) + ":" + timeStr.substring(4, 6);
    }

    /**
     * 가격 포맷팅
     */
    private String formatPrice(int price) {
        return String.format("%,d", price);
    }

    /**
     * 가격 변화 포맷팅
     */
    private String formatPriceChangeWithSign(String sign, int change, double rate) {
        String signChar;
        String color;

        switch(sign) {
            case "1": signChar = "↑"; color = "상한가"; break;
            case "2": signChar = "▲"; color = "상승"; break;
            case "3": signChar = "-"; color = "보합"; break;
            case "4": signChar = "↓"; color = "하한가"; break;
            case "5": signChar = "▼"; color = "하락"; break;
            default: signChar = "?"; color = "알수없음";
        }

        if ("3".equals(sign)) {
            return String.format("%s %s", signChar, color);
        } else {
            return String.format("%s %s %,d (%+.2f%%)", signChar, color, change, rate);
        }
    }

    /**
     * 거래량 포맷팅
     */
    private String formatVolume(long volume) {
        if (volume >= 1_000_000_000) {
            return String.format("%.2f억주", volume / 100_000_000.0);
        } else if (volume >= 10_000) {
            return String.format("%.2f만주", volume / 10_000.0);
        } else {
            return String.format("%,d주", volume);
        }
    }

    /**
     * 거래대금 포맷팅
     */
    private String formatAmount(long amount) {
        if (amount >= 100_000_000_000L) {
            return String.format("%.2f조원", amount / 100_000_000_000.0);
        } else if (amount >= 100_000_000) {
            return String.format("%.2f억원", amount / 100_000_000.0);
        } else if (amount >= 10_000) {
            return String.format("%.2f만원", amount / 10_000.0);
        } else {
            return String.format("%,d원", amount);
        }
    }

    /**
     * 장 상태 포맷팅
     */
    private String formatMarketStatus(String timeClassCode) {
        switch (timeClassCode) {
            case "0": return "[장중]";
            case "A": return "[장후예상]";
            case "B": return "[장전예상]";
            case "C": return "[예상/VI]";
            case "D": return "[시간외예상]";
            default: return "[알수없음]";
        }
    }
}

저장되는 데이터 형식


util

public class StockDataParserUtil {

    /**
     * 문자열 배열 데이터를 StockPriceData 객체로 파싱
     *
     * @param fields 한국투자증권 API에서 반환하는 필드 배열
     * @return 파싱된 StockPriceData 객체
     */
    public static StockPriceData parseStockPriceData(String[] fields) {
        int idx = 0;
        StockPriceData data = new StockPriceData();

        // 기본 필드
        data.setStockCode(fields[idx++]);
        data.setTradeTime(fields[idx++]);
        data.setCurrentPrice(parseInt(fields[idx++]));
        data.setDayOverDaySign(fields[idx++]);
        data.setDayOverDayChange(parseInt(fields[idx++]));
        data.setDayOverDayRate(parseDouble(fields[idx++]));
        data.setWeightedAveragePrice(parseDouble(fields[idx++]));
        data.setOpenPrice(parseInt(fields[idx++]));
        data.setHighPrice(parseInt(fields[idx++]));
        data.setLowPrice(parseInt(fields[idx++]));
        data.setAskPrice1(parseInt(fields[idx++]));
        data.setBidPrice1(parseInt(fields[idx++]));
        data.setTradeVolume(parseLong(fields[idx++]));
        data.setAccumulatedVolume(parseLong(fields[idx++]));
        data.setAccumulatedTradeAmount(parseLong(fields[idx++]));
        data.setSellTradeCount(parseInt(fields[idx++]));
        data.setBuyTradeCount(parseInt(fields[idx++]));
        data.setNetBuyTradeCount(parseInt(fields[idx++]));
        data.setTradeStrength(parseDouble(fields[idx++]));
        data.setTotalSellQuantity(parseLong(fields[idx++]));
        data.setTotalBuyQuantity(parseLong(fields[idx++]));
        data.setTradeType(fields[idx++]);
        data.setBuyRatio(parseDouble(fields[idx++]));
        data.setVolumeChangeRate(parseDouble(fields[idx++]));

        // 시가 관련 정보
        data.setOpenPriceTime(fields[idx++]);
        data.setOpenPriceChangeSign(fields[idx++]);
        data.setOpenPriceChange(parseInt(fields[idx++]));

        // 고가 관련 정보
        data.setHighPriceTime(fields[idx++]);
        data.setHighPriceChangeSign(fields[idx++]);
        data.setHighPriceChange(parseInt(fields[idx++]));

        // 저가 관련 정보
        data.setLowPriceTime(fields[idx++]);
        data.setLowPriceChangeSign(fields[idx++]);
        data.setLowPriceChange(parseInt(fields[idx++]));

        data.setBusinessDate(fields[idx++]);
        data.setMarketOperationCode(fields[idx++]);
        data.setSuspendedFlag(fields[idx++]);

        data.setAskQuantity1(parseLong(fields[idx++]));
        data.setBidQuantity1(parseLong(fields[idx++]));
        data.setTotalAskQuantity(parseLong(fields[idx++]));
        data.setTotalBidQuantity(parseLong(fields[idx++]));

        data.setVolumeTurnoverRate(parseDouble(fields[idx++]));
        data.setPrevDaySameTimeVolume(parseLong(fields[idx++]));
        data.setPrevDaySameTimeVolumeRate(parseDouble(fields[idx++]));

        data.setTimeClassCode(fields[idx++]);
        data.setRandomCloseCode(fields[idx++]);

        // 마지막 필드가 있으면 추가
        if (idx < fields.length) {
            data.setStaticVIBasePrice(parseInt(fields[idx]));
        }

        return data;
    }

    /**
     * JsonNode에서 StockPriceData 객체로 파싱
     *
     * @param output 한국투자증권 API에서 반환하는 JSON 노드
     * @return 파싱된 StockPriceData 객체
     */
    public static StockPriceData parseStockPriceData(JsonNode output) {
        if (output.isTextual()) {
            // 문자열로 전달된 경우 (caret으로 구분된 데이터)
            String[] fields = output.asText().split("\\^");
            return parseStockPriceData(fields);
        }

        // JSON 객체로 전달된 경우 (개별 필드로 구성)
        StockPriceData data = new StockPriceData();

        // 기본 필드 매핑
        setIfExists(data::setStockCode, output, "mksc_shrn_iscd", StockDataParserUtil::asString);
        setIfExists(data::setTradeTime, output, "stck_cntg_hour", StockDataParserUtil::asString);
        setIfExists(data::setCurrentPrice, output, "stck_prpr", StockDataParserUtil::asInt);
        setIfExists(data::setDayOverDaySign, output, "prdy_vrss_sign", StockDataParserUtil::asString);
        setIfExists(data::setDayOverDayChange, output, "prdy_vrss", StockDataParserUtil::asInt);
        setIfExists(data::setDayOverDayRate, output, "prdy_ctrt", StockDataParserUtil::asDouble);
        setIfExists(data::setWeightedAveragePrice, output, "wghn_avrg_stck_prc", StockDataParserUtil::asDouble);
        setIfExists(data::setOpenPrice, output, "stck_oprc", StockDataParserUtil::asInt);
        setIfExists(data::setHighPrice, output, "stck_hgpr", StockDataParserUtil::asInt);
        setIfExists(data::setLowPrice, output, "stck_lwpr", StockDataParserUtil::asInt);
        setIfExists(data::setAskPrice1, output, "askp1", StockDataParserUtil::asInt);
        setIfExists(data::setBidPrice1, output, "bidp1", StockDataParserUtil::asInt);
        setIfExists(data::setTradeVolume, output, "cntg_vol", StockDataParserUtil::asLong);
        setIfExists(data::setAccumulatedVolume, output, "acml_vol", StockDataParserUtil::asLong);
        setIfExists(data::setAccumulatedTradeAmount, output, "acml_tr_pbmn", StockDataParserUtil::asLong);
        setIfExists(data::setSellTradeCount, output, "seln_cntg_csnu", StockDataParserUtil::asInt);
        setIfExists(data::setBuyTradeCount, output, "shnu_cntg_csnu", StockDataParserUtil::asInt);
        setIfExists(data::setNetBuyTradeCount, output, "ntby_cntg_csnu", StockDataParserUtil::asInt);
        setIfExists(data::setTradeStrength, output, "cttr", StockDataParserUtil::asDouble);
        setIfExists(data::setTotalSellQuantity, output, "seln_cntg_smtn", StockDataParserUtil::asLong);
        setIfExists(data::setTotalBuyQuantity, output, "shnu_cntg_smtn", StockDataParserUtil::asLong);
        setIfExists(data::setTradeType, output, "ccld_dvsn", StockDataParserUtil::asString);
        setIfExists(data::setBuyRatio, output, "shnu_rate", StockDataParserUtil::asDouble);
        setIfExists(data::setVolumeChangeRate, output, "prdy_vol_vrss_acml_vol_rate", StockDataParserUtil::asDouble);

        // 시가 관련 정보
        setIfExists(data::setOpenPriceTime, output, "oprc_hour", StockDataParserUtil::asString);
        setIfExists(data::setOpenPriceChangeSign, output, "oprc_vrss_prpr_sign", StockDataParserUtil::asString);
        setIfExists(data::setOpenPriceChange, output, "oprc_vrss_prpr", StockDataParserUtil::asInt);

        // 고가 관련 정보
        setIfExists(data::setHighPriceTime, output, "hgpr_hour", StockDataParserUtil::asString);
        setIfExists(data::setHighPriceChangeSign, output, "hgpr_vrss_prpr_sign", StockDataParserUtil::asString);
        setIfExists(data::setHighPriceChange, output, "hgpr_vrss_prpr", StockDataParserUtil::asInt);

        // 저가 관련 정보
        setIfExists(data::setLowPriceTime, output, "lwpr_hour", StockDataParserUtil::asString);
        setIfExists(data::setLowPriceChangeSign, output, "lwpr_vrss_prpr_sign", StockDataParserUtil::asString);
        setIfExists(data::setLowPriceChange, output, "lwpr_vrss_prpr", StockDataParserUtil::asInt);

        // 나머지 필드
        setIfExists(data::setBusinessDate, output, "bsop_date", StockDataParserUtil::asString);
        setIfExists(data::setMarketOperationCode, output, "new_mkop_cls_code", StockDataParserUtil::asString);
        setIfExists(data::setSuspendedFlag, output, "trht_yn", StockDataParserUtil::asString);
        setIfExists(data::setAskQuantity1, output, "askp_rsqn1", StockDataParserUtil::asLong);
        setIfExists(data::setBidQuantity1, output, "bidp_rsqn1", StockDataParserUtil::asLong);
        setIfExists(data::setTotalAskQuantity, output, "total_askp_rsqn", StockDataParserUtil::asLong);
        setIfExists(data::setTotalBidQuantity, output, "total_bidp_rsqn", StockDataParserUtil::asLong);
        setIfExists(data::setVolumeTurnoverRate, output, "vol_tnrt", StockDataParserUtil::asDouble);
        setIfExists(data::setPrevDaySameTimeVolume, output, "prdy_smns_hour_acml_vol", StockDataParserUtil::asLong);
        setIfExists(data::setPrevDaySameTimeVolumeRate, output, "prdy_smns_hour_acml_vol_rate", StockDataParserUtil::asDouble);
        setIfExists(data::setTimeClassCode, output, "hour_cls_code", StockDataParserUtil::asString);
        setIfExists(data::setRandomCloseCode, output, "mrkt_trtm_cls_code", StockDataParserUtil::asString);
        setIfExists(data::setStaticVIBasePrice, output, "vi_stnd_prc", StockDataParserUtil::asInt);

        return data;
    }

    /**
     * 문자열을 정수로 안전하게 변환
     */
    public static int parseInt(String value) {
        try {
            return Integer.parseInt(value);
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 문자열을 long으로 안전하게 변환
     */
    public static long parseLong(String value) {
        try {
            return Long.parseLong(value);
        } catch (Exception e) {
            return 0;
        }
    }

    /**
     * 문자열을 double로 안전하게 변환
     */
    public static double parseDouble(String value) {
        try {
            return Double.parseDouble(value);
        } catch (Exception e) {
            return 0.0;
        }
    }

    // JsonNode 변환 유틸리티 메서드
    private static String asString(JsonNode node) {
        return node.asText();
    }

    private static int asInt(JsonNode node) {
        try {
            return node.asInt();
        } catch (Exception e) {
            return 0;
        }
    }

    private static long asLong(JsonNode node) {
        try {
            return node.asLong();
        } catch (Exception e) {
            return 0;
        }
    }

    private static double asDouble(JsonNode node) {
        try {
            return node.asDouble();
        } catch (Exception e) {
            return 0.0;
        }
    }

    // 함수형 인터페이스를 활용한 유틸리티 메서드
    private static <T> void setIfExists(java.util.function.Consumer<T> setter,
                                        JsonNode rootNode,
                                        String fieldName,
                                        java.util.function.Function<JsonNode, T> converter) {
        if (rootNode.has(fieldName)) {
            setter.accept(converter.apply(rootNode.get(fieldName)));
        }
    }
}

받아온 주식 데이터를 StockPriceData 형식에 맞게 파싱하기 위한 유틸리티 클래스

위의 이유로 인해 두 가지 입력 형식(String[], JsonNode)을 모두 작성하였습니다


handler

@Component
@Slf4j
@RequiredArgsConstructor
public class KoreaInvestmentWebSocketHandler extends TextWebSocketHandler {

    @Value("${hantu-openapi.appkey}")
    private String appKey;

    @Value("${hantu-openapi.appsecret}")
    private String appSecret;

    @Value("${hantu-openapi.websocket-domain:https://openapi.koreainvestment.com:9443}")
    private String websocketDomain;

    // 필수 의존성
    private final StockInfoService stockInfoService;
    private final ObjectMapper objectMapper;
    private final RestTemplate restTemplate;
    private final WebSocketEventService eventService; // connectionManager 대신 eventService 사용

    // 멤버 변수
    private WebSocketSession session;
    private String approvalKey;
    private final ExecutorService executorService = Executors.newFixedThreadPool(5);
    private final Map<String, String> subscribedStocks = new ConcurrentHashMap<>();

    /**
     * 웹소켓 세션이 열렸을 때 호출됨
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        this.session = session;
        eventService.setSession(session); // connectionManager 대신 eventService 사용
        log.info("WebSocket connection established at {}: {}", LocalDateTime.now(), session.getId());

        // 접속 승인키 얻기
        approvalKey = getApprovalKey();

        // 연결 헤더 전송
        sendConnectionHeader();

        // 기존 구독 정보 복원 (재연결 시)
        resubscribeStocks();
    }

    /**
     * 메시지 수신 시 호출됨
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        String payload = message.getPayload();
        log.info("Received message at {}: {}", LocalDateTime.now(), payload);

        // 비동기 처리
        executorService.submit(() -> processMessage(payload));
    }

    /**
     * 연결 종료 시 호출됨
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        log.info("WebSocket connection closed at {}: {}, status: {}",
                LocalDateTime.now(), session.getId(), status);
        this.session = null;
        eventService.setConnectionActive(false); // connectionManager 대신 eventService 사용
        eventService.setSession(null);
    }

    /**
     * 에러 발생 시 호출됨
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        log.error("WebSocket transport error in session {}", session.getId(), exception);
        eventService.setConnectionActive(false); // connectionManager 대신 eventService 사용
    }

    /**
     * 웹소켓 접속키 발급
     */
    public String getApprovalKey() {
        if (approvalKey != null) {
            return approvalKey;
        }

        try {
            HttpHeaders headers = new HttpHeaders();
            headers.setContentType(MediaType.APPLICATION_JSON);

            Map<String, String> requestMap = new HashMap<>();
            requestMap.put("grant_type", "client_credentials");
            requestMap.put("appkey", appKey);
            requestMap.put("secretkey", appSecret);

            HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestMap, headers);

            ResponseEntity<String> response = restTemplate.exchange(
                    websocketDomain + "/oauth2/Approval",
                    HttpMethod.POST,
                    entity,
                    String.class
            );

            if (response.getStatusCode() == HttpStatus.OK) {
                JsonNode rootNode = objectMapper.readTree(response.getBody());
                approvalKey = rootNode.get("approval_key").asText();
                log.info("Approval key obtained: {}", approvalKey);
                return approvalKey;
            } else {
                throw new RuntimeException("Failed to obtain approval key. Status code: " + response.getStatusCode());
            }
        } catch (Exception e) {
            log.error("Error getting approval key", e);
            throw new RuntimeException("Failed to obtain approval key", e);
        }
    }

    /**
     * 접속 헤더 전송
     */
    private void sendConnectionHeader() {
        try {
            Map<String, Object> header = new HashMap<>();
            header.put("approval_key", approvalKey);
            header.put("custtype", "P");
            header.put("tr_type", "1");
            header.put("content-type", "utf-8");

            Map<String, String> input = new HashMap<>();
            input.put("tr_id", "H0STCNT0");
            input.put("tr_key", "005930");  // 기본 샘플 종목 코드

            Map<String, Object> body = new HashMap<>();
            body.put("input", input);

            Map<String, Object> request = new HashMap<>();
            request.put("header", header);
            request.put("body", body);

            String requestJson = objectMapper.writeValueAsString(request);
            log.info("Sending connection header: {}", requestJson);

            session.sendMessage(new TextMessage(requestJson));
            log.info("Connection header sent successfully");
        } catch (Exception e) {
            log.error("Error sending connection header", e);
        }
    }

    /**
     * 실시간 체결가 구독
     */
    public void subscribeStockPrice(String stockCode, String marketCode) {
        if (session == null || !session.isOpen()) {
            log.warn("WebSocket not connected. Cannot subscribe to {}", stockCode);
            return;
        }

        try {
            Map<String, Object> header = new HashMap<>();
            header.put("approval_key", approvalKey);
            header.put("custtype", "P");
            header.put("tr_type", "1");
            header.put("content-type", "utf-8");

            Map<String, String> input = new HashMap<>();
            input.put("tr_id", "H0STCNT0");
            input.put("tr_key", stockCode);

            Map<String, Object> body = new HashMap<>();
            body.put("input", input);

            Map<String, Object> request = new HashMap<>();
            request.put("header", header);
            request.put("body", body);

            String requestJson = objectMapper.writeValueAsString(request);
            log.info("Sending subscription request for stock {}: {}", stockCode, requestJson);

            session.sendMessage(new TextMessage(requestJson));
            subscribedStocks.put(stockCode, marketCode);
            log.info("Subscription request sent for stock: {}", stockCode);
        } catch (Exception e) {
            log.error("Error subscribing to stock price for code: {}", stockCode, e);
        }
    }

    /**
     * 실시간 시세 구독 해제
     */
    public void unsubscribeStockPrice(String stockCode, String marketCode) {
        if (session == null || !session.isOpen()) {
            log.warn("WebSocket not connected. Cannot unsubscribe.");
            return;
        }

        try {
            String trKey = "H1".equals(marketCode) ? "H1_" : "H2_";

            Map<String, Object> header = new HashMap<>();
            header.put("tr_type", "2");
            header.put("tr_id", "H0STCNT0");
            header.put("tr_key", trKey + stockCode);

            Map<String, String> input = new HashMap<>();
            input.put("mksc_shrn_iscd", stockCode);

            Map<String, Object> body = new HashMap<>();
            body.put("input", input);

            Map<String, Object> request = new HashMap<>();
            request.put("header", header);
            request.put("body", body);

            String requestJson = objectMapper.writeValueAsString(request);
            session.sendMessage(new TextMessage(requestJson));
            subscribedStocks.remove(stockCode);
            log.info("Unsubscribed from real-time price for stock: {}", stockCode);
        } catch (Exception e) {
            log.error("Error unsubscribing from stock price for code: {}", stockCode, e);
        }
    }

    /**
     * 하트비트 메시지 전송
     */
    public void sendHeartbeat() {
        if (session == null || !session.isOpen()) {
            log.warn("WebSocket not connected. Cannot send heartbeat.");
            return;
        }

        try {
            Map<String, Object> header = new HashMap<>();
            header.put("tr_type", "3");

            Map<String, Object> request = new HashMap<>();
            request.put("header", header);

            String requestJson = objectMapper.writeValueAsString(request);
            session.sendMessage(new TextMessage(requestJson));
            log.debug("Heartbeat sent");
        } catch (Exception e) {
            log.error("Error sending heartbeat", e);
            eventService.setConnectionActive(false);  // connectionManager 대신 eventService 사용
        }
    }

    /**
     * 메시지 처리
     */
    private void processMessage(String message) {
        try {
            // 파이프(|)로 구분된 메시지인지 확인 (실제 데이터 메시지)
            if (message.matches("\\d+\\|H0STCNT0\\|\\d+\\|.*")) {
                // 메시지 형식: 0|H0STCNT0|001|데이터...
                String[] parts = message.split("\\|");
                if (parts.length >= 4) {
                    String data = parts[3]; // 실제 데이터 부분
                    String[] fields = data.split("\\^");

                    // StockPriceData 객체 생성 및 처리
                    if (fields.length >= 50) { // 필드 개수 체크
                        StockPriceData priceData = StockDataParserUtil.parseStockPriceData(fields);
                        log.info("Received stock price update:");
                        log.info(priceData.toString());

//                        // DB 업데이트
//                        stockInfoService.updateCurrentPrice(priceData.getStockCode(), priceData.getCurrentPrice())
//                            .ifPresentOrElse(
//                                stockInfo -> log.debug("Updated price for {} ({}): {}",
//                                    stockInfo.getCompany(), priceData.getStockCode(), priceData.getCurrentPrice()),
//                                () -> log.warn("Failed to update price (stock not found): {}", priceData.getStockCode())
//                            );
                    }
                    return; // 메시지 처리 완료
                }
            }

            // 기존 JSON 형식 메시지 처리
            JsonNode rootNode = objectMapper.readTree(message);

            if (rootNode.has("header")) {
                if (rootNode.get("header").has("tr_id")) {
                    String trId = rootNode.get("header").get("tr_id").asText();

                    if ("H0STCNT0".equals(trId) && rootNode.has("body")) {
                        handleStockPriceUpdate(rootNode.get("body"));
                    } else if ("PINGPONG".equals(trId)) {
                        log.debug("Received PINGPONG message");
                    } else {
                        log.info("Other message type received: {}", trId);
                    }
                } else {
                    log.info("Connection response or other message type");
                }
            } else {
                log.info("Message without header received");
            }
        } catch (Exception e) {
            // JSON 파싱 에러는 로그 레벨을 DEBUG로 낮춤 (파이프 구분 메시지 때문에)
            if (e instanceof com.fasterxml.jackson.core.JsonParseException) {
                log.debug("JSON parse error (likely pipe-delimited message): {}", e.getMessage());
            } else {
                log.error("Error processing WebSocket message: {}", message, e);
            }
        }
    }

    /**
     * 주식 가격 업데이트 처리 - StockDataParserUtil 활용
     */
    private void handleStockPriceUpdate(JsonNode body) {
        try {
            // 구독 성공 메시지는 로그만 출력하고 종료
            if (body.has("msg1") && "SUBSCRIBE SUCCESS".equals(body.get("msg1").asText())) {
                log.info("Successfully subscribed to stock price updates");
                return;
            }

            if (body.has("output")) {
                JsonNode output = body.get("output");

                // StockPriceData 객체 생성
                StockPriceData priceData;

                if (output.isTextual()) {
                    // 문자열 형태로 전달된 데이터 파싱
                    String outputData = output.asText();
                    String[] fields = outputData.split("\\^");

                    if (fields.length < 50) {  // 필드 개수 체크
                        log.warn("Received incomplete price data: {}", outputData);
                        return;
                    }

                    // 유틸리티 클래스를 통한 파싱
                    priceData = StockDataParserUtil.parseStockPriceData(fields);
                } else {
                    // JSON 객체로 전달된 데이터 처리
                    if (!output.has("mksc_shrn_iscd")) {
                        log.warn("Required field 'mksc_shrn_iscd' missing in output");
                        return;
                    }

                    // JSON 노드에서 StockPriceData 생성
                    priceData = StockDataParserUtil.parseStockPriceData(output);
                }

                // 이쁘게 포맷팅된 체결 정보 로그 출력
                log.info(priceData.toString());

                // DB 업데이트
//                stockInfoService.updateCurrentPrice(priceData.getStockCode(), priceData.getCurrentPrice())
//                        .ifPresentOrElse(
//                                stockInfo -> log.debug("Updated price for {} ({}): {}",
//                                        stockInfo.getCompany(), priceData.getStockCode(), priceData.getCurrentPrice()),
//                                () -> log.warn("Failed to update price (stock not found): {}", priceData.getStockCode())
//                        );

                // Kafka로 데이터 전송 (구현 필요시)
                // kafkaProducer.sendTickData(priceData);
            }
        } catch (Exception e) {
            log.error("Error handling stock price update", e);
        }
    }



    /**
     * 재연결 후 기존 구독 복원
     */
    private void resubscribeStocks() {
        if (subscribedStocks.isEmpty()) {
            return;
        }

        log.info("Resubscribing to {} stocks after reconnection", subscribedStocks.size());
        subscribedStocks.forEach((stockCode, marketCode) -> {
            try {
                Thread.sleep(100);  // 요청 간 짧은 딜레이
                subscribeStockPrice(stockCode, marketCode);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    /**
     * 메시지를 StockTickData로 변환 (Kafka 전송용)
     */
    private Object convertToTickData(JsonNode output) {
        ObjectNode tickData = objectMapper.createObjectNode();
        tickData.put("stockCode", output.get("mksc_shrn_iscd").asText());
        tickData.put("currentPrice", output.has("stck_prpr") ?
                Double.parseDouble(output.get("stck_prpr").asText()) : 0);
        tickData.put("priceChange", output.has("prdy_vrss") ?
                Double.parseDouble(output.get("prdy_vrss").asText()) : 0);
        tickData.put("changeRate", output.has("prdy_ctrt") ?
                Double.parseDouble(output.get("prdy_ctrt").asText()) : 0);
        tickData.put("volume", output.has("acml_vol") ?
                Long.parseLong(output.get("acml_vol").asText()) : 0);
        tickData.put("timestamp", LocalDateTime.now().toString());

        return tickData;
    }

    /**
     * 여러 종목 동시 구독
     */
    public void subscribeMultipleStocks(String[] stockCodes, String marketCode) {
        if (stockCodes == null || stockCodes.length == 0) {
            return;
        }

        log.info("Subscribing to {} stocks", stockCodes.length);

        for (String stockCode : stockCodes) {
            try {
                Thread.sleep(100);  // API 호출 제한 고려하여 짧은 딜레이
                subscribeStockPrice(stockCode, marketCode);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

전체적인 흐름

  1. 클라이언트가 웹소켓을 연결하면 (afterConnectionEstablished())
  2. 한국투자증권 API에서 승인 키를 받아옴 (getApprovalKey())
  3. 필요한 설정을 서버로 전송 (sendConnectionHeader())
  4. 특정 주식 종목을 구독하면 실시간 데이터를 받아옴 (subscribeStockPrice())
  5. 주식 데이터가 도착하면 (handleTextMessage()) 이를 처리 (processMessage())
  6. 하트비트를 주기적으로 전송 (sendHeartbeat())하여 연결을 유지
  7. 연결이 끊기면 상태를 업데이트 (afterConnectionClosed())


결과

실시간으로 틱이 잘 오는 것을 확인할 수 있습니다

1개의 댓글

comment-user-thumbnail
2025년 3월 16일

멋져요-! 잘 보고 갑니다-!

답글 달기