지난 스택/큐 구조에 이어 이제는 힙 구조에 대해 알아볼 차례이다 !
힙은 특정한 규칙(최대 or 최소)을 가지는 완전이진트리로 "전체 정렬된 트리"가 아니라
"부분 정렬된 트리의 집합" 구조를 말한다.
트리 전체가 아닌, 부모와 자식간의 관계에 대해서만 특정한 규칙을 확인하는 구조로 간단한 구현도를 가지게 된다.

만일 주어진 조건이 "부모는 자식보다 언제나 크다" 라면 자식들 간의 대소는 모르겠지만 최상위 루트 노드의 값은 트리 내 누구보다 큰 최댓값을 가지게 된다. 어찌됐든 큰 수는 계속해서 위로 올라갈 것이기에.
반대로 "부모는 자식보다 언제나 작다"라는 조건이 주어진다면 최상위 루트 노드의 값은 트리 내 누구보다 작은 최솟값을 가지게 될 것이다.
이를 통해 전체 자료 중 오로지 "최댓값" 혹은 "최솟값"에만 특화된 희귀한 자료 구조를 생성해 낼 수 있는 것이다.
이러한 힙 구조를 어떻게 구현할 수 있을까 ?
1) 리스트 이용
역시나 가장 기본적인 방법으로 파이썬의 리스트만을 이용한 구현 방식이 존재한다.
주어진 트리를 완전이진트리라고 가정, 루트 노드를 리스트의 0번 째 값으로 시작하여 각각의 노드가 리스트를 일렬로 채워나간 상태라고 생각해보면 된다.
예를 들어 위 사진을 리스트로 나타내면 [35, 15, 20, 5, 7, 13] 처럼 나타낼 수 있는 것이다.
이것만을 이용하여 추가적인 처리를 통해 데이터의 추출과 삽입을 구현할 수도 있지만... 그 자세한 방법은 아래와 동일하므로 아래에서 확인하자.
2) 클래스 이용
두 번째 방법은 바로 힙 구조의 개념을 클래스로 그대로 나타내는 것이다. 파이썬의 클래스는 모든 것을 나타낼 수 있기에...
import math
class Node:
def __init__(self, value):
self.value = value
class Heap:
"""
index : 완전 이진트리
value : 우선순위 값
"""
def __init__(self):
self.heapq = [] # 실제 힙 구조
self.is_max = True # 최대 힙 여부
self.debug = True # 디버깅 여부
우선 기본이 되는 각각의 노드와 힙 트리를 정의해주었다.
이후 힙 트리에 노드를 추가하는 경우에는 완전이진트리를 유지하며 내부 리스트의 가장 마지막에 추가, 자신의 부모와의 비교를 통해 조건을 만족하지 않을 때까지 부모와의 스위칭 과정을 겪도록 만들어주면 된다.
자식이 두 개로 고정된 완전이진트리 구조이기에 자신의 부모는 (index-1)//2 를 통해 찾아갈 수 있다는 점이 재밌는 점이다.
def rearrange_upward(self, node, index):
# if self.debug:
# print(index, node.value, [v.value for v in self.heapq])
if index <= 0:
return
if node.value > self.heapq[(index-1)//2].value:
tmp_node = self.heapq[(index-1)//2]
self.heapq[(index-1)//2] = node
self.heapq[index] = tmp_node
self.rearrange_upward(node, (index-1)//2) # 그 윗 부모와 또 다시 비교
else:
return
def add_node(self, node):
self.heapq.append(node)
self.rearrange_upward(node, len(self)-1)
보다시피 지금 구현된 힙 구조는 "부모는 자식보다 항상 크다"라는 조건을 달고 있다. 즉, 최댓값을 뽑아내기 위한 힙 구조라는 뜻.
그렇다면 최댓값을 뽑아내는 경우에는 어떻게 구현될까 ?
내부 리스트의 가장 첫 번째 요소가 최댓값으로 고정되어 있으므로 이를 뽑아낸 뒤, 자신의 두 자식 중 더 큰 자식을 끌어올리는 작업을 반복해나가면 된다.
이는 자신의 두 자식의 인덱스가 각각 index*2+1과 index*2+2로 고정되어 있기에 간단히 구현이 가능하다.
def rearrange_downward(self, index):
if index*2+1 >= len(self): # 자식 노드 0개
# if self.debug:
# print("self: ", self)
# print("index: ",index)
self.heapq.pop(index) # 가장 하위 레벨까지 내려간 뒤 요소가 삭제 돼
return
elif (index+1)*2 >= len(self): # 자식 노드 1개
if self.debug:
print(f"변경된 노드 값 : {self.heapq[index].value}, {self.heapq[(index)*2-1].value}")
self.heapq[index] = self.heapq[(index)*2-1]
self.rearrange_downward(index*2+1)
else:
if self.heapq[index*2+1].value >= self.heapq[(index+1)*2].value:
if self.debug:
print(f"변경된 노드 값 : {self.heapq[index].value}, {self.heapq[index*2+1].value}")
self.heapq[index] = self.heapq[index*2+1]
self.rearrange_downward(index*2+1)
else:
if self.debug:
print(f"변경된 노드 값 : {self.heapq[index].value}, {self.heapq[(index+1)*2].value}")
self.heapq[index] = self.heapq[(index+1)*2]
self.rearrange_downward((index+1)*2)
def pop_node(self):
if len(self) <= 0:
return "더 이상 뺴낼 노드가 없습니다. 바보야."
target_node = self.heapq[0]
print(f"추출된 노드값 : {target_node.value}")
self.rearrange_downward(0)
return target_node
코드를 작성할 때는 눈치채지 못했는데... 내부 리스트의 "가장 마지막"에 확정적으로 추가하는 요소 삽입과 다르게 요소 삭제의 경우에는 가장 하위 레벨의 어느 위치의 요소가 끌려 올라올 지 모른다는 점이 존재했다.
내부 리스트로 구현된 힙 트리는 항상 완전이진트리를 만족할 수 밖에 없으므로 만일 가장 하위 레벨의 앞 부분 자식이 끌려 올라간다면 뒤이은 동일 레벨 자식들은 한 칸 씩 당겨질 것이고 부모와의 규칙이 깨지게 될 것이다....
별도로 구현하지는 않겠지만 추가적으로 뒤이은 동일 레벨 자식들에 대해 rearrange_upward() 를 다시 시켜주는 작업을 하게 되면 시간적인 소요가 커 보이지만 시간이 지남에 따라 힙 구조 자체가 부분이 아닌 전체 정렬에 가까운 모습이 될 것이므로 무리없는 구현이 될 듯 싶다.
이제 결과를 한 번 살펴보자.
def __len__(self):
return len(self.heapq)
def __str__(self):
answer = "Heap Tree : "
idx = 0
max_lev = math.floor(math.log2(len(self)))
lev = 0
while idx <= len(self):
splitter = "," + " "*(2**(max_lev-lev)-1)
answer += "\n" + " "*(2**(max_lev-lev)-1) + splitter.join([str(v.value).rjust(3, " ") for v in self.heapq[idx:idx+2**(lev)]])
idx = idx+2**lev
lev += 1
return answer
---------------------------------------------------------------------------------------------
import random
class Gen:
def __init__(self, n):
self.n = n
def __iter__(self):
for _ in range(self.n):
yield random.randint(1, 100)
gen = Gen(30)
heap = Heap()
# 힙 구조 생성
for value in gen:
node = Node(value)
heap.add_node(node)
print(heap)
>>> Heap Tree :
100
91, 97
90, 83, 89, 96
60, 71, 82, 48, 71, 83, 71, 51
8, 33, 9, 49, 29, 63, 16, 20, 6, 51, 34, 47, 36, 64, 40
한 노드의 글자 수 크기를 임의로 4로 설정, 레벨이 높아짐에 따라 시작 공백과 사이 공백을 두 배로 높여감으로써 힙 구조를 시각적으로 표현할 수 있었다.
이제 요소 추출과 요소 삽입 시 힙 구조가 어떻게 작동하는지 테스트해보자.
# 노드 추출 (최댓값)
for _ in range(1):
print(f"노드 추출 {_+1}번 째")
heap.pop_node()
print(heap)
>>> 노드 추출 1번 째
추출된 노드값 : 100
변경된 노드 값 : 100, 97
변경된 노드 값 : 97, 96
변경된 노드 값 : 96, 71
변경된 노드 값 : 71, 64
Heap Tree :
97
91, 96
90, 83, 89, 71
60, 71, 82, 48, 71, 83, 64, 51
8, 33, 9, 49, 29, 63, 16, 20, 6, 51, 34, 47, 36, 40
# 노드 삽입 (랜덤값)
for _ in range(1):
value = random.randint(1, 100)
print(f"삽입된 노드 값 {value}")
node = Node(value)
heap.add_node(node)
print(heap)
>>> 삽입된 노드 값 76
Heap Tree :
97
91, 96
90, 83, 89, 76
60, 71, 82, 48, 71, 83, 64, 71
8, 33, 9, 49, 29, 63, 16, 20, 6, 51, 34, 47, 36, 40, 51
노드 추출 시 최상위 루트 노드의 값 100이 뽑히고 자식들의 지속적인 비교를 통해 내부 자체 정렬이 됨을 알 수 있었다.
이에 반해 노드 삽입 시 가장 마지막 요소로 추가된 뒤 부모 96보다 작을 때까지 올라간 76 을 확인할 수 있었다.
3) Heapq 패키지 이용
파이썬의 기본 자료형인 리스트, 딕셔너리로 구현이 가능한 스택/큐/해시 구조와 다르게 힙 구조의 경우 리스트를 이용하기만 할 뿐 별도의 처리가 필요하기에 파이썬은 기본 패키지 heapq로 이를 제공해준다.
heapq 패키지는 별도의 자료 구조가 아닌, 일반 리스트를 힙 구조에 맞게 처리해주는 역할이다.
collections에서 본 여타 클래스와 다르게 heapq 패키지는 리스트를 입력으로 받아 원본을 수정해주는 매커니즘으로 작동한다.
import heapq
# heapq 자료구조 생성
## heapify 메소드를 이용하여 기존 리스트를 초기 정렬시킬 수 있다.
## 기본은 ascending, 최솟값, 만일 최댓값이 필요하다면 음수화하여 자료구조를 만들어야 한다.
print("자료 구조 생성")
lst = [1, 4, 3, 2, 7, 34, 23, -234, 14, -3, -6, -36]
lst = [1.0, 2.0, -1.0]
lst = [[1, 2, 3], [0, 4, 5]]
print(lst)
heap = heapq.heapify(lst) # 반환 안 돼. inplace가 기본이야.
print(lst, heap)
>>> 자료 구조 생성
[[1, 2, 3], [0, 4, 5]]
[[0, 4, 5], [1, 2, 3]] None
# 원소 제거
print("\n원소 제거")
print(heapq.heappop(lst))
print(lst)
>>> 원소 제거
[0, 4, 5]
[[1, 2, 3]]
# 원소 삽입
print("\n원소 삽입")
heapq.heappush(lst, [1, 8, 9])
print(lst)
>>> 원소 삽입
[[1, 2, 3], [1, 8, 9]]
큰 틀은 일반 리스트를 힙 구조로 바꾸어주는 heapq.heapify(), 원소 삽입의 heapq.heappush(), 원소 추출의 heapq.heappop() 세 가지로 이루어져 있다.
heapq.heapify()은 본래 리스트를 변환할 뿐 반환값으로는 None을 내놓는걸 확인할 수 있다.
또한, 힙 구조는 실수나 정수값 뿐 아니라 리스트 역시 받기에 이를 이용해 다중 우선순위 큐 구조를 구현할 수 있으며 각 요소에 임의로 마이너스 부호를 붙여 최댓값 힙 구조를 구현할 수 있다.
추가적인 응용으로는 양방향, 즉 최솟값과 최댓값이 동시에 필요한 이중우선순위 큐 구조가 존재한다.
이중우선순위 큐 구조는 최솟값 힙 구조와 최댓값 힙 구조를 각각 만들어 이들을 동기화함으로써 구현이 가능하다.
동기화란 ? 하나의 요소를 각각의 힙 구조에 넣고 필요에 따라 원소를 추출, 고유한 ID의 해시 테이블을 만들어 "visited 여부"를 관리하는 것이다. 마지막 문제 예시에서 확인해보자.
1) 더 맵게 Lv.2
각 음식을 섞어 모든 음식의 스코빌 지수가 특정 수치 이상이 되도록 만드는 문제이다.
# 더 맵게 - 기존 풀이 - 시간 초과
def solution_bad(scoville, K):
answer = 0
foods = sorted([s for s in scoville], reverse=True)
while foods and len(foods) > 1 and foods[-1] < K:
food = foods.pop() + foods.pop()*2
idx = len(foods)-1
answer += 1
while True:
if idx < 0 or food <= foods[idx]:
foods.insert(idx+1, food)
break
else:
idx -= 1
if len(foods) == 1 and foods[0] < K:
return -1
return answer
solution_bad([1, 2, 3, 9, 10, 12], 7) # 2번
>>> 2
파이썬의 리스트를 이용, 가장 작은 두 요소를 섞고 적합한 위치 idx를 찾아 새 음식을 넣어주는 방식을 사용한 첫 풀이이다.
결과는 시간 초과. 100만 개의 음식이 주어지는 환경에서 위 방법으로는 전혀 효율성을 낼 수 없었다.
시간 복잡도를 생각해보면 idx 를 찾는데에 O(N), 이후 연속된 메모리인 리스트의 특성 상 나머지 요소를 뒤로 밀어야하기에 insert(idx+1, food) 삽입에 O(N)이 소요되기 때문이다.
# 더 맵게 - 보완 풀이
import heapq
def solution_good(scoville, K):
before = len(scoville)
heapq.heapify(scoville)
try:
while scoville[0] < K:
new_sc = heapq.heappop(scoville) + heapq.heappop(scoville)*2
heapq.heappush(scoville, new_sc)
except:
return -1
return before - len(scoville)
solution_good([1, 2, 3, 9, 10, 12], 7) # 2번
결국 heapq 패키지를 이용하여 풀어내었다.
이 경우 초기 정렬을 제외하면 시간복잡도 O(logN)의 heappop()과 heappush()를 사용하기에 높은 효율성이 가능한 것이다.
자료 구조를 짜는 것이 중요한 이유.....
2) 디스크 컨트롤러 Lv.3
작업의 소요시간이 짧은 것, 작업의 요청 시각이 빠른 것, 작업의 번호가 작은 것의 기준을 가진 다중우선순위 큐를 구현하는 문제이다.
# 디스크 컨트롤러
import heapq
def solution(jobs):
cur_time = 0; answer = 0; i = 0; n = len(jobs)
jobs = sorted(jobs, key=lambda x: x[0], reverse=True)
print("초기 jobs : ", jobs)
queue = []
while jobs or queue:
# 1) 현재 시간에 밀린 jobs들 불러오기
while jobs and jobs[-1][0] <= cur_time:
s, l = jobs.pop()
print("들어온 job의 l, s, i : ", l, s, i)
heapq.heappush(queue, [l, s, i])
i += 1
print("\n남은 jobs : ", jobs)
print("현재 queue : ", queue)
print("현재 cur_time : ", cur_time)
# 2) 우선순위 제일 높은 jobs 실행하기
if queue:
long, start, idx = heapq.heappop(queue)
print("실행된 jobs의 long, start, idx : ", long, start, idx)
cur_time += long
# 3) 반환시간 계산하기
answer += cur_time - start
else:
cur_time += 1
continue
return answer // n
solution([[0, 3], [1, 9], [3, 5]])
같은 부분의 CS의 SJT(Shortest Job First) 등을 공부하던 와중 만난 문제라 굉장히 반가웠던 문제.
알고리즘 자체가 한 번 시행된 job은 CPU를 뺏기지 않는 비선점형 알고리즘이기에 각 job의 시행이 끝나면 밀린 요청을 우선순위에 맞게 힙 구조로 전부 받고, 우선순위가 가장 높은 다음 요청을 시행하는 방식으로 풀어내었다.
파이썬의 heapq 패키지가 애초부터 리스트를 입력으로 받기에 Lv.3 인 것치고 생각보다 쉽게 풀린 문제.
그 실제 코드 작동을 살펴보면 아래와 같다.
# 1) jobs 목록
>>> 초기 jobs : [[3, 5], [1, 9], [0, 3]]
--------------------------------------------
# 2) 밀린 요청 받기, time = 0
>>> 들어온 job의 l, s, i : 3 0 0
>>> 남은 jobs : [[3, 5], [1, 9]]
# 3) 우선순위 큐 구현, time = 0
>>> 현재 queue : [[3, 0, 0]]
# 4) 최우선 job 실행, time = 0 -> time = 3
>>> 실행된 jobs의 long, start, idx : 3 0 0
--------------------------------------------
# 5) 밀린 요청 받기, time = 3
>>> 들어온 job의 l, s, i : 9 1 1
>>> 들어온 job의 l, s, i : 5 3 2
>>> 남은 jobs : []
# 6) 우선순위 큐 구현, time = 3
>>> 현재 queue : [[5, 3, 2], [9, 1, 1]]
# 7) 최우선 job 실행, time = 3 -> time = 8
>>> 실행된 jobs의 long, start, idx : 5 3 2
--------------------------------------------
# 8) 밀린 요청 받기, time = 8
>>> 남은 jobs : []
# 9) 우선순위 큐 구현, time = 8
>>> 현재 queue : [[9, 1, 1]]
# 10) 최우선 job 실행, time = 8 -> time = 17
>>> 실행된 jobs의 long, start, idx : 9 1 1
만일 중간에 CPU를 빼앗기는 선점형 SRTF(Shortest Remaining-Time First)의 경우 위 코드에서 밀린 요청을 받고 실행하는 과정을 매초 진행함으로써 구현할 수 있겠다.
3) 이중우선순위큐 Lv.3
최솟값과 최댓값을 골고루 빼내는 이중우선순위 큐를 구현하는 문제이다.
"I"는 숫자 삽입, "D 1"은 최댓값 추출, "D -1"은 최솟값 추출을 의미한다.
# 이중우선순위 큐
# 최솟값 기준 힙 구조 -> 최댓값은 remove 이용
import heapq
def solution(operations):
answer = []
queue = []
for oper in operations:
inst, value = oper.split(" ")
if inst == "I":
heapq.heappush(queue, int(value))
else:
if not queue: continue
if value == "-1":
heapq.heappop(queue)
else:
queue.remove(max(queue))
heapq.heapify(queue)
return [0, 0] if not queue else [max(queue), min(queue)]
solution(["I -45", "I 653", "D 1", "I -642", "I 45", "I 97", "D 1", "D -1", "I 333"])
>>> [333, -45]
힙 구조를 이용하여 최솟값을 추출하고 최댓값의 경우 max() + heapq.heapify() 를 이용하여 풀어내었다.
최댓값 추출 방식에 대해 많은 고민을 해보았는데 최소 힙 구조에서 최댓값은 가장 하위 레벨, 혹은 그 윗 레벨에 존재하게 되고 이 값의 추출 시 뒤 이은 자식 노드들이 전부 한 칸 씩 당겨진다는 문제가 발생하였다.
이러나저러나 구현이 복잡해지고 효율이 안나오기에 max() 를 이용하게 되었다.
그치만 매번 자료 구조를 재정렬하는 것은 너무나 마음에 들지 않는 것... 문제 요점에 맞게 이중우선순위 큐 구조를 직접 구현해서 풀어보자.
from heapq import heappop, heappush
from uuid import uuid4
def solution(operations):
popped = {} # 해당 값이 뽑혔는지 여부를 나타내는 해시 테이블 / uuid : bool 꼴
min_heap = [] # (value, uuid) 꼴로 보관
max_heap = [] # (-value, uuid) 꼴로 보관
def _pop_max_value():
while max_heap:
_, max_id = heappop(max_heap)
if max_id in popped: continue # 동기화 : popped 해시 테이블 확인
else:
popped[max_id] = True
return
def _pop_min_value():
while min_heap:
_, min_id = heappop(min_heap)
if min_id in popped: # 동기화 : popped 해시 테이블 확인
continue
else:
popped[min_id] = True
return
def _clean(heap): # 이미 추출된 요소 제거
heap = [v for v, id in heap if id not in popped]
return heap
for oper in operations:
_, value = oper.split(" ")
# "D 1"의 경우 최대 힙에서 제거
if oper == "D 1":
_pop_max_value()
# "D -1"의 경우 최소 힙에서 제거
elif oper == "D -1":
_pop_min_value()
# "I 숫자"의 경우 양쪽에 모두 push
else:
new_id = int(uuid4())
heappush(min_heap, (int(value), new_id))
heappush(max_heap, (-int(value), new_id))
min_heap_v_only = _clean(min_heap)
if not min_heap_v_only: return [0, 0]
return [max(min_heap_v_only), min(min_heap_v_only)]
solution(["I 5", "I 3", "D 1", "I 4", "D -1"])
이중우선순위 큐 구조의 경우 최소 힙 구조와 최대 힙 구조, 동기화 테이블을 구현함으로써 만들어 낼 수 있다.
동기화 테이블의 경우(uuid, bool)를 값으로 가지는 해시 테이블을 이용할 수 있다.
그렇게 각각의 힙 구조를 이용하며 요소 삽입 시 양쪽 모두에 삽입, 요소 추출 시 한 쪽에서 추출한 뒤 동기화 테이블에 기록해놓는 식이다.
위 풀이에서는 uuid4() 함수를 이용하여 각 값의 ID를 구현하였으며, 이미 해당 ID가 동기화 테이블에 기록되어 있는 경우 heappop() 을 통한 추출을 다시 진행하는 방식으로 구현해 내었다.
최종적으로 실제 값인 value가 양수로 들어 있는 min_heap 을 동기화 테이블을 고려하여 _clean() 한 뒤, 최댓값과 최솟값을 구해내었다.
고유 ID를 이용하여 여러 중복 값을 처리할 수 있다는 점에서 유익한 풀이였던 것 같다.
+) 앞선 비효율적인 방식보다 동기화 테이블을 이용한 풀이에서 절반의 시간 효율을 보였다 !!