멀티스레딩, 멀티프로세싱

Hyeseong·2022년 1월 27일
0

TIL

목록 보기
10/13

Multithreading - Difference between Process and Thread
Keyword - Process, Thread

Process

  • 운영체제 -> 할당 받는 자원 단위(실행 중인 프로그램)
  • CPU동작 시간, 주소공간(독립적)
  • Code, Data, Stack, Heap -> 독립적
  • 최소 1개의 메인 스레드 보유
  • 파이프, 파일, 소켓 등을 사용해서 프로세스간 통신(Cost 높음) -> Context Switching

Thread

  • 프로세스 내에 실행 흐름 단위
  • 프로세스 자원 사용
  • Stack만 별도 할당 나머지는 공유(Code, Data, Heap)
  • 한 스레드의 결과가 다른 스레드의 영향 끼침
  • 동기화 문제는 정말 주의(디버깅 어려움)

Multi Thread

  • 한 개의 단일 어플리케이션(응용프로그램) -> 여러 쓰레드로 구성 후 작업 처리
  • 시스템 자원 소모 감소(효율성), 처리량 증가(Cost 감소)
  • 통신 부담 감소, 디버깅 어려움, 단일 프로세스에느 효과 미약, 자원 공유 문제(교착 상태), 프로세스 영향 준다

Multil Process

  • 한 개의 단일 어플리케이션(응용프로그램) -> 여러 프로세스로 구성 후 작업 처리
  • 한 개의 프로세스 문제 발생은 확산 없음(프로세스 Kill)
  • 캐시 체인지, Cost 비용 매우 높음(오버헤드), 복잡한 통신 방법 사용

Multithreading - Python's GIL
Keyword - CPython, 메모리 관리, GIL, 사용 이유

GIL(Global Interpreter Lock)

  • CPython -> Python(bytecode) 실행 시 여러 쓰레드 사용할 경우

    • 단일 쓰레드만이 파이썬 오브젝트에 접근하게 제한하는 뮤텍스
  • CPython 메모리 관리가 취약(즉, Thread-safe)

  • Single Thread로 충분히 빠르다

  • 프로세스 사용 가능(Numpy/Scipy)등 GIL 외부 영역에서 효율적인 코딩

  • 병렬 처리는 Multiprocessing, asyncio 선택지 다양함.

  • thread 동시성 완벽 처리를 위해 -> Jython, IronPython, Stackless Python 등이 존재


    쓰레드 생성

    Multithreading - Thread(1) - Basic
    Keyword - Threading Basic

import logging
import threading
import time

# 메인 영역

# 쓰레드 실행 함수
def thread_func(name):
    logging.info("Sub-Thread %s: starting", name)
    time.sleep(3)
    logging.info("Sub-Thread %s: finished", name)


if __name__ == "__main__":
    # Logging
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    logging.info("Main-Thread: before creating thread")

    # 함수 인자 확인
    x = threading.Thread(target=thread_func, args=('First',))

    logging.info("Main-Thread: before running thread")

    # 서브 스레드 시작
    x.start()

    # 주석 전후 결과 확인
    x.join()

    logging.info("Main-Thread: wait for the thread to finish")

    logging.info("Main-Thread: All Clear")

데몬쓰레드, 조인

Multithreading - Thread(2) - Daemon, Join
Keyword - DaemonThread, Join

  • 백그라운드에 실행
  • 메인스레드 종료시 즉시 종료
  • 주로 백그라우드 무한 대기 이벤트 발생 실행하는 부분 담당 ->JVM(가비지 컬렉션), 자동 저장
  • 일반 스레드는 작업 종료시 까지 실행
import logging
import threading
import time

# 메인 영역

# 쓰레드 실행 함수
def thread_func(name, d):
    logging.info("Sub-Thread %s: starting", name)
    for i in d:
        print(i)
    logging.info("Sub-Thread %s: finished", name)


if __name__ == "__main__":
    # Logging
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
    logging.info("Main-Thread: before creating thread")

    # 함수 인자 확인
    x = threading.Thread(target=thread_func, args=('First', range(20000)), daemon=True)
    y = threading.Thread(target=thread_func, args=('Two', range(10000)), daemon=True)

    logging.info("Main-Thread: before running thread")

    # 서브 스레드 시작
    # x.start()
    # y.start()

    print(x.isDaemon())  
    print(y.isDaemon())
    # 주석 전후 결과 확인
    #x.join()

    logging.info("Main-Thread: wait for the thread to finish")

    logging.info("Main-Thread: All Clear") 

