Flow Engineering

김지우·2025년 1월 21일

  1. 작성 개요
  2. State 정의
  3. Node 정의
  4. 그래프 정의: Conventional RAG
  5. 그래프 정의: 재검색
  6. 그래프 정의: 멀티 LLM
  7. 그래프 정의: 쿼리 재작성

1. 작성 개요

다음 글은 LangGraph를 통해 만든 그래프의 flow engineering을 위한 게시물이다. LangGraph를 다룰 때는 세부적인 기능 정리도 중요하지만, 그래프가 어떻게 동작하는지 그 흐름을 이해하고, 구조적인 변경을 쉽게 만들어 내는 것이 중요하다.

따라서 그래프를 만들고, 경로 변경이 필요한 경우 어떤 식으로 바꿀 수 있는지에 대한 내용을 해당 게시물에서 정리 하려고 한다. 다음 그래프에서 사용되는 코드는 수도 코드로 동작을 목표로 하는 것이 아님을 명시한다.

또 해당 글은 테디노트님의 유료강의를 듣고 복습하기 위해 작성된 글로, 글의 내용이 부족하다고 여기시는 분은 테디노트님 유료 강의를 구매하여 들으시는 것을 권하여 드립니다.

2. State 정의

우선 그래프를 만드는 과정을 중요한 워딩만 사용하여 3단계로 나눈다면, State 정의 , Node 정의 , Graph 정의이다. 우선 State 정의를 하면 다음과 같다.

from typing import TypedDict, Annotated, List
from langchain_core.documents import Document
import operator


# State 정의
class GraphState(TypedDict):
    context: Annotated[List[Document], operator.add]
    answer: Annotated[List[Document], operator.add]
    question: Annotated[str, "user question"]
    sql_query: Annotated[str, "sql query"]
    binary_score: Annotated[str, "binary score yes or no"]

다음과 같이 GraphState를 정의해야 하는데, TypedDict를 상속 받아 만들어진다. 이를 통해 State의 구조(Dict/TypedDict)와 데이터의 키밸류 값들을 확인할 수 있다.

참고하면 좋은 부분은 Annotated와 Reducer다. Annotated는 밸류값의 타입과 주석을 확인할 수 있다.

Reducer는 Dict과 같은 형태를 갖는 GraphState가 새로운 값을 추가할 때 마다 값을 대체하는 기본 속성을 가지고 있는데, 이를 방지하기 위함이고, 보통 List에 적용된다. 여기서는 operator.add로 나타나 있고, 새로운 값이 나왔을 때 대체하는 것이 아니라 리스트에 값을 추가하는 방식으로 동작함을 나타내기 위함이다.

Annotated에 str이면 Reducer를 쓸 수 없다.(당연하겠지만)

3. Node 정의

def retrieve(state: GraphState) -> GraphState:
    # retrieve: 검색
    documents = "검색된 문서"
    return {"context": documents}


def rewrite_query(state: GraphState) -> GraphState:
    # Query Transform: 쿼리 재작성
    documents = "검색된 문서"
    return GraphState(context=documents)


def llm_gpt_execute(state: GraphState) -> GraphState:
    # LLM 실행
    answer = "GPT 생성된 답변"
    return GraphState(answer=answer)


def llm_claude_execute(state: GraphState) -> GraphState:
    # LLM 실행
    answer = "Claude 의 생성된 답변"
    return GraphState(answer=answer)


def relevance_check(state: GraphState) -> GraphState:
    # Relevance Check: 관련성 확인
    binary_score = "Relevance Score"
    return GraphState(binary_score=binary_score)


def sum_up(state: GraphState) -> GraphState:
    # sum_up: 결과 종합
    answer = "종합된 답변"
    return GraphState(answer=answer)


def search_on_web(state: GraphState) -> GraphState:
    # Search on Web: 웹 검색
    documents = state["context"] = "기존 문서"
    searched_documents = "검색된 문서"
    documents += searched_documents
    return GraphState(context=documents)


def get_table_info(state: GraphState) -> GraphState:
    # Get Table Info: 테이블 정보 가져오기
    table_info = "테이블 정보"
    return GraphState(context=table_info)


def generate_sql_query(state: GraphState) -> GraphState:
    # Make SQL Query: SQL 쿼리 생성
    sql_query = "SQL 쿼리"
    return GraphState(sql_query=sql_query)


def execute_sql_query(state: GraphState) -> GraphState:
    # Execute SQL Query: SQL 쿼리 실행
    sql_result = "SQL 결과"
    return GraphState(context=sql_result)


