[SpoonOS] GraphAgent, 도구, MCP 프로토콜 및 메모리 관리

네오 블록체인·2026년 2월 6일

SpoonOS

목록 보기
20/28

통합 및 확장

그래프 시스템(Graph System)은 광범위한 SpoonOS 에코시스템과 원활하게 통합됩니다. 이 가이드는 그래프를 에이전트, 도구, MCP 서버 및 메모리 시스템과 연결하는 방법을 다룹니다.

GraphAgent 통합

GraphAgent는 그래프 실행을 SpoonOS 에이전트 라이프사이클, 영구 메모리 및 세션 관리와 함께 래핑합니다.

기본적인 GraphAgent 사용법

import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class AnalysisState(TypedDict, total=False):
    input: str
    output: str


async def analyze(state: AnalysisState) -> dict:
    return {"output": f"분석 완료: {state.get('input', '')}"}


def build_analysis_graph() -> StateGraph:
    graph = StateGraph(AnalysisState)
    graph.add_node("analyze", analyze)
    graph.set_entry_point("analyze")
    graph.add_edge("analyze", END)
    return graph  # 중요: 컴파일되지 않은 StateGraph를 반환합니다.


async def main():
    agent = GraphAgent(
        name="crypto_analyzer",
        graph=build_analysis_graph(),
        memory_path="./agent_memory",
        session_id="user_123_session",
        preserve_state=True,  # 실행 간 상태 보존
    )

    result = await agent.run("BTC 가격 동향 분석")
    print(result)


if __name__ == "__main__":
    asyncio.run(main())

에이전트 설정

from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class AnalysisState(TypedDict, total=False):
    input: str
    output: str


async def analyze(state: AnalysisState) -> dict:
    return {"output": f"분석 완료: {state.get('input', '')}"}


def build_analysis_graph() -> StateGraph:
    graph = StateGraph(AnalysisState)
    graph.add_node("analyze", analyze)
    graph.set_entry_point("analyze")
    graph.add_edge("analyze", END)
    return graph  # 중요: 컴파일되지 않은 StateGraph


agent = GraphAgent(
    name="trading_assistant",
    graph=build_analysis_graph(),  # StateGraph (컴파일되지 않음)
    preserve_state=True,  # 실행 간 상태 유지
    memory_path="./memory",  # 메모리 저장 디렉토리
    session_id="session_abc123",  # 고유 세션 식별자
    max_metadata_size=1024,  # 선택 사항: 저장되는 메타데이터 크기 제한
)

print(f"설정된 에이전트 세션: {agent.memory.session_id}")

실행 메타데이터

import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


async def main() -> None:
    class AnalysisState(TypedDict, total=False):
        input: str
        output: str

    async def analyze(state: AnalysisState) -> dict:
        return {"output": f"분석 완료: {state.get('input', '')}"}

    graph = StateGraph(AnalysisState)
    graph.add_node("analyze", analyze)
    graph.set_entry_point("analyze")
    graph.add_edge("analyze", END)

    agent = GraphAgent(
        name="trading_assistant",
        graph=graph,  # 컴파일되지 않은 StateGraph
        memory_path="./memory",
        session_id="session_abc123",
        preserve_state=True,
    )

    # 에이전트 실행
    result = await agent.run("BTC 전망은 어때?")
    print(result)

    # 실행 메타데이터 액세스
    metadata = agent.get_execution_metadata()

    print(f"성공 여부: {metadata.get('execution_successful')}")
    print(f"마지막 요청: {metadata.get('last_request')}")
    print(f"타임스탬프: {metadata.get('execution_time')}")


if __name__ == "__main__":
    asyncio.run(main())

세션 관리

from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class AnalysisState(TypedDict, total=False):
    input: str
    output: str


async def analyze(state: AnalysisState) -> dict:
    return {"output": f"분석 완료: {state.get('input', '')}"}


