
바이트 스트림이 구조화된 이벤트가 되기까지.
api크레이트의 모든 레이어를 한 줄씩 따라간다.
"API 클라이언트"라고 하면 단순한 HTTP 요청/응답을 떠올리기 쉽다. 하지만 Claw Code의 api 크레이트를 열어보면, 그 안에는 놀라울 정도로 많은 엔지니어링이 숨어 있다.
인증 해석(4가지 모드 × 환경변수 + OAuth 토큰 갱신), SSE 스트리밍(불완전한 청크 버퍼링 + 프레임 경계 탐지), 재시도 로직(지수 백오프 + 재시도 가능 여부 판정), OAuth PKCE(SHA256 챌린지 + 토큰 교환). 이 모든 것이 하나의 크레이트에 응집되어 있으면서도, 각각이 명확한 파일 경계를 가진다.
이번 편에서는 api 크레이트의 다섯 개 소스 파일(client.rs, sse.rs, types.rs, error.rs)과 런타임의 oauth.rs를 순서대로 읽으며, 바이트가 의미 있는 이벤트로 변환되는 전 과정을 추적한다.
API 통신의 첫 번째 관문은 인증이다. Claw Code는 네 가지 인증 모드를 열거형으로 모델링한다:
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuthSource {
None,
ApiKey(String),
BearerToken(String),
ApiKeyAndBearer {
api_key: String,
bearer_token: String,
},
}
왜 네 가지나 필요할까? 각각의 존재 이유가 있다:
None: 테스트 환경에서 인증 없이 mock 서버와 통신할 때ApiKey: 가장 일반적인 경우. ANTHROPIC_API_KEY 환경변수 사용BearerToken: OAuth 흐름으로 얻은 액세스 토큰. 표준 Authorization: Bearer 헤더ApiKeyAndBearer: API 키에서 OAuth로 전환하는 과도기. 두 헤더를 동시에 전송apply() 메서드가 이 네 가지를 HTTP 요청에 주입한다:
pub fn apply(&self, mut request_builder: reqwest::RequestBuilder)
-> reqwest::RequestBuilder {
if let Some(api_key) = self.api_key() {
request_builder = request_builder.header("x-api-key", api_key);
}
if let Some(token) = self.bearer_token() {
request_builder = request_builder.bearer_auth(token);
}
request_builder
}
조건 분기 없이 api_key()과 bearer_token() 접근자 메서드를 사용하는 것이 깔끔하다. ApiKeyAndBearer 변형에서는 두 메서드 모두 Some을 반환하므로, 자연스럽게 두 헤더가 모두 추가된다.
실제 인증 소스를 결정하는 함수는 더 흥미롭다:
pub fn resolve_startup_auth_source<F>(load_oauth_config: F)
-> Result<AuthSource, ApiError>
where
F: FnOnce() -> Result<Option<OAuthConfig>, ApiError>,
{
// 1단계: 환경변수 확인 (최우선)
if let Some(api_key) = read_env_non_empty("ANTHROPIC_API_KEY")? {
return match read_env_non_empty("ANTHROPIC_AUTH_TOKEN")? {
Some(bearer_token) => Ok(AuthSource::ApiKeyAndBearer {
api_key, bearer_token,
}),
None => Ok(AuthSource::ApiKey(api_key)),
};
}
if let Some(bearer_token) = read_env_non_empty("ANTHROPIC_AUTH_TOKEN")? {
return Ok(AuthSource::BearerToken(bearer_token));
}
// 2단계: 저장된 OAuth 토큰 확인
let Some(token_set) = load_saved_oauth_token()? else {
return Err(ApiError::MissingApiKey);
};
// 3a: 토큰이 유효하면 바로 사용 (설정 로딩 불필요)
if !oauth_token_is_expired(&token_set) {
return Ok(AuthSource::BearerToken(token_set.access_token));
}
// 3b: 만료 + refresh_token 없음 → 즉시 실패
if token_set.refresh_token.is_none() {
return Err(ApiError::ExpiredOAuthToken);
}
// 3c: 만료 + refresh 가능 → 이때서야 config 로딩
let Some(config) = load_oauth_config()? else {
return Err(ApiError::Auth(
"saved OAuth token is expired; runtime OAuth config is missing".into(),
));
};
Ok(AuthSource::from(resolve_saved_oauth_token_set(&config, token_set)?))
}
이 함수의 설계에서 가장 주목할 점은 제네릭 파라미터 F: FnOnce()다. OAuth 설정 로딩(파일 I/O + JSON 파싱)은 비용이 있는 연산이다. 이 클로저는 토큰이 만료되고 refresh가 필요할 때만 호출된다. 대부분의 경우(환경변수가 있거나, 토큰이 유효하면) 이 비용은 발생하지 않는다.
흐름을 다이어그램으로 정리하면:
환경변수 확인
├─ API_KEY + AUTH_TOKEN → ApiKeyAndBearer (즉시 반환)
├─ API_KEY만 → ApiKey (즉시 반환)
├─ AUTH_TOKEN만 → BearerToken (즉시 반환)
│
저장된 OAuth 토큰 확인
├─ 토큰 없음 → MissingApiKey 에러
├─ 유효한 토큰 → BearerToken (설정 로딩 안 함)
├─ 만료 + refresh 없음 → ExpiredOAuthToken 에러
└─ 만료 + refresh 있음 → load_oauth_config() 호출 → 토큰 갱신
테스트 친화적 설계: 테스트에서는 || Ok(None)을 넘기면 OAuth 설정 로딩 없이 인증 해석을 테스트할 수 있다. 이것이 클로저 주입의 실질적인 가치다.
#[derive(Debug, Clone)]
pub struct AnthropicClient {
http: reqwest::Client,
auth: AuthSource,
base_url: String, // 기본값: "https://api.anthropic.com"
max_retries: u32, // 기본값: 2
initial_backoff: Duration, // 기본값: 200ms
max_backoff: Duration, // 기본값: 2s
}
Clone을 derive한 것은 의도적이다. reqwest::Client는 내부적으로 Arc로 커넥션 풀을 공유하므로, 클론해도 새 커넥션을 만들지 않는다. 이 덕분에 AnthropicClient를 여러 스레드에서 안전하게 공유할 수 있다.
// 동기 전송: 전체 응답을 한 번에 수신
pub async fn send_message(&self, request: &MessageRequest)
-> Result<MessageResponse, ApiError> {
let request = MessageRequest {
stream: false, // 스트리밍 강제 비활성화
..request.clone()
};
let response = self.send_with_retry(&request).await?;
let mut response = response.json::<MessageResponse>().await?;
// request_id 헤더 → 응답 본문으로 전파
if response.request_id.is_none() {
response.request_id = request_id_from_headers(response.headers());
}
Ok(response)
}
// 스트리밍 전송: SSE 이벤트를 점진적으로 수신
pub async fn stream_message(&self, request: &MessageRequest)
-> Result<MessageStream, ApiError> {
let response = self
.send_with_retry(&request.clone().with_streaming())
.await?;
Ok(MessageStream {
request_id: request_id_from_headers(response.headers()),
response,
parser: SseParser::new(),
pending: VecDeque::new(),
done: false,
})
}
send_message()가 stream: false를 강제하는 것에 주목하자. 호출자가 실수로 stream: true를 전달해도 무시한다. 반대로 stream_message()는 with_streaming()을 호출하여 stream: true를 보장한다. 이 방어적 프로그래밍이 "잘못된 사용을 불가능하게 만드는" Rust의 설계 철학과 일맥상통한다.
재시도 로직의 핵심인 백오프 계산을 살펴보자:
fn backoff_for_attempt(&self, attempt: u32) -> Result<Duration, ApiError> {
let Some(multiplier) = 1_u32.checked_shl(attempt.saturating_sub(1)) else {
return Err(ApiError::BackoffOverflow {
attempt,
base_delay: self.initial_backoff,
});
};
Ok(self
.initial_backoff
.checked_mul(multiplier)
.map_or(self.max_backoff, |delay| delay.min(self.max_backoff)))
}
이 짧은 함수에 세 겹의 오버플로우 방어가 숨어 있다:
checked_shl(): 2^(attempt-1) 계산 시 비트 시프트 오버플로우 방지. attempt >= 33이면 None 반환checked_mul(): base_delay × multiplier 곱셈 오버플로우 방지. 오버플로우 시 max_backoff 사용.min(max_backoff): 정상 범위에서도 상한 제한공식은 delay = min(initial_backoff × 2^(attempt-1), max_backoff)이며, 기본값으로 계산하면:
| 시도 | 계산 | 대기 시간 |
|---|---|---|
| 1회차 | 200ms × 2^0 = 200ms | 200ms |
| 2회차 | 200ms × 2^1 = 400ms | 400ms |
| 3회차 | 200ms × 2^2 = 800ms | 800ms |
| 4회차 | 200ms × 2^3 = 1,600ms | 1,600ms |
| 5회차+ | 200ms × 2^4 = 3,200ms → cap | 2,000ms |
일반적인 구현에서는 단순히 base * 2^n으로 작성하고 오버플로우를 무시하는 경우가 많다. Claw Code의 구현은 어떤 입력에서도 패닉하지 않는다는 것을 수학적으로 보장한다. unsafe_code = "forbid" 정책의 정신이 여기서도 관철된다.
async fn send_with_retry(&self, request: &MessageRequest)
-> Result<reqwest::Response, ApiError> {
let mut attempts = 0;
let mut last_error: Option<ApiError>;
loop {
attempts += 1;
match self.send_raw_request(request).await {
Ok(response) => match expect_success(response).await {
Ok(response) => return Ok(response),
Err(error) if error.is_retryable()
&& attempts <= self.max_retries + 1 => {
last_error = Some(error);
}
Err(error) => return Err(error),
},
Err(error) if error.is_retryable()
&& attempts <= self.max_retries + 1 => {
last_error = Some(error);
}
Err(error) => return Err(error),
}
if attempts > self.max_retries {
break;
}
tokio::time::sleep(self.backoff_for_attempt(attempts)?).await;
}
Err(ApiError::RetriesExhausted {
attempts,
last_error: Box::new(last_error.expect("...")),
})
}
이 루프에서 주목할 부분은 이중 패턴 매칭이다. 외부 match는 HTTP 요청 자체의 성공/실패를, 내부 match는 HTTP 응답 상태 코드의 성공/실패를 판정한다. 두 경로 모두에서 is_retryable()을 확인하는 것은 네트워크 에러(연결 실패)와 서버 에러(5xx)를 동일한 재시도 로직으로 처리하겠다는 결정이다.
attempts <= self.max_retries + 1 조건은 약간 혼란스러울 수 있다. max_retries = 2일 때 총 3번 시도(초기 1회 + 재시도 2회)를 보장하기 위한 것이다.
const fn is_retryable_status(status: reqwest::StatusCode) -> bool {
matches!(status.as_u16(), 408 | 409 | 429 | 500 | 502 | 503 | 504)
}
이 목록은 Anthropic API의 실제 동작 패턴을 반영한다:
| 코드 | 의미 | 재시도 이유 |
|---|---|---|
| 408 | Request Timeout | 서버가 처리 시간 초과 |
| 409 | Conflict | 일시적 충돌 (동시 요청) |
| 429 | Too Many Requests | Rate limit. 시간 지나면 해결 |
| 500 | Internal Server Error | 서버 일시적 장애 |
| 502 | Bad Gateway | 프록시/로드밸런서 문제 |
| 503 | Service Unavailable | 서버 과부하 |
| 504 | Gateway Timeout | 프록시 타임아웃 |
반면 400 Bad Request나 401 Unauthorized는 재시도해도 의미가 없으므로 즉시 실패한다. 이 분류가 정확해야 불필요한 대기를 피할 수 있다.
SSE(Server-Sent Events)는 서버가 클라이언트에게 실시간으로 데이터를 푸시하는 HTTP 기반 프로토콜이다. Anthropic API는 이를 통해 토큰을 하나씩 스트리밍한다. Claw Code의 SseParser는 이 프로토콜을 청크 단위로 점진적 파싱한다.
HTTP 청크(chunk)는 SSE 프레임 경계와 일치하지 않는다. 하나의 청크가 프레임의 절반만 포함할 수도, 여러 프레임을 포함할 수도 있다:
청크 1: "event: content_block_delta\nda"
청크 2: "ta: {\"type\":\"content_block_delta\"}\n\nevent: mes"
청크 3: "sage_stop\ndata: {\"type\":\"message_stop\"}\n\n"
SseParser는 이 불규칙한 입력을 안정적으로 처리해야 한다.
#[derive(Debug, Default)]
pub struct SseParser {
buffer: Vec<u8>,
}
상태는 단 하나: 불완전한 프레임을 보관하는 버퍼. 이 단순함이 핵심이다. 복잡한 상태 머신 대신, "버퍼에 추가 → 완전한 프레임 추출 → 반복"이라는 직관적인 알고리즘을 사용한다.
pub fn push(&mut self, chunk: &[u8]) -> Result<Vec<StreamEvent>, ApiError> {
self.buffer.extend_from_slice(chunk);
let mut events = Vec::new();
while let Some(frame) = self.next_frame() {
if let Some(event) = parse_frame(&frame)? {
events.push(event);
}
}
Ok(events)
}
하나의 청크에서 0개, 1개, 또는 여러 개의 이벤트가 나올 수 있다. while let 루프가 프레임이 더 이상 추출되지 않을 때까지 반복한다.
fn next_frame(&mut self) -> Option<String> {
let separator = self.buffer
.windows(2)
.position(|window| window == b"\n\n")
.map(|position| (position, 2))
.or_else(|| {
self.buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
.map(|position| (position, 4))
})?;
let (position, separator_len) = separator;
let frame = self.buffer
.drain(..position + separator_len)
.collect::<Vec<_>>();
let frame_len = frame.len().saturating_sub(separator_len);
Some(String::from_utf8_lossy(&frame[..frame_len]).into_owned())
}
이 함수에서 눈여겨볼 점이 세 가지 있다:
첫째, 이중 구분자 지원. SSE 프레임의 구분자는 \n\n(Unix)이지만, \r\n\r\n(Windows)도 처리한다. windows() 메서드를 사용한 슬라이딩 윈도우 검색이 Rust다운 관용구다.
둘째, drain()의 효율성. 버퍼에서 프레임을 추출할 때 drain()을 사용한다. 이는 나머지 데이터를 버퍼 앞으로 이동시키는 것을 한 번에 처리한다. split_at() + Vec::new()보다 메모리 효율적이다.
셋째, from_utf8_lossy()의 방어성. 잘못된 UTF-8 바이트가 있어도 패닉하지 않고 유니코드 대체 문자(�)로 변환한다. 네트워크에서 받는 데이터를 신뢰하지 않는 안전한 접근이다.
pub fn parse_frame(frame: &str) -> Result<Option<StreamEvent>, ApiError> {
let trimmed = frame.trim();
if trimmed.is_empty() {
return Ok(None);
}
let mut data_lines = Vec::new();
let mut event_name: Option<&str> = None;
for line in trimmed.lines() {
if line.starts_with(':') {
continue; // 주석 무시 (SSE 스펙)
}
if let Some(name) = line.strip_prefix("event:") {
event_name = Some(name.trim());
continue;
}
if let Some(data) = line.strip_prefix("data:") {
data_lines.push(data.trim_start());
}
}
// ping = keep-alive, 무시
if matches!(event_name, Some("ping")) {
return Ok(None);
}
if data_lines.is_empty() {
return Ok(None);
}
let payload = data_lines.join("\n");
// [DONE] = 스트림 종료 마커, 무시
if payload == "[DONE]" {
return Ok(None);
}
serde_json::from_str::<StreamEvent>(&payload)
.map(Some)
.map_err(ApiError::from)
}
이 함수는 SSE 프로토콜의 세 가지 특수 케이스를 우아하게 처리한다:
: 접두사 = 주석. SSE 스펙에 따라 무시한다. 서버가 디버깅 정보를 넣을 때 사용ping 이벤트 = 연결 유지. 응답 없이 건너뛴다[DONE] 데이터 = 종료 마커. OpenAI API 호환성을 위한 것으로 추정나머지 모든 경우에는 data: 뒤의 JSON을 StreamEvent로 역직렬화한다. 여러 줄의 data:가 있으면 \n으로 합치는 것도 SSE 스펙 준수다.
pub fn finish(&mut self) -> Result<Vec<StreamEvent>, ApiError> {
if self.buffer.is_empty() {
return Ok(Vec::new());
}
let trailing = std::mem::take(&mut self.buffer);
match parse_frame(&String::from_utf8_lossy(&trailing))? {
Some(event) => Ok(vec![event]),
None => Ok(Vec::new()),
}
}
std::mem::take()이 눈에 띈다. 버퍼의 소유권을 가져오면서 원래 위치에 빈 Vec을 남긴다. clone() 없이 효율적으로 데이터를 이동시키는 Rust 관용구다.
SseParser가 바이트를 이벤트로 변환하는 "엔진"이라면, MessageStream은 이 엔진을 구동하는 "운전자"다.
#[derive(Debug)]
pub struct MessageStream {
request_id: Option<String>,
response: reqwest::Response,
parser: SseParser,
pending: VecDeque<StreamEvent>, // 파싱된 이벤트 큐
done: bool, // 수신 완료 플래그
}
VecDeque<StreamEvent>가 핵심이다. 하나의 HTTP 청크에서 여러 이벤트가 나올 수 있으므로, 파싱 결과를 큐에 넣고 하나씩 반환한다.
pub async fn next_event(&mut self) -> Result<Option<StreamEvent>, ApiError> {
loop {
// 1단계: 큐에 이벤트가 있으면 즉시 반환
if let Some(event) = self.pending.pop_front() {
return Ok(Some(event));
}
// 2단계: 스트림이 끝났으면 잔여 데이터 처리
if self.done {
let remaining = self.parser.finish()?;
self.pending.extend(remaining);
if let Some(event) = self.pending.pop_front() {
return Ok(Some(event));
}
return Ok(None); // 스트림 완전 종료
}
// 3단계: 다음 HTTP 청크 수신
match self.response.chunk().await? {
Some(chunk) => {
self.pending.extend(self.parser.push(&chunk)?);
}
None => {
self.done = true; // HTTP 응답 종료
}
}
}
}
이 메서드의 loop가 내부적으로 반복하는 것이 중요하다. 호출자 관점에서는 항상 Some(event) 또는 None(종료)만 받는다. "데이터는 왔지만 완전한 프레임이 아직 안 됐다"는 내부 상태가 외부로 노출되지 않는다.
상태 전이를 다이어그램으로 그려보면:
[시작] → 큐 확인 → 이벤트 있음 → 반환
→ 이벤트 없음 → done? → Yes → finish() → 이벤트 있음 → 반환
→ 이벤트 없음 → None
→ No → chunk() → 데이터 있음 → push() → 루프
→ 데이터 없음 → done=true → 루프
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct MessageRequest {
pub model: String,
pub max_tokens: u32,
pub messages: Vec<InputMessage>,
#[serde(skip_serializing_if = "Option::is_none")]
pub system: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tools: Option<Vec<ToolDefinition>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_choice: Option<ToolChoice>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub stream: bool,
}
skip_serializing_if 어트리뷰트들이 Anthropic API의 기대에 맞춘 것이다. API는 null 값보다 필드 자체가 없는 것을 선호한다. "tools": null은 "도구 없음"이 아니라 잘못된 요청일 수 있기 때문이다.
stream 필드의 std::ops::Not::not은 흥미로운 트릭이다. bool의 Not 트레이트를 활용하여 false일 때 직렬화를 건너뛴다. stream: true일 때만 JSON에 포함된다.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InputContentBlock {
Text { text: String },
ToolUse { id: String, name: String, input: Value },
ToolResult {
tool_use_id: String,
content: Vec<ToolResultContentBlock>,
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
is_error: bool,
},
}
#[serde(tag = "type", rename_all = "snake_case")]가 핵심이다. Rust의 ToolUse 변형이 JSON에서는 이렇게 된다:
{
"type": "tool_use",
"id": "toolu_01A...",
"name": "bash",
"input": {"command": "ls"}
}
Anthropic API의 메시지 형식과 정확히 일치한다. 이 매핑을 수동으로 구현했다면 수십 줄의 코드가 필요했을 것이다.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamEvent {
MessageStart(MessageStartEvent),
MessageDelta(MessageDeltaEvent),
ContentBlockStart(ContentBlockStartEvent),
ContentBlockDelta(ContentBlockDeltaEvent),
ContentBlockStop(ContentBlockStopEvent),
MessageStop(MessageStopEvent),
}
일반적인 스트리밍 시퀀스는 이렇다:
MessageStart ← 메시지 메타데이터 (id, model, usage)
ContentBlockStart ← 콘텐츠 블록 시작 (index=0, type=text)
ContentBlockDelta ← "Hello"
ContentBlockDelta ← " world"
ContentBlockDelta ← "!"
ContentBlockStop ← 블록 종료
ContentBlockStart ← 다음 블록 시작 (index=1, type=tool_use)
ContentBlockDelta ← InputJsonDelta (부분 JSON)
ContentBlockStop ← 블록 종료
MessageDelta ← stop_reason, 최종 usage
MessageStop ← 메시지 완전 종료
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentBlockDelta {
TextDelta { text: String },
InputJsonDelta { partial_json: String },
}
TextDelta는 AI의 텍스트 응답이 한 조각씩 오는 것이고, InputJsonDelta는 AI가 도구 호출의 입력(JSON)을 생성하는 중간 상태다. 후자는 불완전한 JSON 문자열이므로 바로 파싱할 수 없고, 모든 델타를 합친 후에야 완전한 JSON이 된다.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Usage {
pub input_tokens: u32,
#[serde(default)]
pub cache_creation_input_tokens: u32,
#[serde(default)]
pub cache_read_input_tokens: u32,
pub output_tokens: u32,
}
impl Usage {
#[must_use]
pub const fn total_tokens(&self) -> u32 {
self.input_tokens + self.output_tokens
}
}
total_tokens()가 캐시 토큰을 제외하는 것이 중요하다. Anthropic의 가격 정책에서 캐시 생성/읽기 토큰은 일반 입력 토큰과 다른 가격이 적용되기 때문이다. 비용 추정(estimate_cost_usd)에서는 이 네 가지를 모두 별도로 계산한다.
PKCE(Proof Key for Code Exchange)는 OAuth 2.0의 보안 확장이다. 원래 모바일 앱을 위해 설계되었지만, CLI 도구에서도 동일한 문제를 해결한다: 인증 코드 가로채기 공격 방지.
핵심 아이디어는 단순하다. 인증 시작 시 랜덤 비밀(verifier)을 생성하고, 그 해시(challenge)를 서버에 보낸다. 나중에 코드를 교환할 때 원본 비밀을 함께 보내면, 서버가 해시를 검증한다. 코드를 가로채도 비밀을 모르면 토큰을 얻을 수 없다.
pub fn generate_pkce_pair() -> io::Result<PkceCodePair> {
let verifier = generate_random_token(32)?; // 32바이트 = 256비트
Ok(PkceCodePair {
challenge: code_challenge_s256(&verifier),
verifier,
challenge_method: PkceChallengeMethod::S256,
})
}
#[must_use]
pub fn code_challenge_s256(verifier: &str) -> String {
let digest = Sha256::digest(verifier.as_bytes());
base64url_encode(&digest)
}
S256 = BASE64URL(SHA256(verifier)). RFC 7636에 정의된 표준 방식이다. 32바이트 랜덤 → base64url로 43문자 verifier 생성 → SHA256 해시 → base64url로 43문자 challenge.
┌─── CLI (Claw Code) ──────────────────────────────────┐
│ │
│ 1. generate_pkce_pair() → (verifier, challenge) │
│ 2. build_authorization_url(challenge) → URL 생성 │
│ 3. 브라우저 열기 → 사용자 로그인/동의 │
│ │
│ ... 사용자가 브라우저에서 인증 ... │
│ │
│ 4. 콜백 수신 (localhost:4545) → code, state 추출 │
│ 5. exchange_oauth_code(code, verifier) → 토큰 교환 │
│ 6. save_oauth_credentials(token_set) → 디스크 저장 │
│ │
│ ... 이후 API 호출 시 ... │
│ │
│ 7. load_oauth_credentials() → 토큰 로드 │
│ 8. oauth_token_is_expired()? → 만료 확인 │
│ 9. (만료 시) refresh_oauth_token() → 갱신 │
└──────────────────────────────────────────────────────┘
#[must_use]
pub fn oauth_token_is_expired(token_set: &OAuthTokenSet) -> bool {
token_set
.expires_at
.is_some_and(|expires_at| expires_at <= now_unix_timestamp())
}
<=(이하)를 사용한 것에 주목하자. 정확히 만료 시각이면 이미 만료로 판단한다. 경계 조건에서의 안전한 선택이다.
자격증명은 ~/.claude/credentials.json(또는 CLAUDE_CONFIG_HOME)에 저장된다. 중요한 것은 원자적 쓰기 패턴이다. 파일을 직접 덮어쓰지 않고, 임시 파일에 먼저 쓴 후 이름을 변경한다. 이렇게 하면 쓰기 도중 프로세스가 종료되어도 기존 파일이 손상되지 않는다.
pub fn save_oauth_credentials(token_set: &OAuthTokenSet) -> io::Result<()> {
let path = credentials_path()?;
let mut root = read_credentials_root(&path)?;
root.insert("oauth".to_string(),
serde_json::to_value(StoredOAuthCredentials::from(token_set.clone()))?);
write_credentials_root(&path, &root)
}
"oauth" 키 아래에 저장하는 것은, 향후 다른 종류의 자격증명(예: MCP 서버 토큰)을 같은 파일에 추가할 수 있게 하는 확장 가능한 설계다.
#[derive(Debug)]
pub enum ApiError {
MissingApiKey,
ExpiredOAuthToken,
Auth(String),
InvalidApiKeyEnv(VarError),
Http(reqwest::Error),
Io(std::io::Error),
Json(serde_json::Error),
Api {
status: reqwest::StatusCode,
error_type: Option<String>,
message: Option<String>,
body: String,
retryable: bool,
},
RetriesExhausted {
attempts: u32,
last_error: Box<ApiError>,
},
InvalidSseFrame(&'static str),
BackoffOverflow {
attempt: u32,
base_delay: Duration,
},
}
이 열거형이 단순한 에러 목록이 아닌 에러 분류 체계인 이유는 is_retryable() 메서드 때문이다:
pub fn is_retryable(&self) -> bool {
match self {
Self::Http(error) => error.is_connect()
|| error.is_timeout()
|| error.is_request(),
Self::Api { retryable, .. } => *retryable,
Self::RetriesExhausted { last_error, .. }
=> last_error.is_retryable(),
_ => false,
}
}
모든 에러를 "재시도 가능/불가능"으로 이분하는 이 메서드가 재시도 로직의 핵심 판단 기준이다. RetriesExhausted가 내부 에러의 retryable 여부를 재귀적으로 확인하는 것도 재미있다. 중첩된 재시도 루프에서 유용할 수 있다.
Api 변형에 retryable 필드가 직접 있는 것은 HTTP 상태 코드로 판단한 결과를 에러 생성 시점에 캐시하는 것이다. 매번 상태 코드를 재검사하지 않아도 된다.
api 크레이트를 관통하는 설계 원칙을 정리하면:
관심사의 수직 분리. client.rs(HTTP + 재시도) → sse.rs(프로토콜 파싱) → types.rs(데이터 모델) → error.rs(에러 분류). 각 파일이 하나의 관심사만 담당한다.
방어적 프로그래밍의 일관성. checked_shl(), from_utf8_lossy(), saturating_sub() 등 "절대 패닉하지 않는" 코드가 전체에 걸쳐 일관적이다.
게으른 평가(Lazy Evaluation). OAuth 설정 로딩, 청크 수신, 프레임 파싱 모두 "필요할 때만" 수행된다. 불필요한 I/O나 계산을 회피하는 것이 성능의 핵심이다.
serde의 전략적 활용. skip_serializing_if, tag, rename_all 등의 어트리뷰트로 Anthropic API의 JSON 형식과 Rust 타입 사이의 변환을 선언적으로 정의한다.
다음 편에서는 이 API 레이어 위에 구축된 대화 런타임과 세션 관리를 분석한다. ConversationRuntime이 ApiClient 트레이트를 통해 이 모든 복잡성을 추상화하고, 대화 한 턴의 전체 생명주기를 어떻게 관리하는지 살펴본다.
# 1. SSE 파서 단위 테스트 실행
cd rust && cargo test -p api
# 2. SSE 프레임 직접 파싱해보기 (Rust playground)
use claw_api::SseParser;
let mut parser = SseParser::new();
let events = parser.push(
b"event: content_block_delta\ndata: {\"type\":\"content_block_delta\"}\n\n"
)?;
# 3. 재시도 로직 관찰 (환경변수로 잘못된 API 키 설정)
ANTHROPIC_API_KEY="invalid" cargo run -p rusty-claude-cli -- -p "hello"
# 4. OAuth PKCE 흐름 다이어그램 직접 그려보기
# RFC 7636 Section 4.1-4.5를 참조하며 코드와 대조
이 글은 [Claw Code 깊이 읽기] 시리즈의 3편입니다.
시리즈 목차:
1. 프로젝트 배경과 아키텍처 개요
2. Rust 워크스페이스 아키텍처 심층 분석
3. API 통신과 SSE 스트리밍 구현 ← 현재 글
4. 대화 런타임과 세션 관리
5. 도구 시스템과 권한 모델
6. Python 포팅 워크스페이스 분석
7. 테스팅 전략과 패리티 추적
8. TUI 개선 로드맵과 미래 방향
태그: #ClawCode #Rust #SSE #스트리밍 #OAuth #PKCE #AnthropicAPI #재시도 #지수백오프