rust binance websocket

wangki·4일 전

Rust

목록 보기
59/59

개요

다시 한번 자동 매매 프로그램을 만들 예정이다.

내용

timescaleDB라는 시계열 전용 데이터베이스를 활용한다.
docker로 로컬에 설치를 했다.

// main.rs
#[tokio::main]
async fn main() {
    // Tracing 초기화
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::DEBUG)
        .with_line_number(true)
        .with_target(true)
        .with_thread_ids(true)
        .init();

    info!("암호화폐 트레이딩 봇 시작...");

    let connection_string = ****

    // 데이터베이스 클라이언트 초기화
    let db_client = match DbClient::new(connection_string).await {
        Ok(client) => {
            info!("데이터베이스 연결 성공");
            client
        }
        Err(e) => {
            error!("데이터베이스 연결 실패: {}", e);
            return;
        }
    };

    // 테이블 초기화
    if let Err(e) = db_client.init_candles_table().await {
        error!("테이블 초기화 실패: {}", e);
        return;
    }
    info!("테이블 준비 완료");

    // WebSocket 연결 시작
    info!("바이낸스 WebSocket 스트리밍 시작...");
    let db_client_arc = Arc::new(db_client);

    if let Err(e) = binance_websocket::connect_binance_stream(db_client_arc.clone(), "BTCUSDT").await {
        error!("WebSocket 연결 실패: {}", e);
        return;
    }
}

logging은 tracing 크레이트를 사용했다. 이제는 필수라고 볼 수 있다.

pub async fn connect_binance_stream(
    db_client: Arc<DbClient>,
    symbol: &str,
) -> Result<(), Box<dyn std::error::Error>> {
    let stream_name = format!("{}@kline_1m", symbol.to_lowercase());
    let ws_url = format!("wss://fstream.binance.com/ws/{}", stream_name);

    info!("바이낸스 WebSocket 연결 시작: {}", ws_url);

    let (ws_stream, _) = tokio_tungstenite::connect_async(&ws_url).await?;
    info!("바이낸스 WebSocket 연결 성공");

    let (_write, mut read) = ws_stream.split();

    // 채널 생성 (데이터 처리용)
    let (tx, mut rx) = mpsc::channel::<KlineData>(100);

    // WebSocket 메시지 읽기 태스크
    let read_task = tokio::spawn(async move {
        info!("WebSocket 메시지 수신 대기 시작...");
        let mut msg_count = 0;
        loop {
            match read.next().await {
                Some(Ok(Message::Text(text))) => {
                    msg_count += 1;
                    match serde_json::from_str::<BinanceKlineMessage>(&text) {
                        Ok(kline_msg) => {
                            info!(
                                msg_count = msg_count,
                                symbol = %kline_msg.data.s,
                                is_closed = kline_msg.data.k.x,
                                "캔들 메시지 수신"
                            );

                            if kline_msg.data.k.x {
                                // 캔들이 완성됨
                                let kline_data = KlineData {
                                    symbol: kline_msg.data.s.clone(),
                                    open: kline_msg.data.k.open_price.parse().unwrap_or(0.0),
                                    high: kline_msg.data.k.high_price.parse().unwrap_or(0.0),
                                    low: kline_msg.data.k.low_price.parse().unwrap_or(0.0),
                                    close: kline_msg.data.k.close_price.parse().unwrap_or(0.0),
                                    volume: kline_msg.data.k.base_volume.parse().unwrap_or(0.0),
                                    quote_asset_volume: kline_msg.data.k.quote_volume.parse().unwrap_or(0.0),
                                    timestamp: kline_msg.data.k.t,
                                };

                                if let Err(e) = tx.send(kline_data).await {
                                    error!("채널 전송 실패: {}", e);
                                    break;
                                }
                            }
                        }
                        Err(e) => {
                            error!("JSON 파싱 실패: {} | 원문: {}", e, &text[..text.len().min(100)]);
                        }
                    }
                }
                Some(Ok(Message::Close(_))) => {
                    warn!("WebSocket 연결이 종료되었습니다");
                    break;
                }
                Some(Ok(msg)) => {
                    info!("기타 메시지: {:?}", msg);
                }
                Some(Err(e)) => {
                    error!("WebSocket 오류: {}", e);
                    break;
                }
                None => {
                    warn!("WebSocket 스트림 종료");
                    break;
                }
            }
        }
        info!("읽기 태스크 종료 (총 {} 개 메시지)", msg_count);
    });

    // 데이터 저장 태스크
    let save_task = tokio::spawn(async move {
        while let Some(kline) = rx.recv().await {
            match db_client.insert_candle(
                &kline.symbol,
                kline.open,
                kline.high,
                kline.low,
                kline.close,
                kline.volume,
                kline.quote_asset_volume,
                kline.timestamp,
            )
            .await
            {
                Ok(_) => {
                    debug!(
                        symbol = %kline.symbol,
                        open = kline.open,
                        high = kline.high,
                        low = kline.low,
                        close = kline.close,
                        "캔들 데이터 저장 성공"
                    );
                }
                Err(e) => {
                    error!("데이터베이스 저장 실패: {}", e);
                }
            }
        }
        info!("저장 태스크 종료");
    });

    // 읽기 태스크가 에러나 연결 종료될 때까지 대기
    if let Err(e) = read_task.await {
        error!("읽기 태스크 에러: {}", e);
    }

    // 저장 태스크 완료 대기
    if let Err(e) = save_task.await {
        error!("저장 태스크 에러: {}", e);
    }

    Ok(())
}

