랭체인(LangChain)과 랭그래프(LangGraph)는 대규모 언어 모델(LLM)을 활용한 애플리케이션 개발을 위한 도구들입니다. 위 강의는 LangChain에서 운영하는 LangChain Academy에서 제작한 "Introduction to LangGraph" 강의의 내용을 정리 및 추가 설명한 내용입니다.
이번 포스트는 "Module3"내용을 다룹니다:
from IPython.display import Image, display
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.graph import MessagesState
# LLM
model = ChatOpenAI(model="gpt-4o", temperature=0)
# 상태
class State(MessagesState):
summary: str
# 모델을 호출하는 로직 정의
def call_model(state: State):
# 요약이 있으면 가져옴
summary = state.get("summary", "")
# 요약이 있으면 추가
if summary:
# 시스템 메시지에 요약 추가
system_message = f"이전 대화의 요약: {summary}"
# 새로운 메시지에 요약 추가
messages = [SystemMessage(content=system_message)] + state["messages"]
else:
messages = state["messages"]
response = model.invoke(messages)
return {"messages": response}
def summarize_conversation(state: State):
# 먼저 기존 요약을 가져옴
summary = state.get("summary", "")
# 요약 프롬프트 생성
if summary:
# 이미 요약이 존재함
summary_message = (
f"지금까지의 대화 요약: {summary}\n\n"
"위의 새로운 메시지를 고려하여 요약을 확장하세요:"
)
else:
summary_message = "위의 대화를 요약하세요:"
# 프롬프트를 히스토리에 추가
messages = state["messages"] + [HumanMessage(content=summary_message)]
response = model.invoke(messages)
# 가장 최근 2개의 메시지를 제외한 모든 메시지 삭제
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"summary": response.content, "messages": delete_messages}
# 대화를 종료할지 요약할지 결정
def should_continue(state: State):
"""다음에 실행할 노드를 반환합니다."""
messages = state["messages"]
# 메시지가 6개 이상이면 대화를 요약
if len(messages) > 6:
return "summarize_conversation"
# 그렇지 않으면 종료
return END
# 새 그래프 정의
workflow = StateGraph(State)
workflow.add_node("conversation", call_model)
workflow.add_node(summarize_conversation)
# 시작점을 대화로 설정
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)
# 컴파일
memory = MemorySaver()
graph = workflow.compile(checkpointer=memory)
display(Image(graph.get_graph().draw_mermaid_png()))
Langraph의 Streaming 기능은 에이전트가 작업 중 발생하는 이벤트와 상태를 실시간으로 확인할 수 있는 기능입니다.
스트리밍 매서드
.stream()
과 .astream()
메서드는 그래프 실행 중 상태를 스트리밍하는 두 가지 주요 방법입니다. 이 두 메서드의 차이점과 사용 방법을 설명해드리겠습니다:.stream()
- 동기 방식 스트리밍:
.stream()
: Stream은 동기식(Synchronous)
방식으로, 각 노드가 호출될 때마다 상태 업데이트를 스트리밍합니다.
동기적으로 작동한다
는 것은, 작업을 하나씩 순차적으로 처리하는 것을 의미합니다. 앞의 작업이 완료되지 않으면 뒤의 작업이 실행되지 않습니다.
LangGraph에서 .stream()
메서드는 동기 방식으로 작동합니다. 즉, 각 노드가 완료될 때까지 기다린 후 그 결과를 받아오는 방식입니다. 예를 들어, 그래프의 첫 번째 노드가 실행을 완료해야 두 번째 노드가 실행될 수 있습니다.
for chunk in graph.stream(inputs, stream_mode="values"):
print(chunk) # 각 노드 실행 후 출력
실행이 끝나야
그 결과(chunk
)가 출력되고, 그 후에야 두 번째 노드가 실행됩니다. .astream()
- 비동기 방식 스트리밍:
.astream()
: A-Stream은 비동기식(Asynchronous)
방식으로 상태 업데이트를 스트리밍합니다.
비동기적으로 작동한다
는 것은, 작업을 동시에 처리할 수 있다는 것을 의미합니다. 앞의 작업이 끝날 때까지 기다리지 않고, 다른 작업을 진행할 수 있습니다.
LangGraph에서 .astream()
메서드는 비동기 방식으로 작동합니다. 즉, 한 작업이 끝날 때까지 기다리지 않고 다른 작업을 동시에 처리할 수 있습니다. 이 방식은 특히 많은 시간이 걸리는 작업(예: 네트워크 요청, 데이터베이스 접근 등)을 효율적으로 처리하는 데 유리합니다.
async for chunk in graph.astream(inputs, stream_mode="values"):
print(chunk) # 각 노드 실행이 끝날 때까지 기다리지 않고 처리
완료되기 전에
두 번째 노드가 실행될 수 있습니다. 🤔 언제 동기/비동기를 사용해야 할까?
- 동기: 작업이 비교적 빠르게 끝나고 순차적으로 실행해야 할 때 사용합니다.
- 예를 들어, 작은 규모의 프로그램이나 한 번에 하나씩 처리해야 하는 작업에서 동기 방식을 사용하는 것이 좋습니다.
- 비동기: 비동기는 여러 작업을 동시에 처리해야 할 때 특히 유용합니다.
- 예를 들어, 네트워크 요청, 파일 읽기/쓰기, 데이터베이스 접근과 같이 시간이 오래 걸리는 작업에서 비동기 방식을 사용하는 것이 더 효율적입니다.
스트리밍 모드
스트림 모드는 LangGraph가 실행되는 동안 그래프의 상태를 어떤 방식으로 모니터링하고 확인할 것인지를 결정하는 옵션입니다.
각 모드는 그래프가 실행될 때 생성되는 데이터를 어떻게 스트리밍할지에 대한 방식을 정의합니다. 쉽게 말하면, 스트리밍 모드는 실행 중 발생하는 정보(상태나 결과물)를 실시간으로 어떻게 보고할 것인지를 나타냅니다.
주요 스트림 모드
"values" 모드:
사용 예시:
for chunk in graph.stream(inputs, stream_mode="values"):
print(chunk) # 각 노드 실행 후 전체 상태 출력
"updates" 모드:
사용 예시:
for chunk in graph.stream(inputs, stream_mode="updates"):
print(chunk) # 변경된 상태만 출력
텍스트를 실시간으로 모니터링하는 기능
입니다. 💡 LLM 토큰 스트리밍의 목적
1. 실시간 응답 제공: LLM이 응답을 생성하는 동안 즉각적인 피드백을 사용자에게 제공하여 기다리는 시간을 줄이고, 사용자 경험을 향상시킬 수 있습니다.
2. 세밀한 모니터링: LLM이 중간에 생성하는 텍스트를 관찰하여 시스템의 성능을 평가하거나 문제를 디버깅할 수 있습니다.
3. 유연한 제어: 특정 노드에서 발생하는 토큰만을 실시간으로 확인하고, 필요한 경우 중간에 개입하거나 제어할 수 있습니다.
스트리밍 토큰의 주요 개념
.astream_events()
메서드를 사용해 이러한 이벤트를 비동기적으로 처리할 수 있습니다.config = {"configurable": {"thread_id": "3"}}
input_message = HumanMessage(content="피파에서 대한민국은 어느정도 실력인가요?")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
print(f"노드: {event['metadata'].get('langgraph_node','')}. 유형: {event['event']}. 이름: {event['name']}")
출력 예시:
노드: . 유형: on_chain_start. 이름: LangGraph
노드: __start__. 유형: on_chain_start. 이름: __start__
노드: __start__. 유형: on_chain_end. 이름: __start__
노드: conversation. 유형: on_chain_start. 이름: conversation
노드: conversation. 유형: on_chat_model_start. 이름: ChatOpenAI
노드: conversation. 유형: on_chat_model_stream. 이름: ChatOpenAI
(중략)
이 코드는 그래프의 특정 노드에서 발생하는 이벤트를 실시간으로 추적하여, 각 노드가 어떤 작업을 하고 있는지 상태 정보를 얻습니다.
해당 코드 및 출력 결과에서 event['metadata']
, event['event']
, event['name']
등은 LangGraph에서 발생하는 이벤트의 구성 요소입니다.
예를 들어, 위 출력 마지막에 대화형 AI에서 현재 LLM이 "on_chat_model_stream" 이벤트를 통해 현재 해당 노드에서 응답을 생성하는 중임을 확인할 수 있습니다.
이벤트 구조:
event
: 발생한 이벤트의 유형
을 나타냅니다. 예를 들어, LLM이 스트리밍하는 이벤트는 "on_chat_model_stream"
혹은 "on_chain_stream"
같은 유형으로 표시됩니다.
on_chat_model_stream
: LLM(대형 언어 모델)이 생성하는 토큰을 실시간으로 스트리밍하는 이벤트입니다. LLM이 입력을 받아 응답을 생성하는 과정에서 생성된 부분적인 텍스트(토큰)들이 실시간으로 전송됩니다.
on_chain_stream
: 그래프의 특정 노드에서 발생하는 일반적인 이벤트 스트리밍을 나타냅니다. 모든 노드에서 발생할 수 있는 이벤트이며, 각 노드가 실행되면서 일어나는 작업의 상태를 스트리밍합니다.
name
: 이벤트의 이름
으로, 어떤 노드에서 발생한 이벤트인지 명시합니다.
data
: 이벤트와 관련된 데이터
를 담고 있습니다. LLM이 생성하는 실제 텍스트 데이터(토큰)를 포함합니다. 이 데이터는 AIMessageChunk
와 같은 형태로 제공됩니다.
metadata
: 이 필드는 추가적인 메타 정보
를 포함합니다. 여기에는 이벤트가 발생한 노드(예: langgraph_node
)에 대한 정보도 포함됩니다.
스트리밍 방법
metadata['langgraph_node']
값을 이용해 특정 노드의 출력을 필터링할 수 있습니다.
on_chain_stream
:
- 이 코드는 특정 노드(예:
conversation
)의 일반적인 상태 변화나 그래프 실행 상태를 추적하기 위한 용도로 사용됩니다.- LLM의 토큰 생성과는 직접적인 관련이 없으며, 그래프의 노드 상태 변화를 추적하는 방식입니다.
# on_chain_stream
node_to_stream = 'conversation'
config = {"configurable": {"thread_id": "1"}}
input_message = HumanMessage(content="피파에서 대한민국은 어느정도 실력인가요?")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
# 특정 노드의 상태 변화 추적하기
if event["event"] == "on_chain_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
print(event["data"])
on_chain_stream
: 그래프의 상태 변화나 노드 실행 상태가 바뀔 때 발생하는 이벤트입니다. LLM의 응답 생성이 끝나고 그래프의 상태가 변하면 이 이벤트가 발생하며, 이때 전체 응답이 나온 이유는 LLM이 답변을 완료했기 때문이 아니라, 그래프 내 노드의 상태가 변화했기 때문입니다.node_to_stream = 'conversation'
config = {"configurable": {"thread_id": "2"}}
input_message = HumanMessage(content="피파에서 대한민국은 어느정도 실력인가요?")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
# 특정 노드에서 채팅 모델 토큰 가져오기
if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
# if event["event"] == "on_chain_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
print(event["data"])
(중략)
on_chat_model_stream
:
- 이 코드는 LLM의 응답이 점진적으로 생성될 때마다 발생하는 이벤트를 스트리밍합니다.
on_chat_model_stream
이벤트는 LLM이 응답을 생성할 때 부분적으로 생성된 텍스트(토큰)를 실시간으로 추적하는 데 사용됩니다.
# on_chat_model_stream
node_to_stream = 'conversation'
config = {"configurable": {"thread_id": "2"}}
input_message = HumanMessage(content="피파에서 대한민국은 어느정도 실력인가요?")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
# 특정 노드에서 채팅 모델 토큰 추적하기
if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
print(event["data"])
on_chat_model_stream
: LLM이 토큰 단위로 점진적으로 응답을 생성할 때마다 발생하는 이벤트입니다. LLM이 긴 응답을 생성할 때 각각의 토큰(작은 텍스트 단위)이 실시간으로 스트리밍되어 결과가 나오는 것입니다. 이 방식은 실시간으로 토큰을 추적하며, 긴 응답을 기다리지 않고 중간 결과를 빠르게 확인할 수 있습니다.{'chunk': AIMessageChunk(content='대한', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='민국', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content=' 축', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='구', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content=' 국가', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='대표', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='팀', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='은', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content=' 아', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='시아', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='에서', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content=' 강', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
{'chunk': AIMessageChunk(content='력', additional_kwargs={}, response_metadata={}, id='run-f5d6eb65-2633-4bfe-bb73-82cc16348735')}
(중략)
event['data']
값이 AIMessageChunk
형태로 제공되며, 이를 사용해 실제 토큰 데이터를 추출합니다.chunk
키를 사용하여 스트리밍된 텍스트 데이터를 얻고, 이를 실시간으로 확인할 수 있습니다.config = {"configurable": {"thread_id": "222"}}
input_message = HumanMessage(content="피파에서 대한민국은 어느정도 실력인가요?")
async for event in graph.astream_events({"messages": [input_message]}, config, version="v2"):
# 특정 노드에서 채팅 모델 토큰 가져오기
# if event["event"] == "on_chat_model_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
if event["event"] == "on_chain_stream" and event['metadata'].get('langgraph_node','') == node_to_stream:
data = event["data"]
print(data["chunk"]['messages'].content)
Breakpoints는 작업 흐름에서 특정 지점에서 그래프의 실행을 멈추고, 사용자가 검토하거나 작업을 중단할 수 있게 해주는 기능입니다.
Human-in-the-Loop
워크플로우에서 중요한 요소로, 민감한 작업(예: 데이터베이스 쓰기, 외부 시스템에 쓰기)에 대해 사람이 승인을 할 수 있도록 해줍니다.Human-in-the-loop (인간 개입)
개념:
LangGraph에서의 Human-in-the-loop
동기:
a) 승인 (Approval)
:
b) 디버깅 (Debugging)
:
c) 편집 (Editing)
:
브레이크포인트 (Breakpoints)
:
브레이크포인트 (Breakpoints)
Interrupt Before
또는Interrupt After
를 통해 브래이크포인트를 설정할 수 있습니다.
- Interrupt Before: 특정 노드가 실행되기 전에 그래프 실행을 중단합니다.
- Interrupt After: 특정 노드 실행 후에 그래프를 멈춥니다.
예제
multiply
, add
, divide
함수가 정의되어 있습니다. 이들은 간단한 산술 연산을 수행합니다.bind_tools
를 통해 LLM에 바인딩됩니다.from langchain_openai import ChatOpenAI
def multiply(a: int, b: int) -> int:
"""a와 b를 곱합니다.
인자:
a: 첫 번째 정수
b: 두 번째 정수
"""
return a * b
# 이것은 도구가 될 것입니다
def add(a: int, b: int) -> int:
"""a와 b를 더합니다.
인자:
a: 첫 번째 정수
b: 두 번째 정수
"""
return a + b
def divide(a: int, b: int) -> float:
"""a를 b로 나눕니다.
인자:
a: 첫 번째 정수
b: 두 번째 정수
"""
return a / b
tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4")
llm_with_tools = llm.bind_tools(tools)
assistant 노드
: LLM을 사용하여 사용자 입력에 응답합니다.tools 노드
: 실제 산술 연산을 수행합니다.add_conditional_edges
메서드에 tools_condition
을 사용하면, 자동으로 도구 호출 여부에 따른 조건부 라우팅이 설정됩니다.interrupt_before=["tools"]
: tools 노드 실행 전에 그래프가 중단됩니다.MemorySaver
를 사용하여 상태를 저장하고 관리합니다.graph.stream()
을 사용하여 실행 과정을 스트리밍합니다.graph.get_state()
를 사용하여 현재 그래프의 상태를 가져옵니다.state.next
를 통해 다음에 실행될 노드를 확인할 수 있습니다.예제
# 입력
initial_input = {"messages": HumanMessage(content="2와 3을 곱하세요")}
# 스레드
thread = {"configurable": {"thread_id": "3"}}
# 첫 번째 중단까지 그래프 실행
for event in graph.stream(initial_input, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
# 사용자 피드백 받기
user_approval = input("도구를 호출하시겠습니까? (예/아니오): ")
# 승인 확인
if user_approval.lower() == "예":
# 승인된 경우, 그래프 실행 계속
for event in graph.stream(None, thread, stream_mode="values"):
event['messages'][-1].pretty_print()
else:
print("사용자에 의해 작업이 취소되었습니다.")
LangGraph는 중단점(Breakpoints)을 활용하여 그래프의 상태를 관리하고, 특히 사람이 개입하는 흐름에서 상태를 수정하거나 확인할 수 있는 과정을 보여줍니다.
중단점(Breakpoints) 설정
중단점의 개념: 그래프의 특정 노드에서 실행을 일시 중단하고, 사용자의 개입을 기다리거나 상태를 수정하는 기회를 제공하는 메커니즘입니다. 아래의 목적으로 사용됩니다.
코드 예시
interrupt_before=["assistant"]
)상태 업데이트 (graph.update_state)
graph.update_state(
thread,
{"messages": [HumanMessage(content="아니요, 실제로 3과 3을 곱하세요!")]}
)
상태 업데이트: 이 코드는 현재 그래프의 상태를 업데이트하는 예시입니다.
thread
는 해당 그래프 실행의 고유 ID로, 어떤 상태를 업데이트할 것인지를 식별하는 데 사용됩니다.MessagesState: 메시지 상태를 관리하는 LangGraph의 내장된 클래스입니다. 이 클래스는 메시지들의 상태를 추적하고 업데이트할 때 사용됩니다.
Graph.update_state()
호출할 때, MessagesState는 메시지를 추가하거나 업데이트하는 작업을 내부적으로 처리하며, 이 과정에서 리듀서(reducer) 역할
을 수행합니다.
메시지 저장: messages 키
를 통해 메시지 상태를 유지 및 관리합니다.
상태 업데이트: update_state()
메서드를 사용하여 새로운 메시지를 추가하거나 기존 메시지를 수정할 때, MessagesState
클래스는 이를 자동으로 처리합니다.
추가/덮어쓰기 로직: 메시지가 추가될 때 id가 없으면
새로운 메시지를 리스트에 추가하고, id가 있으면
해당 메시지를 덮어쓰는 방식으로 동작합니다.
💡(참고)
graph.update_state()
메서드를 호출하면 기본적으로 다음과 같은 동작이 수행됩니다:
- "messages" 키에 대한 기본 리듀서 함수가 실행됩니다. 이 리듀서는 다음과 같이 동작합니다:
- 새 메시지의 ID가 기존 메시지와 일치하면 해당 메시지를 대체합니다.
- ID가 일치하지 않으면 새 메시지를 기존 메시지 목록의 끝에 추가합니다.
상태 업데이트 후 그래프 재실행
graph.stream(None, thread, stream_mode="values"): 그래프를 재실행하면서, 중단되었던 지점에서 상태를 확인하고, 상태가 업데이트된 후 남은 노드를 실행합니다.
재실행 과정:
None
을 전달하여 중단된 지점에서 다시 실행을 계속합니다.Dummy Node와 피드백 적용
이 기법은 실시간 사용자 개입을 통해, LLM과의 상호작용을 보다 동적
으로 만들 수 있는 방법입니다.
사용자는 특정 노드에서 실행 중단 후 상태를 수정할 수 있으며, 그 이후로도 그래프가 자연스럽게 계속 실행됩니다.
이를 통해 더 복잡한 작업 흐름에서 인간 피드백을 반영할 수 있습니다.
Dummy Node란?
: Dummy Node는 실제로는 아무 작업도 하지 않지만, 특정 시점에서 사용자의 피드백을 받을 수 있도록 중단점을 만들어 주는 가상의 노드입니다.
피드백 적용
: Dummy Node는 사용자의 피드백을 받을 수 있는 구간을 제공하며, 그래프 실행을 잠시 중단합니다. 이후 사용자가 상태 업데이트를 요청하면, 그 피드백을 받아 그래프 상태에 반영합니다.
그래프 정의 : 위 그래프에는 3개의 주요 노드가 있습니다:
human_feedback
: 사용자가 피드백을 제공할 수 있는 가상의 노드(Dummy Node)입니다.
assistant
: 사용자 입력과 LLM을 이용해 도구 호출 등의 연산을 수행하는 노드입니다.
tools
: 도구 호출을 담당하는 노드로, 실제 연산(예: 덧셈, 곱셈)을 수행합니다.
# 시스템 메시지
sys_msg = SystemMessage(content="당신은 일련의 입력에 대해 산술 연산을 수행하는 도움이 되는 어시스턴트입니다.")
# 중단되어야 하는 무작동 노드
def human_feedback(state: MessagesState):
pass
# 어시스턴트 노드
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# 그래프 빌드
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_node("human_feedback", human_feedback)
중단점 정의
human_feedback
노드에 지정했습니다. 즉, 이 노드에 도달할 때 그래프 실행이 멈추고 사용자의 피드백을 기다리게 됩니다.interrupt_before=["human_feedback"]
부분이 이 역할을 담당합니다.# 그래프를 human_feedback 노드 전에 중단
graph = builder.compile(interrupt_before=["human_feedback"], checkpointer=memory)
실행 중 피드백 받기
human_feedback
노드까지 실행된 후 멈춥니다.input()
을 통해 피드백을 받습니다.graph.update_state()
메서드를 통해 human_feedback
노드의 상태에 반영됩니다.# 사용자로부터 피드백 받기
user_input = input("상태를 어떻게 업데이트하고 싶은지 말씀해 주세요: ")
# human_feedback 노드인 것처럼 상태를 업데이트
graph.update_state(thread, {"messages": user_input}, as_node="human_feedback")
상태 업데이트 및 실행 재개
graph.update_state()
를 통해 해당 피드백을 상태에 반영한 뒤 그래프 실행을 계속합니다. # 그래프 실행 계속
for event in graph.stream(None, thread, stream_mode="values"):
event["messages"][-1].pretty_print()
특정 조건에 따라
자동으로 발생하는 중단점
입니다. 특정 상태
나 조건을 감지
하여 자동으로 멈추고, 이후 상태를 수정하거나 추가적인 입력을 받는 형태로 이어질 수 있습니다. 예상치 못한 값을 가질 때
, 상태 업데이트가 필요할 때
, 또는 특정 조건이 충족될 때
매우 유용합니다.동적 중단점(Dynamic Breakpoints)의 목표
일반적인 중단점은 그래프가 특정 노드에 도달할 때 수동으로 설정되지만, 동적 중단점은 그래프가 스스로 특정 조건에 따라 중단
하도록 할 수 있습니다.
이를 통해 그래프는 내부에서 스스로 상태를 점검하고 필요시 중단하여, 상태 업데이트
또는 수정
이 가능하게 만듭니다.
동적 중단점(Dynamic Breakpoints)의 이점
조건부로 중단: 개발자가 정의한 특정 로직에 따라 조건부로 중단이 가능합니다.
중단 이유 전달: 중단점이 발생한 이유를 사용자에게 명확히 전달할 수 있습니다.
NodeInterrupt
에 전달할 정보를 포함시켜 왜 중단이 발생했는지를 설명할 수 있습니다.코드 실습
1. 그래프 설정
StateGraph
를 사용하여 간단한 3단계 그래프를 설정합니다. step_1
, step_2
, step_3
으로 구성되며, step_2에서 입력값의 길이가 5자 이상일 경우 자동으로 중단시키는 NodeInterrupt
를 사용합니다.from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph
# 상태 정의
class State(TypedDict):
input: str
# 1단계: 상태를 그대로 출력
def step_1(state: State) -> State:
print("---1단계---")
return state
# 2단계: 입력의 길이가 5자를 넘으면 중단 발생
def step_2(state: State) -> State:
if len(state['input']) > 5:
raise NodeInterrupt(f"5자보다 긴 입력을 받았습니다: {state['input']}")
print("---2단계---")
return state
# 3단계: 상태를 그대로 출력
def step_3(state: State) -> State:
print("---3단계---")
return state
# 그래프 설정
builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)
# 메모리 설정
memory = MemorySaver()
# 그래프 컴파일
graph = builder.compile(checkpointer=memory)
2. 그래프 실행 및 중단점 발생
입력값이 5자보다 길면 그래프는 step_2에서 중단됩니다.
initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "1"}}
# Run the graph until the first interruption
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
print(event)
실행 결과:
{'input': 'hello world'}
---1단계---
{'input': 'hello world'}
이 시점에서 그래프는 step_2에서 멈추게 되고, 상태를 확인할 수 있습니다.
현재 상태를 조금 더 명확하게 설명하자면, NodeInterrupt가 발생했기 때문에 노드와 엣지 간의 정확한 위치를 구분하는 것이 중요합니다.
노드 위치 관점에서: 실행 흐름은 step_1
을 완료하고 step_2
를 실행하는 도중에 중단되었습니다. 이는 아직 step_2
의 실행이 완료되지 않았음을 의미하므로, 현재는 step_2
에 도달한 상태입니다.
엣지 위치 관점에서: step_1
에서 step_2
로의 전이가 이루어졌으나, step_2
가 완료되지 않았으므로 1-2 엣지"에서 step_2
로 넘어가는 중에 멈춘 상태라고도 볼 수 있습니다.
3. 상태 확인 및 수정
그래프가 중단된 상태에서 다음 실행할 노드가 step_2
임을 확인합니다.
state = graph.get_state(thread_config)
print(state.next) # ('step_2',)
그리고 중단점이 발생한 이유도 확인할 수 있습니다.
print(state.tasks)
(PregelTask(id='ec5598b4-c9cc-ce9e-8387-5f27e35742a5', name='step_2', path=('__pregel_pull', 'step_2'), error=None, interrupts=(Interrupt(value='5자보다 긴 입력을 받았습니다: hello world', when='during'),), state=None),)
4. 상태 업데이트 및 그래프 재실행
상태를 변경하지 않으면 같은 노드에서 계속 멈춰있게 됩니다.
상태를 "Hello"으로 변경하여 그래프를 재실행합니다. 변경 후에는 그래프의 끝까지 정상적으로 실행되는 것을 확인할 수 있습니다.
# 상태 업데이트: 입력을 "Hello"으로 변경
graph.update_state(
thread_config,
{"input": "Hello"},
)
# 그래프 재실행
for event in graph.stream(None, thread_config, stream_mode="values"):
print(event)
업데이트 후 실행 결과:
{'input': 'Hello'}
---2단계---
{'input': 'Hello'}
---3단계---
{'input': 'Hello'}
✨ 동적 중단점의 핵심 개념
1. 자동 중단점 설정: 그래프는 특정 조건(예: 입력 길이 초과)을 만족할 때 자동으로 멈추고, 중단점을 발생시킵니다.
2. 상태 확인 및 업데이트: 그래프가 중단되면 현재 상태를 확인하고, 필요시 상태를 업데이트할 수 있습니다.
3. 그래프 재실행: 상태가 업데이트된 후, 그래프는 중단된 시점부터 다시 실행됩니다.
4. 유연한 워크플로우 구성: 동적 중단점을 사용하면, 사용자의 승인, 디버깅, 상태 수정 등을 유연하게 처리할 수 있습니다.
Time Travel
은 그래프의 이전 상태로 돌아가거나, 그 상태를 재생하여 과거의 실행 흐름을 살펴보고 문제를 분석하는 데 유용한 기능입니다.
디버깅
, 성능 최적화
, 여러 경로 탐색
등에 도움을 줍니다. Replay(재생):
Fork(분기):
Time Travel의 주요 단계
0. Agent Define
from langchain_openai import ChatOpenAI
def multiply(a: int, b: int) -> int:
"""a와 b를 곱합니다.
인자:
a: 첫 번째 정수
b: 두 번째 정수
"""
return a * b
# 이것은 도구가 될 것입니다
def add(a: int, b: int) -> int:
"""a와 b를 더합니다.
인자:
a: 첫 번째 정수
b: 두 번째 정수
"""
return a + b
def divide(a: int, b: int) -> float:
"""a를 b로 나눕니다.
인자:
a: 첫 번째 정수
b: 두 번째 정수
"""
return a / b
tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4")
llm_with_tools = llm.bind_tools(tools)
from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import MessagesState
from langgraph.graph import START, END, StateGraph
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
# 시스템 메시지
sys_msg = SystemMessage(content="당신은 일련의 입력에 대해 산술 연산을 수행하는 도움이 되는 어시스턴트입니다.")
# 노드
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}
# 그래프
builder = StateGraph(MessagesState)
# 노드 정의: 이들이 작업을 수행합니다
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# 엣지 정의: 이들이 제어 흐름을 결정합니다
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# 어시스턴트의 최신 메시지(결과)가 도구 호출이면 -> tools_condition이 tools로 라우팅합니다
# 어시스턴트의 최신 메시지(결과)가 도구 호출이 아니면 -> tools_condition이 END로 라우팅합니다
tools_condition,
)
builder.add_edge("tools", "assistant")
memory = MemorySaver()
graph = builder.compile(checkpointer=MemorySaver())
# 표시
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
1. 현재 상태 조회
get_state
메서드를 호출하여 그래프의 현재 상태를 확인할 수 있습니다. 이는 현재 실행 중이거나 완료된 그래프의 상태를 가져옵니다.state = graph.get_state({'configurable': {'thread_id': '1'}})
print(state)
2. 상태 히스토리 조회
get_state_history
메서드를 사용하면 그래프 실행 동안 발생한 모든 상태의 히스토리를 조회할 수 있습니다. 이 히스토리는 각 단계에서 그래프가 어떤 상태를 가졌는지 보여줍니다.history = graph.get_state_history(thread)
for state in history:
print(state)
history = graph.get_state_history(thread)
for i, state in enumerate(history,1):
print(i)
print(state)
(참고) 중간 정리
체크포인트
는 여행 중 찍은 사진 한 장이고,스레드
는 그 여행의 전체 사진 앨범이라고 할 수 있습니다.
Graph: 그래프는 노드
와 엣지
로 구성되어 있는 컨트롤 플로우
입니다.
Checkpoints: 체크포인터(checkpointer)
가 각 노드의 상태
를 체크포인트로 기록합니다.
체크포인트
에는 상태 정보와 함께 '다음에 어디로 갈지'와 같은 메타데이터도 포함됩니다.Thread: 스레드(thread)는 체크포인트의 모음
입니다.
StateSnapshot: 상태 스냅샷(StateSnapshot)은 체크포인트의 데이터 타입
니다.
Graph.get_state(): 현재(가장 최근의) 체크포인트
를 반환하는 함수입니다.
Graph.get_state_history(): 모든 체크포인트의 리스트
를 반환하여, 상태 변화의 히스토리를 제공합니다.
3. Replay (다시 재생)
Replay
는 그래프 실행이 완료된 후, 특정 체크포인트에서 그래프를 다시 실행하지 않고 그 상태를 그대로 재생하는 기능입니다. 현재 상태
에서 그래프를 실행
합니다. StateSnapshot()을 반환
합니다.다시 실행(re-play)
합니다. 실제로 노드들이 실행되지 않는다
는 것입니다.# 특정 체크포인트 ID를 기반으로 재생
checkpoint_id = history[-2]['config']['checkpoint_id']
graph.stream(None, {'configurable': {'thread_id': '1', 'checkpoint_id': checkpoint_id}}, stream_mode="values")
4. Forking (분기)
Forking
은 특정 체크포인트에서 상태를 수정하고 새로운 실행 흐름을 만드는 기능입니다. ① 현재 상태에서 포킹
우리는 LangGraph의 Graph.update_state
메서드를 사용하여 현재 상태를 포킹할 수 있습니다.
예를 들어, 다음과 같은 코드를 통해 상태를 업데이트할 수 있습니다:
Graph.update_state({thread_id}, {state: "I like cold brew"})
위 코드를 실행하면 thread_id의 현재 그래프 상태가 업데이트되면서 새로운 체크포인트가 생성됩니다. 이를 "현재 상태에서 포킹"한다고 표현할 수 있습니다.
상태가 기록되고 새로운 체크포인트가 생성되면, 포크된 새로운 상태 스냅샷(StateSnapshot)이 만들어집니다.
위 그림의 첫번째 예시에서는 "I heart Langchain
" ⟶ "I like cold brew
"라는 상태로 새로운 체크포인트가 만들어집니다.
② 과거 체크포인트에서 포킹
포킹의 또 다른 방식은 과거의 특정 체크포인트에서 새로운 상태를 포킹하는 것입니다.
이를 위해, 우리는 특정 checkpoint_id
를 제공하여 그 시점의 상태를 기반으로 포크할 수 있습니다.
예를 들어, 다음과 같은 코드로 특정 체크포인트에서 상태를 포킹할 수 있습니다:
Graph.update_state({checkpoint_id}, thread_id, {state: "I like"})
이 경우, 이전에 생성된 특정 체크포인트에서 상태가 업데이트되고 새로운 체크포인트가 생성됩니다.
과거의 상태를 기반으로 새로운 상태를 만들어 포크하는 과정이라고 할 수 있습니다.
이를 통해 특정 시점으로 돌아가 새로운 실행 흐름을 만들 수 있습니다.
💬 Re-play vs Re-Execute
- 재생(Replaying): 이미 실행된 체크포인트를 다시 실행할 때, 그래프는 그 상태가 이미 실행되었음을 인지하고 재생합니다. 즉, 같은 상태를 그대로 유지한 채 실행이 반복됩니다. 예를 들어, 이전에 실행된 상태 스냅샷을 전달하면, 그 상태에 따라 그래프가 이미 실행된 것처럼 동작하게 됩니다.
- 재실행(Re-executing): 반면, 새로운 체크포인트를 통해 그래프를 실행하면, 그래프는 이전에 이 상태가 실행된 적이 없다는 것을 인식하고 새롭게 실행을 시작합니다. 이를 재실행이라고 부릅니다. 포크된 체크포인트가 처음 실행되는 경우, 새로운 상태를 바탕으로 그래프가 다시 실행됩니다.
상태 덧붙임(Appending)과 덮어쓰기(Overwriting)
상태 업데이트 시 중요한 부분은 덧붙임(Appending)과 덮어쓰기(Overwriting)입니다.
예를 들어, 메시지와 같은 상태는 기본적으로 덧붙여지지만, 특정 id
를 전달하면 해당 메시지가 덮어쓰기 됩니다.
이는 add messages reducer
와 같은 방식에서 발생합니다.
📋 주의
messages
에 대한 리듀서가 어떻게 작동하는지 기억하세요!
- 메시지 ID를 제공하지 않으면 뒤에 추가(append)합니다.
- 상태에 추가하는 대신 메시지를 덮어쓰기(overwrite) 위해 메시지 ID를 제공해야합니다!
Graph.update_state({checkpoint_id}, thread_id, {state: "I like", id: "message_id"})
위 코드를 수행하면 새로운 메시지가 추가되지 않고 기존 메시지가 덮어씌워지며 업데이트됩니다.
메시지 상태를 어떻게 관리할지에 따라 덧붙일지 또는 덮어쓸지를 결정할 수 있습니다.
상태 히스토리 관리
각 상태 업데이트는 그래프의 히스토리에서 새로운 체크포인트를 생성합니다. 예를 들어, "I like cold brew"라는 상태를 업데이트하고 새로운 체크포인트가 생성되었다면, 이 체크포인트는 히스토리에 기록됩니다.
여러 번 상태를 업데이트하면, 각 상태는 고유한 체크포인트로 관리되고, 필요한 경우 과거의 특정 체크포인트로 돌아가 새로운 포크를 만들 수 있습니다.
히스토리에서 새로운 체크포인트를 추가하는 방식으로 상태를 관리할 수 있으며, 그래프의 각 스레드는 여러 체크포인트로 구성됩니다. 이렇게 하면 매우 유연하게 상태를 관리할 수 있습니다.
포크된 체크포인트로 재실행
포크된 체크포인트를 통해 그래프를 다시 실행할 때, LangGraph는 해당 체크포인트가 이전에 실행된 적이 없는지 확인하고 재실행합니다.
예를 들어, 새로운 checkpoint_id
로 재실행을 하면, 과거 상태에서 새로운 실행 흐름이 시작됩니다.
다음과 같이 새로운 포크된 체크포인트를 제공하여 재실행할 수 있습니다:
Graph.stream(None, {checkpoint_id}, thread_id)
이 코드를 통해 포크된 체크포인트로 그래프가 재실행되고, 그에 따라 새로운 상태가 생성됩니다.
이 강의 내용은 Langraph의 다양한 기능과 Human-in-the-Loop의 활용 방안을 중심으로 구성되어 있으며, 각각의 기능은 실제 워크플로우에서 매우 유용하게 사용됩니다. 각 레슨의 세부 내용을 철저히 이해하면 Langraph를 이용한 고급 작업 흐름을 쉽게 구현할 수 있을 것입니다.