def validate_sql_query(state: GraphState) -> GraphState:
    # Validate SQL Query: SQL 쿼리 검증
    binary_score = "SQL 쿼리 검증 결과"
    return GraphState(binary_score=binary_score)


def handle_error(state: GraphState) -> GraphState:
    # Error Handling: 에러 처리
    error = "에러 발생"
    return GraphState(context=error)


def decision(state: GraphState) -> GraphState:
    # 의사결정
    decision = "결정"
    # 로직을 추가할 수 가 있고요.

    if state["binary_score"] == "yes":
        return "종료"
    else:
        return "재검색"

노드 들은 기본적으로 파이썬 함수로 정리되고, 이해하고 있어야 하는 부분은 인풋도 아웃풋도 모두 GraphState가 들어가야 한다.

4. 그래프 정의:Conventional RAG

우선 가장 기본적인 형태의 RAG를 실행하는 그래프를 다음 코드에서 정의한다.

from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver

# (1): Conventional RAG
# langgraph.graph에서 StateGraph와 END를 가져옵니다.
workflow = StateGraph(GraphState)

# 노드를 추가합니다.
workflow.add_node("retrieve", retrieve)

workflow.add_node("GPT 요청", llm_gpt_execute)

workflow.add_node("GPT_relevance_check", relevance_check)

workflow.add_node("결과 종합", sum_up)

# 각 노드들을 연결합니다.
workflow.add_edge("retrieve", "GPT 요청")

workflow.add_edge("GPT 요청", "GPT_relevance_check")
workflow.add_edge("GPT_relevance_check", "결과 종합")

workflow.add_edge("결과 종합", END)  # (2) - off



# 시작점을 설정합니다.
workflow.set_entry_point("retrieve")

# 기록을 위한 메모리 저장소를 설정합니다.
memory = MemorySaver()

# 그래프를 컴파일합니다.
app = workflow.compile(checkpointer=memory)

기본적으로 그래프 생성은
1) StateGraph 생성, 2) 노드 추가 3) 노드 연결 4) 시작, 종료점 설정, 5) 그래프 컴파일로 구성된다.

상기 구조로 그래프를 정의하면 다음 순서대로 노드들이 동작한다.
retrieve -> GPT 요청 -> GPT_relevance_check -> 결과 종합
다음 구조는 가장 일반적으로 Chain을 통해 RAG를 하는 것과 동일한 방식으로 동작한다.

5. 그래프 정의: 재검색

그러나 GPT_relevance_check 를 통해 적절히 검색되지 않았다면 다시 검색해 결과물이 더 적절히 나올 수 있도록 해야 한다.

from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver


# (1): Conventional RAG
# (2): 재검색
# (3): 멀티 LLM
# (4): 쿼리 재작성


# langgraph.graph에서 StateGraph와 END를 가져옵니다.
workflow = StateGraph(GraphState)

# 노드를 추가합니다.
workflow.add_node("retrieve", retrieve)

workflow.add_node("GPT 요청", llm_gpt_execute)
workflow.add_node("GPT_relevance_check", relevance_check)
workflow.add_node("결과 종합", sum_up)

# 각 노드들을 연결합니다.
workflow.add_edge("retrieve", "GPT 요청")
workflow.add_edge("GPT 요청", "GPT_relevance_check")
workflow.add_edge("GPT_relevance_check", "결과 종합")

# 조건부 엣지를 추가합니다. (2), (4)
workflow.add_conditional_edges(
    "결과 종합",  # 관련성 체크 노드에서 나온 결과를 is_relevant 함수에 전달합니다.
    decision,
    {
        "재검색": "retrieve",  # 관련성이 있으면 종료합니다.
        "종료": END,  # 관련성 체크 결과가 모호하다면 다시 답변을 생성합니다.
    },
)


# 시작점을 설정합니다.
workflow.set_entry_point("retrieve")

# 기록을 위한 메모리 저장소를 설정합니다.
memory = MemorySaver()

# 그래프를 컴파일합니다.
app = workflow.compile(checkpointer=memory)

앞 부분 과 다른 부분은 조건부 엣지가 있다는 것이다. 관련성 체크 노드에서 나온 결과가 "재검색"인지, "종료"인지에 따라 조건부 엣지는 다른 노드로 이어진다. "재검색"인 경우 retrive 노드로 이동하여 관련성있는 문서를 다시 검색하고, "종료"라면 그래의 동작이 끝난다.

6. 그래프 정의: 멀티 LLM

