아쉽게도 대부분 ai 모델이 위와 같이 고급(?)기능을 제공하는 경우 유료임.
네이버 조차 정식 서비스 launching 후 신청해야
데모가 아닌 실제 활용가능한 모델 쓸 수 있었음..
매번 테스트 앱 키고 끄는게 번거로워 작동 테스트만 진행후 종료
프롬프트를 받아와서 이미 한번 model에서 promt 에 맞게 작동함
ai 모델이 생각보다 양질의 데이터를 프롬프트를 써도 잘뽑아내진 못한다고 한다 (현직자 말씀)
전처리를 해도 + 후처리를 통해 한번더 필터링하는 듯.
(이게 fine tuning인가)
그룹 아이디 추가하는 이유 : 컨슈머 그룹 내에서 컨슈머 간 메시지 분배 관리
ERR ) consumer가 offset을 못 읽는 에러 발생.
for topic_partition in self.consumer.assignment():
current_offset = self.consumer.position(topic_partition)
end_offset = self.consumer.end_offsets([topic_partition])
if current_offset < end_offset:
all_offsets_reached = False
break
if all_offsets_reached:
self.logger.info("모든 메시지 처리 완료: poll loop 종료")
break
else:
self.logger.info("poll 대기 중")
await asyncio.sleep(1)
continue
all_offsets_reached = True
auto.offset.reset = earliest(처음부터 읽는다) / latest/ none
모든 조건 통과시 메시지 전달 (polling time , message data가 실제할 경우등)
중복이 아닌 메시지만 처리 예약(목표)
topic, record.offset 두 키로 현재 처리 중인 메시지 기록 후 중복 메시지 스킵
OrderDict로 처리 순서 유지하며 관리
record.value를 직접 task에 append 시 processed_offset에서 중복 확인 어려움
중복 여부는 topic pattern, offset 기준으로 확인
record.value로는 중복 확인 어렵고, 처리 완료 후 커밋할 오프셋 특정 어려움 (value 만 나오므로)
consumer commit 메시지와 처리된 레코드(pre_record) 간 정확한 일치 불가능
비동기 작업과 프로듀서 작동 간 타이밍 문제 발생 가능성 높음 (asyncio, kafkaproduce간)
process_offsets 크기 관리 실패 시 메모리 누수 가능성 있음
이러한 이유로 별도의 method 필요
logger에 디버그 용 코드 추가 exc_info=True
async def process_record(self, data):
try:
value =data.value
self.logger.info(f"Processing record: {value}")
await asyncio.sleep(0.1) 처리 시간 예제
return value
except Exception as e:
self.logger.error(f"Error processing record: {e}")
return None
코루틴 객체를 프로듀서에 전달
task는 튜플임으로 언패킹 연산자 이용하여 개별인수로 분리후 함수에 전달
1. create_tasks
사용해보니 create_tasks 사용시 중복관리 세밀한 디버그 가능 대신 예외처리및 각 코드가 길어짐
create_tasks 사용시 중복관리 세밀한 디버그 가능 대신 예외처리및 각 코드가 길어짐
특징
장점
단점
tasks = [self.process_record(record) for record in records]
results = await asyncio.gather(tasks)
특징
장점
단점
Gather , unpacking comprehension 사용
Orderdict :
Output
([('key','value'),('key','value'),('key','value')])