동시성이란 흔히 말하는 멀티태스킹입니다. 우리가 어떠한 일을 할 때 '나는 멀티가 안돼'라는 말을 종종 하는 사람을 볼 수 있을 것이다. 맞는 얘기다 인간은 멀티, 즉 멀티태스킹이 될 수가 없다. 그러나 컴퓨터는 멀티태스킹을 지원한다. 예를 들어 노래를 들으면서 인터넷 쇼핑을 할 수 있는 것처럼 말이다. 동시성이 한순간에 하나의 일만을 처리하지만 별령성은 진짜로 동시에 여러 개의 일을 처리하는 것을 의미한다. 마치 '일꾼'이 여럿이어서 여러 개의 일을 처리하는 것을 별령성이라고 한다. 지금까지의 설명을 들어보면 별령성의 단점을 찾아볼 수 없어 보이지만 별령성은 물리적인 코어의 개수에 제한이 있다. 보통의 사용자들은 쿼드코어를 사용하고 있으면 cpu는 많아야 8개 정도이다. 즉 한순간에 처리할 수 있는 양이 제한적이게 된다.
futures를 써서 동시성 프로그래밍을 하는 방법은 2가지가 있는데 첫 번째는 concurrent.futures의 map 함수를 쓰는 것이고, 두 번째는 wait과 as_completed를 쓰는 것이다.
import os
import time
from concurrent import futures
WORK_LIST = [100000, 1000000, 10000000, 10000000]
# 동시성 합계 계산 메인함수
# 누적 합계 함수(제네레이터)
def sum_generator(n):
return sum(n for n in range(1, n+1))
def main():
# Worker Count
worker = min(10, len(WORK_LIST))
# 시작 시간
start_tm = time.time()
# 결과 건수
# ProcessPoolExecutor
with futures.ThreadPoolExecutor() as excutor:
# map -> 작업 순서 유지, 즉시 실행
result = excutor.map(sum_generator, WORK_LIST)
# 종료 시간
end_tm = time.time() - start_tm
# 출력 포멧
msg = '\n Result -> {} Time : {:.2f}s'
# 최종 결과 출력
print(msg.format(list(result), end_tm))
# 실행 (시작점이 있어야 futures 정상 사용이 가능하기 때문에 따로 함수를 만듬)
if __name__ == '__main__':
main()
Result -> [5000050000, 500000500000, 50000005000000, 50000005000000] Time : 1.33s
상위 코드에서 ThreadPoolExecutor라고 된 부분을 ProcessPoolExecutor로 바꿔주면 프로세스를 이용해서 작업할 수도 있다. 쉽게 생각해서, 스레드를 이용해서 동시성 프로그래밍을 하려면 ThreadPoolExecutor를, 프로세스를 이용하려면 ProcessPoolExecutor를 사용하면 된다.
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
WORK_LIST = [10000, 100000, 1000000, 10000000]
# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
return sum(n for n in range(1, n+1))
# wait
# as_completed
def main():
# Worker Count
worker = min(10, len(WORK_LIST)) #10보다 것으로 스레드 만들기
# 시작 시간
start_tm = time.time()
# Futures
futures_list = []
# 결과 건수
# ProcessPoolExecutor
with ThreadPoolExecutor() as excutor:
for work in WORK_LIST:
# future 반환 (실제로 일을 하지는 않음)
future = excutor.submit(sum_generator, work)
# 스케쥴링
futures_list.append(future)
# wait 결과 출력 7초까지만 기다리고 그 때까지 못 하는 일은 실패로 간주하고 중단
result = wait(futures_list, timeout=7)
# wait 결과 출력
result = wait(futures_list, timeout=7)
# 성공
print('Completed Tasks : ' + str(result.done))
# 실패
print('Pending ones after waiting for 7seconds : ' + str(result.not_done))
# 결과 값 출력
print([future.result() for future in result.done])
# 종료 시간
end_tm = time.time() - start_tm
# 출력 포멧
msg = '\n Time : {:.2f}s'
# 최종 결과 출력
print(msg.format(end_tm))
# 실행
if __name__ == '__main__':
main()
Pending ones after waiting for 7seconds : set()
[500000500000, 50005000, 50000005000000, 5000050000]
Time : 0.53s
첫 번째 방법을 사용했던 것보다 코드가 길어졌다. futures_list라는 빈 리스트를 만들어서 미래에 할 일들을 담아 놓는다. with 문을 지나 for 문으로 executor.submit(작업 함수, 각개 일) 메서드를 실행해 반환된 작업을 future에 할랑한 후 다시 futures_list에 넣는다. result에 timeout 7초를 정해서 7초가 넘으면 실패로 간주하고 wait의 결과를 출력한다.
import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
WORK_LIST = [10000, 100000, 1000000, 10000000]
# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
return sum(n for n in range(1, n+1))
# wait
# as_completed
def main():
# Worker Count
worker = min(10, len(WORK_LIST)) #10보다 것으로 스레드 만들기
# 시작 시간
start_tm = time.time()
# Futures
futures_list = []
# 결과 건수
# ProcessPoolExecutor
with ThreadPoolExecutor() as excutor:
for work in WORK_LIST:
# future 반환 (실제로 일을 하지는 않음)
future = excutor.submit(sum_generator, work)
# 스케쥴링
futures_list.append(future)
# as_completed 결과 출력
for future in as_completed(futures_list):
result = future.result()
done = future.done()
cancelled = future.cancelled
# future 결과 확인
print('Future Result : {}, Done : {}'.format(result, done))
print('Future Cancelled : {}'.format(cancelled))
# 종료 시간
end_tm = time.time() - start_tm
# 출력 포멧
msg = '\n Time : {:.2f}s'
# 최종 결과 출력
print(msg.format(end_tm))
# 실행
if __name__ == '__main__':
main()
Future Result : 5000050000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaed4cfe20 state=finished returned int>>
Future Result : 50005000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaed4cfca0 state=finished returned int>>
Future Result : 500000500000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaef058670 state=finished returned int>>
Future Result : 50000005000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaef058970 state=finished returned int>>
Time : 0.56s
as_completed 결과도 출력도 가능하다. as_completed 호출 결과로 for 문으로 돌려 추출된 결괏값을 result에 할당하고 future.done()으로 작업이 완료됐는지(True/False), future.cancelled()는 작업이 취소됐는지(True/False)를 확인할 수 있다.
결과적으로 한 번에 한 개의 작업이 가능한 프로그램을 사용하는 것보다 더 효율적으로 사용이 가능하다.