# Claw Code 깊이 읽기 #3 — API 통신과 SSE 스트리밍 구현

조현상·2026년 4월 1일

ClaudeCode

목록 보기
3/17
post-thumbnail

바이트 스트림이 구조화된 이벤트가 되기까지. api 크레이트의 모든 레이어를 한 줄씩 따라간다.


들어가며: HTTP 너머의 복잡성

"API 클라이언트"라고 하면 단순한 HTTP 요청/응답을 떠올리기 쉽다. 하지만 Claw Code의 api 크레이트를 열어보면, 그 안에는 놀라울 정도로 많은 엔지니어링이 숨어 있다.

인증 해석(4가지 모드 × 환경변수 + OAuth 토큰 갱신), SSE 스트리밍(불완전한 청크 버퍼링 + 프레임 경계 탐지), 재시도 로직(지수 백오프 + 재시도 가능 여부 판정), OAuth PKCE(SHA256 챌린지 + 토큰 교환). 이 모든 것이 하나의 크레이트에 응집되어 있으면서도, 각각이 명확한 파일 경계를 가진다.

이번 편에서는 api 크레이트의 다섯 개 소스 파일(client.rs, sse.rs, types.rs, error.rs)과 런타임의 oauth.rs를 순서대로 읽으며, 바이트가 의미 있는 이벤트로 변환되는 전 과정을 추적한다.


1. 인증 체계: 네 가지 얼굴의 AuthSource

API 통신의 첫 번째 관문은 인증이다. Claw Code는 네 가지 인증 모드를 열거형으로 모델링한다:

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuthSource {
    None,
    ApiKey(String),
    BearerToken(String),
    ApiKeyAndBearer {
        api_key: String,
        bearer_token: String,
    },
}

왜 네 가지나 필요할까? 각각의 존재 이유가 있다:

  • None: 테스트 환경에서 인증 없이 mock 서버와 통신할 때
  • ApiKey: 가장 일반적인 경우. ANTHROPIC_API_KEY 환경변수 사용
  • BearerToken: OAuth 흐름으로 얻은 액세스 토큰. 표준 Authorization: Bearer 헤더
  • ApiKeyAndBearer: API 키에서 OAuth로 전환하는 과도기. 두 헤더를 동시에 전송

apply() 메서드가 이 네 가지를 HTTP 요청에 주입한다:

pub fn apply(&self, mut request_builder: reqwest::RequestBuilder)
    -> reqwest::RequestBuilder {
    if let Some(api_key) = self.api_key() {
        request_builder = request_builder.header("x-api-key", api_key);
    }
    if let Some(token) = self.bearer_token() {
        request_builder = request_builder.bearer_auth(token);
    }
    request_builder
}

조건 분기 없이 api_key()bearer_token() 접근자 메서드를 사용하는 것이 깔끔하다. ApiKeyAndBearer 변형에서는 두 메서드 모두 Some을 반환하므로, 자연스럽게 두 헤더가 모두 추가된다.

인증 해석의 우선순위: 게으른 평가의 미학

실제 인증 소스를 결정하는 함수는 더 흥미롭다:

pub fn resolve_startup_auth_source<F>(load_oauth_config: F)
    -> Result<AuthSource, ApiError>
where
    F: FnOnce() -> Result<Option<OAuthConfig>, ApiError>,
{
    // 1단계: 환경변수 확인 (최우선)
    if let Some(api_key) = read_env_non_empty("ANTHROPIC_API_KEY")? {
        return match read_env_non_empty("ANTHROPIC_AUTH_TOKEN")? {
            Some(bearer_token) => Ok(AuthSource::ApiKeyAndBearer {
                api_key, bearer_token,
            }),
            None => Ok(AuthSource::ApiKey(api_key)),
        };
    }
    if let Some(bearer_token) = read_env_non_empty("ANTHROPIC_AUTH_TOKEN")? {
        return Ok(AuthSource::BearerToken(bearer_token));
    }

    // 2단계: 저장된 OAuth 토큰 확인
    let Some(token_set) = load_saved_oauth_token()? else {
        return Err(ApiError::MissingApiKey);
    };

    // 3a: 토큰이 유효하면 바로 사용 (설정 로딩 불필요)
    if !oauth_token_is_expired(&token_set) {
        return Ok(AuthSource::BearerToken(token_set.access_token));
    }

    // 3b: 만료 + refresh_token 없음 → 즉시 실패
    if token_set.refresh_token.is_none() {
        return Err(ApiError::ExpiredOAuthToken);
    }

    // 3c: 만료 + refresh 가능 → 이때서야 config 로딩
    let Some(config) = load_oauth_config()? else {
        return Err(ApiError::Auth(
            "saved OAuth token is expired; runtime OAuth config is missing".into(),
        ));
    };
    Ok(AuthSource::from(resolve_saved_oauth_token_set(&config, token_set)?))
}

