[python] concurrent.futures로 동시성 프로그래밍을 구현하기

김동욱·2024년 5월 22일

python

목록 보기
6/6
post-thumbnail

파이썬에서의 동시성 구현

파이썬에서 동시성 프로그래밍을 구현하는 방법에는 몇 가지가 있다. 코루틴을 사용하여 단일 스레드에서 비동기 방식으로 구현하는 방법과 멀티 스레딩이나 멀티 프로세싱을 활용하는 방법이다. 멀티 스레딩이나 멀티 프로세싱은 threadingmultiprocessing 모듈을 import하여 구현할 수 있다. 이러한 모듈들은 저수준에서 동시성을 제어할 수 있지만 구현이 복잡할 수 있다. 좀 더 고수준의 인터페이스를 제공하며 간결하게 구현할 수 있는 concurrent.futures 모듈을 사용하여 동시성 프로그래밍을 하는 방법에 대해 알아보자.

파이썬에서 동시성을 구현하는 주요 방법들

멀티프로세싱: GIL을 우회하여 CPU 바운드 작업을 병렬로 처리할 수 있다.
멀티스레딩: GIL의 영향을 받지만 I/O 바운드 작업에 적합하다.
비동기 프로그래밍(코루틴): 단일 스레드에서 코루틴을 사용하여 효율적으로 I/O 바운드 작업을 처리할 수 있다.

※ GIL(Global Interpreter Lock)이란 두 개 이상의 스레드가 동시에 실행될 때 하나의 자원에 접근하는 경우 발생할 수 있는 문제를 방지하기 위해 리소스 전체에 걸리는 락이다.

concurrent.futures 사용하기

앞서 말했듯 concurrent.futures 모듈을 사용하면 멀티스레딩과 멀티프로세싱을 쉽게 구현할 수 있다. 멀티스레딩과 멀티프로세싱 각각 ThreadPoolExecutor, ProcessPoolExecutor 메서드가 사용된다. 구현 방식은 동일하나 구현한 코드가 수행하는 내용에 따라 수행 시간에서 차이가 발생한다.

concurrent.futures을 통해 사용할 수 있는 몇 가지 기능들을 살펴보자.

map

map 메서드는 주어진 함수와 반복 가능한 인자들을 받아서, 여러 작업을 병렬로 실행하고 결과를 순서대로 반환한다. 이는 호출 시점에서 동기적으로 동작하지만, 내부적으로는 병렬로 작업을 처리하는 메커니즘이다.

import os
import time
from concurrent import futures

# 작업 리스트: 각 작업은 n까지의 합을 계산함
WORK_LIST = [1000000, 10000000, 100000000, 100000000]


def sum_generator(n):
    # 1부터 n까지의 합을 계산하는 함수
    return sum(range(1, n + 1))


def main(executor_type):
	# 시스템의 CPU 코어 수를 기준으로 worker 수를 설정
    worker = min(len(WORK_LIST), os.cpu_count())

    # 동시성 작업을 실행하고 결과와 시간을 출력하는 함수
    start_time = time.time()

    with executor_type(max_workers=worker) as executor:
        # map 메서드 사용: 작업 순서를 유지하고, 즉시 실행
        result = executor.map(sum_generator, WORK_LIST)

    end_time = time.time() - start_time
    print(f'\nResult -> {list(result)} Time: {end_time:.2f}s')


if __name__ == '__main__':
    print("Using ThreadPoolExecutor:")
    main(futures.ThreadPoolExecutor)

    print("\nUsing ProcessPoolExecutor:")
    main(futures.ProcessPoolExecutor)

실행 결과는 다음과 같다. 단순히 숫자를 누적하여 더하는 CPU 바운드 작업이기 때문에 멀티프로세싱이 더 나은 성능을 보인다.

Using ThreadPoolExecutor:

Result -> [500000500000, 50000005000000, 5000000050000000, 5000000050000000] Time: 3.43s

Using ProcessPoolExecutor:

Result -> [500000500000, 50000005000000, 5000000050000000, 5000000050000000] Time: 1.69s

wait

wait 메서드는 주어진 Future 객체들이 완료될 때까지 기다린다. 또한 return_when 파라미터를 통해 모든 작업이 완료될 때까지 기다리거나, 시간을 지정하거나 첫 번째 작업이 완료될 때까지 기다리는 등의 동작을 설정할 수 있다. 이 메서드는 완료된 Future와 완료되지 않은 Future를 두 그룹으로 나누어 반환한다.