threading.Thread 클래스의 키워드 파라미터인 daemon을 True로 지정하게 될 경우 메인 쓰레드가 종료될 경우 서브 쓰레드 역시 작업을 종료하게 된다.


ThreadPoolExecutor

Multithreading - Thread(3) - ThreadPoolExecutor
Keyword - Many Threads, concurrent.futures, (xxx).PoolExecutor

그룹스레드

Python 3.2 이상 표준 라이브러리 사용

.concurrent.futures

.with 사용으로 생성, 소멸 라이프 사이클 관리 용이

디버깅하기가 난해함(단점)

대기중인 작업 -> Queue -> 완료 상태 조사 -> 결과 또는 예외 -> 단일화(캡슐화)


import logging
from concurrent.futures import ThreadPoolExecutor
import time

def task(name):
   logging.info(f"Sub-Thread {name} starting")
   result = 0
   for i in range(10001):
      result += i 
   logging.info(f"Sub-Thread {name}: finished result: {result}")

   return result    

def main():
   # logging format  
   format = "%(asctime)s: %(message)s"
   logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

   # 실행 방법1
   # max_workers : 작업의 개수가 넘어가면 직접 설정이 유리
   # executor = ThreadPoolExecutor(max_workers=3)
   # task1 = executor.submit(task, "First") # submit 함수 2번째 위치 파라미터에는 넣고 싶은 값을 넣음(튜플 노!)
   # task2 = executor.submit(task, "Two")
   # task3 = executor.submit(task, "3")
   # task4 = executor.submit(task, "4")
   # task5 = executor.submit(task, "5")
   # task6 = executor.submit(task, "6")

   # print()
   # print(task1.result())
   # print()

   # 실행 방법2
   with ThreadPoolExecutor(max_workers=3) as executor:
       tasks = executor.map(task, ("Firtst", "Second"))
   # 결과 확인
   print(tuple(tasks))

if __name__ == '__main__':
   main()

Lock, DeadLock

"""
Section 1
Multithreading - Thread(4) Lock, DeadLock
Keyword - Lock, DeadLock, Race Condition, Thread synchronization
"""

"""
1. 세마포어(Semaphore): 프로세스간 공유 된 자원에 접근 시 문제 발생 가능성
-> 한 개의 프로세스만 접근 처리 고안(경쟁 상태 예방)

  1. 뮤텍스(Mutex): 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는 것.
    -> 경쟁 상태 예방

  2. Lock: 상호 배제를 위한 잠금(Lock)처리
    -> 데이터 경쟁

  3. 데드락(DeadLock): 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상태(교착상태)

  4. Thread Synchronization(쓰레드 동기화)를 통해서 안정적으로 동작하게 처리한다.(동기화 메소드, 동기화 블럭)

  5. Semaphore와 Mutex 차이점은?
    -> 세마포어와 뮤텍스 개체는 모두 병렬 프로그래밍 환경에서 상호배제를 위해 사용
    -> 뮤텍스 개체는 단일 스레드가 리소스 또는 중요 섹션을 소비 허용
    -> 세마포어는 리소스에 대한 제한된 수의 동시 액세스를 허용
    """

예시1

from concurrent.futures import ThreadPoolExecutor
import logging
import threading
import time

class FakeDataStore:
    # 공유 변수(value)
    def __init__(self):
        self.value = 0

    # 변수 업데이트 함수
    def update(self, n):
        logging.info("Thread %s: starting update", n)

        # Mutex or Lock 등 동기화(Thread synchronization 필요)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy

        logging.info("Thread %s: finishing update", n)


if __name__ == "__main__":

    # logging format
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    # instance of class
    store = FakeDataStore()

    logging.info(f'Testing update. Starting value is {store.value}')

    # with context 
    with ThreadPoolExecutor(max_workers=2) as executor:
        for n in 'First', 'Second', 'Third':
            executor.sub(store.update, n)
    
    logging.info("Testing update. Ending value is %d", store.value)

