OpenManus는 "초대 코드 없이도 모든 아이디어를 실현할 수 있는" AI 에이전트 프레임워크입니다. MetaGPT 기여자들이 개발한 이 프로젝트는 AI 에이전트를 쉽게 구축하고 활용할 수 있게 해주는 오픈소스 솔루션입니다.
GitHub에서 프로젝트를 클론하고 코드를 살펴보면서 OpenManus의 구조를 파악해봤습니다.
import asyncio
from app.agent.manus import Manus
from app.logger import logger
async def main():
agent = Manus()
try:
prompt = input("Enter your prompt: ")
if not prompt.strip():
logger.warning("Empty prompt provided.")
return
logger.warning("Processing your request...")
await agent.run(prompt)
logger.info("Request processing completed.")
except KeyboardInterrupt:
logger.warning("Operation interrupted.")
if __name__ == "__main__":
asyncio.run(main())
이 코드는 매우 단순하지만 OpenManus의 기본 작동 방식을 보여줍니다:
Manus
에이전트 인스턴스를 생성합니다.run
메서드를 호출하여 프롬프트를 처리합니다.async/await
) 패턴을 사용하여 효율적인 처리를 지원합니다.from app.agent.base import BaseAgent
from app.agent.planning import PlanningAgent
from app.agent.react import ReActAgent
from app.agent.swe import SWEAgent
from app.agent.toolcall import ToolCallAgent
__all__ = [
"BaseAgent",
"PlanningAgent",
"ReActAgent",
"SWEAgent",
"ToolCallAgent",
]
이 코드에서 OpenManus가 다양한 유형의 에이전트를 제공하는 것을 알 수 있습니다:
BaseAgent
클래스는 OpenManus 프레임워크에서 모든 에이전트의 기본 골격을 제공합니다. 상태 관리, 메모리 처리, 실행 루프 등 에이전트의 핵심 동작을 정의하고 있습니다.
from abc import ABC, abstractmethod
from contextlib import asynccontextmanager
from typing import List, Optional
from pydantic import BaseModel, Field, model_validator
from app.llm import LLM
from app.logger import logger
from app.schema import ROLE_TYPE, AgentState, Memory, Message
class BaseAgent(BaseModel, ABC):
"""Abstract base class for managing agent state and execution.
Provides foundational functionality for state transitions, memory management,
and a step-based execution loop. Subclasses must implement the `step` method.
"""
# Core attributes
name: str = Field(..., description="Unique name of the agent")
description: Optional[str] = Field(None, description="Optional agent description")
# Prompts
system_prompt: Optional[str] = Field(
None, description="System-level instruction prompt"
)
next_step_prompt: Optional[str] = Field(
None, description="Prompt for determining next action"
)
# Dependencies
llm: LLM = Field(default_factory=LLM, description="Language model instance")
memory: Memory = Field(default_factory=Memory, description="Agent's memory store")
state: AgentState = Field(
default=AgentState.IDLE, description="Current agent state"
)
# Execution control
max_steps: int = Field(default=10, description="Maximum steps before termination")
current_step: int = Field(default=0, description="Current step in execution")
duplicate_threshold: int = 2
class Config:
arbitrary_types_allowed = True
extra = "allow" # Allow extra fields for flexibility in subclasses
BaseModel
(Pydantic): 데이터 유효성 검증과 타입 안전성 제공ABC
(Abstract Base Class): 추상 메서드 정의를 통한 인터페이스 보장이 두 가지를 상속받음으로써 견고한 데이터 모델과 인터페이스를 동시에 보장합니다.
@model_validator(mode="after")
def initialize_agent(self) -> "BaseAgent":
"""Initialize agent with default settings if not provided."""
if self.llm is None or not isinstance(self.llm, LLM):
self.llm = LLM(config_name=self.name.lower())
if not isinstance(self.memory, Memory):
self.memory = Memory()
return self
Pydantic의 model_validator
를 사용하여 객체 생성 후 유효성을 검사하고 적절한 기본값을 설정합니다. 특히 LLM과 메모리 인스턴스를 자동으로 생성해주는 부분이 중요합니다.
@asynccontextmanager
async def state_context(self, new_state: AgentState):
"""Context manager for safe agent state transitions."""
if not isinstance(new_state, AgentState):
raise ValueError(f"Invalid state: {new_state}")
previous_state = self.state
self.state = new_state
try:
yield
except Exception as e:
self.state = AgentState.ERROR # Transition to ERROR on failure
raise e
finally:
self.state = previous_state # Revert to previous state
비동기 컨텍스트 매니저를 사용한 우아한 상태 전환 관리가 인상적입니다:
with
구문과 유사한 간결한 사용 패턴 제공def update_memory(
self,
role: ROLE_TYPE, # type: ignore
content: str,
**kwargs,
) -> None:
"""Add a message to the agent's memory."""
message_map = {
"user": Message.user_message,
"system": Message.system_message,
"assistant": Message.assistant_message,
"tool": lambda content, **kw: Message.tool_message(content, **kw),
}
if role not in message_map:
raise ValueError(f"Unsupported message role: {role}")
msg_factory = message_map[role]
msg = msg_factory(content, **kwargs) if role == "tool" else msg_factory(content)
self.memory.add_message(msg)
팩토리 패턴을 사용한 메시지 생성 및 저장:
async def run(self, request: Optional[str] = None) -> str:
"""Execute the agent's main loop asynchronously."""
if self.state != AgentState.IDLE:
raise RuntimeError(f"Cannot run agent from state: {self.state}")
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and self.state != AgentState.FINISHED
):
self.current_step += 1
logger.info(f"Executing step {self.current_step}/{self.max_steps}")
step_result = await self.step()
# Check for stuck state
if self.is_stuck():
self.handle_stuck_state()
results.append(f"Step {self.current_step}: {step_result}")
if self.current_step >= self.max_steps:
self.current_step = 0
self.state = AgentState.IDLE
results.append(f"Terminated: Reached max steps ({self.max_steps})")
return "\n".join(results) if results else "No steps executed"
비동기 실행 루프의 주요 특징:
@abstractmethod
async def step(self) -> str:
"""Execute a single step in the agent's workflow.
Must be implemented by subclasses to define specific behavior.
"""
모든 하위 클래스가 구현해야 하는 핵심 메서드입니다. 이 메서드는 에이전트의 실제 동작 로직을 정의합니다.
def handle_stuck_state(self):
"""Handle stuck state by adding a prompt to change strategy"""
stuck_prompt = "\
Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
def is_stuck(self) -> bool:
"""Check if the agent is stuck in a loop by detecting duplicate content"""
if len(self.memory.messages) < 2:
return False
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# Count identical content occurrences
duplicate_count = sum(
1
for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
return duplicate_count >= self.duplicate_threshold
에이전트가 동일한 응답을 반복하는 경우를 감지하고 처리하는 기능:
@property
def messages(self) -> List[Message]:
"""Retrieve a list of messages from the agent's memory."""
return self.memory.messages
@messages.setter
def messages(self, value: List[Message]):
"""Set the list of messages in the agent's memory."""
self.memory.messages = value
메모리 내 메시지 접근을 위한 속성 래퍼:
BaseAgent 클래스에서 볼 수 있는 주요 설계 패턴과 원칙:
run
메서드가 실행 흐름을 정의하고, 하위 클래스가 step
메서드를 구현Manus
클래스는 ToolCallAgent
를 상속받고 있으며, 이는 앞서 본 구조에서 BaseAgent
의 하위 클래스입니다. 이 클래스는 다양한 도구들을 활용하여 범용적인 작업을 처리할 수 있는 AI 에이전트를 구현합니다.
상속 관계:
class Manus(ToolCallAgent):
ToolCallAgent
를 상속받아 도구 호출 기능을 확장합니다.
기본 정보:
name: str = "Manus"
description: str = "A versatile agent that can solve various tasks using multiple tools"
에이전트의 이름과 설명을 정의합니다.
프롬프트 설정:
system_prompt: str = SYSTEM_PROMPT
next_step_prompt: str = NEXT_STEP_PROMPT
app.prompt.manus
모듈에서 가져온 미리 정의된 프롬프트를 사용합니다.
실행 제한:
max_observe: int = 2000
max_steps: int = 20
관찰 데이터 길이와 최대 실행 단계 수를 제한합니다.
도구 모음:
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(), WebSearch(), BrowserUseTool(), FileSaver(), Terminate()
)
)
에이전트가 사용할 수 있는 도구 모음을 정의합니다:
PythonExecute
: Python 코드 실행WebSearch
: 웹 검색BrowserUseTool
: 웹 브라우저 조작FileSaver
: 파일 저장Terminate
: 작업 종료특별 도구 처리:
async def _handle_special_tool(self, name: str, result: Any, **kwargs):
if not self._is_special_tool(name):
return
else:
await self.available_tools.get_tool(BrowserUseTool().name).cleanup()
await super()._handle_special_tool(name, result, **kwargs)
특별 도구를 처리하는 메서드를 오버라이드하여, 특별 도구 실행 전에 브라우저 도구의 클린업을 수행합니다.
다양한 기능을 갖춘 범용 에이전트:
Manus
클래스는 코드 실행, 웹 검색, 브라우저 조작, 파일 저장 등 다양한 도구를 결합한 범용 에이전트입니다.
도구 추상화:
ToolCollection
을 통해 여러 도구를 관리하고, 각 도구는 독립적인 클래스로 구현되어 확장성이 뛰어납니다.
특별 도구 처리 메커니즘:
특정 도구들에 대한 특별 처리 로직을 구현하여 자원 관리(브라우저 정리 등)를 자동화합니다.
상위 클래스 활용:
super()._handle_special_tool()
를 호출하여 상위 클래스의 기능을 재사용하는 좋은 OOP 관행을 보여줍니다.
PlanningAgent
는 작업을 해결하기 위한 구조화된 계획을 생성하고 관리하는 에이전트입니다. 계획의 각 단계를 추적하고 완료할 때까지 진행합니다.
기본 속성 설정:
PlanningTool
과 Terminate
도구를 기본적으로 사용계획 관리 속성:
active_plan_id
: 현재 활성화된 계획의 IDstep_execution_tracker
: 도구 호출별로 단계 실행 상태를 추적하는 딕셔너리current_step_index
: 현재 실행 중인 단계의 인덱스초기화 및 유효성 검증:
@model_validator(mode="after")
def initialize_plan_and_verify_tools(self) -> "PlanningAgent":
self.active_plan_id = f"plan_{int(time.time())}"
if "planning" not in self.available_tools.tool_map:
self.available_tools.add_tool(PlanningTool())
return self
사고 과정 (think 메서드):
async def think(self) -> bool:
# 현재 계획 상태를 기반으로 다음 행동 결정
prompt = f"CURRENT PLAN STATUS:\n{await self.get_plan()}\n\n{self.next_step_prompt}"
# ...
행동 실행 (act 메서드):
async def act(self) -> str:
# 단계 실행 및 완료 상태 추적
# ...
계획 관리 메서드:
get_plan()
: 현재 계획 상태 조회update_plan_status()
: 도구 실행 후 계획 상태 업데이트_get_current_step_index()
: 첫 번째 미완료 단계 인덱스 찾기create_initial_plan()
: 초기 요청을 기반으로 계획 생성실행 메서드 (run):
async def run(self, request: Optional[str] = None) -> str:
if request:
await self.create_initial_plan(request)
return await super().run()
create_initial_plan
을 통해 계획 생성run
메서드에서 계획 실행 시작think
단계에서 현재 계획 상태를 기반으로 다음 행동 결정act
단계에서 행동 실행 및 결과 처리이 코드는 AI 에이전트가 복잡한 작업을 여러 단계로 나누어 체계적으로 해결하는 방식을 잘 보여줍니다. 특히 계획 생성, 추적, 업데이트를 자동화하여 구조화된 문제 해결 접근법을 구현한 점이 인상적입니다.
ReActAgent
는 BaseAgent
를 상속하고 추가적인 추상 메서드를 정의하는 추상 클래스(ABC)입니다.
name: str # 에이전트 이름
description: Optional[str] = None # 에이전트 설명
system_prompt: Optional[str] = None # 시스템 프롬프트
next_step_prompt: Optional[str] = None # 다음 단계 프롬프트
llm: Optional[LLM] = Field(default_factory=LLM) # 언어 모델 인스턴스
memory: Memory = Field(default_factory=Memory) # 메모리 객체
state: AgentState = AgentState.IDLE # 현재 상태
max_steps: int = 10 # 최대 단계 수
current_step: int = 0 # 현재 단계
이 속성들은 앞서 본 BaseAgent
의 속성과 유사하며, ReActAgent
는 특히 ReAct 패턴을 구현하기 위한 기반을 제공합니다.
추상 메서드 think
@abstractmethod
async def think(self) -> bool:
"""Process current state and decide next action"""
추상 메서드 act
@abstractmethod
async def act(self) -> str:
"""Execute decided actions"""
step 메서드
async def step(self) -> str:
"""Execute a single step: think and act."""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()
BaseAgent
의 추상 메서드 step
을 구현한 메서드think
메서드를 호출하여 행동 여부 결정think
가 True를 반환하면 act
메서드를 호출하여 행동 수행이 클래스는 ReAct(Reasoning + Acting) 패턴을 구현한 좋은 예입니다:
think
메서드는 현재 상황을 분석하고 행동 계획을 세우는 추론 과정을 담당act
메서드는 계획에 따라 실제 행동을 수행step
메서드를 통해 이 두 단계를 반복적으로 수행추상화와 템플릿 메서드 패턴:
think
와 act
는 추상 메서드로, 구체적인 구현은 하위 클래스에 위임step
은 템플릿 메서드로, 전체 흐름은 정의하되 세부 구현은 하위 클래스에서 결정유연성:
단순성:
비동기 프로그래밍:
async
로 선언되어 비동기 작업(예: API 호출)을 효율적으로 처리이 ReActAgent
클래스는 다양한 목적의 구체적인 에이전트를 구현하기 위한 기반을 제공하며, BaseAgent
에서 정의한 에이전트의 기본 골격에 ReAct 패턴을 접목시킨 구조를 보여줍니다.
SWEAgent
는 ToolCallAgent
를 상속받은 클래스로, 프로그래밍 및 소프트웨어 엔지니어링 작업을 수행하기 위한 도구들을 갖추고 있습니다.
name: str = "swe"
description: str = "an autonomous AI programmer that interacts directly with the computer to solve tasks."
system_prompt: str = SYSTEM_PROMPT
next_step_prompt: str = NEXT_STEP_TEMPLATE
available_tools: ToolCollection = ToolCollection(
Bash(), StrReplaceEditor(), Terminate()
)
special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
max_steps: int = 30
bash: Bash = Field(default_factory=Bash)
working_dir: str = "."
기본 정보:
프롬프트:
SYSTEM_PROMPT
: 소프트웨어 엔지니어링 작업에 특화된 시스템 프롬프트NEXT_STEP_TEMPLATE
: 작업 디렉토리 정보를 포함할 수 있는 템플릿 형태의 프롬프트도구 모음:
Bash()
: 셸 명령어 실행 도구StrReplaceEditor()
: 텍스트/코드 수정 도구Terminate()
: 작업 종료 도구실행 제어:
max_steps
: 30 (다른 에이전트보다 많은 단계 허용)bash
: Bash 도구의 인스턴스working_dir
: 현재 작업 디렉토리 (기본값: ".")async def think(self) -> bool:
"""Process current state and decide next action"""
# Update working directory
self.working_dir = await self.bash.execute("pwd")
self.next_step_prompt = self.next_step_prompt.format(
current_dir=self.working_dir
)
return await super().think()
think
메서드는 부모 클래스의 메서드를 오버라이드하여 다음과 같은 기능을 추가합니다:
pwd
명령어를 실행하여 현재 작업 디렉토리를 가져옴next_step_prompt
업데이트think
메서드 호출개발 환경 인식:
코드 조작 도구:
소프트웨어 개발 맥락:
긴 작업 허용:
max_steps
가 30으로 설정되어 있어 복잡한 프로그래밍 작업을 수행할 수 있는 충분한 단계 제공이 SWEAgent
클래스는 코드 작성, 디버깅, 소프트웨어 개발과 같은 프로그래밍 관련 작업을 수행하는 데 특화된 에이전트입니다. Bash 명령어와 텍스트 편집 도구를 통해 파일 시스템과 코드를 직접 조작할 수 있는 능력을 가지고 있으며, 이를 통해 복잡한 개발 작업을 자동화할 수 있습니다.
ToolCallAgent
는 도구/함수 호출을 처리하기 위한 향상된 추상화를 제공하는 기본 에이전트 클래스입니다.
name: str = "toolcall"
description: str = "an agent that can execute tool calls."
system_prompt: str = SYSTEM_PROMPT
next_step_prompt: str = NEXT_STEP_PROMPT
available_tools: ToolCollection = ToolCollection(
CreateChatCompletion(), Terminate()
)
tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO
special_tool_names: List[str] = Field(default_factory=lambda: [Terminate().name])
tool_calls: List[ToolCall] = Field(default_factory=list)
max_steps: int = 30
max_observe: Optional[Union[int, bool]] = None
max_input_tokens: Optional[int] = None
기본 도구 설정:
CreateChatCompletion()
: 대화 완성을 위한 도구Terminate()
: 작업 종료 도구도구 선택 모드:
tool_choices
: 도구 선택 모드 (AUTO, REQUIRED, NONE)특수 도구 처리:
special_tool_names
: 특별히 처리되는 도구 이름 목록도구 호출 저장:
tool_calls
: 현재 처리 중인 도구 호출 목록 제한 설정:
max_observe
: 도구 출력 결과 길이 제한max_input_tokens
: 최대 입력 토큰 수 제한async def think(self) -> bool:
"""Process current state and decide next actions using tools"""
# 다음 단계 프롬프트 추가
if self.next_step_prompt:
user_msg = Message.user_message(self.next_step_prompt)
self.messages += [user_msg]
try:
# 도구 옵션으로 응답 얻기
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=[Message.system_message(self.system_prompt)]
if self.system_prompt
else None,
tools=self.available_tools.to_params(),
tool_choice=self.tool_choices,
)
except Exception as e:
# 토큰 제한 오류 처리
# ...
self.tool_calls = response.tool_calls
# 다양한 도구 선택 모드 처리
# ...
이 메서드는 다음과 같은 역할을 합니다:
async def act(self) -> str:
"""Execute tool calls and handle their results"""
if not self.tool_calls:
if self.tool_choices == ToolChoice.REQUIRED:
raise ValueError(TOOL_CALL_REQUIRED)
# 도구 호출이 없으면 마지막 메시지 내용 반환
return self.messages[-1].content or "No content or commands to execute"
results = []
for command in self.tool_calls:
result = await self.execute_tool(command)
if self.max_observe:
result = result[: self.max_observe]
# 도구 응답을 메모리에 추가
tool_msg = Message.tool_message(
content=result, tool_call_id=command.id, name=command.function.name
)
self.memory.add_message(tool_msg)
results.append(result)
return "\n\n".join(results)
이 메서드는 다음과 같은 역할을 합니다:
async def execute_tool(self, command: ToolCall) -> str:
"""Execute a single tool call with robust error handling"""
if not command or not command.function or not command.function.name:
return "Error: Invalid command format"
name = command.function.name
if name not in self.available_tools.tool_map:
return f"Error: Unknown tool '{name}'"
try:
# 인자 파싱
args = json.loads(command.function.arguments or "{}")
# 도구 실행
result = await self.available_tools.execute(name=name, tool_input=args)
# 표시용 결과 형식 지정
observation = f"Observed output of cmd `{name}` executed:\n{str(result)}"
# `finish`와 같은 특수 도구 처리
await self._handle_special_tool(name=name, result=result)
return observation
except json.JSONDecodeError:
# JSON 파싱 오류 처리
except Exception as e:
# 일반 오류 처리
이 메서드는 다음과 같은 역할을 합니다:
async def _handle_special_tool(self, name: str, result: Any, **kwargs):
"""Handle special tool execution and state changes"""
if not self._is_special_tool(name):
return
if self._should_finish_execution(name=name, result=result, **kwargs):
# 에이전트 상태를 완료로 설정
self.state = AgentState.FINISHED
@staticmethod
def _should_finish_execution(**kwargs) -> bool:
"""Determine if tool execution should finish the agent"""
return True
def _is_special_tool(self, name: str) -> bool:
"""Check if tool name is in special tools list"""
return name.lower() in [n.lower() for n in self.special_tool_names]
이 메서드들은 특수 도구(예: Terminate
)를 처리하는 방법을 정의합니다:
이 ToolCallAgent
클래스는 OpenManus의 핵심 구성 요소로, LLM이 다양한 도구를 활용하여 복잡한 작업을 수행할 수 있게 하는 기반을 제공합니다. 이전에 본 SWEAgent
, PlanningAgent
, Manus
클래스들은 모두 이 ToolCallAgent
를 상속받아 특정 용도에 맞게 확장한 것입니다.
BaseFlow
는 여러 에이전트를 지원하는 실행 흐름의 기본 클래스입니다.
agents: Dict[str, BaseAgent]
tools: Optional[List] = None
primary_agent_key: Optional[str] = None
에이전트 관리:
agents
: 이름을 키로 하는 에이전트 딕셔너리primary_agent_key
: 기본 에이전트를 식별하는 키도구 설정:
tools
: 선택적으로 사용 가능한 도구 목록def __init__(
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
):
# 다양한 방식의 에이전트 제공 처리
if isinstance(agents, BaseAgent):
agents_dict = {"default": agents}
elif isinstance(agents, list):
agents_dict = {f"agent_{i}": agent for i, agent in enumerate(agents)}
else:
agents_dict = agents
# 기본 에이전트 지정
primary_key = data.get("primary_agent_key")
if not primary_key and agents_dict:
primary_key = next(iter(agents_dict))
data["primary_agent_key"] = primary_key
# 에이전트 딕셔너리 설정
data["agents"] = agents_dict
# BaseModel의 초기화 사용
super().__init__(**data)
이 메서드는 다음과 같은 역할을 합니다:
@property
def primary_agent(self) -> Optional[BaseAgent]:
"""Get the primary agent for the flow"""
return self.agents.get(self.primary_agent_key)
def get_agent(self, key: str) -> Optional[BaseAgent]:
"""Get a specific agent by key"""
return self.agents.get(key)
def add_agent(self, key: str, agent: BaseAgent) -> None:
"""Add a new agent to the flow"""
self.agents[key] = agent
이 메서드들은 다음과 같은 역할을 합니다:
@abstractmethod
async def execute(self, input_text: str) -> str:
"""Execute the flow with given input"""
이 추상 메서드는 하위 클래스에서 구현해야 하며, 흐름의 실행 로직을 정의합니다.
PlanStepStatus
는 계획 단계의 가능한 상태를 정의하는 열거형 클래스입니다.
class PlanStepStatus(str, Enum):
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
상태 값:
NOT_STARTED
: 시작되지 않은 단계IN_PROGRESS
: 진행 중인 단계COMPLETED
: 완료된 단계BLOCKED
: 차단된 단계유틸리티 메서드:
get_all_statuses()
: 모든 가능한 상태 값 반환get_active_statuses()
: 활성 상태 값 반환 (시작되지 않았거나 진행 중)get_status_marks()
: 상태에 대한 마커 기호 매핑 반환async
)로 정의됨이 BaseFlow
클래스는 OpenManus의 중요한 구성 요소로, 여러 에이전트를 조율하여 복잡한 작업을 수행하는 프레임워크를 제공합니다. 이 클래스를 상속받아 다양한 흐름 유형(예: 계획 흐름)을 구현할 수 있습니다.
FlowFactory
는 다양한 유형의 흐름을 생성하는 팩토리 클래스로, 여러 에이전트를 지원합니다.
@staticmethod
def create_flow(
flow_type: FlowType,
agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]],
**kwargs,
) -> BaseFlow:
flows = {
FlowType.PLANNING: PlanningFlow,
}
flow_class = flows.get(flow_type)
if not flow_class:
raise ValueError(f"Unknown flow type: {flow_type}")
return flow_class(agents, **kwargs)
이 메서드는 다음과 같은 역할을 합니다:
흐름 유형 매핑:
FlowType.PLANNING
만 PlanningFlow
클래스에 매핑됨유효성 검사:
유연한 에이전트 처리:
BaseFlow
의 생성자에서 처리됨추가 매개변수 지원:
이 FlowFactory
클래스는 OpenManus 시스템에서 다양한 유형의 에이전트 흐름을 생성하기 위한 중앙화된 방법을 제공합니다. 현재는 PlanningFlow
유형만 지원하지만, 필요에 따라 새로운 흐름 유형을 쉽게 추가할 수 있도록 설계되어 있습니다.
PlanningFlow
는 에이전트를 사용하여 작업 계획을 수립하고 실행하는 흐름을 관리하는 클래스입니다.
llm: LLM = Field(default_factory=lambda: LLM())
planning_tool: PlanningTool = Field(default_factory=PlanningTool)
executor_keys: List[str] = Field(default_factory=list)
active_plan_id: str = Field(default_factory=lambda: f"plan_{int(time.time())}")
current_step_index: Optional[int] = None
LLM 및 계획 도구:
llm
: 언어 모델 인스턴스planning_tool
: 계획 관리를 위한 도구실행 관리:
executor_keys
: 단계 실행에 사용할 에이전트 키 목록active_plan_id
: 현재 활성화된 계획의 IDcurrent_step_index
: 현재 실행 중인 단계의 인덱스def __init__(
self, agents: Union[BaseAgent, List[BaseAgent], Dict[str, BaseAgent]], **data
):
# 설정 처리 및 부모 클래스 초기화
# executor_keys 설정
# planning_tool 초기화
이 메서드는 다음과 같은 역할을 합니다:
def get_executor(self, step_type: Optional[str] = None) -> BaseAgent:
"""단계 유형에 따라 적절한 실행자 에이전트를 선택"""
# 단계 유형이 에이전트 키와 일치하면 해당 에이전트 사용
# 그렇지 않으면 사용 가능한 첫 번째 실행자 또는 기본 에이전트 사용
이 메서드는 단계 유형에 따라 적절한 에이전트를 선택합니다:
async def execute(self, input_text: str) -> str:
"""에이전트를 사용하여 계획 흐름 실행"""
# 입력이 제공되면 초기 계획 생성
# 현재 단계 가져오기 및 실행
# 모든 단계 완료 시 계획 마무리
이 메서드는 전체 계획 실행 흐름을 관리합니다:
async def _create_initial_plan(self, request: str) -> None:
"""LLM과 PlanningTool을 사용하여 요청에 기반한 초기 계획 생성"""
# 계획 생성을 위한 시스템 및 사용자 메시지 준비
# LLM에 PlanningTool을 사용하여 계획 생성 요청
# 도구 호출 처리 또는 기본 계획 생성
이 메서드는 초기 계획을 생성합니다:
async def _get_current_step_info(self) -> tuple[Optional[int], Optional[dict]]:
"""첫 번째 미완료 단계의 인덱스와 정보를 식별"""
# 유효한 계획 확인
# 단계 데이터 및 상태 가져오기
# 첫 번째 활성 단계 찾기 및 상태 업데이트
이 메서드는 다음을 수행합니다:
async def _execute_step(self, executor: BaseAgent, step_info: dict) -> str:
"""지정된 에이전트를 사용하여 현재 단계 실행"""
# 현재 계획 상태로 컨텍스트 준비
# 에이전트에 단계 실행 프롬프트 제공
# 단계 완료로 표시
이 메서드는 단계를 실행합니다:
async def _finalize_plan(self) -> str:
"""계획을 마무리하고 LLM을 사용하여 요약 제공"""
# 최종 계획 상태 가져오기
# LLM을 사용하여 요약 생성
# 실패 시 에이전트를 사용한 대체 요약
이 메서드는 계획을 마무리합니다:
이 PlanningFlow
클래스는 복잡한 작업을 계획하고 적절한 에이전트를 사용하여 단계별로 실행하는 강력한 프레임워크를 제공합니다. 각 단계마다 진행 상황을 추적하고 최종적으로 결과를 요약하는 흐름을 관리합니다.