def build_analysis_graph() -> StateGraph:
    graph = StateGraph(AnalysisState)
    graph.add_node("analyze", analyze)
    graph.set_entry_point("analyze")
    graph.add_edge("analyze", END)
    return graph


agent = GraphAgent(
    name="session_manager",
    graph=build_analysis_graph(),
    memory_path="./memory",
    session_id="user_123_session",
)

# 현재 세션
print(f"현재 세션: {agent.memory.session_id}")

# 세션 전환
agent.load_session("user_456_session")
print(f"현재 세션: {agent.memory.session_id}")

도구 통합

외부 기능을 사용하기 위해 그래프 노드 내에서 SpoonOS 도구를 사용하십시오.

기본 제공 도구 사용

from typing import Any, TypedDict

from spoon_toolkits.crypto.crypto_powerdata.tools import CryptoPowerDataCEXTool


class MarketState(TypedDict, total=False):
    symbol: str
    market_data: Any
    data_source: str
    tool_error: str


async def fetch_market_data(state: MarketState) -> dict:
    """CryptoPowerData 도구를 사용하는 노드."""
    symbol = state.get("symbol", "BTC")
    tool = CryptoPowerDataCEXTool()

    result = await tool.execute(
        exchange="binance",
        symbol=f"{symbol}/USDT",
        timeframe="1h",
        limit=24,
    )

    if getattr(result, "error", None):
        return {"market_data": {}, "data_source": "binance", "tool_error": result.error}

    return {"market_data": result.output, "data_source": "binance"}

도구 결과 처리

import asyncio
import os
from typing import Any, TypedDict


try:
    from spoon_toolkits.crypto.crypto_data_tools.price_data import GetTokenPriceTool
except Exception:  # pragma: no cover - 선택적 종속성
    GetTokenPriceTool = None


class ProcessState(TypedDict, total=False):
    symbol: str
    tool_status: str
    tool_error: str
    tool_output: Any


async def process_with_tool(state: ProcessState) -> dict:
    """오류 처리가 포함된 강력한 도구 사용."""
    if os.getenv("DOC_SNIPPET_MODE") == "1" or GetTokenPriceTool is None:
        return {"tool_status": "skipped", "tool_output": {"reason": "DOC_SNIPPET_MODE 또는 툴킷 누락"}}

    tool = GetTokenPriceTool()
    try:
        result = await tool.execute(symbol=state.get("symbol", "ETH-USDC"))
        if getattr(result, "error", None):
            return {"tool_status": "error", "tool_error": result.error}
        return {"tool_status": "success", "tool_output": result.output}
    except Exception as e:
        return {"tool_status": "error", "tool_error": str(e)}


async def main() -> None:
    print(await process_with_tool({"symbol": "ETH-USDC"}))


if __name__ == "__main__":
    asyncio.run(main())

한 노드에서 여러 도구 사용

import asyncio
import os
from typing import Any, Dict, TypedDict


try:
    from spoon_toolkits.crypto.crypto_powerdata.tools import CryptoPowerDataCEXTool
except Exception:  # pragma: no cover - 선택적 종속성
    CryptoPowerDataCEXTool = None

try:
    from spoon_toolkits.crypto.crypto_data_tools.price_data import GetTokenPriceTool
except Exception:  # pragma: no cover - 선택적 종속성
    GetTokenPriceTool = None


class AnalysisState(TypedDict, total=False):
    symbol: str
    price_data: Any
    dex_price: Any


async def comprehensive_analysis(state: AnalysisState) -> dict:
    """여러 도구를 조율하는 노드."""
    symbol = state.get("symbol", "BTC")

    if os.getenv("DOC_SNIPPET_MODE") == "1" or CryptoPowerDataCEXTool is None or GetTokenPriceTool is None:
        return {
            "price_data": {"source": "stub", "symbol": symbol},
            "dex_price": {"source": "stub", "symbol": symbol},
        }

    results: Dict[str, Any] = {}

    # 도구 1: 가격 데이터
    price_tool = CryptoPowerDataCEXTool()
    price_data = await price_tool.execute(
        exchange="binance",
        symbol=f"{symbol}/USDT",
        timeframe="1h",
        limit=24,
    )
    results["price_data"] = price_data.output if not getattr(price_data, "error", None) else {"error": price_data.error}

    # 도구 2: DEX 스팟 가격 스냅샷
    dex_tool = GetTokenPriceTool()
    dex_price = await dex_tool.execute(symbol=f"{symbol}-USDC", exchange="uniswap")
    results["dex_price"] = dex_price.output if not getattr(dex_price, "error", None) else {"error": dex_price.error}

    return results