이 함수의 설계에서 가장 주목할 점은 제네릭 파라미터 F: FnOnce()다. OAuth 설정 로딩(파일 I/O + JSON 파싱)은 비용이 있는 연산이다. 이 클로저는 토큰이 만료되고 refresh가 필요할 때만 호출된다. 대부분의 경우(환경변수가 있거나, 토큰이 유효하면) 이 비용은 발생하지 않는다.

흐름을 다이어그램으로 정리하면:

환경변수 확인
├─ API_KEY + AUTH_TOKEN → ApiKeyAndBearer (즉시 반환)
├─ API_KEY만           → ApiKey          (즉시 반환)
├─ AUTH_TOKEN만        → BearerToken     (즉시 반환)
│
저장된 OAuth 토큰 확인
├─ 토큰 없음           → MissingApiKey 에러
├─ 유효한 토큰         → BearerToken     (설정 로딩 안 함)
├─ 만료 + refresh 없음 → ExpiredOAuthToken 에러
└─ 만료 + refresh 있음 → load_oauth_config() 호출 → 토큰 갱신

테스트 친화적 설계: 테스트에서는 || Ok(None)을 넘기면 OAuth 설정 로딩 없이 인증 해석을 테스트할 수 있다. 이것이 클로저 주입의 실질적인 가치다.


2. AnthropicClient: 재시도의 기술

클라이언트 구조체

#[derive(Debug, Clone)]
pub struct AnthropicClient {
    http: reqwest::Client,
    auth: AuthSource,
    base_url: String,        // 기본값: "https://api.anthropic.com"
    max_retries: u32,        // 기본값: 2
    initial_backoff: Duration, // 기본값: 200ms
    max_backoff: Duration,   // 기본값: 2s
}

Clone을 derive한 것은 의도적이다. reqwest::Client는 내부적으로 Arc로 커넥션 풀을 공유하므로, 클론해도 새 커넥션을 만들지 않는다. 이 덕분에 AnthropicClient를 여러 스레드에서 안전하게 공유할 수 있다.

두 가지 전송 모드

// 동기 전송: 전체 응답을 한 번에 수신
pub async fn send_message(&self, request: &MessageRequest)
    -> Result<MessageResponse, ApiError> {
    let request = MessageRequest {
        stream: false,       // 스트리밍 강제 비활성화
        ..request.clone()
    };
    let response = self.send_with_retry(&request).await?;
    let mut response = response.json::<MessageResponse>().await?;
    // request_id 헤더 → 응답 본문으로 전파
    if response.request_id.is_none() {
        response.request_id = request_id_from_headers(response.headers());
    }
    Ok(response)
}

// 스트리밍 전송: SSE 이벤트를 점진적으로 수신
pub async fn stream_message(&self, request: &MessageRequest)
    -> Result<MessageStream, ApiError> {
    let response = self
        .send_with_retry(&request.clone().with_streaming())
        .await?;
    Ok(MessageStream {
        request_id: request_id_from_headers(response.headers()),
        response,
        parser: SseParser::new(),
        pending: VecDeque::new(),
        done: false,
    })
}

send_message()stream: false강제하는 것에 주목하자. 호출자가 실수로 stream: true를 전달해도 무시한다. 반대로 stream_message()with_streaming()을 호출하여 stream: true를 보장한다. 이 방어적 프로그래밍이 "잘못된 사용을 불가능하게 만드는" Rust의 설계 철학과 일맥상통한다.