2개의 task를 만들어 select!으로 동시에 처리하도록 했다.
바이낸스 websocket 연결 후, 요청한 타임 프레임의 봉 마감이 이루어진 다면 채널을 통해 메시지를 보내도록 했다. 따라서 1분마다 timescaleDB에 저장이 되도록 했다.

timescaleDB의 재밌는 점은 Continuous Aggregates를 통해

  • 1분봉 -> 5분봉
  • 1분봉 -> 15분봉
  • 1분봉 -> 1시간봉
    .... 등으로 자동으로 만들어준다.

sql 문으로는

  r#"
            CREATE MATERIALIZED VIEW IF NOT EXISTS candles_5m
            WITH (timescaledb.continuous) AS
            SELECT
                symbol,
                time_bucket('5 minutes', open_time) AS bucket,
                FIRST(open, open_time) AS open,
                MAX(high) AS high,
                MIN(low) AS low,
                LAST(close, open_time) AS close,
                SUM(volume) AS volume
            FROM candles
            GROUP BY symbol, bucket
            "#

이렇게 해주면 된다고 한다. DBeaver를 통해 확인해봤을 때

위처럼 view가 생성되는 것을 확인했다.

21:05 ~ 21:09 총 5분동안 1분봉 데이터를 저장하였다. 저장이 정확히 되는지는 실제 차트를 보면서 확인할 수 있다.


21:09 1분봉의 OHLC이다. 정확한 것을 확인할 수 있고 자동으로 5분봉 데이터가 생성되는지 확인해보겠다.

데이터가 생성은 되었다. 값이 정확한지 실제 차트로 비교해 보겠다.

정확한 것을 확인했다.

5분봉을 만들기 위해 타임 스케줄을 사용해 5분마다 1분봉 5개를 조회하여 합친다거나 그런 불필요한 작업을 하지 않아도 되는 것이 엄청난 장점인 것 같다. 자동 파티셔닝을 통해 읽기와 쓰기 속도가 데이터가 아무리 많아져도 느려지지 않는다는 장점이 있다고 한다.

데이터
https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Kline-Candlestick-Streams

결론

이전에는 원하는 기술을 억지로 사용하면서 개발을 했는데 이제는 상황에 맞는 기술을 선택하는 것이 고수라는 것을 깨닫고 있다. 특히 ai 시대에 최적의 선택을 할 수 있는 능력이 중요할 것 같다.

알고리즘을 구현 후, 백테스팅과 실제 자동 매매를 구현해 보겠다.

0개의 댓글