async def main() -> None:
    result = await comprehensive_analysis({"symbol": "BTC"})
    print(result)


if __name__ == "__main__":
    asyncio.run(main())

MCP 프로토콜 통합

동적 도구 검색을 위해 MCP(Model Context Protocol) 서버에 연결하십시오.

MCP 개요

graph LR
    A[그래프 노드] --> B[MCP 클라이언트]
    B --> C[MCP 서버]
    C --> D[외부 도구]
    C --> E[데이터 소스]
    C --> F[API]

그래프에서 MCP 도구 사용

import os
from typing import Any, TypedDict

from spoon_ai.tools.mcp_tool import MCPTool


class SearchState(TypedDict, total=False):
    query: str
    search_results: Any


tavily_tool = MCPTool(
    name="tavily-search",
    description="Tavily를 통한 웹 검색",
    mcp_config={
        "command": "npx",
        "args": ["--yes", "tavily-mcp"],
        "env": {"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "")},
    },
)


async def mcp_search_node(state: SearchState) -> dict:
    """MCP 도구를 호출하는 노드."""
    if not os.getenv("TAVILY_API_KEY"):
        return {"search_results": "건너뜀: TAVILY_API_KEY가 설정되지 않음"}

    result = await tavily_tool.execute(query=state.get("query", ""), max_results=5)
    return {"search_results": result}

async def main():
    # MCP 도구 노드 테스트
    result = await mcp_search_node({"query": "Bitcoin price"})
    print(f"\n쿼리: Bitcoin price")
    print(f"결과: {result.get('search_results', '결과 없음')}")


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

상위 수준의 MCP 통합

import os
import asyncio
from typing import TypedDict

from spoon_ai.graph.builder import HighLevelGraphAPI
from spoon_ai.graph.mcp_integration import MCPToolSpec


class MyState(TypedDict, total=False):
    user_query: str


async def main():
    api = HighLevelGraphAPI(MyState)

    # TAVILY_API_KEY 설정 확인
    tavily_key = os.getenv("TAVILY_API_KEY", "").strip()
    if not tavily_key or "..." in tavily_key:
        print("참고: TAVILY_API_KEY가 설정되지 않았습니다. MCP 도구 등록을 건너뜁니다.")
        print("이 예제를 사용하려면 TAVILY_API_KEY 환경 변수를 설정하십시오.")
        return

    # HighLevelGraphAPI를 사용하여 MCP 도구 등록
    api.register_mcp_tool(
        intent_category="research",
        spec=MCPToolSpec(name="tavily-search"),
        config={
            "command": "npx",
            "args": ["--yes", "tavily-mcp"],
            "env": {"TAVILY_API_KEY": tavily_key},
        },
    )

    # 도구 인스턴스 생성
    tool = api.create_mcp_tool("tavily-search")

    if tool:
        result = await tool.execute(query="Bitcoin price", max_results=3)
        print(f"검색 결과: {len(str(result))} 자 반환됨")
    else:
        print("MCP 도구 생성 실패. 설정을 확인하십시오.")


if __name__ == "__main__":
    asyncio.run(main())

더 많은 MCP 정보

다음을 위해 전용 MCP 프로토콜 가이드를 참조하십시오.

  • 서버 유형 (stdio, HTTP, WebSocket)
  • 인증 및 보안
  • 사용자 정의 MCP 서버 개발
  • 고급 도구 검색 패턴

메모리 관리

세션 간에 상태와 대화 기록을 유지합니다.

GraphAgent 메모리 작업

from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class AnalysisState(TypedDict, total=False):
    input: str
    output: str


async def analyze(state: AnalysisState) -> dict:
    return {"output": f"분석 완료: {state.get('input', '')}"}


def build_analysis_graph() -> StateGraph:
    graph = StateGraph(AnalysisState)
    graph.add_node("analyze", analyze)
    graph.set_entry_point("analyze")
    graph.add_edge("analyze", END)
    return graph


# 메모리가 포함된 에이전트 생성
agent = GraphAgent(
    name="assistant",
    graph=build_analysis_graph(),
    memory_path="./memory",
    session_id="user_123",
)

# 메타데이터 설정
agent.set_memory_metadata("last_analysis_time", "2024-01-15T10:30:00Z")
agent.set_memory_metadata("user_preferences", {
    "risk_tolerance": "medium",
    "favorite_tokens": ["BTC", "ETH", "SOL"]
})

# 메타데이터 가져오기
last_time = agent.get_memory_metadata("last_analysis_time")
prefs = agent.get_memory_metadata("user_preferences")

# 통계 가져오기
stats = agent.get_memory_statistics()
print(f"총 메시지 수: {stats['total_messages']}")
print(f"세션 ID: {stats['session_id']}")
print(f"저장 경로: {stats['storage_path']}")
print(f"파일 크기: {stats['file_size']} 바이트")

메모리 검색

from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class AnalysisState(TypedDict, total=False):
    input: str
    output: str


async def analyze(state: AnalysisState) -> dict:
    return {"output": f"분석 완료: {state.get('input', '')}"}


def build_analysis_graph() -> StateGraph:
    graph = StateGraph(AnalysisState)
    graph.add_node("analyze", analyze)
    graph.set_entry_point("analyze")
    graph.add_edge("analyze", END)
    return graph


agent = GraphAgent(
    name="assistant",
    graph=build_analysis_graph(),
    memory_path="./memory",
    session_id="user_123",
)

# 예시 메시지 추가 후 검색.
agent.memory.add_message({"role": "assistant", "content": "여기 짧은 비트코인 분석이 있습니다..."})

matches = agent.search_memory(query="bitcoin analysis", limit=5)
for match in matches:
    print(f"내용: {str(match.get('content', ''))[:100]}...")
    print(f"타임스탬프: {match.get('timestamp')}")
    print("---")

상태 지속성

import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class AgentState(TypedDict, total=False):
    input: str
    favorite_token: str
    output: str


async def process(state: AgentState) -> dict:
    text = (state.get("input") or "").lower()
    if "remember" in text and "favorite token" in text and " is " in text:
        token = state["input"].split(" is ", 1)[-1].strip().upper()
        return {"favorite_token": token, "output": f"알겠습니다. 당신의 최애 토큰이 {token}임을 기억하겠습니다."}
    if "favorite token" in text:
        token = state.get("favorite_token") or "아직 설정되지 않음"
        return {"output": f"당신의 최애 토큰은 {token}입니다."}
    return {"output": f"에코: {state.get('input', '')}"}


graph = StateGraph(AgentState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)

# 에이전트 레이어에서 상태 지속성 활성화.
agent = GraphAgent(
    name="persistent_agent",
    graph=graph,  # 컴파일되지 않은 StateGraph
    preserve_state=True,  # 핵심 설정
    memory_path="./memory",
)


async def main() -> None:
    # 첫 번째 실행
    print(await agent.run("나의 최애 토큰이 SOL임을 기억해줘"))

    # 나중 실행 (상태가 보존됨)
    print(await agent.run("나의 최애 토큰이 뭐야?"))


if __name__ == "__main__":
    asyncio.run(main())

체크포인팅이 포함된 메모리

from spoon_ai.graph import InMemoryCheckpointer
from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent

# 그래프 체크포인팅과 에이전트 메모리 결합
checkpointer = InMemoryCheckpointer(max_checkpoints_per_thread=50)

class MyState(TypedDict, total=False):
    input: str
    output: str


async def process(state: MyState) -> dict:
    return {"output": f"처리됨: {state.get('input', '')}"}


graph = StateGraph(MyState, checkpointer=checkpointer)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)

