⚡ HBase의 region
- HBase의 region은 여러개의 row key 범위를 나타내는 단위이며 HBase에서는 데이터를 저장할 때 row key를 기준으로 파티셔닝을 수행하며, 각 파티션이 region으로 나뉘어진다.
- region은 서로 겹치지 않는 연속적인 row key 범위를 가지고 각 region은 메모리와 디스크에 캐시되며, 사용자는 region 수준에서 데이터를 스캔하거나 읽고 쓸 수 있다.
Server | Language |
---|---|
Centos7 | Python3.7 |
import concurrent.futures
import time
import happybase
from datetime import datetime
import random
import socket
import sys
socket.setdefaulttimeout(10)
# HBase 정보
HOST = ["host1", "host2"] # Host
table_name = "테이블 이름" # 테이블 이름
dt = datetime(2020, 1, 1) # 예) 2022년 1월 1일 이전의 데이터를 삭제
target_timestamp = int(dt.timestamp() * 1000)
scan_batch_size = 1000 # 한 번에 스캔할 데이터 수를 설정
# HBase Connect
def getHConnection(num=None):
while 1:
try:
if num == None:
num = random.randint(0, len(HOST) - 1)
server = HOST[num]
connection = happybase.Connection(server, port=9090, timeout=60000)
connection.open()
return connection
except Exception as msg:
print("error:", msg)
time.sleep(1)
num = None
# start_row와, end_row를 받아 범위 내의 timestamp 이하의 데이터 삭제
def delete_data(worker_id, start_row, end_row):
print("현재 region: ",worker_id)
st_row = start_row
while True:
connection = getHConnection()
table = connection.table(table_name)
row_list = []
last_row_key = None
for row_key, data in table.scan(row_start=st_row, row_stop=end_row, include_timestamp=True):
for column, (value, timestamp) in data.items():
if timestamp < target_timestamp:
row_list.append(row_key)
break
else:
break
last_row_key = row_key
if len(row_list) >= scan_batch_size:
break
for row_key in row_list:
table.delete(row_key)
print(f"Deleted row_key: {row_key}")
if last_row_key:
st_row = last_row_key # 체크포인트를 마지막 row_key로 설정
else:
break # 스캔이 완료되면 루프를 종료
connection.close()
time.sleep(1) # 연결이 끊어진 후 다음 스캔을 시작하기 전에 일시 중지
if __name__ == '__main__':
start_row = None # 초기 시작 행을 None으로 설정
end_row = None # end_row 초기화
# 리전 정보 가져오기
connection = getHConnection()
table = connection.table(table_name)
regions = connection.client.getTableRegions(table.name) # 해당 테이블의 모든 리전 정보를 가져온다.
connection.close()
# 리전 수 별로 멀티 스레딩
num_processes = len(regions) # 이 부분이 중요!!!!!
print("regions number:", num_processes) # 리전 갯수 출력!
with concurrent.futures.ThreadPoolExecutor(max_workers=num_processes) as executor:
# 각 리전의 start row 와 end row 구하기
for i, region in enumerate(regions):
start_row = region.startKey
end_row = region.endKey
try:
future = executor.submit(delete_data, i, start_row, end_row)
except Exception as msg:
print("error:", msg)
sys.exit()
getHConnection
: 주어진 호스트 중 하나에 연결을 시도하고, 성공한 경우 HappyBase Connection 객체를 반환, 연결에 실패하면 다른 호스트로 시도한다.dt = datetime(2022, 1, 1)
: 삭제할 날짜를 지정한다. 이 예에서는 2022년 1월 1일 이전의 데이터를 삭제한다.scan_batch_size, start_row
: 한 번에 스캔할 데이터 수를 지정하고, 초기 시작 행을 None으로 설정for row_key, data in table.scan(row_start=start_row, include_timestamp=True)
table.delete(row_key)
: row_list에 저장된 행 키에 대해 삭제 작업을 수행하고, 삭제된 행 키를 출력한다.if __name__ == '__main__'
regions = connection.client.getTableRegions(table.name)
: HBase 서버에 연결하고 테이블의 리전 정보를 가져온다.num_processes = len(regions)
: 멀티 스레딩을 실행시킬 갯수로 만약 이대로 실행을 시키면 하나의 thread에 하나의 region이 할당되어 실행된다. 하지만 이 말은 만약 데이터양이 많아 ex) 8000개의 region 갯수하면 스레드도 8000개가 실행됨
스레드가 너무 많이 실행된다면 서버에 부담이되어 timeout을 내뱉을수도 있으니 적당히 한번에 실행시킬 스레드의 갯수를 조절해야한다. (필자는 100개의 스레드를 실행하도록 맞췄다.) 1. ProcessPoolExecutor
- 멀티 프로세싱을 지원한다. 여러 개의 독립적인 프로세스를 생성하여 작업을 병렬로 실행한다.
- 각 프로세스는 독립적인 메모리 공간을 가진다. 따라서 전역 인터프리터 락(Global Interpreter Lock, GIL)에 영향을 받지 않으며, CPU 바운드 작업에서 성능 향상을 얻을 수 있다.
- 프로세스간 통신은 피클링(pickling)을 통해 진행, 작업에 전달되는 인자와 결과는 피클링 가능해야 한다.
- 작업을 생성하고 관리하는 오버헤드가 상대적으로 크기 때문에, I/O 바운드 작업에는 적합하지 않을 수 있다.
2. ThreadPoolExecutor
- 멀티 스레딩을 지원한다. 하나의 프로세스 내에서 여러 개의 스레드를 생성하여 작업을 병렬로 실행한다.
- 모든 스레드는 같은 메모리 공간을 공유한다. 전역 인터프리터 락(GIL) 때문에, 한 번에 하나의 스레드만 실행될 수 있어, CPU 바운드 작업에서 성능 향상을 기대하기 어렵다.
- 작업을 생성하고 관리하는 오버헤드가 상대적으로 작기 때문에, I/O 바운드 작업에 더 적합하다.
- 스레드간 통신이 별도의 직렬화 과정이 필요하지 않는다. 작업에 전달되는 인자와 결과가 피클링 가능할 필요가 없습니다.
1. I/O 바운드 (I/O-bound)
- I/O 바운드는 프로그램의 실행 성능이 입력/출력(I/O) 작업, 예를 들어 디스크, 네트워크, 사용자 입력 등에 의해 제한되는 경우를 의미한다.
- I/O 바운드 작업에서는 대부분의 시간이 데이터를 주고받는 데 사용되며, CPU의 처리 능력이 프로그램 성능에 큰 영향을 미치지 않는다.
- 이러한 작업에서는 CPU가 대기 상태로 많은 시간을 보내게 되어, 다른 작업을 처리할 수 있는 여유가 생긴다.
- I/O 바운드 작업을 최적화하는 일반적인 방법 중 하나는 비동기 처리나 병렬 처리를 사용하여 I/O 작업을 최소화하는 것이다.
- I/O 바운드의 주된 제약 요소는 네트워크와 디스크I/O 성능
2. CPU 바운드 (CPU-bound)
- CPU 바운드는 프로그램의 실행 성능이 CPU의 처리 능력에 의해 제한되는 경우를 의미한다.
- CPU 바운드 작업에서는 대부분의 시간이 CPU가 계산을 수행하는 데 사용되며, 입력/출력 작업이 병목 현상을 일으키지 않는다.
- CPU 바운드 작업의 성능은 CPU의 처리 속도와 코어 수에 크게 영향을 받는다.
- CPU 바운드 작업을 최적화하는 일반적인 방법 중 하나는 알고리즘을 개선하거나, 병렬 처리를 사용하여 여러 CPU 코어에서 동시에 실행되도록 하는 것이다.
- CPU 바운드 작업의 주된 제약 요소는 CPU 자체의 처리 능력이며, 메모리 성능이 큰 영향을 미치지는 않는다.
간단히 말해, GIL이 있더라도 I/O 바운드 작업에서는 스레드를 효과적으로 사용할 수 있지만, CPU 바운드 작업에서는 성능 제약이 있다. 이 때문에 CPU 바운드 작업에서는 멀티 프로세스를 사용하는 것이 좋다.
$ nohup python 스크립트.py &