지수 백오프: 수학과 안전성

재시도 로직의 핵심인 백오프 계산을 살펴보자:

fn backoff_for_attempt(&self, attempt: u32) -> Result<Duration, ApiError> {
    let Some(multiplier) = 1_u32.checked_shl(attempt.saturating_sub(1)) else {
        return Err(ApiError::BackoffOverflow {
            attempt,
            base_delay: self.initial_backoff,
        });
    };
    Ok(self
        .initial_backoff
        .checked_mul(multiplier)
        .map_or(self.max_backoff, |delay| delay.min(self.max_backoff)))
}

이 짧은 함수에 세 겹의 오버플로우 방어가 숨어 있다:

  1. checked_shl(): 2^(attempt-1) 계산 시 비트 시프트 오버플로우 방지. attempt >= 33이면 None 반환
  2. checked_mul(): base_delay × multiplier 곱셈 오버플로우 방지. 오버플로우 시 max_backoff 사용
  3. .min(max_backoff): 정상 범위에서도 상한 제한

공식은 delay = min(initial_backoff × 2^(attempt-1), max_backoff)이며, 기본값으로 계산하면:

시도계산대기 시간
1회차200ms × 2^0 = 200ms200ms
2회차200ms × 2^1 = 400ms400ms
3회차200ms × 2^2 = 800ms800ms
4회차200ms × 2^3 = 1,600ms1,600ms
5회차+200ms × 2^4 = 3,200ms → cap2,000ms

일반적인 구현에서는 단순히 base * 2^n으로 작성하고 오버플로우를 무시하는 경우가 많다. Claw Code의 구현은 어떤 입력에서도 패닉하지 않는다는 것을 수학적으로 보장한다. unsafe_code = "forbid" 정책의 정신이 여기서도 관철된다.

send_with_retry(): 재시도 루프의 전체 구조

async fn send_with_retry(&self, request: &MessageRequest)
    -> Result<reqwest::Response, ApiError> {
    let mut attempts = 0;
    let mut last_error: Option<ApiError>;

    loop {
        attempts += 1;
        match self.send_raw_request(request).await {
            Ok(response) => match expect_success(response).await {
                Ok(response) => return Ok(response),
                Err(error) if error.is_retryable()
                    && attempts <= self.max_retries + 1 => {
                    last_error = Some(error);
                }
                Err(error) => return Err(error),
            },
            Err(error) if error.is_retryable()
                && attempts <= self.max_retries + 1 => {
                last_error = Some(error);
            }
            Err(error) => return Err(error),
        }

        if attempts > self.max_retries {
            break;
        }
        tokio::time::sleep(self.backoff_for_attempt(attempts)?).await;
    }

    Err(ApiError::RetriesExhausted {
        attempts,
        last_error: Box::new(last_error.expect("...")),
    })
}

이 루프에서 주목할 부분은 이중 패턴 매칭이다. 외부 match는 HTTP 요청 자체의 성공/실패를, 내부 match는 HTTP 응답 상태 코드의 성공/실패를 판정한다. 두 경로 모두에서 is_retryable()을 확인하는 것은 네트워크 에러(연결 실패)와 서버 에러(5xx)를 동일한 재시도 로직으로 처리하겠다는 결정이다.

attempts <= self.max_retries + 1 조건은 약간 혼란스러울 수 있다. max_retries = 2일 때 총 3번 시도(초기 1회 + 재시도 2회)를 보장하기 위한 것이다.

재시도 가능한 상태 코드

const fn is_retryable_status(status: reqwest::StatusCode) -> bool {
    matches!(status.as_u16(), 408 | 409 | 429 | 500 | 502 | 503 | 504)
}

이 목록은 Anthropic API의 실제 동작 패턴을 반영한다:

코드의미재시도 이유
408Request Timeout서버가 처리 시간 초과
409Conflict일시적 충돌 (동시 요청)
429Too Many RequestsRate limit. 시간 지나면 해결
500Internal Server Error서버 일시적 장애
502Bad Gateway프록시/로드밸런서 문제
503Service Unavailable서버 과부하
504Gateway Timeout프록시 타임아웃