wait 메서드를 사용한 예시를 살펴보자.

import os
import time
from concurrent.futures import ProcessPoolExecutor, wait

WORK_LIST = [1000000, 10000000, 100000000, 1000000000]


# 누적 합계 함수(제너레이터)
def sum_generator(n):
    return sum(n for n in range(1, n + 1))


def main():
    # 시스템의 CPU 코어 수를 기준으로 worker 수를 설정
    worker = min(len(WORK_LIST), os.cpu_count())

    # 시작 시간
    start_tm = time.time()
    # Futures 객체를 담기 위한 리스트
    futures_list = []

    # ThreadPoolExecutor 사용
    with ProcessPoolExecutor(max_workers=worker) as executor:
        for work in WORK_LIST:
            # future 객체만 반환(실행x)
            future = executor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
            # 스케쥴링 확인
            print('Scheduled for {} : {}'.format(work, future))

        # wait 결과 출력
        done, not_done = wait(futures_list, timeout=7)

        # 완료된 작업
        print('\nCompleted Tasks:')
        for future in done:
            print(f'Future Result: {future.result()}')

        # 완료되지 않은 작업
        print('\nPending Tasks after waiting for 7 seconds:')
        for future in not_done:
            print(f'Future: {future}')

    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포맷
    msg = '\nTime: {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))


# 실행
if __name__ == '__main__':
    main()

실행 결과를 보면 지정한 시간 내에 끝낸 작업과 끝내지 못한 작업들을 확인할 수 있다.

Scheduled for 1000000 : <Future at 0x104b5a580 state=running>
Scheduled for 10000000 : <Future at 0x104b63490 state=running>
Scheduled for 100000000 : <Future at 0x104b63820 state=running>
Scheduled for 1000000000 : <Future at 0x104b63be0 state=running>

Completed Tasks:
Future Result: 500000500000
Future Result: 50000005000000

Pending Tasks after waiting for 7 seconds:
Future: <Future at 0x104b63820 state=running>
Future: <Future at 0x104b63be0 state=running>

Time: 42.76s

as_completed

as_completed 메서드는 주어진 Future 객체들이 완료되는 순서대로 iterator를 반환한다.

as_completed 메서드를 사용한 예시를 살펴보자.

import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

WORK_LIST = [100000, 1000000, 10000000, 100000000]


def sum_generator(n):
    return sum(n for n in range(1, n + 1))


def main():
    # 시스템의 CPU 코어 수를 기준으로 worker 수를 설정
    worker = min(len(WORK_LIST), os.cpu_count())

    # 시작 시간
    start_tm = time.time()
    # Futures
    futures_list = []

    # ThreadPoolExecutor 사용
    with ProcessPoolExecutor(max_workers=worker) as executor:
        for work in WORK_LIST:
            # future 객체만 반환(실행x)
            future = executor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
            # 스케쥴링 확인
            print('Scheduled for {} : {}'.format(work, future))

        # as_completed 결과 출력
        for future in as_completed(futures_list):
            result = future.result()  # 작업이 완료될 때까지 기다린 후 결과를 반환
            done = future.done()  # 작업이 완료되었는지 여부를 확인
            cancelled = future.cancelled()  # 작업이 취소되었는지 여부를 확인

            # future 결과 확인
            print()
            print('Future Result: {}, Done: {}'.format(result, done))
            print('Future Cancelled: {}'.format(cancelled))

    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포맷
    msg = '\nTime: {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))


# 실행
if __name__ == '__main__':
    main()

실행 결과는 다음과 같다.

Scheduled for 100000 : <Future at 0x104bf5880 state=running>
Scheduled for 1000000 : <Future at 0x104c129d0 state=running>
Scheduled for 10000000 : <Future at 0x104c12c40 state=running>
Scheduled for 100000000 : <Future at 0x104c12f40 state=pending>

Future Result: 5000050000, Done: True
Future Cancelled: False

Future Result: 500000500000, Done: True
Future Cancelled: False

Future Result: 50000005000000, Done: True
Future Cancelled: False

Future Result: 5000000050000000, Done: True
Future Cancelled: False

Time: 4.22s
profile
안녕하세요! 질문과 피드백은 언제든지 환영입니다:)

0개의 댓글