HBase 데이터 삭제

Q·2023년 4월 14일
0

✅ 개요

  • 운영하는 하둡 클러스터에 디스크가 모두 90% 이상 차는 일이 발생했다.
  • 따라서 과거 데이터 부터 삭제, 각 서버 용량이 80%가 확보되는 시점까지 데이터를 삭제를 하도록 지시가 내려졌으면 데이터 삭제후 노드를 살펴보고 디스크 교체해야하는 노드들 산정하기로 했다.
  • 대규모의 데이터를 삭제해야하고 과거 데이터 부터 삭제해야하므로 HBase의 shell보다는 스크립트를 작성하여 작업을 진행하였으며 Timestamp를 기준으로 데이터를 삭제하였다.

✅ 사용한 Server 및 언어

ServerLanguage
Centos7Python3.7

✅ Python 설치

✅ happybase 설치

pip install happybase

happybase?

  • HappyBase는 Apache HBase의 사용자 친화적인 파이썬 클라이언트 라이브러리

happyBase의 주요 특징

  • 사용자 친화적인 API : HappyBase는 직관적이고 사용하기 쉬운 API를 제공하여, 개발자가 HBase의 핵심 기능에 쉽게 접근할 수 있다.
  • 커넥션 풀링 : HappyBase는 커넥션 풀링을 지원하므로, 여러 개의 HBase 클라이언트를 사용하여 동시에 연결하고 작업을 수행할 수 있다.
  • 자동 시리얼라이제이션 및 역시리얼라이제이션 : HappyBase는 데이터를 저장하거나 조회할 때 자동으로 시리얼라이제이션 및 역시리얼라이제이션을 처리하여, 개발자가 각 데이터 유형에 대해 별도의 변환 작업을 수행할 필요가 없다.
  • 배치 작업 지원 : HappyBase는 HBase의 배치 작업을 지원하여, 여러 개의 데이터를 한 번에 읽거나 쓸 수 있어 효율성과 성능을 높일수 있다.
  • 데이터 모델 추상화 : HappyBase는 HBase의 로우 키, 컬럼 패밀리, 컬럼, 타임스탬프 등의 데이터 모델 개념을 추상화하여, 간단한 코드로 데이터를 저장하고 관리할 수 있다.
  • 파이썬 2와 3 모두 지원 : HappyBase는 파이썬 2와 3 모두를 지원하여, 여러 버전의 파이썬 프로젝트에 적용할 수 있다.

✅ HBase Thrift Server start

cd path to/hbase/bin
./hbase-daemon.sh start thrift
  • HBase Thrift Server는 Apache HBase의 Thrift 기반 서버로, Thrift 프로토콜을 사용해 HBase를 사용할 수 있는 다양한 프로그래밍 언어로 작성된 클라이언트와 통신할 수 있게 해준다.

  • Thrift는 Facebook에서 개발한 고성능, 확장 가능한 소프트웨어 프레임워크로서, 서로 다른 프로그래밍 언어로 작성된 서비스 간의 통신을 가능하게 해주는 인터페이스 정의 언어(IDL)와 바이너리 통신 프로토콜을 제공

  • 이를 통해 HBase를 사용하는 클라이언트 애플리케이션은 자바 뿐만 아니라 파이썬, Ruby, PHP, C++ 등 여러 프로그래밍 언어로 개발할 수 있다.

✅ Python Code

  • HBase 테이블에서 특정 시간 이전의 데이터를 삭제하는 스크립트 작성
import time
import happybase
from datetime import datetime
import random
import socket

socket.setdefaulttimeout(10)

HOST = ["host1", "host2"]

def getHConnection(num=None):
    while 1:
        try:
            if num == None:
                num = random.randint(0, len(HOST) - 1)
            server = HOST[num]
            connection = happybase.Connection(server, port=9090, timeout=30000)
            connection.open()
            return connection
        except Exception as msg:
            print("error:", msg)
            time.sleep(1)
            num = None

dt = datetime(2022, 1, 1) # 예) 2022년 1월 1일 이전의 데이터를 삭제
target_timestamp = int(dt.timestamp() * 1000)

scan_batch_size = 1000  # 한 번에 스캔할 데이터 수를 설정
start_row = None  # 초기 시작 행을 None으로 설정

row_list = []

while True:
    connection = getHConnection()
    table = connection.table("테이블 이름")

    row_count = 0
    last_row_key = None

    for row_key, data in table.scan(row_start=start_row, include_timestamp=True):
        for column, (value, timestamp) in data.items():
            if timestamp < target_timestamp:
                row_list.append(row_key)
                break
            else:
                break

        row_count += 1
        last_row_key = row_key

        if row_count >= scan_batch_size:
            break

    for row_key in row_list:
        table.delete(row_key)
        print(f"Deleted row_key: {row_key}")

    if last_row_key:
        start_row = last_row_key  # 체크포인트를 마지막 row_key로 설정
    else:
        break  # 스캔이 완료되면 루프를 종료

    row_list = []
    connection.close()
    time.sleep(1)  # 연결이 끊어진 후 다음 스캔을 시작하기 전에 일시 중지
  • getHConnection: 주어진 호스트 중 하나에 연결을 시도하고, 성공한 경우 HappyBase Connection 객체를 반환, 연결에 실패하면 다른 호스트로 시도한다.
  • dt = datetime(2022, 1, 1): 삭제할 날짜를 지정한다. 이 예에서는 2022년 1월 1일 이전의 데이터를 삭제한다.
  • scan_batch_size, start_row: 한 번에 스캔할 데이터 수를 지정하고, 초기 시작 행을 None으로 설정
  • for row_key, data in table.scan(row_start=start_row, include_timestamp=True)
    • for문을 사용하여 테이블을 스캔하고, 특정 시간 이전의 데이터를 찾는다. 찾은 데이터의 행 키를 row_list에 추가한다.
    • 스캔이 끝날 때까지 행을 스캔하고, scan_batch_size에 도달하면 루프를 멈춘다.(1000개씩 데이터를 처리하기 위함)
  • table.delete(row_key): row_list에 저장된 행 키에 대해 삭제 작업을 수행하고, 삭제된 행 키를 출력한다.
  • 스캔이 끝나지 않았다면, 마지막으로 스캔한 행 키를 시작 행으로 설정하고, 다음 스캔을 시작하기 전에 1초간 대기한다.

✅ python 백그라운드 실행

$ nohup python 스크립트.py &

✅ 트러블 슈팅

  • 적절한 데이터 처리량을 찾는 것이 너무 힘들었다ㅠㅠ
  • 작은 배치로 시작하여 더 큰 배치로 조정해 가면서 적절한 데이터처리양을 찾아야하는데 나는 2000에서 커넥션이 끊기길래 그의 반인 1000으로 설정하여 데이터를 처리했다.
  • 1000개 단위로 HBase connection을 다시 맺어주며 full scan을 하여 데이터를 삭제했다.
  • 여기서 가장 중요했던건 connection을 끊고 다시 맺어주어도 그 시점부터 scan이 이어져야 하는데 난 이걸 row_key로 해결했다.
  • row_key를 체크포인트와 같은 개념으로 계속 갱신시켜서 해당 row_key 부터 scan을 시작하게끔 만들어 끝까지 데이터를 처리하였다.
profile
Data Engineer

0개의 댓글