반면 400 Bad Request401 Unauthorized는 재시도해도 의미가 없으므로 즉시 실패한다. 이 분류가 정확해야 불필요한 대기를 피할 수 있다.


3. SSE 파서: 바이트에서 이벤트로

SSE(Server-Sent Events)는 서버가 클라이언트에게 실시간으로 데이터를 푸시하는 HTTP 기반 프로토콜이다. Anthropic API는 이를 통해 토큰을 하나씩 스트리밍한다. Claw Code의 SseParser는 이 프로토콜을 청크 단위로 점진적 파싱한다.

왜 점진적 파싱이 필요한가

HTTP 청크(chunk)는 SSE 프레임 경계와 일치하지 않는다. 하나의 청크가 프레임의 절반만 포함할 수도, 여러 프레임을 포함할 수도 있다:

청크 1: "event: content_block_delta\nda"
청크 2: "ta: {\"type\":\"content_block_delta\"}\n\nevent: mes"
청크 3: "sage_stop\ndata: {\"type\":\"message_stop\"}\n\n"

SseParser는 이 불규칙한 입력을 안정적으로 처리해야 한다.

SseParser 구조

#[derive(Debug, Default)]
pub struct SseParser {
    buffer: Vec<u8>,
}

상태는 단 하나: 불완전한 프레임을 보관하는 버퍼. 이 단순함이 핵심이다. 복잡한 상태 머신 대신, "버퍼에 추가 → 완전한 프레임 추출 → 반복"이라는 직관적인 알고리즘을 사용한다.

push(): 버퍼링과 프레임 추출

pub fn push(&mut self, chunk: &[u8]) -> Result<Vec<StreamEvent>, ApiError> {
    self.buffer.extend_from_slice(chunk);
    let mut events = Vec::new();

    while let Some(frame) = self.next_frame() {
        if let Some(event) = parse_frame(&frame)? {
            events.push(event);
        }
    }

    Ok(events)
}

하나의 청크에서 0개, 1개, 또는 여러 개의 이벤트가 나올 수 있다. while let 루프가 프레임이 더 이상 추출되지 않을 때까지 반복한다.

next_frame(): 프레임 경계 탐지의 세밀함

fn next_frame(&mut self) -> Option<String> {
    let separator = self.buffer
        .windows(2)
        .position(|window| window == b"\n\n")
        .map(|position| (position, 2))
        .or_else(|| {
            self.buffer
                .windows(4)
                .position(|window| window == b"\r\n\r\n")
                .map(|position| (position, 4))
        })?;

    let (position, separator_len) = separator;
    let frame = self.buffer
        .drain(..position + separator_len)
        .collect::<Vec<_>>();
    let frame_len = frame.len().saturating_sub(separator_len);
    Some(String::from_utf8_lossy(&frame[..frame_len]).into_owned())
}

이 함수에서 눈여겨볼 점이 세 가지 있다:

첫째, 이중 구분자 지원. SSE 프레임의 구분자는 \n\n(Unix)이지만, \r\n\r\n(Windows)도 처리한다. windows() 메서드를 사용한 슬라이딩 윈도우 검색이 Rust다운 관용구다.

둘째, drain()의 효율성. 버퍼에서 프레임을 추출할 때 drain()을 사용한다. 이는 나머지 데이터를 버퍼 앞으로 이동시키는 것을 한 번에 처리한다. split_at() + Vec::new()보다 메모리 효율적이다.

셋째, from_utf8_lossy()의 방어성. 잘못된 UTF-8 바이트가 있어도 패닉하지 않고 유니코드 대체 문자(�)로 변환한다. 네트워크에서 받는 데이터를 신뢰하지 않는 안전한 접근이다.

parse_frame(): SSE 프로토콜의 해석

