이 글은 GraphRAG 인덱싱 프로세스의 결과를 Neo4j 그래프 데이터베이스에 가져와 추가 처리, 분석 또는 시각화를 수행하는 절차를 작성하였습니다.
인덱싱 프로세스의 출력 폴더에서 Parquet 파일을 로드하여 Pandas 데이터프레임에 불러옵니다. 그런 다음, 데이터를 Neo4j에 일괄 처리 방식으로 전송하여 노드와 관계를 생성하고 관련 속성을 추가합니다. 대부분의 엔터티에 대한 id 배열은 관계로 변환됩니다.
(예를 들어, 하나의 엔터티(노드)가 여러 다른 엔터티들과 연결될 때, 이 연결을 id 배열로 표현하여 각 엔터티(entity) 간의 관계를 명확하게 표현한다.)
모든 작업은 MERGE를 사용하므로 멱등성을 유지하며, 스크립트를 여러 번 실행해도 안전합니다.
(특정 노드나 관계를 데이터베이스에 존재하는지 확인하고, 존재하지 않으면 새로 생성하는 작업)
데이터베이스를 정리해야 하는 경우, 다음 명령문을 실행하면 됩니다:
MATCH (n)
CALL { WITH n DETACH DELETE n } IN TRANSACTIONS OF 25000 ROWS;
"""DB 업데이트 메서드: 데이터프레임을 배치로 Neo4j에 삽입합니다."""
def batched_import(self, statement: str, df: pd.DataFrame, batch_size: int = 1000) -> int:
total = len(df)
start_time = time.time()
for start in range(0, total, batch_size):
batch = df.iloc[start:min(start + batch_size, total)]
with self.driver.session(database=os.getenv("NEO4J_DATABASE")) as session:
session.run("UNWIND $rows AS row " + statement, {"rows": batch.to_dict('records')})
elapsed_time = time.time() - start_time
print(f'Imported {total} rows in {elapsed_time:.2f} seconds.')
return total
Neo4j에서 인덱스는 그래프 쿼리의 시작점을 찾는 데만 사용됩니다. 예를 들어, 두 노드를 빠르게 찾아 연결하는 데 사용됩니다. 제약 조건은 중복을 방지하기 위해 존재하며, 주로 엔터티 유형의 ID에 대해 생성합니다.
우리는 일부 유형(메타데이터 유형, 마커 유형, 관계 유형을 나타내는 마커)을 실제 엔터티 유형과 구분하기 위해 앞뒤에 두 개의 밑줄(__)을 사용하여 마커로 사용합니다.
여기서 기본 관계 유형은 RELATED이지만, 설명이나 시작 및 끝 노드의 유형에서 실제 관계 유형을 추론할 수도 있습니다.
__Entity____Document____Chunk____Community____Covariate__"""제약 조건 생성 메서드: 데이터베이스에 필요한 제약 조건을 생성합니다."""
def create_constraints(self):
statements = """
create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique;
create constraint document_id if not exists for (d:__Document__) require d.id is unique;
create constraint entity_id if not exists for (c:__Community__) require c.community is unique;
create constraint entity_id if not exists for (e:__Entity__) require e.id is unique;
create constraint entity_title if not exists for (e:__Entity__) require e.name is unique;
create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique;
create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique;
""".split(";")
for statement in statements:
if len((statement or "").strip()) > 0:
with self.driver.session(database=os.getenv("NEO4J_DATABASE")) as session:
session.run(statement)
문서에 대한 Parquet 파일을 로드하고, 각 문서의 ID를 사용하여 노드를 생성하며 제목 속성을 추가합니다. text_unit_ids를 저장할 필요는 없습니다. 왜냐하면 관계를 생성할 수 있고, 텍스트 내용이 청크에 포함되어 있기 때문입니다.
doc_df = pd.read_parquet(f'{GRAPHRAG_FOLDER}/create_final_documents.parquet', columns=["id", "title"])
doc_df.head(2)
import os
from dotenv import load_dotenv
from neo4j import GraphDatabase
import pandas as pd
from typing import Dict, Any
import time
class EnvironmentLoader:
@staticmethod
def load_env_variables():
load_dotenv()
return {
"OPENAI_API_KEY": os.getenv("GRAPHRAG_API_KEY"),
"NEO4J_URI": os.getenv("NEO4J_URI"),
"NEO4J_USER": os.getenv("NEO4J_USER"),
"NEO4J_PASSWORD": os.getenv("NEO4J_PASSWORD"),
"NEO4J_DATABASE": os.getenv("NEO4J_DATABASE")
}
class Neo4jClient:
"""초기화 메서드: 환경 변수를 로드하고 Neo4j 드라이버를 설정합니다."""
def __init__(self, config: Dict[str, Any]):
self.driver = GraphDatabase.driver(
config["NEO4J_URI"],
auth=(config["NEO4J_USER"], config["NEO4J_PASSWORD"])
)
self.database = os.getenv("NEO4J_DATABASE")
"""제약 조건 생성 메서드: 데이터베이스에 필요한 제약 조건을 생성합니다."""
def create_constraints(self):
statements = """
create constraint chunk_id if not exists for (c:__Chunk__) require c.id is unique;
create constraint document_id if not exists for (d:__Document__) require d.id is unique;
create constraint entity_id if not exists for (c:__Community__) require c.community is unique;
create constraint entity_id if not exists for (e:__Entity__) require e.id is unique;
create constraint entity_title if not exists for (e:__Entity__) require e.name is unique;
create constraint entity_title if not exists for (e:__Covariate__) require e.title is unique;
create constraint related_id if not exists for ()-[rel:RELATED]->() require rel.id is unique;
""".strip().split(";")
with self.driver.session(database=self.database) as session:
for statement in statements:
if statement.strip(): # 빈 문자열이 아닌 경우에만 실행
session.run(statement.strip())
"""DB 쿼리 메서드: Cypher 쿼리를 실행하고 결과를 DataFrame으로 반환합니다."""
def db_query(self, cypher: str, params: Dict[str, Any] = {}) -> pd.DataFrame:
with self.driver.session(database=self.database) as session:
result = session.run(cypher, parameters=params)
return result.to_df()
"""DB 업데이트 메서드: 데이터프레임을 배치로 Neo4j에 삽입합니다."""
def batched_import(self, statement: str, df: pd.DataFrame, batch_size: int = 1000) -> int:
total = len(df)
records = df.to_dict('records')
start_time = time.time()
with self.driver.session(database=self.database) as session:
for start in range(0, total, batch_size):
batch = records[start:min(start + batch_size, total)]
session.run("UNWIND $rows AS row " + statement, {"rows": batch})
elapsed_time = time.time() - start_time
print(f'Imported {total} rows in {elapsed_time:.2f} seconds.')
return total
def import_doc(self, doc_df: pd.DataFrame) -> int:
statement = """
MERGE (d:__Document__ {id: row.id})
SET d += row {.title}
"""
return self.batched_import(statement, doc_df)
def __del__(self):
if hasattr(self, 'driver'):
self.driver.close()
"""그래프 데이터를 Neo4j로 가져오는 클래스"""
class GraphRAGImporter:
GRAPHRAG_FOLDER = "/Users/jiyoon/Downloads/기술면접_KG/langGraphRAG_Neo/ragtest/output/20240904-215250/artifacts"
def __init__(self):
config = EnvironmentLoader.load_env_variables()
self.client = Neo4jClient(config)
def import_documents(self):
doc_df = pd.read_parquet(f'{self.GRAPHRAG_FOLDER}/create_final_documents.parquet', columns=["id", "title"])
return self.client.import_doc(doc_df)
# __Chunk__ 노드의 토큰 수와 노드 수를 반환하는 쿼리
def query_chunks(self):
query = "MATCH (n:__Chunk__) RETURN n.n_tokens as token_count, count(*) AS count"
return self.client.db_query(query)
def import_text_units(self):
text_df = pd.read_parquet(f'{self.GRAPHRAG_FOLDER}/create_final_text_units.parquet',
columns=["id","text","n_tokens","document_ids"])
statement = """
MERGE (c:__Chunk__ {id: row.id})
SET c += row {.text, .n_tokens}
WITH c, row
UNWIND row.document_ids AS document
MATCH (d:__Document__ {id: document})
MERGE (c)-[:PART_OF]->(d)
"""
return self.client.batched_import(statement, text_df)
def import_entities(self):
entity_df = pd.read_parquet(f'{self.GRAPHRAG_FOLDER}/create_final_entities.parquet',
columns=["name","type","description","human_readable_id","id","description_embedding","text_unit_ids"])
entity_statement = """
MERGE (e:__Entity__ {id:row.id})
SET e += row {.human_readable_id, .description, name:replace(row.name,'"','')}
WITH e, row
CALL db.create.setNodeVectorProperty(e, "description_embedding", row.description_embedding)
CALL apoc.create.addLabels(e, case when coalesce(row.type,"") = "" then [] else [apoc.text.upperCamelCase(replace(row.type,'"',''))] end) yield node
UNWIND row.text_unit_ids AS text_unit
MATCH (c:__Chunk__ {id:text_unit})
MERGE (c)-[:HAS_ENTITY]->(e)
"""
return self.client.batched_import(entity_statement, entity_df)
def import_relationships(self):
rel_df = pd.read_parquet(f'{self.GRAPHRAG_FOLDER}/create_final_relationships.parquet',
columns=["source","target","id","rank","weight","human_readable_id","description","text_unit_ids"])
rel_statement = """
MATCH (source:__Entity__ {name:replace(row.source,'"','')})
MATCH (target:__Entity__ {name:replace(row.target,'"','')})
// not necessary to merge on id as there is only one relationship per pair
MERGE (source)-[rel:RELATED {id: row.id}]->(target)
SET rel += row {.rank, .weight, .human_readable_id, .description, .text_unit_ids}
RETURN count(*) as createdRels
"""
return self.client.batched_import(rel_statement, rel_df)
def import_communities(self):
community_df = pd.read_parquet(f'{self.GRAPHRAG_FOLDER}/create_final_communities.parquet',
columns=["id","level","title","text_unit_ids","relationship_ids"])
statement = """
MERGE (c:__Community__ {community:row.id})
SET c += row {.level, .title}
WITH *
UNWIND row.relationship_ids as rel_id
MATCH (start:__Entity__)-[:RELATED {id:rel_id}]->(end:__Entity__)
MERGE (start)-[:IN_COMMUNITY]->(c)
MERGE (end)-[:IN_COMMUNITY]->(c)
RETURN count(distinct c) as createdCommunities
"""
return self.client.batched_import(statement, community_df)
def import_community_reports(self):
community_report_df = pd.read_parquet(f'{self.GRAPHRAG_FOLDER}/create_final_community_reports.parquet',
columns=["id","community","level","title","summary", "findings","rank","rank_explanation","full_content"])
community_statement = """
MERGE (c:__Community__ {community:row.community})
SET c += row {.level, .title, .rank, .rank_explanation, .full_content, .summary}
WITH c, row
UNWIND range(0, size(row.findings)-1) AS finding_idx
WITH c, row, finding_idx, row.findings[finding_idx] as finding
MERGE (c)-[:HAS_FINDING]->(f:Finding {id:finding_idx})
SET f += finding
"""
return self.client.batched_import(community_statement, community_report_df)
def import_covariates(self):
cov_df = pd.read_parquet(f'{self.GRAPHRAG_FOLDER}/create_final_covariates.parquet')
cov_statement = """
MERGE (c:__Covariate__ {id:row.id})
SET c += apoc.map.clean(row, ["text_unit_id", "document_ids", "n_tokens"], [NULL, ""])
WITH c, row
MATCH (ch:__Chunk__ {id: row.text_unit_id})
MERGE (ch)-[:HAS_COVARIATE]->(c)
"""
return self.client.batched_import(cov_statement, cov_df)
def main():
importer = GraphRAGImporter()
importer.client.create_constraints()
imported_count = importer.import_documents()
print(f"Imported {imported_count} documents")
# Importing other entities and relationships
importer.import_text_units()
importer.import_entities()
importer.import_relationships()
importer.import_communities()
importer.import_community_reports()
# importer.import_covariates()
result_df = importer.query_chunks()
print(result_df)
if __name__ == "__main__":
main()