DIFY API로 VectorStore 업데이트 자동화하기

문정현·2024년 11월 22일

DIFY

목록 보기
4/4

DIFY 사용 전과 후

이전에는 Pinecone Cloud의 VectorStore를 이용해 임베딩한 데이터를 저장하고 관리했다. 하지만 이제 DIFY를 사용하면서, 외부 Cloud VectorStore 대신 DIFY Docker 컨테이너에 포함된 VectorStore를 활용해 데이터를 저장해야 하는 상황이 되었다.

DIFY는 GUI 기반으로 문서를 드래그 앤 드롭하여 저장할 수 있지만, 수작업으로 추가하거나 갱신해야 하기 때문에 자동화가 필요하다. DIFY 공식문서를 참고하면 API를 통해 자동화할 수 있다.

DIFY의 저장소 조회 API 사용

먼저, POSTMAN으로 DIFY의 저장소(VectorStore)를 조회해보자. CURL 예제는 다음과 같다.

curl --location --request GET 'http://{DIFY_IP}/v1/datasets?page=1&limit=20' \\
--header 'Authorization: Bearer {api_key}'

{
    "data": [
        {
            "id": "983559c1-****-****-****-cc6f397fac96",
            "name": "DIREA 홈페이지",
            "description": "useful for when you want to answer queries about the http://www.direa.co.kr/ ",
            // 중략
            "external_retrieval_model": {
                "top_k": 4,
                "score_threshold": 0.0
            }
        },
        {
            "id": "d9545719-****-****-****-28175b78ebc0",
            "name": "WikiDocument",
            "description": "wiki.direa.synology Data",
            // 중략
            "external_retrieval_model": {
                "top_k": 5,
                "score_threshold": 0.0
            }
        }
    ],
    "has_more": false,
    "limit": 20,
    "total": 2,
    "page": 1
}

조회 결과, DIFY에 생성된 여러 저장소 목록이 보인다. 이제 위키 문서를 저장하거나 갱신하기 위해 저장소의 ID(예: d9545719-****-****-****-28175b78ebc0)를 기억해두자.

Python 코드로 저장소 조회

아래는 파이썬 코드로 저장소를 조회하는 함수이다.

import requests
from typing import Dict, Any

def get_documents(page) -> Dict[str, Any]:
    # API 엔드포인트 설정
    url = f"{api_url}/v1/datasets/{dataset_id}/documents?page={page}&limit=100"

    # API 호출
    response = requests.get(url, headers=common_headers)

    # 응답 처리
    if response.status_code == 200:
        return response.json()
    else:
        return {
            "status": "error",
            "status_code": response.status_code,
            "error": response.text
        }

조회 결과는 아래와 같이 나타난다.

{
    "data": [
        // 데이터 생략
    ],
    "has_more": true,
    "limit": 100,
    "total": 327,
    "page": 1
}

has_more 값이 true인 경우 더 많은 데이터가 있다는 뜻이다. 이를 활용해 전체 문서를 순회하며 조회할 수 있다.

cnt = 1
has_more = True
exist_doc = {}
while has_more:
    documents_response = get_documents(cnt)
    has_more = documents_response.get("has_more", False)
    documents = documents_response.get("data", [])
    for document in documents:
        exist_doc[document["name"]] = document["id"]
    cnt += 1
print(f"총 {len(exist_doc)}개의 문서 확인")

has_moreFalse가 될 때까지 문서를 순회하며, 저장소 내 문서 제목을 exist_doc에 저장한다. exist_doc 배열은 위키 크롤링 시 이미 존재하는 문서인지 확인하고, 업데이트 또는 스킵을 결정하는 데 사용된다.

문서 생성 및 업데이트

이제 저장소에 문서를 생성하거나 업데이트하는 요청 코드를 작성하자. dataset_id는 위에서 조회한 ID를 사용한다.

import os
from dotenv import load_dotenv
load_dotenv()

api_url = os.environ.get("API_URL")
dataset_id = os.environ.get("DATASET_ID")
api_key = os.environ.get("API_KEY")

# 생성
def create_by_text(main_title, text_data):
    # API 엔드포인트 설정
    url = f"{api_url}/v1/datasets/{dataset_id}/document/create_by_text"

    # 요청 데이터 설정
    data = {
        "name": main_title,
        "text": text_data,
        "indexing_technique": "high_quality",
        "process_rule": {
            "mode": "automatic"
        }
    }

# 수정
def update_by_text(main_title, text_data, document_id):
    # API 엔드포인트 설정
    url = f"{api_url}/v1/datasets/{dataset_id}/documents/{document_id}/update_by_text"

    # 요청 데이터 설정
    data = {
        "name": main_title,
        "text": text_data,
        "indexing_technique": "high_quality",
        "process_rule": {
            "mode": "automatic"
        }
    }

문서를 생성하거나 수정할 때는 파일을 통으로 전송하는 방식도 가능하지만, 기존에 구현된 크롤링을 그대로 활용하기 위해 텍스트 기반 요청 방식을 선택했다.

문서 저장 함수 작성

최종적으로 사용할 메인 함수 save_doc는 다음과 같다.

def save_doc(main_title, text_data, last_modified, exist_doc):
    log_msg = f"EXIST >>> 문서 '{main_title}'은(는) 이미 최신 버전입니다. 업데이트를 건너뜁니다."

    if main_title in exist_doc.keys() and last_modified.startswith("지난주"): # (1)
        update_by_text(main_title, text_data, exist_doc[main_title])
        log_msg = f"UPDATE >>> 문서 '{main_title}'의 버전이 {last_modified} 일자로 변경되었습니다. 최신화를 진행합니다."

    if main_title not in exist_doc.keys(): # (2)
        create_by_text(main_title, text_data)
        log_msg = f"CREATE >>> 문서 '{main_title}'은(는) 새로운 문서입니다. 추가를 진행합니다."

    create_wiki_log("crawl", log_msg)
  • main_title: 저장할 문서의 제목
  • text_data: 저장할 문서의 내용
  • last_modified: 문서 수정 기록
  • exist_doc: 저장소에 존재하는 문서들의 제목 배열

조건 설명

  • (1): 제목이 같은 문서가 이미 존재하고, 최근에 수정된 경우 업데이트 진행
  • (2): 존재하지 않는 문서라면 새로 생성

로거는 이전에 구현한 것으로, 해당 포스트를 참고하면 된다.

profile
기록 == 성장

0개의 댓글