pub fn parse_frame(frame: &str) -> Result<Option<StreamEvent>, ApiError> {
    let trimmed = frame.trim();
    if trimmed.is_empty() {
        return Ok(None);
    }

    let mut data_lines = Vec::new();
    let mut event_name: Option<&str> = None;

    for line in trimmed.lines() {
        if line.starts_with(':') {
            continue;                    // 주석 무시 (SSE 스펙)
        }
        if let Some(name) = line.strip_prefix("event:") {
            event_name = Some(name.trim());
            continue;
        }
        if let Some(data) = line.strip_prefix("data:") {
            data_lines.push(data.trim_start());
        }
    }

    // ping = keep-alive, 무시
    if matches!(event_name, Some("ping")) {
        return Ok(None);
    }

    if data_lines.is_empty() {
        return Ok(None);
    }

    let payload = data_lines.join("\n");

    // [DONE] = 스트림 종료 마커, 무시
    if payload == "[DONE]" {
        return Ok(None);
    }

    serde_json::from_str::<StreamEvent>(&payload)
        .map(Some)
        .map_err(ApiError::from)
}

이 함수는 SSE 프로토콜의 세 가지 특수 케이스를 우아하게 처리한다:

  1. : 접두사 = 주석. SSE 스펙에 따라 무시한다. 서버가 디버깅 정보를 넣을 때 사용
  2. ping 이벤트 = 연결 유지. 응답 없이 건너뛴다
  3. [DONE] 데이터 = 종료 마커. OpenAI API 호환성을 위한 것으로 추정

나머지 모든 경우에는 data: 뒤의 JSON을 StreamEvent로 역직렬화한다. 여러 줄의 data:가 있으면 \n으로 합치는 것도 SSE 스펙 준수다.

finish(): 스트림 종료 처리

pub fn finish(&mut self) -> Result<Vec<StreamEvent>, ApiError> {
    if self.buffer.is_empty() {
        return Ok(Vec::new());
    }

    let trailing = std::mem::take(&mut self.buffer);
    match parse_frame(&String::from_utf8_lossy(&trailing))? {
        Some(event) => Ok(vec![event]),
        None => Ok(Vec::new()),
    }
}

std::mem::take()이 눈에 띈다. 버퍼의 소유권을 가져오면서 원래 위치에 빈 Vec을 남긴다. clone() 없이 효율적으로 데이터를 이동시키는 Rust 관용구다.


4. MessageStream: 스트리밍의 오케스트레이션

SseParser가 바이트를 이벤트로 변환하는 "엔진"이라면, MessageStream은 이 엔진을 구동하는 "운전자"다.

#[derive(Debug)]
pub struct MessageStream {
    request_id: Option<String>,
    response: reqwest::Response,
    parser: SseParser,
    pending: VecDeque<StreamEvent>,  // 파싱된 이벤트 큐
    done: bool,                      // 수신 완료 플래그
}

VecDeque<StreamEvent>가 핵심이다. 하나의 HTTP 청크에서 여러 이벤트가 나올 수 있으므로, 파싱 결과를 큐에 넣고 하나씩 반환한다.

next_event(): 세 단계 상태 머신

pub async fn next_event(&mut self) -> Result<Option<StreamEvent>, ApiError> {
    loop {
        // 1단계: 큐에 이벤트가 있으면 즉시 반환
        if let Some(event) = self.pending.pop_front() {
            return Ok(Some(event));
        }

        // 2단계: 스트림이 끝났으면 잔여 데이터 처리
        if self.done {
            let remaining = self.parser.finish()?;
            self.pending.extend(remaining);
            if let Some(event) = self.pending.pop_front() {
                return Ok(Some(event));
            }
            return Ok(None);  // 스트림 완전 종료
        }

        // 3단계: 다음 HTTP 청크 수신
        match self.response.chunk().await? {
            Some(chunk) => {
                self.pending.extend(self.parser.push(&chunk)?);
            }
            None => {
                self.done = true;  // HTTP 응답 종료
            }
        }
    }
}

이 메서드의 loop내부적으로 반복하는 것이 중요하다. 호출자 관점에서는 항상 Some(event) 또는 None(종료)만 받는다. "데이터는 왔지만 완전한 프레임이 아직 안 됐다"는 내부 상태가 외부로 노출되지 않는다.

상태 전이를 다이어그램으로 그려보면:

[시작] → 큐 확인 → 이벤트 있음 → 반환
                 → 이벤트 없음 → done? → Yes → finish() → 이벤트 있음 → 반환
                                                         → 이벤트 없음 → None
                                       → No → chunk() → 데이터 있음 → push() → 루프
                                                       → 데이터 없음 → done=true → 루프