agent = GraphAgent(
    name="full_memory_agent",
    graph=graph,
    memory_path="./memory",
    preserve_state=True
)

# 이제 다음을 가집니다:
# 1. 그래프 수준의 체크포인트 (실행 내 복구용)
# 2. 에이전트 수준의 메모리 (실행 간 지속성용)
print("체크포인팅 활성화됨:", agent.graph.graph.checkpointer is checkpointer)

모니터링 및 디버깅

실행을 추적하고 문제를 진단합니다.

모니터링 활성화

from typing import TypedDict

from spoon_ai.graph import END, StateGraph


class MonitorState(TypedDict, total=False):
    input: str
    output: str


async def process(state: MonitorState) -> dict:
    return {"output": f"처리됨: {state.get('input', '')}"}


graph = StateGraph(MonitorState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)

graph.enable_monitoring(["execution_time", "success_rate", "routing_performance", "node_stats"])
app = graph.compile()

async def main():
    # 그래프 실행
    result = await app.invoke({"input": "test", "output": ""})
    print(result)

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

실행 지표

import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph


async def main() -> None:
    class MonitorState(TypedDict, total=False):
        input: str
        output: str

    async def process(state: MonitorState) -> dict:
        return {"output": f"처리됨: {state.get('input', '')}"}

    graph = StateGraph(MonitorState)
    graph.enable_monitoring(["execution_time", "success_rate", "routing_performance", "node_stats"])
    graph.add_node("process", process)
    graph.set_entry_point("process")
    graph.add_edge("process", END)

    app = graph.compile()
    initial_state = {"input": "hello", "output": ""}

    # 그래프 실행
    result = await app.invoke(initial_state)
    print(result)

    # 지표 가져오기
    metrics = app.get_execution_metrics()

    print("실행 요약:")
    print(f"  총 실행 수: {metrics['total_executions']}")
    print(f"  성공률: {metrics['success_rate']:.1%}")
    print(f"  평균 실행 시간: {metrics['avg_execution_time']:.3f}s")

    # 노드별 통계
    print("노드별 통계:")
    for node, stats in metrics.get("node_stats", {}).items():
        print(f"  {node}:")
        print(f"    호출 수: {stats['count']}")
        print(f"    평균 시간: {stats['avg_time']:.3f}s")
        print(f"    오류율: {stats['error_rate']:.1%}")