예시2

"""
Section 1
Multithreading - Thread(4) Lock, DeadLock
Keyword - Lock, DeadLock, Race Condition, Thread synchronization
"""

"""
1. 세마포어(Semaphore): 프로세스간 공유 된 자원에 접근 시 문제 발생 가능성
    -> 한 개의 프로세스만 접근 처리 고안(경쟁 상태 예방)

2. 뮤텍스(Mutex): 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는 것. 
    -> 경쟁 상태 예방

3. Lock: 상호 배제를 위한 잠금(Lock)처리 
    -> 데이터 경쟁

4. 데드락(DeadLock): 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상태(교착상태)

5. Thread Synchronization(쓰레드 동기화)를 통해서 안정적으로 동작하게 처리한다.(동기화 메솓, 동기화 블럭)

6. Semaphore와 Mutex 차이점은?
    -> 세마포어와 뮤텍스 개체는 모두 병렬 프로그래밍 환경에서 상호배제를 위해 사용
    -> 뮤텍스 개체는 단일 스레드가 리소스 또는 중요 섹션을 소비 허용
    -> 세마포어는 리소스에 대한 제한된 수의 동시 액세스를 허용
"""

from concurrent.futures import ThreadPoolExecutor
import logging
import threading
import time


class FakeDataStore:
    # 공유 변수(value)
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    # 변수 업데이트 함수
    def update(self, n):
        logging.info("Thread %s: starting update", n)

        # Mutex or Lock 등 동기화(Thread synchronization 필요)

        # Lock 획득 - 방법1

        # self._lock.acquire()
        # logging.info("Thread %s has lock",n)

        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy

        # logging.info("Thread %s about to release lock", n)

        # # Lock 반환
        # self._lock.release()

        # Lock 획득 - 방법2
        with self._lock:
            logging.info("Thread %s has lock", n)
            copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy

            logging.info("Thread %s about to release lock", n)

        logging.info("Thread %s: finishing update", n)


if __name__ == "__main__":

    # logging format
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    # instance of class
    store = FakeDataStore()

    logging.info(f'Testing update. Starting value is {store.value}')

    # with context 
    with ThreadPoolExecutor(max_workers=2) as executor:
        for n in 'First', 'Second', 'Third':
            executor.submit(store.update, n)
    
    logging.info("Testing update. Ending value is %d", store.value)

Producer and Consumer Using Queue


"""
Section 1
Multithreading - Thread(5) Producer vs Consumer using Queue
Keyword - 생산자 소비자 패턴(Producer/Consumer Pattern)

"""

"""
Procuer - Consumer Pattern
1) 멀티 스레드 디자인 패턴의 정석
2) 서버측 프로그래밍의 핵심
3) 주로 허리 역할 중요

Python Event 객체
1) Flag 초기값(0)
2) 여러 메서드 
    - Set() -> 1
    - Clear() -> 0
    - Wait(1-> 리턴, 0-> 대기)
    - isSet() -> 현 플래그 상태

"""


import concurrent.futures
import logging
import queue
import random 
import threading
import time

# 생산자
def producer(queue, event):
    '''네트워크 대기 상태라 가정(서버)'''

    while not event.is_set():
        message = random.randint(1, 11)
        logging.info('Producer producing message: %s', message)
        queue.put(message)
    logging.info('Producer produced event then Exited')

# 소비자
def consumer(queue, event):
    '''응답 받고 소비하는 것으로 가정 or DB 저장'''
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info('Consumer storing message: %s (size=%d)', message, queue.qsize())
    logging.info('COnsumer received event Exiting')

if __name__ == '__main__':
    # logging format
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

    # 사이즈가 중요
    # A(생산자) -> Queue -> B(소비자) 
    # Queue의 사이즈가 만약 무한대라면 B가 제때에 소비하지 못한다면 
    pipeline = queue.Queue(maxsize=10)

    # 이벤트 플래그 초기 값 0
    event = threading.Event()

    # With Context 시작
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        # 실행 시간 조정
        time.sleep(1)

        logging.info('Main : about to set event')

        # 프로그램 종료
        event.set()
profile
어제보다 오늘 그리고 오늘 보다 내일...

0개의 댓글