5. 타입 시스템: API 계약의 Rust 표현

MessageRequest: serde의 정교한 활용

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct MessageRequest {
    pub model: String,
    pub max_tokens: u32,
    pub messages: Vec<InputMessage>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub system: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tools: Option<Vec<ToolDefinition>>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub tool_choice: Option<ToolChoice>,
    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
    pub stream: bool,
}

skip_serializing_if 어트리뷰트들이 Anthropic API의 기대에 맞춘 것이다. API는 null 값보다 필드 자체가 없는 것을 선호한다. "tools": null은 "도구 없음"이 아니라 잘못된 요청일 수 있기 때문이다.

stream 필드의 std::ops::Not::not은 흥미로운 트릭이다. boolNot 트레이트를 활용하여 false일 때 직렬화를 건너뛴다. stream: true일 때만 JSON에 포함된다.

InputContentBlock: 태그 기반 직렬화

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InputContentBlock {
    Text { text: String },
    ToolUse { id: String, name: String, input: Value },
    ToolResult {
        tool_use_id: String,
        content: Vec<ToolResultContentBlock>,
        #[serde(default, skip_serializing_if = "std::ops::Not::not")]
        is_error: bool,
    },
}

#[serde(tag = "type", rename_all = "snake_case")]가 핵심이다. Rust의 ToolUse 변형이 JSON에서는 이렇게 된다:

{
    "type": "tool_use",
    "id": "toolu_01A...",
    "name": "bash",
    "input": {"command": "ls"}
}

Anthropic API의 메시지 형식과 정확히 일치한다. 이 매핑을 수동으로 구현했다면 수십 줄의 코드가 필요했을 것이다.

StreamEvent: 스트리밍 프로토콜의 전체 표현

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamEvent {
    MessageStart(MessageStartEvent),
    MessageDelta(MessageDeltaEvent),
    ContentBlockStart(ContentBlockStartEvent),
    ContentBlockDelta(ContentBlockDeltaEvent),
    ContentBlockStop(ContentBlockStopEvent),
    MessageStop(MessageStopEvent),
}

일반적인 스트리밍 시퀀스는 이렇다:

MessageStart          ← 메시지 메타데이터 (id, model, usage)
  ContentBlockStart   ← 콘텐츠 블록 시작 (index=0, type=text)
  ContentBlockDelta   ← "Hello"
  ContentBlockDelta   ← " world"
  ContentBlockDelta   ← "!"
  ContentBlockStop    ← 블록 종료
  ContentBlockStart   ← 다음 블록 시작 (index=1, type=tool_use)
  ContentBlockDelta   ← InputJsonDelta (부분 JSON)
  ContentBlockStop    ← 블록 종료
MessageDelta          ← stop_reason, 최종 usage
MessageStop           ← 메시지 완전 종료

ContentBlockDelta: 두 종류의 점진적 데이터

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentBlockDelta {
    TextDelta { text: String },
    InputJsonDelta { partial_json: String },
}

TextDelta는 AI의 텍스트 응답이 한 조각씩 오는 것이고, InputJsonDelta는 AI가 도구 호출의 입력(JSON)을 생성하는 중간 상태다. 후자는 불완전한 JSON 문자열이므로 바로 파싱할 수 없고, 모든 델타를 합친 후에야 완전한 JSON이 된다.

Usage: 캐시 토큰의 분리

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Usage {
    pub input_tokens: u32,
    #[serde(default)]
    pub cache_creation_input_tokens: u32,
    #[serde(default)]
    pub cache_read_input_tokens: u32,
    pub output_tokens: u32,
}

impl Usage {
    #[must_use]
    pub const fn total_tokens(&self) -> u32 {
        self.input_tokens + self.output_tokens
    }
}

total_tokens()캐시 토큰을 제외하는 것이 중요하다. Anthropic의 가격 정책에서 캐시 생성/읽기 토큰은 일반 입력 토큰과 다른 가격이 적용되기 때문이다. 비용 추정(estimate_cost_usd)에서는 이 네 가지를 모두 별도로 계산한다.


