인기검색어 도출 프로세스

서아로·2023년 12월 27일
0

사내에서 마지막 프로젝트로 ES기반의 검색엔진 구축을 진행하였고 프로젝트 기록용으로 블로그에 작성하려고 한다. 오늘은 여러 파트 중 인기검색어 관해 얘기해보려고 한다.

현재 인기검색어는 두가지로 분리된다.

  1. 순수하게 사용자들이 검색한 것을 통계 낸 것
  2. 제외검색어와 상위고정 검색어를 필터링하여 인기검색어를 통계 낸것

이 중 1번에 대해 인기검색어를 어떻게 추출하고 엘라스틱서치에 색인하는지 알아보자.

로그파일에 찍히는 정보들 중 INFO 로그를 가져와 필터링 하는 부분이 필요하여 로그스태시를 이용하여 log파일을 필터링하고 엘라스틱서치에 적재한다.

logstash

input {
  file {
    path => "/home/ec2-user/logs/*.log"
    start_position => "end"
  }
}
filter {
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:log_timestamp} \[INFO\] - \{word: %{DATA:word}, total: %{NUMBER:total}, category: %{WORD:category}\}"
    }
  }
  date {
    match => [ "log_timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
    target => "@timestamp"
  }
}


output {
  # Elasticsearch에 전송
  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "elastic"
     index => "search-query-log--%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}

설명

pathLogstash가 데이터를 읽을 로그 파일을 선택
start_position로그 파일을 처음 읽을 때 Logstash가 파일의 어느 위치에서부터 데이터를 읽을지를 지정
input {
  file {
    path => "/home/ec2-user/logs/*.log"
    start_position => "end"
  }
}

[filter]

filter로그 데이터를 파싱하고 필요한 필드를 추출
grokLogstash에서 로그 데이터를 구문 분석하고 필드로 추출하기 위해 사용되는 패턴 매칭 도구
💡 ‘grok’필터를 사용하여 로그 메시지를 정규 표현식으로 분석하고 ‘date’필터를 사용하여 ‘log_timestamp’ 필드에서 날짜 및 시간 정보를 추출하여 ‘@timestamp’ 필드에 저장
filter {
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:log_timestamp} \[INFO\] - \{word: %{DATA:word}, total: %{NUMBER:total}, category: %{WORD:category}\}"
    }
  }
  date {
    match => [ "log_timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
    target => "@timestamp"
  }
}

[output]

elasticsearchelasticsearch 출력 설정
hostselasticsearch 클러스터에 연결한 호스트를 지정
indexelasticsearch에 전송할 데이터의 인덱스 이름을 지정
stdoutLogstash가 처리한 데이터를 터미널에 출력하는데 사용

💡 Elasticsearch로 데이터를 전송하고, 동시에 터미널에 데이터를 출력한다. index 부분에서
index ⇒ “search-query-log—%{+YYYY.MM.dd}” 는 날짜 형식을 사용하여 현재 날짜를 기반으로 동적으로 인덱스 이름을 생성하는데 사용. 예를 들어, 오늘 날짜가 “2023-10-05”이면 인덱스 이름은 “search-query-log—2023.10.05”가 된다.

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "elastic"
    password => "elastic"
    index => "search-query-log--%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}

logstash를 이용하여 elasticsearch의 search-query-log—{해당 일자}로 인덱싱이 된다.

인기검색어를 도출하기 위한 데이터 추출 과정

“search-query-log” 인덱스 접근 후 “top_query_temp” 템플릿 사용하여 특정 기간별 인기검색어 도출

  • top_query_temp 템플릿
    "top_query_temp" : {
            "lang" : "mustache",
            "source" : """{"query":{"bool":{"must_not":[{"exists":{"field":"tags"}}],"filter":{"range":{"@timestamp":{"gte":"{{from}}","lte":"{{to}}"}}}}},"size":0,"aggs":{"word_aggregation":{"terms":{"field":"word.keyword","size":"{{size}}"}}}}""",
            "options" : {
              "content_type" : "application/json; charset=UTF-8"
            }
          }

popular.py

from fastapi import APIRouter, HTTPException
from engine.elasticsearch_client import es
from elasticsearch.exceptions import ConnectionError, RequestError
from datetime import datetime

router = APIRouter()

def validate_date(date_text):
    try:
        datetime.strptime(date_text, '%Y-%m-%d')
        return True
    except ValueError:
        return False

# 제외검색어와 우선순위가 적용되어있지 않은 순수 top query 
@router.get('/popular')
async def popular(size:int, from_date:str, to:str):
    if not validate_date(from_date) or not validate_date(to):
        raise HTTPException(status_code=400, detail="Invalid date format. Use 'YYYY-MM-DD' format.")
    try:
        if size <= 0:
            raise ValueError("Size parameter must be a positive integer")
        results = es.search_template(
        index="search-query-log-*",
        body={"id": "top_query_temp",
              "params":{
                  "from": from_date,
                  "to": to,
                  "size":size
              }}
        )

        popular_keywords=[]
        for i in range(0, len(results['aggregations']['word_aggregation']['buckets'])):
            popular_keywords.append({'keyword':results['aggregations']['word_aggregation']['buckets'][i]['key'], 'priority': i})

        resp = popular_keywords[0:10]

        return resp
    except ValueError as ve:
        raise HTTPException(status_code=400, detail=str(ve))
    except ConnectionError as ce:
        raise HTTPException(status_code=500, detail="Elasticsearch Connection Error")
    except RequestError as re:
        raise HTTPException(status_code=500, detail="Elasticsearch Request Error")
    except Exception as e:
        raise HTTPException(status_code=500, detail="Internal Server Error")

0개의 댓글