[JAVA] 한국투자증권 OpenAPI 사용 (Rest)
위 링크에서 Rest 코드에 대해 확인할 수 있습니다
이번에는 실시간 웹소켓 통신을 사용하는 api에 대해 포스팅 하겠습니다
(같은 조원이신 김바보님께 감사를 표합니다)
웹소켓을 제공하는 경우 왼쪽 상단처럼 WEBSOCKET 태그가 달려있는 것을 확인할 수 있습니다
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Value("${hantu-openapi.websocket-uri:ws://ops.koreainvestment.com:31000}")
private String websocketUri;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 서버 측 웹소켓 핸들러 등록 - 필요한 경우에만 사용
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public WebSocketClient webSocketClient() {
StandardWebSocketClient client = new StandardWebSocketClient();
client.getUserProperties().put("org.apache.tomcat.websocket.IO_TIMEOUT_MS", "60000");
return client;
}
// WebSocketEventService 빈 생성
@Bean
public WebSocketEventService webSocketEventService() {
return new WebSocketEventService();
}
// KoreaInvestmentWebSocketHandler 빈 생성
@Bean
public KoreaInvestmentWebSocketHandler koreaInvestmentWebSocketHandler(
StockInfoService stockInfoService,
ObjectMapper objectMapper,
RestTemplate restTemplate,
WebSocketEventService webSocketEventService) {
return new KoreaInvestmentWebSocketHandler(
stockInfoService,
objectMapper,
restTemplate,
webSocketEventService
);
}
// MyWebSocketConnectionManager 빈 생성
@Bean
public MyWebSocketConnectionManager koreaInvestmentWebSocketConnectionManager(
WebSocketClient webSocketClient,
WebSocketEventService webSocketEventService,
ApplicationContext applicationContext) {
// KoreaInvestmentWebSocketHandler를 ApplicationContext에서 가져오는 대신
// 클래스를 생성한 후 주입
KoreaInvestmentWebSocketHandler handler = applicationContext.getBean(KoreaInvestmentWebSocketHandler.class);
MyWebSocketConnectionManager connectionManager = new MyWebSocketConnectionManager(
webSocketClient,
handler,
websocketUri,
webSocketEventService
);
// 자동 시작 비활성화 (서비스 레이어에서 명시적으로 시작)
connectionManager.setAutoStartup(false);
return connectionManager;
}
// 초기화 로직을 위한 빈
@Bean(initMethod = "startConnection", destroyMethod = "stopConnection")
public ConnectionInitializer connectionInitializer(MyWebSocketConnectionManager connectionManager) {
return new ConnectionInitializer(connectionManager);
}
// 내부 초기화 클래스
public static class ConnectionInitializer {
private final MyWebSocketConnectionManager connectionManager;
public ConnectionInitializer(MyWebSocketConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}
public void startConnection() {
connectionManager.startConnection();
}
public void stopConnection() {
connectionManager.stopConnection();
}
}
}
웹소켓 클라이언트를 설정
웹소켓 핸들러 및 연결 관리자를 빈(Bean)으로 등록
(위 코드는 모의투자 기준으로 작성된 코드입니다)
@Slf4j
public class MyWebSocketConnectionManager extends WebSocketConnectionManager {
private final ScheduledExecutorService scheduler;
private ScheduledFuture<?> heartbeatTask;
private final WebSocketEventService eventService;
/**
* 생성자
*/
public MyWebSocketConnectionManager(WebSocketClient client,
WebSocketHandler webSocketHandler,
String uriTemplate,
WebSocketEventService eventService) {
super(client, webSocketHandler, uriTemplate);
this.scheduler = new ScheduledThreadPoolExecutor(1);
this.eventService = eventService;
}
/**
* 연결 시작
*/
public void startConnection() {
log.info("Starting WebSocket connection manager");
super.start();
startHeartbeat();
}
/**
* 연결 종료
*/
public void stopConnection() {
log.info("Stopping WebSocket connection manager");
stopHeartbeat();
super.stop();
}
/**
* 하트비트 시작
*/
private void startHeartbeat() {
stopHeartbeat(); // 기존 하트비트 중지
heartbeatTask = scheduler.scheduleAtFixedRate(() -> {
WebSocketSession session = eventService.getSession();
if (session != null && session.isOpen()) {
// 하트비트 이벤트 발생 - 구현은 나중에 추가
log.debug("Heartbeat check - session is active");
} else if (eventService.isConnectionActive()) {
// 세션이 없거나 닫혀있는데 활성 상태라고 생각하는 경우
log.warn("Heartbeat detected inactive session while connection marked as active. Reconnecting...");
reconnect();
}
}, 30, 30, TimeUnit.SECONDS);
log.info("WebSocket heartbeat started");
}
/**
* 하트비트 중지
*/
private void stopHeartbeat() {
if (heartbeatTask != null && !heartbeatTask.isCancelled()) {
heartbeatTask.cancel(false);
log.info("WebSocket heartbeat stopped");
}
}
/**
* 재연결 시도
*/
private void reconnect() {
log.info("Attempting to reconnect WebSocket");
stopConnection();
try {
Thread.sleep(1000); // 약간의 지연 후 재연결
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
startConnection();
}
}
확장된 웹소켓 연결 관리자 클래스
Spring의 WebSocketConnectionManager를 확장하여 하트비트, 세션 관리 등 추가 기능 구현
@Service
@Slf4j
public class WebSocketEventService {
@Getter
private WebSocketSession session;
private boolean connectionActive = false;
/**
* 세션 설정
*/
public void setSession(WebSocketSession session) {
this.session = session;
this.connectionActive = (session != null && session.isOpen());
log.info("WebSocket session set: {}, active: {}",
session != null ? session.getId() : "null",
connectionActive);
}
/**
* 연결 상태 설정
*/
public void setConnectionActive(boolean active) {
this.connectionActive = active;
log.info("WebSocket connection active state changed to: {}", active);
}
/**
* 연결 상태 반환
*/
public boolean isConnectionActive() {
return connectionActive;
}
}
WebSocket 이벤트를 처리하는 서비스
핸들러와 연결 관리자 사이의 중재자 역할을 수행
국내주식 실시간체결가 (KRX) [실시간-003]을 예시로 실시간으로 정보를 받아오고 출력하는 코드를 작성해보겠습니다
(모의투자를 기준으로 코드가 작성되었습니다)
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class StockPriceData {
private String stockCode; // 종목코드
private String tradeTime; // 체결 시간
private int currentPrice; // 현재가
private String dayOverDaySign; // 전일 대비 부호
private int dayOverDayChange; // 전일 대비
private double dayOverDayRate; // 전일 대비율
private double weightedAveragePrice; // 가중 평균 가격
private int openPrice; // 시가
private int highPrice; // 고가
private int lowPrice; // 저가
private int askPrice1; // 매도호가1
private int bidPrice1; // 매수호가1
private long tradeVolume; // 체결 거래량
private long accumulatedVolume; // 누적 거래량
private long accumulatedTradeAmount; // 누적 거래 대금
private int sellTradeCount; // 매도 체결 건수
private int buyTradeCount; // 매수 체결 건수
private int netBuyTradeCount; // 순매수 체결 건수
private double tradeStrength; // 체결강도
private long totalSellQuantity; // 총 매도 수량
private long totalBuyQuantity; // 총 매수 수량
private String tradeType; // 체결구분
private double buyRatio; // 매수비율
private double volumeChangeRate; // 전일 거래량 대비 등락율
// 시가 관련 정보
private String openPriceTime; // 시가 시간
private String openPriceChangeSign; // 시가대비구분
private int openPriceChange; // 시가대비
// 고가 관련 정보
private String highPriceTime; // 고가 시간
private String highPriceChangeSign; // 고가대비구분
private int highPriceChange; // 고가대비
// 저가 관련 정보
private String lowPriceTime; // 저가 시간
private String lowPriceChangeSign; // 저가대비구분
private int lowPriceChange; // 저가대비
private String businessDate; // 영업 일자
private String marketOperationCode; // 신 장운영 구분 코드
private String suspendedFlag; // 거래정지 여부
private long askQuantity1; // 매도호가 잔량1
private long bidQuantity1; // 매수호가 잔량1
private long totalAskQuantity; // 총 매도호가 잔량
private long totalBidQuantity; // 총 매수호가 잔량
private double volumeTurnoverRate; // 거래량 회전율
private long prevDaySameTimeVolume; // 전일 동시간 누적 거래량
private double prevDaySameTimeVolumeRate; // 전일 동시간 누적 거래량 비율
private String timeClassCode; // 시간 구분 코드
private String randomCloseCode; // 임의종료구분코드
private int staticVIBasePrice; // 정적VI발동기준가
/**
* 체결가 데이터를 이쁘게 출력하는 메서드
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("\n╔══════════════════════════════════════╗\n");
sb.append("║ 주식 체결 정보: ").append(String.format("%-10s", stockCode)).append(" ║\n");
sb.append("╠══════════════════════════════════════╣\n");
// 기본 가격 정보
sb.append("║ ").append(formatTime(tradeTime)).append(" | ");
sb.append(formatMarketStatus(timeClassCode)).append(" ").append(suspendedFlag.equals("Y") ? "[거래정지]" : "").append(String.format("%15s", "")).append(" ║\n");
sb.append("╠══════════════╦═══════════════════════╣\n");
sb.append("║ 현재가 ║ ").append(formatPrice(currentPrice)).append(String.format("%16s", "")).append(" ║\n");
String changeStr = formatPriceChangeWithSign(dayOverDaySign, dayOverDayChange, dayOverDayRate);
sb.append("║ 전일대비 ║ ").append(changeStr).append(String.format("%"+(26-changeStr.length())+"s", "")).append(" ║\n");
sb.append("╠══════════════╬═══════════════════════╣\n");
sb.append("║ 시가 ║ ").append(formatPrice(openPrice)).append(" (").append(formatTime(openPriceTime)).append(")").append(String.format("%3s", "")).append(" ║\n");
sb.append("║ 고가 ║ ").append(formatPrice(highPrice)).append(" (").append(formatTime(highPriceTime)).append(")").append(String.format("%3s", "")).append(" ║\n");
sb.append("║ 저가 ║ ").append(formatPrice(lowPrice)).append(" (").append(formatTime(lowPriceTime)).append(")").append(String.format("%3s", "")).append(" ║\n");
sb.append("╠══════════════╬═══════════════════════╣\n");
sb.append("║ 거래량 ║ ").append(formatVolume(accumulatedVolume));
double volRateStr = prevDaySameTimeVolumeRate;
if (volRateStr != 0) {
sb.append(" (").append(String.format("%+.2f%%", volRateStr)).append(")");
}
sb.append(String.format("%7s", "")).append(" ║\n");
sb.append("║ 거래대금 ║ ").append(formatAmount(accumulatedTradeAmount)).append(String.format("%7s", "")).append(" ║\n");
sb.append("╠══════════════╬═══════════════════════╣\n");
sb.append("║ 체결강도 ║ ").append(String.format("%.2f%%", tradeStrength)).append(String.format("%16s", "")).append(" ║\n");
sb.append("║ 매수비율 ║ ").append(String.format("%.2f%%", buyRatio)).append(String.format("%16s", "")).append(" ║\n");
sb.append("╠══════════════╩═══════════════════════╣\n");
sb.append("║ 매도호가1: ").append(formatPrice(askPrice1)).append(" (").append(formatVolume(askQuantity1)).append(")").append(String.format("%5s", "")).append(" ║\n");
sb.append("║ 매수호가1: ").append(formatPrice(bidPrice1)).append(" (").append(formatVolume(bidQuantity1)).append(")").append(String.format("%5s", "")).append(" ║\n");
sb.append("╠══════════════════════════════════════╣\n");
sb.append("║ 총매도잔량: ").append(formatVolume(totalAskQuantity)).append(String.format("%14s", "")).append(" ║\n");
sb.append("║ 총매수잔량: ").append(formatVolume(totalBidQuantity)).append(String.format("%14s", "")).append(" ║\n");
sb.append("╚══════════════════════════════════════╝\n");
return sb.toString();
}
/**
* 시간 포맷팅
*/
private String formatTime(String timeStr) {
if (timeStr == null || timeStr.length() != 6) {
return "N/A";
}
return timeStr.substring(0, 2) + ":" + timeStr.substring(2, 4) + ":" + timeStr.substring(4, 6);
}
/**
* 가격 포맷팅
*/
private String formatPrice(int price) {
return String.format("%,d", price);
}
/**
* 가격 변화 포맷팅
*/
private String formatPriceChangeWithSign(String sign, int change, double rate) {
String signChar;
String color;
switch(sign) {
case "1": signChar = "↑"; color = "상한가"; break;
case "2": signChar = "▲"; color = "상승"; break;
case "3": signChar = "-"; color = "보합"; break;
case "4": signChar = "↓"; color = "하한가"; break;
case "5": signChar = "▼"; color = "하락"; break;
default: signChar = "?"; color = "알수없음";
}
if ("3".equals(sign)) {
return String.format("%s %s", signChar, color);
} else {
return String.format("%s %s %,d (%+.2f%%)", signChar, color, change, rate);
}
}
/**
* 거래량 포맷팅
*/
private String formatVolume(long volume) {
if (volume >= 1_000_000_000) {
return String.format("%.2f억주", volume / 100_000_000.0);
} else if (volume >= 10_000) {
return String.format("%.2f만주", volume / 10_000.0);
} else {
return String.format("%,d주", volume);
}
}
/**
* 거래대금 포맷팅
*/
private String formatAmount(long amount) {
if (amount >= 100_000_000_000L) {
return String.format("%.2f조원", amount / 100_000_000_000.0);
} else if (amount >= 100_000_000) {
return String.format("%.2f억원", amount / 100_000_000.0);
} else if (amount >= 10_000) {
return String.format("%.2f만원", amount / 10_000.0);
} else {
return String.format("%,d원", amount);
}
}
/**
* 장 상태 포맷팅
*/
private String formatMarketStatus(String timeClassCode) {
switch (timeClassCode) {
case "0": return "[장중]";
case "A": return "[장후예상]";
case "B": return "[장전예상]";
case "C": return "[예상/VI]";
case "D": return "[시간외예상]";
default: return "[알수없음]";
}
}
}
저장되는 데이터 형식
public class StockDataParserUtil {
/**
* 문자열 배열 데이터를 StockPriceData 객체로 파싱
*
* @param fields 한국투자증권 API에서 반환하는 필드 배열
* @return 파싱된 StockPriceData 객체
*/
public static StockPriceData parseStockPriceData(String[] fields) {
int idx = 0;
StockPriceData data = new StockPriceData();
// 기본 필드
data.setStockCode(fields[idx++]);
data.setTradeTime(fields[idx++]);
data.setCurrentPrice(parseInt(fields[idx++]));
data.setDayOverDaySign(fields[idx++]);
data.setDayOverDayChange(parseInt(fields[idx++]));
data.setDayOverDayRate(parseDouble(fields[idx++]));
data.setWeightedAveragePrice(parseDouble(fields[idx++]));
data.setOpenPrice(parseInt(fields[idx++]));
data.setHighPrice(parseInt(fields[idx++]));
data.setLowPrice(parseInt(fields[idx++]));
data.setAskPrice1(parseInt(fields[idx++]));
data.setBidPrice1(parseInt(fields[idx++]));
data.setTradeVolume(parseLong(fields[idx++]));
data.setAccumulatedVolume(parseLong(fields[idx++]));
data.setAccumulatedTradeAmount(parseLong(fields[idx++]));
data.setSellTradeCount(parseInt(fields[idx++]));
data.setBuyTradeCount(parseInt(fields[idx++]));
data.setNetBuyTradeCount(parseInt(fields[idx++]));
data.setTradeStrength(parseDouble(fields[idx++]));
data.setTotalSellQuantity(parseLong(fields[idx++]));
data.setTotalBuyQuantity(parseLong(fields[idx++]));
data.setTradeType(fields[idx++]);
data.setBuyRatio(parseDouble(fields[idx++]));
data.setVolumeChangeRate(parseDouble(fields[idx++]));
// 시가 관련 정보
data.setOpenPriceTime(fields[idx++]);
data.setOpenPriceChangeSign(fields[idx++]);
data.setOpenPriceChange(parseInt(fields[idx++]));
// 고가 관련 정보
data.setHighPriceTime(fields[idx++]);
data.setHighPriceChangeSign(fields[idx++]);
data.setHighPriceChange(parseInt(fields[idx++]));
// 저가 관련 정보
data.setLowPriceTime(fields[idx++]);
data.setLowPriceChangeSign(fields[idx++]);
data.setLowPriceChange(parseInt(fields[idx++]));
data.setBusinessDate(fields[idx++]);
data.setMarketOperationCode(fields[idx++]);
data.setSuspendedFlag(fields[idx++]);
data.setAskQuantity1(parseLong(fields[idx++]));
data.setBidQuantity1(parseLong(fields[idx++]));
data.setTotalAskQuantity(parseLong(fields[idx++]));
data.setTotalBidQuantity(parseLong(fields[idx++]));
data.setVolumeTurnoverRate(parseDouble(fields[idx++]));
data.setPrevDaySameTimeVolume(parseLong(fields[idx++]));
data.setPrevDaySameTimeVolumeRate(parseDouble(fields[idx++]));
data.setTimeClassCode(fields[idx++]);
data.setRandomCloseCode(fields[idx++]);
// 마지막 필드가 있으면 추가
if (idx < fields.length) {
data.setStaticVIBasePrice(parseInt(fields[idx]));
}
return data;
}
/**
* JsonNode에서 StockPriceData 객체로 파싱
*
* @param output 한국투자증권 API에서 반환하는 JSON 노드
* @return 파싱된 StockPriceData 객체
*/
public static StockPriceData parseStockPriceData(JsonNode output) {
if (output.isTextual()) {
// 문자열로 전달된 경우 (caret으로 구분된 데이터)
String[] fields = output.asText().split("\\^");
return parseStockPriceData(fields);
}
// JSON 객체로 전달된 경우 (개별 필드로 구성)
StockPriceData data = new StockPriceData();
// 기본 필드 매핑
setIfExists(data::setStockCode, output, "mksc_shrn_iscd", StockDataParserUtil::asString);
setIfExists(data::setTradeTime, output, "stck_cntg_hour", StockDataParserUtil::asString);
setIfExists(data::setCurrentPrice, output, "stck_prpr", StockDataParserUtil::asInt);
setIfExists(data::setDayOverDaySign, output, "prdy_vrss_sign", StockDataParserUtil::asString);
setIfExists(data::setDayOverDayChange, output, "prdy_vrss", StockDataParserUtil::asInt);
setIfExists(data::setDayOverDayRate, output, "prdy_ctrt", StockDataParserUtil::asDouble);
setIfExists(data::setWeightedAveragePrice, output, "wghn_avrg_stck_prc", StockDataParserUtil::asDouble);
setIfExists(data::setOpenPrice, output, "stck_oprc", StockDataParserUtil::asInt);
setIfExists(data::setHighPrice, output, "stck_hgpr", StockDataParserUtil::asInt);
setIfExists(data::setLowPrice, output, "stck_lwpr", StockDataParserUtil::asInt);
setIfExists(data::setAskPrice1, output, "askp1", StockDataParserUtil::asInt);
setIfExists(data::setBidPrice1, output, "bidp1", StockDataParserUtil::asInt);
setIfExists(data::setTradeVolume, output, "cntg_vol", StockDataParserUtil::asLong);
setIfExists(data::setAccumulatedVolume, output, "acml_vol", StockDataParserUtil::asLong);
setIfExists(data::setAccumulatedTradeAmount, output, "acml_tr_pbmn", StockDataParserUtil::asLong);
setIfExists(data::setSellTradeCount, output, "seln_cntg_csnu", StockDataParserUtil::asInt);
setIfExists(data::setBuyTradeCount, output, "shnu_cntg_csnu", StockDataParserUtil::asInt);
setIfExists(data::setNetBuyTradeCount, output, "ntby_cntg_csnu", StockDataParserUtil::asInt);
setIfExists(data::setTradeStrength, output, "cttr", StockDataParserUtil::asDouble);
setIfExists(data::setTotalSellQuantity, output, "seln_cntg_smtn", StockDataParserUtil::asLong);
setIfExists(data::setTotalBuyQuantity, output, "shnu_cntg_smtn", StockDataParserUtil::asLong);
setIfExists(data::setTradeType, output, "ccld_dvsn", StockDataParserUtil::asString);
setIfExists(data::setBuyRatio, output, "shnu_rate", StockDataParserUtil::asDouble);
setIfExists(data::setVolumeChangeRate, output, "prdy_vol_vrss_acml_vol_rate", StockDataParserUtil::asDouble);
// 시가 관련 정보
setIfExists(data::setOpenPriceTime, output, "oprc_hour", StockDataParserUtil::asString);
setIfExists(data::setOpenPriceChangeSign, output, "oprc_vrss_prpr_sign", StockDataParserUtil::asString);
setIfExists(data::setOpenPriceChange, output, "oprc_vrss_prpr", StockDataParserUtil::asInt);
// 고가 관련 정보
setIfExists(data::setHighPriceTime, output, "hgpr_hour", StockDataParserUtil::asString);
setIfExists(data::setHighPriceChangeSign, output, "hgpr_vrss_prpr_sign", StockDataParserUtil::asString);
setIfExists(data::setHighPriceChange, output, "hgpr_vrss_prpr", StockDataParserUtil::asInt);
// 저가 관련 정보
setIfExists(data::setLowPriceTime, output, "lwpr_hour", StockDataParserUtil::asString);
setIfExists(data::setLowPriceChangeSign, output, "lwpr_vrss_prpr_sign", StockDataParserUtil::asString);
setIfExists(data::setLowPriceChange, output, "lwpr_vrss_prpr", StockDataParserUtil::asInt);
// 나머지 필드
setIfExists(data::setBusinessDate, output, "bsop_date", StockDataParserUtil::asString);
setIfExists(data::setMarketOperationCode, output, "new_mkop_cls_code", StockDataParserUtil::asString);
setIfExists(data::setSuspendedFlag, output, "trht_yn", StockDataParserUtil::asString);
setIfExists(data::setAskQuantity1, output, "askp_rsqn1", StockDataParserUtil::asLong);
setIfExists(data::setBidQuantity1, output, "bidp_rsqn1", StockDataParserUtil::asLong);
setIfExists(data::setTotalAskQuantity, output, "total_askp_rsqn", StockDataParserUtil::asLong);
setIfExists(data::setTotalBidQuantity, output, "total_bidp_rsqn", StockDataParserUtil::asLong);
setIfExists(data::setVolumeTurnoverRate, output, "vol_tnrt", StockDataParserUtil::asDouble);
setIfExists(data::setPrevDaySameTimeVolume, output, "prdy_smns_hour_acml_vol", StockDataParserUtil::asLong);
setIfExists(data::setPrevDaySameTimeVolumeRate, output, "prdy_smns_hour_acml_vol_rate", StockDataParserUtil::asDouble);
setIfExists(data::setTimeClassCode, output, "hour_cls_code", StockDataParserUtil::asString);
setIfExists(data::setRandomCloseCode, output, "mrkt_trtm_cls_code", StockDataParserUtil::asString);
setIfExists(data::setStaticVIBasePrice, output, "vi_stnd_prc", StockDataParserUtil::asInt);
return data;
}
/**
* 문자열을 정수로 안전하게 변환
*/
public static int parseInt(String value) {
try {
return Integer.parseInt(value);
} catch (Exception e) {
return 0;
}
}
/**
* 문자열을 long으로 안전하게 변환
*/
public static long parseLong(String value) {
try {
return Long.parseLong(value);
} catch (Exception e) {
return 0;
}
}
/**
* 문자열을 double로 안전하게 변환
*/
public static double parseDouble(String value) {
try {
return Double.parseDouble(value);
} catch (Exception e) {
return 0.0;
}
}
// JsonNode 변환 유틸리티 메서드
private static String asString(JsonNode node) {
return node.asText();
}
private static int asInt(JsonNode node) {
try {
return node.asInt();
} catch (Exception e) {
return 0;
}
}
private static long asLong(JsonNode node) {
try {
return node.asLong();
} catch (Exception e) {
return 0;
}
}
private static double asDouble(JsonNode node) {
try {
return node.asDouble();
} catch (Exception e) {
return 0.0;
}
}
// 함수형 인터페이스를 활용한 유틸리티 메서드
private static <T> void setIfExists(java.util.function.Consumer<T> setter,
JsonNode rootNode,
String fieldName,
java.util.function.Function<JsonNode, T> converter) {
if (rootNode.has(fieldName)) {
setter.accept(converter.apply(rootNode.get(fieldName)));
}
}
}
받아온 주식 데이터를 StockPriceData 형식에 맞게 파싱하기 위한 유틸리티 클래스
위의 이유로 인해 두 가지 입력 형식(String[], JsonNode)을 모두 작성하였습니다
@Component
@Slf4j
@RequiredArgsConstructor
public class KoreaInvestmentWebSocketHandler extends TextWebSocketHandler {
@Value("${hantu-openapi.appkey}")
private String appKey;
@Value("${hantu-openapi.appsecret}")
private String appSecret;
@Value("${hantu-openapi.websocket-domain:https://openapi.koreainvestment.com:9443}")
private String websocketDomain;
// 필수 의존성
private final StockInfoService stockInfoService;
private final ObjectMapper objectMapper;
private final RestTemplate restTemplate;
private final WebSocketEventService eventService; // connectionManager 대신 eventService 사용
// 멤버 변수
private WebSocketSession session;
private String approvalKey;
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
private final Map<String, String> subscribedStocks = new ConcurrentHashMap<>();
/**
* 웹소켓 세션이 열렸을 때 호출됨
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
this.session = session;
eventService.setSession(session); // connectionManager 대신 eventService 사용
log.info("WebSocket connection established at {}: {}", LocalDateTime.now(), session.getId());
// 접속 승인키 얻기
approvalKey = getApprovalKey();
// 연결 헤더 전송
sendConnectionHeader();
// 기존 구독 정보 복원 (재연결 시)
resubscribeStocks();
}
/**
* 메시지 수신 시 호출됨
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
log.info("Received message at {}: {}", LocalDateTime.now(), payload);
// 비동기 처리
executorService.submit(() -> processMessage(payload));
}
/**
* 연결 종료 시 호출됨
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
log.info("WebSocket connection closed at {}: {}, status: {}",
LocalDateTime.now(), session.getId(), status);
this.session = null;
eventService.setConnectionActive(false); // connectionManager 대신 eventService 사용
eventService.setSession(null);
}
/**
* 에러 발생 시 호출됨
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("WebSocket transport error in session {}", session.getId(), exception);
eventService.setConnectionActive(false); // connectionManager 대신 eventService 사용
}
/**
* 웹소켓 접속키 발급
*/
public String getApprovalKey() {
if (approvalKey != null) {
return approvalKey;
}
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
Map<String, String> requestMap = new HashMap<>();
requestMap.put("grant_type", "client_credentials");
requestMap.put("appkey", appKey);
requestMap.put("secretkey", appSecret);
HttpEntity<Map<String, String>> entity = new HttpEntity<>(requestMap, headers);
ResponseEntity<String> response = restTemplate.exchange(
websocketDomain + "/oauth2/Approval",
HttpMethod.POST,
entity,
String.class
);
if (response.getStatusCode() == HttpStatus.OK) {
JsonNode rootNode = objectMapper.readTree(response.getBody());
approvalKey = rootNode.get("approval_key").asText();
log.info("Approval key obtained: {}", approvalKey);
return approvalKey;
} else {
throw new RuntimeException("Failed to obtain approval key. Status code: " + response.getStatusCode());
}
} catch (Exception e) {
log.error("Error getting approval key", e);
throw new RuntimeException("Failed to obtain approval key", e);
}
}
/**
* 접속 헤더 전송
*/
private void sendConnectionHeader() {
try {
Map<String, Object> header = new HashMap<>();
header.put("approval_key", approvalKey);
header.put("custtype", "P");
header.put("tr_type", "1");
header.put("content-type", "utf-8");
Map<String, String> input = new HashMap<>();
input.put("tr_id", "H0STCNT0");
input.put("tr_key", "005930"); // 기본 샘플 종목 코드
Map<String, Object> body = new HashMap<>();
body.put("input", input);
Map<String, Object> request = new HashMap<>();
request.put("header", header);
request.put("body", body);
String requestJson = objectMapper.writeValueAsString(request);
log.info("Sending connection header: {}", requestJson);
session.sendMessage(new TextMessage(requestJson));
log.info("Connection header sent successfully");
} catch (Exception e) {
log.error("Error sending connection header", e);
}
}
/**
* 실시간 체결가 구독
*/
public void subscribeStockPrice(String stockCode, String marketCode) {
if (session == null || !session.isOpen()) {
log.warn("WebSocket not connected. Cannot subscribe to {}", stockCode);
return;
}
try {
Map<String, Object> header = new HashMap<>();
header.put("approval_key", approvalKey);
header.put("custtype", "P");
header.put("tr_type", "1");
header.put("content-type", "utf-8");
Map<String, String> input = new HashMap<>();
input.put("tr_id", "H0STCNT0");
input.put("tr_key", stockCode);
Map<String, Object> body = new HashMap<>();
body.put("input", input);
Map<String, Object> request = new HashMap<>();
request.put("header", header);
request.put("body", body);
String requestJson = objectMapper.writeValueAsString(request);
log.info("Sending subscription request for stock {}: {}", stockCode, requestJson);
session.sendMessage(new TextMessage(requestJson));
subscribedStocks.put(stockCode, marketCode);
log.info("Subscription request sent for stock: {}", stockCode);
} catch (Exception e) {
log.error("Error subscribing to stock price for code: {}", stockCode, e);
}
}
/**
* 실시간 시세 구독 해제
*/
public void unsubscribeStockPrice(String stockCode, String marketCode) {
if (session == null || !session.isOpen()) {
log.warn("WebSocket not connected. Cannot unsubscribe.");
return;
}
try {
String trKey = "H1".equals(marketCode) ? "H1_" : "H2_";
Map<String, Object> header = new HashMap<>();
header.put("tr_type", "2");
header.put("tr_id", "H0STCNT0");
header.put("tr_key", trKey + stockCode);
Map<String, String> input = new HashMap<>();
input.put("mksc_shrn_iscd", stockCode);
Map<String, Object> body = new HashMap<>();
body.put("input", input);
Map<String, Object> request = new HashMap<>();
request.put("header", header);
request.put("body", body);
String requestJson = objectMapper.writeValueAsString(request);
session.sendMessage(new TextMessage(requestJson));
subscribedStocks.remove(stockCode);
log.info("Unsubscribed from real-time price for stock: {}", stockCode);
} catch (Exception e) {
log.error("Error unsubscribing from stock price for code: {}", stockCode, e);
}
}
/**
* 하트비트 메시지 전송
*/
public void sendHeartbeat() {
if (session == null || !session.isOpen()) {
log.warn("WebSocket not connected. Cannot send heartbeat.");
return;
}
try {
Map<String, Object> header = new HashMap<>();
header.put("tr_type", "3");
Map<String, Object> request = new HashMap<>();
request.put("header", header);
String requestJson = objectMapper.writeValueAsString(request);
session.sendMessage(new TextMessage(requestJson));
log.debug("Heartbeat sent");
} catch (Exception e) {
log.error("Error sending heartbeat", e);
eventService.setConnectionActive(false); // connectionManager 대신 eventService 사용
}
}
/**
* 메시지 처리
*/
private void processMessage(String message) {
try {
// 파이프(|)로 구분된 메시지인지 확인 (실제 데이터 메시지)
if (message.matches("\\d+\\|H0STCNT0\\|\\d+\\|.*")) {
// 메시지 형식: 0|H0STCNT0|001|데이터...
String[] parts = message.split("\\|");
if (parts.length >= 4) {
String data = parts[3]; // 실제 데이터 부분
String[] fields = data.split("\\^");
// StockPriceData 객체 생성 및 처리
if (fields.length >= 50) { // 필드 개수 체크
StockPriceData priceData = StockDataParserUtil.parseStockPriceData(fields);
log.info("Received stock price update:");
log.info(priceData.toString());
// // DB 업데이트
// stockInfoService.updateCurrentPrice(priceData.getStockCode(), priceData.getCurrentPrice())
// .ifPresentOrElse(
// stockInfo -> log.debug("Updated price for {} ({}): {}",
// stockInfo.getCompany(), priceData.getStockCode(), priceData.getCurrentPrice()),
// () -> log.warn("Failed to update price (stock not found): {}", priceData.getStockCode())
// );
}
return; // 메시지 처리 완료
}
}
// 기존 JSON 형식 메시지 처리
JsonNode rootNode = objectMapper.readTree(message);
if (rootNode.has("header")) {
if (rootNode.get("header").has("tr_id")) {
String trId = rootNode.get("header").get("tr_id").asText();
if ("H0STCNT0".equals(trId) && rootNode.has("body")) {
handleStockPriceUpdate(rootNode.get("body"));
} else if ("PINGPONG".equals(trId)) {
log.debug("Received PINGPONG message");
} else {
log.info("Other message type received: {}", trId);
}
} else {
log.info("Connection response or other message type");
}
} else {
log.info("Message without header received");
}
} catch (Exception e) {
// JSON 파싱 에러는 로그 레벨을 DEBUG로 낮춤 (파이프 구분 메시지 때문에)
if (e instanceof com.fasterxml.jackson.core.JsonParseException) {
log.debug("JSON parse error (likely pipe-delimited message): {}", e.getMessage());
} else {
log.error("Error processing WebSocket message: {}", message, e);
}
}
}
/**
* 주식 가격 업데이트 처리 - StockDataParserUtil 활용
*/
private void handleStockPriceUpdate(JsonNode body) {
try {
// 구독 성공 메시지는 로그만 출력하고 종료
if (body.has("msg1") && "SUBSCRIBE SUCCESS".equals(body.get("msg1").asText())) {
log.info("Successfully subscribed to stock price updates");
return;
}
if (body.has("output")) {
JsonNode output = body.get("output");
// StockPriceData 객체 생성
StockPriceData priceData;
if (output.isTextual()) {
// 문자열 형태로 전달된 데이터 파싱
String outputData = output.asText();
String[] fields = outputData.split("\\^");
if (fields.length < 50) { // 필드 개수 체크
log.warn("Received incomplete price data: {}", outputData);
return;
}
// 유틸리티 클래스를 통한 파싱
priceData = StockDataParserUtil.parseStockPriceData(fields);
} else {
// JSON 객체로 전달된 데이터 처리
if (!output.has("mksc_shrn_iscd")) {
log.warn("Required field 'mksc_shrn_iscd' missing in output");
return;
}
// JSON 노드에서 StockPriceData 생성
priceData = StockDataParserUtil.parseStockPriceData(output);
}
// 이쁘게 포맷팅된 체결 정보 로그 출력
log.info(priceData.toString());
// DB 업데이트
// stockInfoService.updateCurrentPrice(priceData.getStockCode(), priceData.getCurrentPrice())
// .ifPresentOrElse(
// stockInfo -> log.debug("Updated price for {} ({}): {}",
// stockInfo.getCompany(), priceData.getStockCode(), priceData.getCurrentPrice()),
// () -> log.warn("Failed to update price (stock not found): {}", priceData.getStockCode())
// );
// Kafka로 데이터 전송 (구현 필요시)
// kafkaProducer.sendTickData(priceData);
}
} catch (Exception e) {
log.error("Error handling stock price update", e);
}
}
/**
* 재연결 후 기존 구독 복원
*/
private void resubscribeStocks() {
if (subscribedStocks.isEmpty()) {
return;
}
log.info("Resubscribing to {} stocks after reconnection", subscribedStocks.size());
subscribedStocks.forEach((stockCode, marketCode) -> {
try {
Thread.sleep(100); // 요청 간 짧은 딜레이
subscribeStockPrice(stockCode, marketCode);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
/**
* 메시지를 StockTickData로 변환 (Kafka 전송용)
*/
private Object convertToTickData(JsonNode output) {
ObjectNode tickData = objectMapper.createObjectNode();
tickData.put("stockCode", output.get("mksc_shrn_iscd").asText());
tickData.put("currentPrice", output.has("stck_prpr") ?
Double.parseDouble(output.get("stck_prpr").asText()) : 0);
tickData.put("priceChange", output.has("prdy_vrss") ?
Double.parseDouble(output.get("prdy_vrss").asText()) : 0);
tickData.put("changeRate", output.has("prdy_ctrt") ?
Double.parseDouble(output.get("prdy_ctrt").asText()) : 0);
tickData.put("volume", output.has("acml_vol") ?
Long.parseLong(output.get("acml_vol").asText()) : 0);
tickData.put("timestamp", LocalDateTime.now().toString());
return tickData;
}
/**
* 여러 종목 동시 구독
*/
public void subscribeMultipleStocks(String[] stockCodes, String marketCode) {
if (stockCodes == null || stockCodes.length == 0) {
return;
}
log.info("Subscribing to {} stocks", stockCodes.length);
for (String stockCode : stockCodes) {
try {
Thread.sleep(100); // API 호출 제한 고려하여 짧은 딜레이
subscribeStockPrice(stockCode, marketCode);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
실시간으로 틱이 잘 오는 것을 확인할 수 있습니다
멋져요-! 잘 보고 갑니다-!