6. OAuth 2.0 PKCE: 보안 인증의 구현

PKCE란 무엇인가

PKCE(Proof Key for Code Exchange)는 OAuth 2.0의 보안 확장이다. 원래 모바일 앱을 위해 설계되었지만, CLI 도구에서도 동일한 문제를 해결한다: 인증 코드 가로채기 공격 방지.

핵심 아이디어는 단순하다. 인증 시작 시 랜덤 비밀(verifier)을 생성하고, 그 해시(challenge)를 서버에 보낸다. 나중에 코드를 교환할 때 원본 비밀을 함께 보내면, 서버가 해시를 검증한다. 코드를 가로채도 비밀을 모르면 토큰을 얻을 수 없다.

PKCE 쌍 생성

pub fn generate_pkce_pair() -> io::Result<PkceCodePair> {
    let verifier = generate_random_token(32)?;  // 32바이트 = 256비트
    Ok(PkceCodePair {
        challenge: code_challenge_s256(&verifier),
        verifier,
        challenge_method: PkceChallengeMethod::S256,
    })
}

#[must_use]
pub fn code_challenge_s256(verifier: &str) -> String {
    let digest = Sha256::digest(verifier.as_bytes());
    base64url_encode(&digest)
}

S256 = BASE64URL(SHA256(verifier)). RFC 7636에 정의된 표준 방식이다. 32바이트 랜덤 → base64url로 43문자 verifier 생성 → SHA256 해시 → base64url로 43문자 challenge.

인증 흐름의 전체 경로

┌─── CLI (Claw Code) ──────────────────────────────────┐
│                                                      │
│ 1. generate_pkce_pair() → (verifier, challenge)      │
│ 2. build_authorization_url(challenge) → URL 생성      │
│ 3. 브라우저 열기 → 사용자 로그인/동의                        │
│                                                      │
│    ... 사용자가 브라우저에서 인증 ...                       │
│                                                      │
│ 4. 콜백 수신 (localhost:4545) → code, state 추출        │
│ 5. exchange_oauth_code(code, verifier) → 토큰 교환     │
│ 6. save_oauth_credentials(token_set) → 디스크 저장      │
│                                                      │
│    ... 이후 API 호출 시 ...                             │
│                                                      │
│ 7. load_oauth_credentials() → 토큰 로드                │
│ 8. oauth_token_is_expired()? → 만료 확인               │
│ 9. (만료 시) refresh_oauth_token() → 갱신               │
└──────────────────────────────────────────────────────┘

토큰 만료 확인

#[must_use]
pub fn oauth_token_is_expired(token_set: &OAuthTokenSet) -> bool {
    token_set
        .expires_at
        .is_some_and(|expires_at| expires_at <= now_unix_timestamp())
}

<=(이하)를 사용한 것에 주목하자. 정확히 만료 시각이면 이미 만료로 판단한다. 경계 조건에서의 안전한 선택이다.

자격증명 저장: 원자적 쓰기

자격증명은 ~/.claude/credentials.json(또는 CLAUDE_CONFIG_HOME)에 저장된다. 중요한 것은 원자적 쓰기 패턴이다. 파일을 직접 덮어쓰지 않고, 임시 파일에 먼저 쓴 후 이름을 변경한다. 이렇게 하면 쓰기 도중 프로세스가 종료되어도 기존 파일이 손상되지 않는다.

pub fn save_oauth_credentials(token_set: &OAuthTokenSet) -> io::Result<()> {
    let path = credentials_path()?;
    let mut root = read_credentials_root(&path)?;
    root.insert("oauth".to_string(),
        serde_json::to_value(StoredOAuthCredentials::from(token_set.clone()))?);
    write_credentials_root(&path, &root)
}

"oauth" 키 아래에 저장하는 것은, 향후 다른 종류의 자격증명(예: MCP 서버 토큰)을 같은 파일에 추가할 수 있게 하는 확장 가능한 설계다.


7. 에러 처리: 복구 가능성의 분류

ApiError 열거형