그렇다면 다양한 종류의 작업이 동시에 이루어 져야할 때 병렬처리도 시도할 수 있다. 개인적으로 Chain을 통한 작업은 여러가지 데이터를 다룰 때 불편하기 때문에 LangGraph를 이용하는 것이 훨씬 편하고 적절히 작업할 수 있다는 판단이 들었다.

from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langchain_teddynote.graphs import visualize_graph

# (1): Conventional RAG
# (2): 재검색
# (3): 멀티 LLM
# (4): 쿼리 재작성


# langgraph.graph에서 StateGraph와 END를 가져옵니다.
workflow = StateGraph(GraphState)

# 노드를 추가합니다.
workflow.add_node("retrieve", retrieve)


workflow.add_node("GPT 요청", llm_gpt_execute)
workflow.add_node("Claude 요청", llm_claude_execute)  # (3)
workflow.add_node("GPT_relevance_check", relevance_check)
workflow.add_node("Claude_relevance_check", relevance_check)  # (3)
workflow.add_node("결과 종합", sum_up)

# 각 노드들을 연결합니다.
workflow.add_edge("retrieve", "GPT 요청")
workflow.add_edge("retrieve", "Claude 요청")  # (3)
workflow.add_edge("GPT 요청", "GPT_relevance_check")
workflow.add_edge("GPT_relevance_check", "결과 종합")
workflow.add_edge("Claude 요청", "Claude_relevance_check")  # (3)
workflow.add_edge("Claude_relevance_check", "결과 종합")  # (3)

workflow.add_edge("결과 종합", END)  # (2) - off

# 시작점을 설정합니다.
workflow.set_entry_point("retrieve")

# 기록을 위한 메모리 저장소를 설정합니다.
memory = MemorySaver()

# 그래프를 컴파일합니다.
app = workflow.compile(checkpointer=memory)

# 그래프 시각화
visualize_graph(app)

이 그래프는 병렬처리를 지원한다. 6개의 노드가 있는데 시작하는 노드인 retrieve 에서 GPT요청과 Claude 요청으로 나뉜다. 각자의 relevance_check를 지원하고 이후 결과를 종합한 후 끝낸다.

add_edge에서 같은 노드에서 서로 다른 노드로 갈 수 있도록 두개의 엣지를 만들면 기본적으로 병렬 처리를 수행한다. LangChain에 비해 병렬처리 방식이 훨씬 간단하다는 느낌이든다.

7. 그래프 정의: 쿼리 재작성

from langgraph.graph import END, StateGraph
from langgraph.checkpoint.memory import MemorySaver

# (1): Conventional RAG
# (2): 재검색
# (3): 멀티 LLM
# (4): 쿼리 재작성


# langgraph.graph에서 StateGraph와 END를 가져옵니다.
workflow = StateGraph(GraphState)

# 노드를 추가합니다.
workflow.add_node("retrieve", retrieve)

workflow.add_node("rewrite_query", rewrite_query)  # (4)

workflow.add_node("GPT 요청", llm_gpt_execute)
workflow.add_node("GPT_relevance_check", relevance_check)
workflow.add_node("결과 종합", sum_up)

# 각 노드들을 연결합니다.
workflow.add_edge("retrieve", "GPT 요청")
workflow.add_edge("rewrite_query", "retrieve")  # (4)
workflow.add_edge("GPT 요청", "GPT_relevance_check")
workflow.add_edge("GPT_relevance_check", "결과 종합")

# 조건부 엣지를 추가합니다. (4)
workflow.add_conditional_edges(
    "결과 종합",  # 관련성 체크 노드에서 나온 결과를 is_relevant 함수에 전달합니다.
    decision,
    {
        "재검색": "rewrite_query",  # 관련성이 있으면 종료합니다.
        "종료": END,  # 관련성 체크 결과가 모호하다면 다시 답변을 생성합니다.
    },
)

# 시작점을 설정합니다.
workflow.set_entry_point("retrieve")

# 기록을 위한 메모리 저장소를 설정합니다.
memory = MemorySaver()

# 그래프를 컴파일합니다.
app = workflow.compile(checkpointer=memory)

다음 코드의 경우 재검색 노드를 추가한 것에 하나가 더 추가된 것이다. 재검색을 한다고 하더라도, 쿼리가 바뀌지 않으면 검색이 더 좋아지지 않을 수 있다. 따라서 재검색을 하는 노드로 조건부 엣지를 통해 보내고, 쿼리를 더 향상 시킨 뒤 재검색을 진행해야 한다.

profile
프로그래밍 기록 + 공부 기록

0개의 댓글