Graph System에 LLM을 연동하면 어떤 걸 만들 수 있을까요?
이번 글에서는 실제로 돌려볼 수 있는 5가지 예제를 통해 핵심 패턴들을 살펴보겠습니다.
모든 예제는 고정된 출력이 아닌 실제 LLM 호출을 사용합니다.
사용자의 질문을 LLM이 의도(intent)별로 분류하고, 각 의도에 맞는 전문 핸들러로 라우팅하는 패턴입니다.
"""
LLM 인텐트 라우터 예제
다루는 내용:
- LLM 기반 의도 분류
- LLM 출력에 따른 조건부 라우팅
- 의도별 전문 핸들러 노드
"""
import asyncio
import json
from typing import TypedDict
from spoon_ai.graph import StateGraph, END
from spoon_ai.llm import LLMManager
from spoon_ai.schema import Message
class RouterState(TypedDict):
query: str
intent: str
confidence: float
result: str
llm = LLMManager()
async def classify_intent(state: RouterState) -> dict:
"""LLM이 사용자의 의도를 분류하고 신뢰도 점수를 반환합니다."""
response = await llm.chat([
Message(role="system", content="""You are an intent classifier for a crypto assistant.
Classify the user query into one of these categories:
- price: asking about cryptocurrency prices
- news: asking about crypto news or updates
- analysis: requesting market analysis or trends
- general: other questions
Respond with JSON only: {"intent": "category", "confidence": 0.0-1.0}"""),
Message(role="user", content=state["query"])
])
try:
result = json.loads(response.content)
return {
"intent": result.get("intent", "general"),
"confidence": result.get("confidence", 0.5)
}
except json.JSONDecodeError:
return {"intent": "general", "confidence": 0.5}
async def handle_price(state: RouterState) -> dict:
"""가격 관련 응답을 생성합니다."""
response = await llm.chat([
Message(role="system", content="""You are a crypto price expert.
Provide helpful information about cryptocurrency prices.
Be concise and include relevant data points."""),
Message(role="user", content=state["query"])
])
return {"result": response.content}
async def handle_news(state: RouterState) -> dict:
"""뉴스 관련 응답을 생성합니다."""
response = await llm.chat([
Message(role="system", content="""You are a crypto news analyst.
Summarize relevant cryptocurrency news and updates.
Focus on recent developments and their implications."""),
Message(role="user", content=state["query"])
])
return {"result": response.content}
async def handle_analysis(state: RouterState) -> dict:
"""시장 분석 응답을 생성합니다."""
response = await llm.chat([
Message(role="system", content="""You are a crypto market analyst.
Provide detailed market analysis including:
- Current trends
- Technical indicators
- Risk assessment
Be analytical and data-driven."""),
Message(role="user", content=state["query"])
])
return {"result": response.content}
async def handle_general(state: RouterState) -> dict:
"""일반 질문을 처리합니다."""
response = await llm.chat([
Message(role="system", content="""You are a helpful crypto assistant.
Answer questions clearly and accurately.
If you're unsure, say so."""),
Message(role="user", content=state["query"])
])
return {"result": response.content}
def route_by_intent(state: RouterState) -> str:
"""분류된 의도에 따라 라우팅합니다."""
return state.get("intent", "general")
# 그래프 구성
graph = StateGraph(RouterState)
graph.add_node("classify", classify_intent)
graph.add_node("price_handler", handle_price)
graph.add_node("news_handler", handle_news)
graph.add_node("analysis_handler", handle_analysis)
graph.add_node("general_handler", handle_general)
graph.set_entry_point("classify")
graph.add_conditional_edges(
"classify",
route_by_intent,
{
"price": "price_handler",
"news": "news_handler",
"analysis": "analysis_handler",
"general": "general_handler",
}
)
graph.add_edge("price_handler", END)
graph.add_edge("news_handler", END)
graph.add_edge("analysis_handler", END)
graph.add_edge("general_handler", END)
app = graph.compile()
async def main():
"""다양한 쿼리로 LLM 라우터를 테스트합니다."""
test_queries = [
"What is the current price of Bitcoin?",
"Any news about Ethereum updates?",
"Analyze the SOL market trend",
"What is a blockchain?",
]
for query in test_queries:
print(f"\n{'='*60}")
print(f"Query: {query}")
print('='*60)
result = await app.invoke({
"query": query,
"intent": "",
"confidence": 0.0,
"result": ""
})
print(f"Intent: {result['intent']} (confidence: {result['confidence']:.0%})")
print(f"\nResponse:\n{result['result']}")
return True
if __name__ == "__main__":
asyncio.run(main())
여러 LLM 호출을 순차적으로 연결해서, 컨텍스트를 점점 쌓아가며 깊이 있는 분석을 수행하는 패턴입니다.
"""
다단계 LLM 분석 파이프라인
다루는 내용:
- 컨텍스트를 누적하는 순차 LLM 호출
- 노드 간 상태 축적
- 분석의 점진적 정제
"""
import asyncio
from typing import TypedDict, List, Dict, Any
from spoon_ai.graph import StateGraph, END
from spoon_ai.llm import LLMManager
from spoon_ai.schema import Message
class AnalysisState(TypedDict):
symbol: str
user_question: str
market_context: str
technical_analysis: str
risk_assessment: str
final_recommendation: str
confidence: float
llm = LLMManager()
async def gather_context(state: AnalysisState) -> dict:
"""해당 심볼의 시장 컨텍스트를 수집합니다."""
response = await llm.chat([
Message(role="system", content="""You are a market context specialist.
Provide relevant market context for the cryptocurrency including:
- Current market sentiment
- Recent price movements
- Key events affecting the asset
Be factual and concise."""),
Message(role="user", content=f"Provide market context for {state['symbol']}")
])
return {"market_context": response.content}
async def analyze_technicals(state: AnalysisState) -> dict:
"""수집된 컨텍스트를 기반으로 기술적 분석을 수행합니다."""
response = await llm.chat([
Message(role="system", content="""You are a technical analyst.
Based on the market context, provide technical analysis including:
- Support and resistance levels
- Key indicators (RSI, MACD trends)
- Chart patterns
Be specific and analytical."""),
Message(role="user", content=f"""
Symbol: {state['symbol']}
Market Context: {state['market_context']}
Provide technical analysis:""")
])
return {"technical_analysis": response.content}
async def assess_risk(state: AnalysisState) -> dict:
"""수집된 모든 정보를 기반으로 리스크를 평가합니다."""
response = await llm.chat([
Message(role="system", content="""You are a risk assessment specialist.
Based on the context and technical analysis, assess:
- Risk level (Low/Medium/High)
- Key risk factors
- Potential downside scenarios
Also provide a confidence score (0-100) for your assessment.
End your response with: Confidence: XX%"""),
Message(role="user", content=f"""
Symbol: {state['symbol']}
Market Context: {state['market_context']}
Technical Analysis: {state['technical_analysis']}
Provide risk assessment:""")
])
# 응답에서 신뢰도 추출
content = response.content
confidence = 0.7 # 기본값
if "Confidence:" in content:
try:
conf_str = content.split("Confidence:")[-1].strip().replace("%", "")
confidence = float(conf_str) / 100
except:
pass
return {
"risk_assessment": content,
"confidence": confidence
}
async def generate_recommendation(state: AnalysisState) -> dict:
"""최종 추천을 생성합니다."""
response = await llm.chat([
Message(role="system", content="""You are a senior investment advisor.
Based on all the analysis, provide a clear recommendation.
Structure your response as:
1. Summary recommendation (Buy/Hold/Sell)
2. Key reasoning points
3. Suggested actions
4. Caveats and disclaimers"""),
Message(role="user", content=f"""
User Question: {state['user_question']}
Symbol: {state['symbol']}
Analysis Summary:
- Market Context: {state['market_context'][:500]}...
- Technical: {state['technical_analysis'][:500]}...
- Risk: {state['risk_assessment'][:500]}...
- Confidence: {state['confidence']:.0%}
Generate final recommendation:""")
])
return {"final_recommendation": response.content}
# 그래프 구성: 컨텍스트 수집 -> 기술 분석 -> 리스크 평가 -> 추천
graph = StateGraph(AnalysisState)
graph.add_node("gather_context", gather_context)
graph.add_node("analyze_technicals", analyze_technicals)
graph.add_node("assess_risk", assess_risk)
graph.add_node("generate_recommendation", generate_recommendation)
graph.set_entry_point("gather_context")
graph.add_edge("gather_context", "analyze_technicals")
graph.add_edge("analyze_technicals", "assess_risk")
graph.add_edge("assess_risk", "generate_recommendation")
graph.add_edge("generate_recommendation", END)
app = graph.compile()
async def main():
"""분석 파이프라인을 실행합니다."""
print("="*60)
print("MULTI-STEP LLM ANALYSIS PIPELINE")
print("="*60)
result = await app.invoke({
"symbol": "BTC",
"user_question": "Should I buy Bitcoin now?",
"market_context": "",
"technical_analysis": "",
"risk_assessment": "",
"final_recommendation": "",
"confidence": 0.0
})
print("\n📊 MARKET CONTEXT:")
print("-"*40)
print(result["market_context"][:500] + "...")
print("\n📈 TECHNICAL ANALYSIS:")
print("-"*40)
print(result["technical_analysis"][:500] + "...")
print("\n⚠️ RISK ASSESSMENT:")
print("-"*40)
print(result["risk_assessment"][:500] + "...")
print(f"\n🎯 CONFIDENCE: {result['confidence']:.0%}")
print("\n💡 FINAL RECOMMENDATION:")
print("-"*40)
print(result["final_recommendation"])
return True
if __name__ == "__main__":
asyncio.run(main())
LLM이 요약을 생성한 뒤, 사람의 승인을 받고 나서 실행하는 워크플로우입니다.
고위험 작업에서 사람의 판단을 끼워넣고 싶을 때 유용합니다.
"""
LLM 기반 사람 승인 워크플로우
다루는 내용:
- LLM이 승인 요약 생성
- 사람의 판단을 위한 인터럽트
- 사용자 입력으로 재개
"""
import asyncio
from typing import TypedDict, Optional, Dict, Any
from spoon_ai.graph import StateGraph, END, interrupt, Command
from spoon_ai.graph import InMemoryCheckpointer
from spoon_ai.llm import LLMManager
from spoon_ai.schema import Message
class ApprovalState(TypedDict):
request_type: str
request_details: Dict[str, Any]
llm_summary: str
risk_level: str
user_approved: Optional[bool]
execution_result: str
llm = LLMManager()
async def analyze_request(state: ApprovalState) -> dict:
"""LLM이 요청을 분석하고 요약을 생성합니다."""
response = await llm.chat([
Message(role="system", content="""You are a request analyzer.
Analyze the request and provide:
1. A clear summary of what will happen
2. Risk level: LOW, MEDIUM, or HIGH
3. Key points to consider
Format your response as:
SUMMARY: [summary]
RISK: [LOW/MEDIUM/HIGH]
CONSIDERATIONS: [key points]"""),
Message(role="user", content=f"""
Request Type: {state['request_type']}
Details: {state['request_details']}
Analyze this request:""")
])
content = response.content
risk_level = "MEDIUM" # 기본값
if "RISK: LOW" in content:
risk_level = "LOW"
elif "RISK: HIGH" in content:
risk_level = "HIGH"
return {
"llm_summary": content,
"risk_level": risk_level
}
async def request_approval(state: ApprovalState) -> dict:
"""사용자 승인을 위해 인터럽트합니다."""
if state.get("user_approved") is not None:
return {} # 이미 결정된 상태
interrupt({
"type": "approval_required",
"summary": state["llm_summary"],
"risk_level": state["risk_level"],
"request_type": state["request_type"],
"details": state["request_details"],
"requires_response": ["user_approved"]
})
return {}
async def execute_request(state: ApprovalState) -> dict:
"""승인 결과에 따라 실행합니다."""
if state.get("user_approved"):
response = await llm.chat([
Message(role="system", content="Generate a confirmation message for the executed request."),
Message(role="user", content=f"""
Request Type: {state['request_type']}
Details: {state['request_details']}
The user approved this request. Generate execution confirmation:""")
])
return {"execution_result": f"✅ APPROVED\n{response.content}"}
else:
response = await llm.chat([
Message(role="system", content="Generate a professional rejection acknowledgment."),
Message(role="user", content=f"The user rejected the {state['request_type']} request.")
])
return {"execution_result": f"❌ REJECTED\n{response.content}"}
# 체크포인터와 함께 그래프 구성
checkpointer = InMemoryCheckpointer()
graph = StateGraph(ApprovalState, checkpointer=checkpointer)
graph.add_node("analyze", analyze_request)
graph.add_node("approve", request_approval)
graph.add_node("execute", execute_request)
graph.set_entry_point("analyze")
graph.add_edge("analyze", "approve")
graph.add_edge("approve", "execute")
graph.add_edge("execute", END)
app = graph.compile()
async def main():
"""승인 워크플로우를 테스트합니다."""
test_requests = [
{"type": "transfer", "details": {"amount": 500, "to": "wallet_abc", "currency": "USDT"}},
{"type": "trade", "details": {"action": "buy", "amount": 0.5, "symbol": "ETH", "price": "market"}},
]
for i, request in enumerate(test_requests):
print(f"\n{'#'*60}")
print(f"Processing Request {i + 1}: {request['type']}")
print('#'*60)
session_id = f"approval_session_{i}"
# 첫 실행 - LLM 분석 후 인터럽트 발생
result = await app.invoke(
{
"request_type": request["type"],
"request_details": request["details"],
"llm_summary": "",
"risk_level": "",
"user_approved": None,
"execution_result": ""
},
config={"configurable": {"thread_id": session_id}}
)
# 인터럽트 확인
if "__interrupt__" in result:
interrupt_data = result["__interrupt__"][0]["value"]
print("\n🔔 APPROVAL REQUIRED")
print("="*50)
print(f"Risk Level: {interrupt_data['risk_level']}")
print(f"\nLLM Analysis:\n{interrupt_data['summary']}")
print("="*50)
# 리스크에 따른 사용자 결정 시뮬레이션
approved = interrupt_data["risk_level"] != "HIGH"
print(f"\nSimulated Decision: {'APPROVED' if approved else 'REJECTED'}")
# 결정을 반영하여 재개
result = await app.invoke(
Command(resume={"user_approved": approved}),
config={"configurable": {"thread_id": session_id}}
)
print(f"\n📋 Final Result:\n{result.get('execution_result', 'Unknown')}")
return True
if __name__ == "__main__":
asyncio.run(main())
interrupt / resume 패턴으로 비동기 사람 상호작용이 가능합니다여러 LLM 호출을 동시에 실행하고, 결과를 모아서 종합하는 패턴입니다.
서로 다른 관점의 분석을 한꺼번에 돌려서 시간도 아끼고 다양성도 확보할 수 있습니다.
"""
병렬 LLM 분석 예제
다루는 내용:
- 여러 LLM 호출의 동시 실행
- 서로 다른 분석 관점
- LLM을 활용한 결과 종합
"""
import asyncio
from typing import TypedDict, Dict, Any
from spoon_ai.graph import StateGraph, END
from spoon_ai.graph.config import ParallelGroupConfig
from spoon_ai.llm import LLMManager
from spoon_ai.schema import Message
class ParallelAnalysisState(TypedDict):
symbol: str
query: str
bullish_view: str
bearish_view: str
neutral_view: str
aggregated_analysis: str
llm = LLMManager()
async def bullish_analyst(state: ParallelAnalysisState) -> dict:
"""강세 관점에서 분석합니다."""
response = await llm.chat([
Message(role="system", content="""You are a bullish crypto analyst.
Find and present the positive aspects and upside potential.
Focus on growth catalysts, adoption metrics, and bullish indicators.
Be optimistic but grounded in facts."""),
Message(role="user", content=f"Analyze {state['symbol']}: {state['query']}")
])
return {"bullish_view": response.content}
async def bearish_analyst(state: ParallelAnalysisState) -> dict:
"""약세 관점에서 분석합니다."""
response = await llm.chat([
Message(role="system", content="""You are a bearish crypto analyst.
Find and present the risks and downside potential.
Focus on threats, competition, and bearish indicators.
Be cautious but fair in your assessment."""),
Message(role="user", content=f"Analyze {state['symbol']}: {state['query']}")
])
return {"bearish_view": response.content}
async def neutral_analyst(state: ParallelAnalysisState) -> dict:
"""중립적 관점에서 균형 잡힌 분석을 제공합니다."""
response = await llm.chat([
Message(role="system", content="""You are a neutral crypto analyst.
Provide balanced analysis considering both sides.
Focus on factual data and objective metrics.
Present pros and cons equally."""),
Message(role="user", content=f"Analyze {state['symbol']}: {state['query']}")
])
return {"neutral_view": response.content}
async def aggregate_analysis(state: ParallelAnalysisState) -> dict:
"""모든 관점을 종합하여 최종 분석을 생성합니다."""
response = await llm.chat([
Message(role="system", content="""You are a senior analyst aggregating multiple perspectives.
Synthesize the bullish, bearish, and neutral views into a comprehensive analysis.
Structure your response:
1. Executive Summary
2. Key Bullish Points
3. Key Bearish Points
4. Balanced Assessment
5. Final Verdict"""),
Message(role="user", content=f"""
Symbol: {state['symbol']}
Query: {state['query']}
BULLISH VIEW:
{state['bullish_view']}
BEARISH VIEW:
{state['bearish_view']}
NEUTRAL VIEW:
{state['neutral_view']}
Synthesize these perspectives:""")
])
return {"aggregated_analysis": response.content}
# 병렬 실행 그래프 구성
graph = StateGraph(ParallelAnalysisState)
graph.add_node("bullish", bullish_analyst)
graph.add_node("bearish", bearish_analyst)
graph.add_node("neutral", neutral_analyst)
graph.add_node("aggregate", aggregate_analysis)
# 병렬 그룹 설정
graph.add_parallel_group(
"perspectives",
nodes=["bullish", "bearish", "neutral"],
config=ParallelGroupConfig(
join_strategy="all",
timeout=60.0,
)
)
# 모든 병렬 노드가 종합 노드로 수렴
graph.add_edge("bullish", "aggregate")
graph.add_edge("bearish", "aggregate")
graph.add_edge("neutral", "aggregate")
graph.add_edge("aggregate", END)
graph.set_entry_point("bullish")
app = graph.compile()
async def main():
"""병렬 LLM 분석을 실행합니다."""
print("="*60)
print("PARALLEL LLM ANALYSIS")
print("="*60)
result = await app.invoke({
"symbol": "ETH",
"query": "What's the outlook for Ethereum in the next year?",
"bullish_view": "",
"bearish_view": "",
"neutral_view": "",
"aggregated_analysis": ""
})
print("\n🐂 BULLISH VIEW:")
print("-"*40)
print(result["bullish_view"][:400] + "...")
print("\n🐻 BEARISH VIEW:")
print("-"*40)
print(result["bearish_view"][:400] + "...")
print("\n⚖️ NEUTRAL VIEW:")
print("-"*40)
print(result["neutral_view"][:400] + "...")
print("\n📊 AGGREGATED ANALYSIS:")
print("-"*40)
print(result["aggregated_analysis"])
return True
if __name__ == "__main__":
asyncio.run(main())
멀티턴 대화에서 컨텍스트를 유지하는 패턴입니다.
이전 대화 내용을 기억하면서 자연스러운 대화를 이어갈 수 있습니다.
"""
메모리 기반 대화형 LLM 에이전트
다루는 내용:
- 멀티턴 대화 상태 관리
- 메시지 히스토리 축적
- 컨텍스트를 인식하는 응답
"""
import asyncio
from typing import TypedDict, List
from spoon_ai.graph import StateGraph, END, InMemoryCheckpointer
from spoon_ai.llm import LLMManager
from spoon_ai.schema import Message
class ConversationState(TypedDict):
messages: List[dict] # 직렬화를 위해 dict로 저장
user_input: str
assistant_response: str
turn_count: int
llm = LLMManager()
async def process_message(state: ConversationState) -> dict:
"""전체 대화 히스토리와 함께 메시지를 처리합니다."""
messages = state.get("messages", [])
user_input = state["user_input"]
# 히스토리를 포함한 대화 구성 (dict를 Message 객체로 변환)
conversation = [
Message(role="system", content="""You are a helpful crypto trading assistant.
You have memory of the entire conversation.
Reference previous messages when relevant.
Be conversational and helpful.""")
] + [
Message(role=m["role"], content=m["content"]) for m in messages
] + [
Message(role="user", content=user_input)
]
response = await llm.chat(conversation)
# 메시지 히스토리 업데이트 (JSON 직렬화를 위해 dict로 저장)
new_messages = messages + [
{"role": "user", "content": user_input},
{"role": "assistant", "content": response.content}
]
return {
"assistant_response": response.content,
"messages": new_messages,
"turn_count": state.get("turn_count", 0) + 1
}
# 그래프 구성
checkpointer = InMemoryCheckpointer()
graph = StateGraph(ConversationState, checkpointer=checkpointer)
graph.add_node("chat", process_message)
graph.set_entry_point("chat")
graph.add_edge("chat", END)
app = graph.compile()
async def main():
"""멀티턴 대화를 시뮬레이션합니다."""
print("="*60)
print("CONVERSATIONAL LLM AGENT WITH MEMORY")
print("="*60)
session_id = "conversation_demo"
# 대화 턴들
turns = [
"Hi! I'm interested in Bitcoin.",
"What's its current market situation?",
"You mentioned Bitcoin earlier - what about Ethereum compared to it?",
"Based on our conversation, what would you recommend for a beginner?",
]
state = {
"messages": [],
"user_input": "",
"assistant_response": "",
"turn_count": 0
}
for turn in turns:
print(f"\n👤 User: {turn}")
state["user_input"] = turn
result = await app.invoke(
state,
config={"configurable": {"thread_id": session_id}}
)
print(f"\n🤖 Assistant: {result['assistant_response']}")
print(f" [Turn {result['turn_count']}, History: {len(result['messages'])} messages]")
# 다음 턴을 위해 상태 유지
state = result
return True
if __name__ == "__main__":
asyncio.run(main())
모든 예제는 레포지토리에서 바로 실행할 수 있습니다.
cd spoon-core/examples/docs
# 개별 예제 실행
python llm_router.py # 예제 1: 인텐트 라우터
python analysis_pipeline.py # 예제 2: 다단계 분석
python llm_approval.py # 예제 3: Human-in-the-Loop
python parallel_analysis.py # 예제 4: 병렬 LLM
python conversational.py # 예제 5: 대화형 에이전트