Runnable 하위 클래스 종류

­강지윤·2025년 6월 11일

Runnable Sequence

1. RunnableLambda함수

from langchain_core.runnables import RunnableLambda
# RunnableLambda(함수) -> 함수를 실행하는 Runnable을 생성.
my_runnable2 = RunnableLambda(lambda input_data : f"{input_data}를 한문장으로 설명해줘.")  # lambda 말고 함수 정의한거 넣을 수 있음

my_runnable2.invoke("LLM")

2. RunnablePassthrough

1) 앞 Runnable이 처리한 결과를 다음 Runnable에 그대로 전달

from langchain_core.runnables import RunnablePassthrough

RunnablePassthrough().invoke("안녕하세요")   # 받은 값을 그대로 전달
RunnablePassthrough().invoke({"Key":"value"})
--> {'Key': 'value'}

2) RunnablePassthrough에서 그대로 전달하는 게 아닌 값을 추가해서 전달하고 싶은 경우

# 2) 앞 Runnable이 처리한 결과에 Item을 추가해서 다음 Runnable에 그대로 전달
# ->입력으로 dictionary 받아서 거기에 item을 추가. 
#RunnablePassthrough.assign(key1=Runnable, key2=Runnable, ...)
# - 받은 dictionary에 "key1": Runnable 반환값, "key2": Runnable 반환값 ..추가해서 다음으로 전달.

address_runnable = RunnableLambda(lambda x: "서울시 금천구")  # "서울시 금천구"를 반환.
phone_runnable = RunnableLambda(lambda x: "010-1111-2222")

RunnablePassthrough().assign(address=address_runnable, phone=phone_runnable).invoke({"name":"홍길동"})
--> {'name': '홍길동', 'address': '서울시 금천구', 'phone': '010-1111-2222'}

3. RunnableParallel -> Dictionary로 묶어서 반환

1) 일반적인 RunnableParrallel 표현

from langchain_core.runnables import RunnableParallel, RunnableLambda

run1 = RunnableLambda(lambda x: x+1)
run2  =RunnableLambda(lambda x: x*2)
run3 = RunnableLambda(lambda x: x//3)

runnable = RunnableParallel(
    {
        "result1":run1,
        "result2":run2,
        "result3":run3,
        "result4": RunnablePassthrough()  #앞에서 받은 값을 그대로 다음에 전달.
    }
)
# Runnable들을 각각 실행하고 그 결과를 key에 할당한 Dictionary를 반환
runnable.invoke(20)
--> {'result1': 21, 'result2': 40, 'result3': 6, 'result4': 20}

2) LCEL 형태 안에 Runnableparallel을 넣을 때는 딕셔너리 형태로 표현할 수 있다!

run0 = RunnableLambda(lambda x: x+100)
# run0 -> {run1, run2, run3} -> 
chain = run0 |{"result1":run1,
                "result2":run2,
                "result3":run3,}
# lcel 안에서만 runnableparallel을 딕셔너리 형태로 표현할 수 있다, 
profile
Proverbs 2:20

0개의 댓글