그래프 시스템(Graph System)은 광범위한 SpoonOS 에코시스템과 원활하게 통합됩니다. 이 가이드는 그래프를 에이전트, 도구, MCP 서버 및 메모리 시스템과 연결하는 방법을 다룹니다.
GraphAgent는 그래프 실행을 SpoonOS 에이전트 라이프사이클, 영구 메모리 및 세션 관리와 함께 래핑합니다.
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class AnalysisState(TypedDict, total=False):
input: str
output: str
async def analyze(state: AnalysisState) -> dict:
return {"output": f"분석 완료: {state.get('input', '')}"}
def build_analysis_graph() -> StateGraph:
graph = StateGraph(AnalysisState)
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
return graph # 중요: 컴파일되지 않은 StateGraph를 반환합니다.
async def main():
agent = GraphAgent(
name="crypto_analyzer",
graph=build_analysis_graph(),
memory_path="./agent_memory",
session_id="user_123_session",
preserve_state=True, # 실행 간 상태 보존
)
result = await agent.run("BTC 가격 동향 분석")
print(result)
if __name__ == "__main__":
asyncio.run(main())
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class AnalysisState(TypedDict, total=False):
input: str
output: str
async def analyze(state: AnalysisState) -> dict:
return {"output": f"분석 완료: {state.get('input', '')}"}
def build_analysis_graph() -> StateGraph:
graph = StateGraph(AnalysisState)
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
return graph # 중요: 컴파일되지 않은 StateGraph
agent = GraphAgent(
name="trading_assistant",
graph=build_analysis_graph(), # StateGraph (컴파일되지 않음)
preserve_state=True, # 실행 간 상태 유지
memory_path="./memory", # 메모리 저장 디렉토리
session_id="session_abc123", # 고유 세션 식별자
max_metadata_size=1024, # 선택 사항: 저장되는 메타데이터 크기 제한
)
print(f"설정된 에이전트 세션: {agent.memory.session_id}")
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
async def main() -> None:
class AnalysisState(TypedDict, total=False):
input: str
output: str
async def analyze(state: AnalysisState) -> dict:
return {"output": f"분석 완료: {state.get('input', '')}"}
graph = StateGraph(AnalysisState)
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
agent = GraphAgent(
name="trading_assistant",
graph=graph, # 컴파일되지 않은 StateGraph
memory_path="./memory",
session_id="session_abc123",
preserve_state=True,
)
# 에이전트 실행
result = await agent.run("BTC 전망은 어때?")
print(result)
# 실행 메타데이터 액세스
metadata = agent.get_execution_metadata()
print(f"성공 여부: {metadata.get('execution_successful')}")
print(f"마지막 요청: {metadata.get('last_request')}")
print(f"타임스탬프: {metadata.get('execution_time')}")
if __name__ == "__main__":
asyncio.run(main())
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class AnalysisState(TypedDict, total=False):
input: str
output: str
async def analyze(state: AnalysisState) -> dict:
return {"output": f"분석 완료: {state.get('input', '')}"}
def build_analysis_graph() -> StateGraph:
graph = StateGraph(AnalysisState)
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
return graph
agent = GraphAgent(
name="session_manager",
graph=build_analysis_graph(),
memory_path="./memory",
session_id="user_123_session",
)
# 현재 세션
print(f"현재 세션: {agent.memory.session_id}")
# 세션 전환
agent.load_session("user_456_session")
print(f"현재 세션: {agent.memory.session_id}")
외부 기능을 사용하기 위해 그래프 노드 내에서 SpoonOS 도구를 사용하십시오.
from typing import Any, TypedDict
from spoon_toolkits.crypto.crypto_powerdata.tools import CryptoPowerDataCEXTool
class MarketState(TypedDict, total=False):
symbol: str
market_data: Any
data_source: str
tool_error: str
async def fetch_market_data(state: MarketState) -> dict:
"""CryptoPowerData 도구를 사용하는 노드."""
symbol = state.get("symbol", "BTC")
tool = CryptoPowerDataCEXTool()
result = await tool.execute(
exchange="binance",
symbol=f"{symbol}/USDT",
timeframe="1h",
limit=24,
)
if getattr(result, "error", None):
return {"market_data": {}, "data_source": "binance", "tool_error": result.error}
return {"market_data": result.output, "data_source": "binance"}
import asyncio
import os
from typing import Any, TypedDict
try:
from spoon_toolkits.crypto.crypto_data_tools.price_data import GetTokenPriceTool
except Exception: # pragma: no cover - 선택적 종속성
GetTokenPriceTool = None
class ProcessState(TypedDict, total=False):
symbol: str
tool_status: str
tool_error: str
tool_output: Any
async def process_with_tool(state: ProcessState) -> dict:
"""오류 처리가 포함된 강력한 도구 사용."""
if os.getenv("DOC_SNIPPET_MODE") == "1" or GetTokenPriceTool is None:
return {"tool_status": "skipped", "tool_output": {"reason": "DOC_SNIPPET_MODE 또는 툴킷 누락"}}
tool = GetTokenPriceTool()
try:
result = await tool.execute(symbol=state.get("symbol", "ETH-USDC"))
if getattr(result, "error", None):
return {"tool_status": "error", "tool_error": result.error}
return {"tool_status": "success", "tool_output": result.output}
except Exception as e:
return {"tool_status": "error", "tool_error": str(e)}
async def main() -> None:
print(await process_with_tool({"symbol": "ETH-USDC"}))
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import os
from typing import Any, Dict, TypedDict
try:
from spoon_toolkits.crypto.crypto_powerdata.tools import CryptoPowerDataCEXTool
except Exception: # pragma: no cover - 선택적 종속성
CryptoPowerDataCEXTool = None
try:
from spoon_toolkits.crypto.crypto_data_tools.price_data import GetTokenPriceTool
except Exception: # pragma: no cover - 선택적 종속성
GetTokenPriceTool = None
class AnalysisState(TypedDict, total=False):
symbol: str
price_data: Any
dex_price: Any
async def comprehensive_analysis(state: AnalysisState) -> dict:
"""여러 도구를 조율하는 노드."""
symbol = state.get("symbol", "BTC")
if os.getenv("DOC_SNIPPET_MODE") == "1" or CryptoPowerDataCEXTool is None or GetTokenPriceTool is None:
return {
"price_data": {"source": "stub", "symbol": symbol},
"dex_price": {"source": "stub", "symbol": symbol},
}
results: Dict[str, Any] = {}
# 도구 1: 가격 데이터
price_tool = CryptoPowerDataCEXTool()
price_data = await price_tool.execute(
exchange="binance",
symbol=f"{symbol}/USDT",
timeframe="1h",
limit=24,
)
results["price_data"] = price_data.output if not getattr(price_data, "error", None) else {"error": price_data.error}
# 도구 2: DEX 스팟 가격 스냅샷
dex_tool = GetTokenPriceTool()
dex_price = await dex_tool.execute(symbol=f"{symbol}-USDC", exchange="uniswap")
results["dex_price"] = dex_price.output if not getattr(dex_price, "error", None) else {"error": dex_price.error}
return results
async def main() -> None:
result = await comprehensive_analysis({"symbol": "BTC"})
print(result)
if __name__ == "__main__":
asyncio.run(main())
동적 도구 검색을 위해 MCP(Model Context Protocol) 서버에 연결하십시오.
graph LR
A[그래프 노드] --> B[MCP 클라이언트]
B --> C[MCP 서버]
C --> D[외부 도구]
C --> E[데이터 소스]
C --> F[API]
import os
from typing import Any, TypedDict
from spoon_ai.tools.mcp_tool import MCPTool
class SearchState(TypedDict, total=False):
query: str
search_results: Any
tavily_tool = MCPTool(
name="tavily-search",
description="Tavily를 통한 웹 검색",
mcp_config={
"command": "npx",
"args": ["--yes", "tavily-mcp"],
"env": {"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY", "")},
},
)
async def mcp_search_node(state: SearchState) -> dict:
"""MCP 도구를 호출하는 노드."""
if not os.getenv("TAVILY_API_KEY"):
return {"search_results": "건너뜀: TAVILY_API_KEY가 설정되지 않음"}
result = await tavily_tool.execute(query=state.get("query", ""), max_results=5)
return {"search_results": result}
async def main():
# MCP 도구 노드 테스트
result = await mcp_search_node({"query": "Bitcoin price"})
print(f"\n쿼리: Bitcoin price")
print(f"결과: {result.get('search_results', '결과 없음')}")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
import os
import asyncio
from typing import TypedDict
from spoon_ai.graph.builder import HighLevelGraphAPI
from spoon_ai.graph.mcp_integration import MCPToolSpec
class MyState(TypedDict, total=False):
user_query: str
async def main():
api = HighLevelGraphAPI(MyState)
# TAVILY_API_KEY 설정 확인
tavily_key = os.getenv("TAVILY_API_KEY", "").strip()
if not tavily_key or "..." in tavily_key:
print("참고: TAVILY_API_KEY가 설정되지 않았습니다. MCP 도구 등록을 건너뜁니다.")
print("이 예제를 사용하려면 TAVILY_API_KEY 환경 변수를 설정하십시오.")
return
# HighLevelGraphAPI를 사용하여 MCP 도구 등록
api.register_mcp_tool(
intent_category="research",
spec=MCPToolSpec(name="tavily-search"),
config={
"command": "npx",
"args": ["--yes", "tavily-mcp"],
"env": {"TAVILY_API_KEY": tavily_key},
},
)
# 도구 인스턴스 생성
tool = api.create_mcp_tool("tavily-search")
if tool:
result = await tool.execute(query="Bitcoin price", max_results=3)
print(f"검색 결과: {len(str(result))} 자 반환됨")
else:
print("MCP 도구 생성 실패. 설정을 확인하십시오.")
if __name__ == "__main__":
asyncio.run(main())
다음을 위해 전용 MCP 프로토콜 가이드를 참조하십시오.
세션 간에 상태와 대화 기록을 유지합니다.
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class AnalysisState(TypedDict, total=False):
input: str
output: str
async def analyze(state: AnalysisState) -> dict:
return {"output": f"분석 완료: {state.get('input', '')}"}
def build_analysis_graph() -> StateGraph:
graph = StateGraph(AnalysisState)
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
return graph
# 메모리가 포함된 에이전트 생성
agent = GraphAgent(
name="assistant",
graph=build_analysis_graph(),
memory_path="./memory",
session_id="user_123",
)
# 메타데이터 설정
agent.set_memory_metadata("last_analysis_time", "2024-01-15T10:30:00Z")
agent.set_memory_metadata("user_preferences", {
"risk_tolerance": "medium",
"favorite_tokens": ["BTC", "ETH", "SOL"]
})
# 메타데이터 가져오기
last_time = agent.get_memory_metadata("last_analysis_time")
prefs = agent.get_memory_metadata("user_preferences")
# 통계 가져오기
stats = agent.get_memory_statistics()
print(f"총 메시지 수: {stats['total_messages']}")
print(f"세션 ID: {stats['session_id']}")
print(f"저장 경로: {stats['storage_path']}")
print(f"파일 크기: {stats['file_size']} 바이트")
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class AnalysisState(TypedDict, total=False):
input: str
output: str
async def analyze(state: AnalysisState) -> dict:
return {"output": f"분석 완료: {state.get('input', '')}"}
def build_analysis_graph() -> StateGraph:
graph = StateGraph(AnalysisState)
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
return graph
agent = GraphAgent(
name="assistant",
graph=build_analysis_graph(),
memory_path="./memory",
session_id="user_123",
)
# 예시 메시지 추가 후 검색.
agent.memory.add_message({"role": "assistant", "content": "여기 짧은 비트코인 분석이 있습니다..."})
matches = agent.search_memory(query="bitcoin analysis", limit=5)
for match in matches:
print(f"내용: {str(match.get('content', ''))[:100]}...")
print(f"타임스탬프: {match.get('timestamp')}")
print("---")
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class AgentState(TypedDict, total=False):
input: str
favorite_token: str
output: str
async def process(state: AgentState) -> dict:
text = (state.get("input") or "").lower()
if "remember" in text and "favorite token" in text and " is " in text:
token = state["input"].split(" is ", 1)[-1].strip().upper()
return {"favorite_token": token, "output": f"알겠습니다. 당신의 최애 토큰이 {token}임을 기억하겠습니다."}
if "favorite token" in text:
token = state.get("favorite_token") or "아직 설정되지 않음"
return {"output": f"당신의 최애 토큰은 {token}입니다."}
return {"output": f"에코: {state.get('input', '')}"}
graph = StateGraph(AgentState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
# 에이전트 레이어에서 상태 지속성 활성화.
agent = GraphAgent(
name="persistent_agent",
graph=graph, # 컴파일되지 않은 StateGraph
preserve_state=True, # 핵심 설정
memory_path="./memory",
)
async def main() -> None:
# 첫 번째 실행
print(await agent.run("나의 최애 토큰이 SOL임을 기억해줘"))
# 나중 실행 (상태가 보존됨)
print(await agent.run("나의 최애 토큰이 뭐야?"))
if __name__ == "__main__":
asyncio.run(main())
from spoon_ai.graph import InMemoryCheckpointer
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
# 그래프 체크포인팅과 에이전트 메모리 결합
checkpointer = InMemoryCheckpointer(max_checkpoints_per_thread=50)
class MyState(TypedDict, total=False):
input: str
output: str
async def process(state: MyState) -> dict:
return {"output": f"처리됨: {state.get('input', '')}"}
graph = StateGraph(MyState, checkpointer=checkpointer)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
agent = GraphAgent(
name="full_memory_agent",
graph=graph,
memory_path="./memory",
preserve_state=True
)
# 이제 다음을 가집니다:
# 1. 그래프 수준의 체크포인트 (실행 내 복구용)
# 2. 에이전트 수준의 메모리 (실행 간 지속성용)
print("체크포인팅 활성화됨:", agent.graph.graph.checkpointer is checkpointer)
실행을 추적하고 문제를 진단합니다.
from typing import TypedDict
from spoon_ai.graph import END, StateGraph
class MonitorState(TypedDict, total=False):
input: str
output: str
async def process(state: MonitorState) -> dict:
return {"output": f"처리됨: {state.get('input', '')}"}
graph = StateGraph(MonitorState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
graph.enable_monitoring(["execution_time", "success_rate", "routing_performance", "node_stats"])
app = graph.compile()
async def main():
# 그래프 실행
result = await app.invoke({"input": "test", "output": ""})
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph
async def main() -> None:
class MonitorState(TypedDict, total=False):
input: str
output: str
async def process(state: MonitorState) -> dict:
return {"output": f"처리됨: {state.get('input', '')}"}
graph = StateGraph(MonitorState)
graph.enable_monitoring(["execution_time", "success_rate", "routing_performance", "node_stats"])
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
app = graph.compile()
initial_state = {"input": "hello", "output": ""}
# 그래프 실행
result = await app.invoke(initial_state)
print(result)
# 지표 가져오기
metrics = app.get_execution_metrics()
print("실행 요약:")
print(f" 총 실행 수: {metrics['total_executions']}")
print(f" 성공률: {metrics['success_rate']:.1%}")
print(f" 평균 실행 시간: {metrics['avg_execution_time']:.3f}s")
# 노드별 통계
print("노드별 통계:")
for node, stats in metrics.get("node_stats", {}).items():
print(f" {node}:")
print(f" 호출 수: {stats['count']}")
print(f" 평균 시간: {stats['avg_time']:.3f}s")
print(f" 오류율: {stats['error_rate']:.1%}")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph
class MonitorState(TypedDict, total=False):
input: str
output: str
async def process(state: MonitorState) -> dict:
return {"output": f"처리됨: {state.get('input', '')}"}
graph = StateGraph(MonitorState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
app = graph.compile()
async def main() -> None:
await app.invoke({"input": "hello"})
for step in app.execution_history:
print(
f"노드: {step.get('node_name')}\n"
f" 성공: {step.get('success')}\n"
f" 실행 시간: {step.get('execution_time', 0.0):.3f}s\n"
)
if __name__ == "__main__":
asyncio.run(main())
# 상세 로깅 활성화
import logging
logging.getLogger("spoon_ai.graph").setLevel(logging.DEBUG)
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph
async def main() -> None:
class DebugState(TypedDict, total=False):
input: str
output: str
async def process(state: DebugState) -> dict:
return {"output": f"처리됨: {state.get('input', '')}"}
graph = StateGraph(DebugState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
app = graph.compile()
initial_state = {"input": "hello"}
# 체크포인팅과 함께 실행
result = await app.invoke(
initial_state,
config={"configurable": {"thread_id": "debug_session"}},
)
print(result)
# 체크포인트 기록 검사
config = {"configurable": {"thread_id": "debug_session"}}
for checkpoint in graph.get_state_history(config):
print(f"노드 실행 후: {checkpoint.metadata.get('node')}")
print(f"상태: {checkpoint.values}")
print("---")
if __name__ == "__main__":
asyncio.run(main())
# 실시간 업데이트를 위한 실행 스트리밍
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph
async def main() -> None:
class StreamState(TypedDict, total=False):
input: str
output: str
async def process(state: StreamState) -> dict:
return {"output": f"처리됨: {state.get('input', '')}"}
graph = StateGraph(StreamState)
graph.add_node("process", process)
graph.set_entry_point("process")
graph.add_edge("process", END)
app = graph.compile()
initial_state = {"input": "hello"}
async for update in app.stream(initial_state):
node = update.get("__node__", "unknown")
print(f"[{node}] 상태 업데이트: {list(update.keys())}")
# 특정 조건 확인
if update.get("error"):
print(f" 오류: {update['error']}")
if update.get("confidence", 0) < 0.5:
print(f" 낮은 신뢰도: {update.get('confidence')}")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from typing import Any, TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent, InMemoryCheckpointer
try:
# 선택 사항: 툴킷 기반 마켓 데이터 도구
from spoon_toolkits.crypto.crypto_powerdata.tools import CryptoPowerDataCEXTool
except Exception: # pragma: no cover - 선택적 종속성
CryptoPowerDataCEXTool = None
class FullStackState(TypedDict, total=False):
input: str
symbol: str
price_data: Any
output: str
async def extract_symbol(state: FullStackState) -> dict:
text = (state.get("input") or "").upper()
if "ETH" in text:
symbol = "ETH"
elif "SOL" in text:
symbol = "SOL"
else:
symbol = "BTC"
return {"symbol": symbol}
async def fetch_price(state: FullStackState) -> dict:
if CryptoPowerDataCEXTool is None:
return {"output": "건너뜀: spoon_toolkits가 설치되지 않음"}
tool = CryptoPowerDataCEXTool()
result = await tool.execute(
exchange="binance",
symbol=f"{state.get('symbol', 'BTC')}/USDT",
timeframe="1h",
limit=10,
)
if getattr(result, "error", None):
return {"output": f"도구 오류: {result.error}", "price_data": {}}
return {"price_data": result.output}
async def format_output(state: FullStackState) -> dict:
if state.get("output"):
return {"output": state["output"]}
preview = str(state.get("price_data", {}))[:200]
return {"output": f"{state.get('symbol')}의 가격 데이터를 가져왔습니다: {preview}..."}
def build_graph() -> StateGraph:
graph = StateGraph(FullStackState, checkpointer=InMemoryCheckpointer())
graph.enable_monitoring(["execution_time", "node_stats"])
graph.add_node("extract_symbol", extract_symbol)
graph.add_node("fetch_price", fetch_price)
graph.add_node("format_output", format_output)
graph.set_entry_point("extract_symbol")
graph.add_edge("extract_symbol", "fetch_price")
graph.add_edge("fetch_price", "format_output")
graph.add_edge("format_output", END)
return graph
async def main() -> None:
agent = GraphAgent(
name="full_stack_agent",
graph=build_graph(), # 중요: 컴파일되지 않은 StateGraph를 전달
memory_path="./memory",
preserve_state=True,
)
result = await agent.run("BTC 분석해줘")
print(result)
print(agent.graph.get_execution_metrics())
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from typing import TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class HandoffState(TypedDict, total=False):
input: str
output: str
def build_research_graph() -> StateGraph:
graph = StateGraph(HandoffState)
async def research(state: HandoffState) -> dict:
return {"output": f"연구 결과: {state.get('input', '')}"}
graph.add_node("research", research)
graph.set_entry_point("research")
graph.add_edge("research", END)
return graph
def build_analysis_graph() -> StateGraph:
graph = StateGraph(HandoffState)
async def analyze(state: HandoffState) -> dict:
return {"output": f"분석: {state.get('input', '')}"}
graph.add_node("analyze", analyze)
graph.set_entry_point("analyze")
graph.add_edge("analyze", END)
return graph
def build_execution_graph() -> StateGraph:
graph = StateGraph(HandoffState)
async def execute(state: HandoffState) -> dict:
return {"output": f"실행 계획: {state.get('input', '')}"}
graph.add_node("execute", execute)
graph.set_entry_point("execute")
graph.add_edge("execute", END)
return graph
async def main() -> None:
# 에이전트는 격리되어 있습니다. 입력을 통해 명시적으로 결과를 전달합니다.
session_id = "handoff_session"
memory_path = "./memory"
research_agent = GraphAgent(
name="researcher",
graph=build_research_graph(),
session_id=session_id,
memory_path=memory_path,
)
analysis_agent = GraphAgent(
name="analyst",
graph=build_analysis_graph(),
session_id=session_id,
memory_path=memory_path,
)
execution_agent = GraphAgent(
name="executor",
graph=build_execution_graph(),
session_id=session_id,
memory_path=memory_path,
)
research_result = await research_agent.run("BTC 시장 연구")
analysis_result = await analysis_agent.run(f"분석: {research_result}")
execution_result = await execution_agent.run(f"실행: {analysis_result}")
print(research_result)
print(analysis_result)
print(execution_result)
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import json
from typing import Any, Dict, TypedDict
from spoon_ai.graph import END, StateGraph, GraphAgent
class EventState(TypedDict, total=False):
input: str
event_type: str
event_data: Dict[str, Any]
output: str
def build_event_graph() -> StateGraph:
graph = StateGraph(EventState)
async def parse_event(state: EventState) -> dict:
payload = json.loads(state.get("input", "{}"))
return {"event_type": payload.get("event_type", ""), "event_data": payload.get("data", {})}
async def process_event(state: EventState) -> dict:
return {"output": f"{state.get('event_type')} 처리됨: {state.get('event_data')}"}
graph.add_node("parse_event", parse_event)
graph.add_node("process_event", process_event)
graph.set_entry_point("parse_event")
graph.add_edge("parse_event", "process_event")
graph.add_edge("process_event", END)
return graph
async def handle_event(event_type: str, data: dict) -> str:
agent = GraphAgent(
name="event_processor",
graph=build_event_graph(),
session_id=f"event_{event_type}",
memory_path="./memory",
)
payload = json.dumps({"event_type": event_type, "data": data})
return await agent.run(payload)
async def main() -> None:
print(await handle_event("price_alert", {"symbol": "BTC", "price": 50000}))
print(await handle_event("trade_signal", {"symbol": "ETH", "side": "buy"}))
if __name__ == "__main__":
asyncio.run(main())