if __name__ == "__main__":
    asyncio.run(main())

실행 기록

import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph


class MonitorState(TypedDict, total=False):
    input: str
    output: str


async def process(state: MonitorState) -> dict:
    return {"output": f"처리됨: {state.get('input', '')}"}


graph = StateGraph(MonitorState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
app = graph.compile()


async def main() -> None:
    await app.invoke({"input": "hello"})
    for step in app.execution_history:
        print(
            f"노드: {step.get('node_name')}\n"
            f"  성공: {step.get('success')}\n"
            f"  실행 시간: {step.get('execution_time', 0.0):.3f}s\n"
        )


if __name__ == "__main__":
    asyncio.run(main())

체크포인트를 사용한 디버깅

# 상세 로깅 활성화
import logging
logging.getLogger("spoon_ai.graph").setLevel(logging.DEBUG)

import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph


async def main() -> None:
    class DebugState(TypedDict, total=False):
        input: str
        output: str

    async def process(state: DebugState) -> dict:
        return {"output": f"처리됨: {state.get('input', '')}"}

    graph = StateGraph(DebugState)
    graph.add_node("process", process)
    graph.set_entry_point("process")
    graph.add_edge("process", END)
    app = graph.compile()
    initial_state = {"input": "hello"}

    # 체크포인팅과 함께 실행
    result = await app.invoke(
        initial_state,
        config={"configurable": {"thread_id": "debug_session"}},
    )
    print(result)

    # 체크포인트 기록 검사
    config = {"configurable": {"thread_id": "debug_session"}}
    for checkpoint in graph.get_state_history(config):
        print(f"노드 실행 후: {checkpoint.metadata.get('node')}")
        print(f"상태: {checkpoint.values}")
        print("---")


if __name__ == "__main__":
    asyncio.run(main())

실시간 모니터링을 위한 스트리밍

# 실시간 업데이트를 위한 실행 스트리밍
import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph


async def main() -> None:
    class StreamState(TypedDict, total=False):
        input: str
        output: str

    async def process(state: StreamState) -> dict:
        return {"output": f"처리됨: {state.get('input', '')}"}

    graph = StateGraph(StreamState)
    graph.add_node("process", process)
    graph.set_entry_point("process")
    graph.add_edge("process", END)
    app = graph.compile()
    initial_state = {"input": "hello"}

    async for update in app.stream(initial_state):
        node = update.get("__node__", "unknown")
        print(f"[{node}] 상태 업데이트: {list(update.keys())}")

        # 특정 조건 확인
        if update.get("error"):
            print(f"  오류: {update['error']}")

        if update.get("confidence", 0) < 0.5:
            print(f"  낮은 신뢰도: {update.get('confidence')}")


if __name__ == "__main__":
    asyncio.run(main())

통합 패턴

패턴 1: 풀스택 에이전트

import asyncio
from typing import Any, TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent, InMemoryCheckpointer

try:
    # 선택 사항: 툴킷 기반 마켓 데이터 도구
    from spoon_toolkits.crypto.crypto_powerdata.tools import CryptoPowerDataCEXTool
except Exception:  # pragma: no cover - 선택적 종속성
    CryptoPowerDataCEXTool = None


class FullStackState(TypedDict, total=False):
    input: str
    symbol: str
    price_data: Any
    output: str


async def extract_symbol(state: FullStackState) -> dict:
    text = (state.get("input") or "").upper()
    if "ETH" in text:
        symbol = "ETH"
    elif "SOL" in text:
        symbol = "SOL"
    else:
        symbol = "BTC"
    return {"symbol": symbol}


async def fetch_price(state: FullStackState) -> dict:
    if CryptoPowerDataCEXTool is None:
        return {"output": "건너뜀: spoon_toolkits가 설치되지 않음"}

    tool = CryptoPowerDataCEXTool()
    result = await tool.execute(
        exchange="binance",
        symbol=f"{state.get('symbol', 'BTC')}/USDT",
        timeframe="1h",
        limit=10,
    )
    if getattr(result, "error", None):
        return {"output": f"도구 오류: {result.error}", "price_data": {}}
    return {"price_data": result.output}


async def format_output(state: FullStackState) -> dict:
    if state.get("output"):
        return {"output": state["output"]}
    preview = str(state.get("price_data", {}))[:200]
    return {"output": f"{state.get('symbol')}의 가격 데이터를 가져왔습니다: {preview}..."}


def build_graph() -> StateGraph:
    graph = StateGraph(FullStackState, checkpointer=InMemoryCheckpointer())
    graph.enable_monitoring(["execution_time", "node_stats"])

    graph.add_node("extract_symbol", extract_symbol)
    graph.add_node("fetch_price", fetch_price)
    graph.add_node("format_output", format_output)

    graph.set_entry_point("extract_symbol")
    graph.add_edge("extract_symbol", "fetch_price")
    graph.add_edge("fetch_price", "format_output")
    graph.add_edge("format_output", END)
    return graph


async def main() -> None:
    agent = GraphAgent(
        name="full_stack_agent",
        graph=build_graph(),  # 중요: 컴파일되지 않은 StateGraph를 전달
        memory_path="./memory",
        preserve_state=True,
    )

    result = await agent.run("BTC 분석해줘")
    print(result)
    print(agent.graph.get_execution_metrics())


if __name__ == "__main__":
    asyncio.run(main())

패턴 2: 멀티 에이전트 핸드오프

import asyncio
from typing import TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class HandoffState(TypedDict, total=False):
    input: str
    output: str


def build_research_graph() -> StateGraph:
    graph = StateGraph(HandoffState)

    async def research(state: HandoffState) -> dict:
        return {"output": f"연구 결과: {state.get('input', '')}"}

    graph.add_node("research", research)
    graph.set_entry_point("research")
    graph.add_edge("research", END)
    return graph


def build_analysis_graph() -> StateGraph:
    graph = StateGraph(HandoffState)

    async def analyze(state: HandoffState) -> dict:
        return {"output": f"분석: {state.get('input', '')}"}

    graph.add_node("analyze", analyze)
    graph.set_entry_point("analyze")
    graph.add_edge("analyze", END)
    return graph


def build_execution_graph() -> StateGraph:
    graph = StateGraph(HandoffState)

    async def execute(state: HandoffState) -> dict:
        return {"output": f"실행 계획: {state.get('input', '')}"}

    graph.add_node("execute", execute)
    graph.set_entry_point("execute")
    graph.add_edge("execute", END)
    return graph


async def main() -> None:
    # 에이전트는 격리되어 있습니다. 입력을 통해 명시적으로 결과를 전달합니다.
    session_id = "handoff_session"
    memory_path = "./memory"

    research_agent = GraphAgent(
        name="researcher",
        graph=build_research_graph(),
        session_id=session_id,
        memory_path=memory_path,
    )

    analysis_agent = GraphAgent(
        name="analyst",
        graph=build_analysis_graph(),
        session_id=session_id,
        memory_path=memory_path,
    )

    execution_agent = GraphAgent(
        name="executor",
        graph=build_execution_graph(),
        session_id=session_id,
        memory_path=memory_path,
    )

    research_result = await research_agent.run("BTC 시장 연구")
    analysis_result = await analysis_agent.run(f"분석: {research_result}")
    execution_result = await execution_agent.run(f"실행: {analysis_result}")

    print(research_result)
    print(analysis_result)
    print(execution_result)


if __name__ == "__main__":
    asyncio.run(main())

패턴 3: 이벤트 기반 그래프

import asyncio
import json
from typing import Any, Dict, TypedDict

from spoon_ai.graph import END, StateGraph, GraphAgent


class EventState(TypedDict, total=False):
    input: str
    event_type: str
    event_data: Dict[str, Any]
    output: str


def build_event_graph() -> StateGraph:
    graph = StateGraph(EventState)

    async def parse_event(state: EventState) -> dict:
        payload = json.loads(state.get("input", "{}"))
        return {"event_type": payload.get("event_type", ""), "event_data": payload.get("data", {})}

    async def process_event(state: EventState) -> dict:
        return {"output": f"{state.get('event_type')} 처리됨: {state.get('event_data')}"}

    graph.add_node("parse_event", parse_event)
    graph.add_node("process_event", process_event)
    graph.set_entry_point("parse_event")
    graph.add_edge("parse_event", "process_event")
    graph.add_edge("process_event", END)
    return graph


async def handle_event(event_type: str, data: dict) -> str:
    agent = GraphAgent(
        name="event_processor",
        graph=build_event_graph(),
        session_id=f"event_{event_type}",
        memory_path="./memory",
    )
    payload = json.dumps({"event_type": event_type, "data": data})
    return await agent.run(payload)


async def main() -> None:
    print(await handle_event("price_alert", {"symbol": "BTC", "price": 50000}))
    print(await handle_event("trade_signal", {"symbol": "ETH", "side": "buy"}))


if __name__ == "__main__":
    asyncio.run(main())
profile
스마트 이코노미를 위한 퍼블릭 블록체인, 네오에 대한 모든것

0개의 댓글