얼마 전, 회사 서버에 fastapi로 서빙중인 특정 모델을 langchain의 custom tool(RunnableLambda) 내에서 호출하려 requests 라이브러리를 사용했지만 자꾸만 에러가 발생한 적이 있습니다. 후의 저를 위해서 이 문제를 해결한 기록을 남겨두려고 합니다 :)
langchain 공식문서에는 따로 requests라는 이름으로 tool 내부에서 requests를 사용하는 방법을 정리해 두었습니다(링크). 이 글에서는 TextRequestsWrapper를 사용하는 방법, 내부 코드를 설명하도록 하겠습니다.
from langchain_community.utilities import TextRequestsWrapper
requests = TextRequestsWrapper()
# '<!doctype html><html itemscope="" itemtype="http://schema.org/WebPage"...
TextRequestsWraper를 사용하는 방법은 위와 같습니다. 즉, 우리는 langchain tool 내부에서 requests 라이브러리 대신 langchain.utilities.TextRequestsWrapper를 사용하면 되는 것입니다.
한번 TextRequestsWrapper class 코드를 한번 뜯어보겠습니다.
# langchain_community.utilities.requests.py
class TextRequestsWrapper(GenericRequestsWrapper):
"""Lightweight wrapper around requests library, with async support.
The main purpose of this wrapper is to always return a text output."""
response_content_type: Literal["text", "json"] = "text"
GenericRequestsWrapper를 상속받았으며 response_content_type이 "text"로 선언되어 있음을 알 수 있습니다. 아무래도 GenericRequestsWrapper 코드를 살펴봐야 내부 동작을 확인할 수 있겠군요 🫠🫠
# langchain_community.utilities.requests.py
class GenericRequestsWrapper(BaseModel):
"""Lightweight wrapper around requests library."""
headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
auth: Optional[Any] = None
response_content_type: Literal["text", "json"] = "text"
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
@property
def requests(self) -> Requests:
return Requests(
headers=self.headers, aiosession=self.aiosession, auth=self.auth
)
def _get_resp_content(self, response: Response) -> Union[str, Dict[str, Any]]:
if self.response_content_type == "text":
return response.text
elif self.response_content_type == "json":
return response.json()
else:
raise ValueError(f"Invalid return type: {self.response_content_type}")
async def _aget_resp_content(
self, response: aiohttp.ClientResponse
) -> Union[str, Dict[str, Any]]:
if self.response_content_type == "text":
return await response.text()
elif self.response_content_type == "json":
return await response.json()
else:
raise ValueError(f"Invalid return type: {self.response_content_type}")
def get(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""GET the URL and return the text."""
return self._get_resp_content(self.requests.get(url, **kwargs))
def post(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""POST to the URL and return the text."""
return self._get_resp_content(self.requests.post(url, data, **kwargs))
def patch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""PATCH the URL and return the text."""
return self._get_resp_content(self.requests.patch(url, data, **kwargs))
def put(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""PUT the URL and return the text."""
return self._get_resp_content(self.requests.put(url, data, **kwargs))
def delete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""DELETE the URL and return the text."""
return self._get_resp_content(self.requests.delete(url, **kwargs))
async def aget(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""GET the URL and return the text asynchronously."""
async with self.requests.aget(url, **kwargs) as response:
return await self._aget_resp_content(response)
async def apost(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""POST to the URL and return the text asynchronously."""
async with self.requests.apost(url, data, **kwargs) as response:
return await self._aget_resp_content(response)
async def apatch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""PATCH the URL and return the text asynchronously."""
async with self.requests.apatch(url, data, **kwargs) as response:
return await self._aget_resp_content(response)
async def aput(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> Union[str, Dict[str, Any]]:
"""PUT the URL and return the text asynchronously."""
async with self.requests.aput(url, data, **kwargs) as response:
return await self._aget_resp_content(response)
async def adelete(self, url: str, **kwargs: Any) -> Union[str, Dict[str, Any]]:
"""DELETE the URL and return the text asynchronously."""
async with self.requests.adelete(url, **kwargs) as response:
return await self._aget_resp_content(response)
GenericRequestsWrapper에는 get, post, patch, aget, apost, ... requests method가 선언되어 있으며 공통적으로 self._get_resp_content 또는 self._aget_resp_content를 호출하고 있습니다.
이때, self._get_resp_content는 response로 self.requests.{method}(url, ...)의 값을 받아 text 또는 json 타입으로 값을 반환합니다.
@property
def requests(self) -> Requests:
return Requests(
headers=self.headers, aiosession=self.aiosession, auth=self.auth
)
위 코드는 GenericRequestsWrapper의 requests 부분입니다. 즉, self._get_resp_content를 설명할 때 보았던 self.requests...는 Requests(headers=...였다는 것을 알 수 있었습니다.
넵 맞습니다. 최종적으로 저희는 Requests 클래스를 확인해야 합니다.
# langchain_community.utilities.requests.py
import requests
...
class Requests(BaseModel):
"""Wrapper around requests to handle auth and async.
The main purpose of this wrapper is to handle authentication (by saving
headers) and enable easy async methods on the same base object.
"""
headers: Optional[Dict[str, str]] = None
aiosession: Optional[aiohttp.ClientSession] = None
auth: Optional[Any] = None
class Config:
"""Configuration for this pydantic object."""
extra = Extra.forbid
arbitrary_types_allowed = True
def get(self, url: str, **kwargs: Any) -> requests.Response:
"""GET the URL and return the text."""
return requests.get(url, headers=self.headers, auth=self.auth, **kwargs)
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""POST to the URL and return the text."""
return requests.post(
url, json=data, headers=self.headers, auth=self.auth, **kwargs
)
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""PATCH the URL and return the text."""
return requests.patch(
url, json=data, headers=self.headers, auth=self.auth, **kwargs
)
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response:
"""PUT the URL and return the text."""
return requests.put(
url, json=data, headers=self.headers, auth=self.auth, **kwargs
)
def delete(self, url: str, **kwargs: Any) -> requests.Response:
"""DELETE the URL and return the text."""
return requests.delete(url, headers=self.headers, auth=self.auth, **kwargs)
@asynccontextmanager
async def _arequest(
self, method: str, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""Make an async request."""
if not self.aiosession:
async with aiohttp.ClientSession() as session:
async with session.request(
method, url, headers=self.headers, auth=self.auth, **kwargs
) as response:
yield response
else:
async with self.aiosession.request(
method, url, headers=self.headers, auth=self.auth, **kwargs
) as response:
yield response
@asynccontextmanager
async def aget(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""GET the URL and return the text asynchronously."""
async with self._arequest("GET", url, **kwargs) as response:
yield response
@asynccontextmanager
async def apost(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""POST to the URL and return the text asynchronously."""
async with self._arequest("POST", url, json=data, **kwargs) as response:
yield response
@asynccontextmanager
async def apatch(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PATCH the URL and return the text asynchronously."""
async with self._arequest("PATCH", url, json=data, **kwargs) as response:
yield response
@asynccontextmanager
async def aput(
self, url: str, data: Dict[str, Any], **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""PUT the URL and return the text asynchronously."""
async with self._arequest("PUT", url, json=data, **kwargs) as response:
yield response
@asynccontextmanager
async def adelete(
self, url: str, **kwargs: Any
) -> AsyncGenerator[aiohttp.ClientResponse, None]:
"""DELETE the URL and return the text asynchronously."""
async with self._arequest("DELETE", url, **kwargs) as response:
yield response
깊게 보지는 않고 저희가 필요했던 정보인 http method들을 살펴보겠습니다.
확인 가능한 것은 조금 식상할 수 있으나 Requests class에서 구현되어있는 http method들은 python의 requests 라이브러리들의 각 http method들을 활용하는 식으로 구현이 되어 있었다는 것입니다.
원래 이 글의 주제는 langchain custom tool(RunnableLambda) 내부에서 requests를 사용하는 방법에 대한 글이였지만 본의 아니게 조금 돌아간 것 같습니다 😅😅. 다시 본 목적에 넘어와서 @chain을 활용한 custom RunnableLambda에서 requests를 사용하는 코드를 작성해 보겠습니다.
from langchain_community.utilities.requests import TextRequestsWrapper
def generate_claim(user_input:str, url:str) -> str:
try:
data = {
"inputs": [
user_input
],
"generate_args": None
}
langchain_requests = TextRequestsWrapper()
# langchain_requests는 str이므로 eval을 통해 결과 값을 Dict로 변환해 주었습니다.
result = eval(langchain_requests.post(url=url, data=data))
result_text = result["result"][0]
return result_text
except Exception as E:
print(E)
raise E