#[derive(Debug)]
pub enum ApiError {
    MissingApiKey,
    ExpiredOAuthToken,
    Auth(String),
    InvalidApiKeyEnv(VarError),
    Http(reqwest::Error),
    Io(std::io::Error),
    Json(serde_json::Error),
    Api {
        status: reqwest::StatusCode,
        error_type: Option<String>,
        message: Option<String>,
        body: String,
        retryable: bool,
    },
    RetriesExhausted {
        attempts: u32,
        last_error: Box<ApiError>,
    },
    InvalidSseFrame(&'static str),
    BackoffOverflow {
        attempt: u32,
        base_delay: Duration,
    },
}

이 열거형이 단순한 에러 목록이 아닌 에러 분류 체계인 이유는 is_retryable() 메서드 때문이다:

pub fn is_retryable(&self) -> bool {
    match self {
        Self::Http(error) => error.is_connect()
            || error.is_timeout()
            || error.is_request(),
        Self::Api { retryable, .. } => *retryable,
        Self::RetriesExhausted { last_error, .. }
            => last_error.is_retryable(),
        _ => false,
    }
}

모든 에러를 "재시도 가능/불가능"으로 이분하는 이 메서드가 재시도 로직의 핵심 판단 기준이다. RetriesExhausted가 내부 에러의 retryable 여부를 재귀적으로 확인하는 것도 재미있다. 중첩된 재시도 루프에서 유용할 수 있다.

Api 변형에 retryable 필드가 직접 있는 것은 HTTP 상태 코드로 판단한 결과를 에러 생성 시점에 캐시하는 것이다. 매번 상태 코드를 재검사하지 않아도 된다.


마치며: 레이어의 조화

api 크레이트를 관통하는 설계 원칙을 정리하면:

관심사의 수직 분리. client.rs(HTTP + 재시도) → sse.rs(프로토콜 파싱) → types.rs(데이터 모델) → error.rs(에러 분류). 각 파일이 하나의 관심사만 담당한다.

방어적 프로그래밍의 일관성. checked_shl(), from_utf8_lossy(), saturating_sub() 등 "절대 패닉하지 않는" 코드가 전체에 걸쳐 일관적이다.

게으른 평가(Lazy Evaluation). OAuth 설정 로딩, 청크 수신, 프레임 파싱 모두 "필요할 때만" 수행된다. 불필요한 I/O나 계산을 회피하는 것이 성능의 핵심이다.

serde의 전략적 활용. skip_serializing_if, tag, rename_all 등의 어트리뷰트로 Anthropic API의 JSON 형식과 Rust 타입 사이의 변환을 선언적으로 정의한다.

다음 편에서는 이 API 레이어 위에 구축된 대화 런타임과 세션 관리를 분석한다. ConversationRuntimeApiClient 트레이트를 통해 이 모든 복잡성을 추상화하고, 대화 한 턴의 전체 생명주기를 어떻게 관리하는지 살펴본다.


실습 가이드

# 1. SSE 파서 단위 테스트 실행
cd rust && cargo test -p api

# 2. SSE 프레임 직접 파싱해보기 (Rust playground)
use claw_api::SseParser;
let mut parser = SseParser::new();
let events = parser.push(
    b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\"}\n\n"
)?;

# 3. 재시도 로직 관찰 (환경변수로 잘못된 API 키 설정)
ANTHROPIC_API_KEY="invalid" cargo run -p rusty-claude-cli -- -p "hello"

# 4. OAuth PKCE 흐름 다이어그램 직접 그려보기
# RFC 7636 Section 4.1-4.5를 참조하며 코드와 대조

이 글은 [Claw Code 깊이 읽기] 시리즈의 3편입니다.

시리즈 목차:
1. 프로젝트 배경과 아키텍처 개요
2. Rust 워크스페이스 아키텍처 심층 분석
3. API 통신과 SSE 스트리밍 구현 ← 현재 글
4. 대화 런타임과 세션 관리
5. 도구 시스템과 권한 모델
6. Python 포팅 워크스페이스 분석
7. 테스팅 전략과 패리티 추적
8. TUI 개선 로드맵과 미래 방향

태그: #ClawCode #Rust #SSE #스트리밍 #OAuth #PKCE #AnthropicAPI #재시도 #지수백오프

profile
꿈꾸는 개발자

0개의 댓글