알고리즘 문제 중에서 힙(heap), 우선순위큐(prioriry queue)를 사용해야 할 때가 종종 있다. 그럴 때 굳이 힙 자료구조를 구현할 필요없이, python에 내장된 heapq 모듈을 사용하면 정말 편리하다. 변수 heap를 생성하고 heapq.heappush(heap, item)
또는 heapq.heappop(heap)
하면 그냥 힙이다. 속도도 빨라서 직접 구현한 힙 클래스로는 시간초과가 나던 문제가 heapq 모듈로 바꾸면 무난히 통과된다.
어째서 구현한 힙 클래스보다 heapq 모듈이 빠를까? heapq 모듈도 똑같이 cpython으로 동작한다. 내가 구현한 힙 클래스에 불필요한 연산이 많았기 때문일까? 아니면 heapq 모듈 내에 숨겨진 비밀이 있는걸까? 궁금즘을 해소하기 위해 cpython의 heapq.py를 직접 보며 이유를 찾아보았다.
class Heapq:
def __init__(self, n):
self.heapq = [None] + [0] * n
self.length = 0
def print(self):
print(self.heapq[1:self.length+1])
def shiftdown(self, left, right):
parent = left
temp = self.heapq[parent]
while parent < right//2+1:
cl = parent*2
cr = cl+1
child = cr if cr <= right and self.heapq[cr] > self.heapq[cl] else cl
if temp >= self.heapq[child]:
break
self.heapq[parent] = self.heapq[child]
parent = child
self.heapq[parent] = temp
def heappush(self, item):
self.length += 1
self.heapq[self.length] = item
child = self.length
temp = self.heapq[child]
while child//2 > 0:
parent = child//2
if temp <= self.heapq[parent]:
break
self.heapq[child] = self.heapq[parent]
child = parent
self.heapq[child] = temp
def heappop(self):
if self.length == 0:
return 0
root = self.heapq[1]
self.heapq[1] = self.heapq[self.length]
self.length -= 1
if self.length > 0:
self.shiftdown(1, self.length)
return root
heapq.py를 뜯어보고 직접 구현한 힙 클래스를 다시 보니, 정말 비효율적인 연산이 많다. length 변수는 왜 만들었지..
"""Heap queue algorithm (a.k.a. priority queue).
Heaps are arrays for which a[k] <= a[2*k+1] and a[k] <= a[2*k+2] for
all k, counting elements from 0. For the sake of comparison,
non-existing elements are considered to be infinite. The interesting
property of a heap is that a[0] is always its smallest element.
"""
(우선순위큐라고도 하는)힙큐 알고리즘. 힙은 인덱스가 0부터 시작하며, 모든 인덱스 k에 대해 부모 노드가 자식 노드보다 값이 작은 배열이다(a[k] <= a[2*k+1] and a[k] <= a[2*k+2]
). 비교 연산을 위해 존재하지 않는 노드의 값은 무한으로 둔다. 힙의 재미있는 특징은 루트인 a[0]는 항상 가장 작은 값이라는 것이다.
heappush
def heappush(heap, item):
"""Push item onto heap, maintaining the heap invariant."""
heap.append(item)
_siftdown(heap, 0, len(heap)-1)
함수 heappush(heap, item)
은 인자로 heap과 item을 받는다. heap은 list 타입이고, item은 리스트에 들어갈 수 있는 어떤 타입이던 상관없다. 따라서 heap.append(item)
은 단순히 list에 item을 append하는 것이다. 중요한 것은 _siftdown(heap, 0, len(heap)-1)
으로 보인다.
_siftdown
# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
def _siftdown(heap, startpos, pos):
newitem = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# newitem fits.
while pos > startpos:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if newitem < parent:
heap[pos] = parent
pos = parentpos
continue
break
heap[pos] = newitem
함수 _siftdown(heap, startpos, pos)
는 인자로 heap, startpos, pos를 받는다. 요약하자면 _shiftdown()
함수는 startpos를 루트로 가지는 서브트리를 힙으로 재구성하는 함수이다. heap[pos]에는 힙에 새로 들어온 item이 들어있다. 이 item을 적절한 자기 위치로 찾아가게끔 한다.
로직은 간단하다. 새로 들어온 newitem을 부모 노드와 비교해서 자신이 더 작으면 부모와 자리를 바꾸며 자리를 찾아 올라가는 로직이다. 특이한 점은 부모 노드 인덱스 계산 시, (pos-1) >> 1
로 시프트 연산자를 썼다. 일반적으로 나누기 연산(//
)보다 비트 시프트 연산(>>
)이 빠르다고 하니 그런 듯 하다.
heappop
def heappop(heap):
"""Pop the smallest item off the heap, maintaining the heap invariant."""
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
_siftup(heap, 0)
return returnitem
return lastelt
함수 heappop(heap)
은 heap에서 루트 노드의 값을 추출해서 리턴한다. 루트 노드를 추출한 후, 힙의 가장 마지막 노드의 값을 루트 노드에 저장하면 최소 힙의 조건이 깨진다. 이후, _shiftup(heap, 0)
를 통해 루트 노드의 값이 적절한 위치를 찾아가도록 하며 힙으로 재구성한다.
_siftup
def _siftup(heap, pos):
endpos = len(heap)
startpos = pos
newitem = heap[pos]
# Bubble up the smaller child until hitting a leaf.
childpos = 2*pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of smaller child.
rightpos = childpos + 1
if rightpos < endpos and not heap[childpos] < heap[rightpos]:
childpos = rightpos
# Move the smaller child up.
heap[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown(heap, startpos, pos)
_shiftdown()
과 로직은 똑같다. 다만 _shiftup()
함수는 루트 노드가 자식 노드와 비교하며 자리를 찾아 아래로 내려간다. 그런데 특이한 것은 맨 마지막에 _shiftdown(heap, startpos, pos)
를 해준다는 점이다. 주석을 보면 이유를 알 수 있는데, newitem이 제 위치를 찾아 내려가며 힙에 버블이 일어났다고 표현한다. newitem이 제 위치를 찾아오며 노드 간의 우선순위가 엉망이 되어있을 수 있으니, _shiftdown()
으로 재구성하기 위함이다.
heapify
def heapify(x):
"""Transform list into a heap, in-place, in O(len(x)) time."""
n = len(x)
# Transform bottom-up. The largest index there's any point to looking at
# is the largest with a child index in-range, so must have 2*i + 1 < n,
# or i < (n-1)/2. If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
# j-1 is the largest, which is n//2 - 1. If n is odd = 2*j+1, this is
# (2*j+1-1)/2 = j so j-1 is the largest, and that's again n//2-1.
for i in reversed(range(n//2)):
_siftup(x, i)
번외로, 함수 heapify(x)
는 리스트 x를 인자로 받아, 힙으로 재구성해주는 함수이다. 로직은 다음과 같다. 가장 아랫단보다 한 칸 위의 가장 오른쪽 노드를 루트로 하는 서브트리를 _siftup(x, i)
함수로 힙으로 재구성하며, 이것을 루트까지 반복한다. 마지막 서브트리부터 차근차근 올라가며 전체 트리를 힙으로 구성하는 분할정복의 기법이라 볼 수 있다.
def heappush(heap, item):
"""Push item onto heap, maintaining the heap invariant."""
heap.append(item)
_siftdown(heap, 0, len(heap)-1)
def heappop(heap):
"""Pop the smallest item off the heap, maintaining the heap invariant."""
lastelt = heap.pop() # raises appropriate IndexError if heap is empty
if heap:
returnitem = heap[0]
heap[0] = lastelt
_siftup(heap, 0)
return returnitem
return lastelt
def heapify(x):
"""Transform list into a heap, in-place, in O(len(x)) time."""
n = len(x)
# Transform bottom-up. The largest index there's any point to looking at
# is the largest with a child index in-range, so must have 2*i + 1 < n,
# or i < (n-1)/2. If n is even = 2*j, this is (2*j-1)/2 = j-1/2 so
# j-1 is the largest, which is n//2 - 1. If n is odd = 2*j+1, this is
# (2*j+1-1)/2 = j so j-1 is the largest, and that's again n//2-1.
for i in reversed(range(n//2)):
_siftup(x, i)
# 'heap' is a heap at all indices >= startpos, except possibly for pos. pos
# is the index of a leaf with a possibly out-of-order value. Restore the
# heap invariant.
def _siftdown(heap, startpos, pos):
newitem = heap[pos]
# Follow the path to the root, moving parents down until finding a place
# newitem fits.
while pos > startpos:
parentpos = (pos - 1) >> 1
parent = heap[parentpos]
if newitem < parent:
heap[pos] = parent
pos = parentpos
continue
break
heap[pos] = newitem
def _siftup(heap, pos):
endpos = len(heap)
startpos = pos
newitem = heap[pos]
# Bubble up the smaller child until hitting a leaf.
childpos = 2*pos + 1 # leftmost child position
while childpos < endpos:
# Set childpos to index of smaller child.
rightpos = childpos + 1
if rightpos < endpos and not heap[childpos] < heap[rightpos]:
childpos = rightpos
# Move the smaller child up.
heap[pos] = heap[childpos]
pos = childpos
childpos = 2*pos + 1
# The leaf at pos is empty now. Put newitem there, and bubble it up
# to its final resting place (by sifting its parents down).
heap[pos] = newitem
_siftdown(heap, startpos, pos)
이외에 heapq.py에는 추가적으로
heappushpop(heap, item)
heapreplace(heap, item)
_heappop_max(heap)
_heapify_max(heap, x)
와 같이 여러 함수가 더 존재하지만, 그 중 자주 사용하는 함수만 살펴보았다.
heapq.py를 보고나니 왜 heapq 모듈이 빠른지 조금 알 수 있었다. 직접 구현한 코드에서 불필요한 연산을 일체 제거하고 로직은 단순화시켰다. 특수한 알고리즘을 사용했나 싶었지만 딱히 그런 것은 아니었다. 이제 로직을 파악하며 왜 빠른지 알게 되었으니, 마음놓고 heapq 모듈을 사용할 수 있을 것 같다.