Multithreading - Difference between Process and Thread
Keyword - Process, Thread
Multithreading - Python's GIL
Keyword - CPython, 메모리 관리, GIL, 사용 이유
CPython -> Python(bytecode) 실행 시 여러 쓰레드 사용할 경우
CPython 메모리 관리가 취약(즉, Thread-safe)
Single Thread로 충분히 빠르다
프로세스 사용 가능(Numpy/Scipy)등 GIL 외부 영역에서 효율적인 코딩
병렬 처리는 Multiprocessing, asyncio 선택지 다양함.
thread 동시성 완벽 처리를 위해 -> Jython, IronPython, Stackless Python 등이 존재
Multithreading - Thread(1) - Basic
Keyword - Threading Basic
import logging
import threading
import time
# 메인 영역
# 쓰레드 실행 함수
def thread_func(name):
logging.info("Sub-Thread %s: starting", name)
time.sleep(3)
logging.info("Sub-Thread %s: finished", name)
if __name__ == "__main__":
# Logging
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
logging.info("Main-Thread: before creating thread")
# 함수 인자 확인
x = threading.Thread(target=thread_func, args=('First',))
logging.info("Main-Thread: before running thread")
# 서브 스레드 시작
x.start()
# 주석 전후 결과 확인
x.join()
logging.info("Main-Thread: wait for the thread to finish")
logging.info("Main-Thread: All Clear")
Multithreading - Thread(2) - Daemon, Join
Keyword - DaemonThread, Join
import logging
import threading
import time
# 메인 영역
# 쓰레드 실행 함수
def thread_func(name, d):
logging.info("Sub-Thread %s: starting", name)
for i in d:
print(i)
logging.info("Sub-Thread %s: finished", name)
if __name__ == "__main__":
# Logging
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
logging.info("Main-Thread: before creating thread")
# 함수 인자 확인
x = threading.Thread(target=thread_func, args=('First', range(20000)), daemon=True)
y = threading.Thread(target=thread_func, args=('Two', range(10000)), daemon=True)
logging.info("Main-Thread: before running thread")
# 서브 스레드 시작
# x.start()
# y.start()
print(x.isDaemon())
print(y.isDaemon())
# 주석 전후 결과 확인
#x.join()
logging.info("Main-Thread: wait for the thread to finish")
logging.info("Main-Thread: All Clear")
threading.Thread 클래스의 키워드 파라미터인 daemon을 True로 지정하게 될 경우 메인 쓰레드가 종료될 경우 서브 쓰레드 역시 작업을 종료하게 된다.
Multithreading - Thread(3) - ThreadPoolExecutor
Keyword - Many Threads, concurrent.futures, (xxx).PoolExecutor
import logging
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
logging.info(f"Sub-Thread {name} starting")
result = 0
for i in range(10001):
result += i
logging.info(f"Sub-Thread {name}: finished result: {result}")
return result
def main():
# logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# 실행 방법1
# max_workers : 작업의 개수가 넘어가면 직접 설정이 유리
# executor = ThreadPoolExecutor(max_workers=3)
# task1 = executor.submit(task, "First") # submit 함수 2번째 위치 파라미터에는 넣고 싶은 값을 넣음(튜플 노!)
# task2 = executor.submit(task, "Two")
# task3 = executor.submit(task, "3")
# task4 = executor.submit(task, "4")
# task5 = executor.submit(task, "5")
# task6 = executor.submit(task, "6")
# print()
# print(task1.result())
# print()
# 실행 방법2
with ThreadPoolExecutor(max_workers=3) as executor:
tasks = executor.map(task, ("Firtst", "Second"))
# 결과 확인
print(tuple(tasks))
if __name__ == '__main__':
main()
"""
Section 1
Multithreading - Thread(4) Lock, DeadLock
Keyword - Lock, DeadLock, Race Condition, Thread synchronization
"""
"""
1. 세마포어(Semaphore): 프로세스간 공유 된 자원에 접근 시 문제 발생 가능성
-> 한 개의 프로세스만 접근 처리 고안(경쟁 상태 예방)
뮤텍스(Mutex): 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는 것.
-> 경쟁 상태 예방
Lock: 상호 배제를 위한 잠금(Lock)처리
-> 데이터 경쟁
데드락(DeadLock): 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상태(교착상태)
Thread Synchronization(쓰레드 동기화)를 통해서 안정적으로 동작하게 처리한다.(동기화 메소드, 동기화 블럭)
Semaphore와 Mutex 차이점은?
-> 세마포어와 뮤텍스 개체는 모두 병렬 프로그래밍 환경에서 상호배제를 위해 사용
-> 뮤텍스 개체는 단일 스레드가 리소스 또는 중요 섹션을 소비 허용
-> 세마포어는 리소스에 대한 제한된 수의 동시 액세스를 허용
"""
from concurrent.futures import ThreadPoolExecutor
import logging
import threading
import time
class FakeDataStore:
# 공유 변수(value)
def __init__(self):
self.value = 0
# 변수 업데이트 함수
def update(self, n):
logging.info("Thread %s: starting update", n)
# Mutex or Lock 등 동기화(Thread synchronization 필요)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info("Thread %s: finishing update", n)
if __name__ == "__main__":
# logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# instance of class
store = FakeDataStore()
logging.info(f'Testing update. Starting value is {store.value}')
# with context
with ThreadPoolExecutor(max_workers=2) as executor:
for n in 'First', 'Second', 'Third':
executor.sub(store.update, n)
logging.info("Testing update. Ending value is %d", store.value)
"""
Section 1
Multithreading - Thread(4) Lock, DeadLock
Keyword - Lock, DeadLock, Race Condition, Thread synchronization
"""
"""
1. 세마포어(Semaphore): 프로세스간 공유 된 자원에 접근 시 문제 발생 가능성
-> 한 개의 프로세스만 접근 처리 고안(경쟁 상태 예방)
2. 뮤텍스(Mutex): 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는 것.
-> 경쟁 상태 예방
3. Lock: 상호 배제를 위한 잠금(Lock)처리
-> 데이터 경쟁
4. 데드락(DeadLock): 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상태(교착상태)
5. Thread Synchronization(쓰레드 동기화)를 통해서 안정적으로 동작하게 처리한다.(동기화 메솓, 동기화 블럭)
6. Semaphore와 Mutex 차이점은?
-> 세마포어와 뮤텍스 개체는 모두 병렬 프로그래밍 환경에서 상호배제를 위해 사용
-> 뮤텍스 개체는 단일 스레드가 리소스 또는 중요 섹션을 소비 허용
-> 세마포어는 리소스에 대한 제한된 수의 동시 액세스를 허용
"""
from concurrent.futures import ThreadPoolExecutor
import logging
import threading
import time
class FakeDataStore:
# 공유 변수(value)
def __init__(self):
self.value = 0
self._lock = threading.Lock()
# 변수 업데이트 함수
def update(self, n):
logging.info("Thread %s: starting update", n)
# Mutex or Lock 등 동기화(Thread synchronization 필요)
# Lock 획득 - 방법1
# self._lock.acquire()
# logging.info("Thread %s has lock",n)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
# logging.info("Thread %s about to release lock", n)
# # Lock 반환
# self._lock.release()
# Lock 획득 - 방법2
with self._lock:
logging.info("Thread %s has lock", n)
copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info("Thread %s about to release lock", n)
logging.info("Thread %s: finishing update", n)
if __name__ == "__main__":
# logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# instance of class
store = FakeDataStore()
logging.info(f'Testing update. Starting value is {store.value}')
# with context
with ThreadPoolExecutor(max_workers=2) as executor:
for n in 'First', 'Second', 'Third':
executor.submit(store.update, n)
logging.info("Testing update. Ending value is %d", store.value)
"""
Section 1
Multithreading - Thread(5) Producer vs Consumer using Queue
Keyword - 생산자 소비자 패턴(Producer/Consumer Pattern)
"""
"""
Procuer - Consumer Pattern
1) 멀티 스레드 디자인 패턴의 정석
2) 서버측 프로그래밍의 핵심
3) 주로 허리 역할 중요
Python Event 객체
1) Flag 초기값(0)
2) 여러 메서드
- Set() -> 1
- Clear() -> 0
- Wait(1-> 리턴, 0-> 대기)
- isSet() -> 현 플래그 상태
"""
import concurrent.futures
import logging
import queue
import random
import threading
import time
# 생산자
def producer(queue, event):
'''네트워크 대기 상태라 가정(서버)'''
while not event.is_set():
message = random.randint(1, 11)
logging.info('Producer producing message: %s', message)
queue.put(message)
logging.info('Producer produced event then Exited')
# 소비자
def consumer(queue, event):
'''응답 받고 소비하는 것으로 가정 or DB 저장'''
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info('Consumer storing message: %s (size=%d)', message, queue.qsize())
logging.info('COnsumer received event Exiting')
if __name__ == '__main__':
# logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# 사이즈가 중요
# A(생산자) -> Queue -> B(소비자)
# Queue의 사이즈가 만약 무한대라면 B가 제때에 소비하지 못한다면
pipeline = queue.Queue(maxsize=10)
# 이벤트 플래그 초기 값 0
event = threading.Event()
# With Context 시작
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
# 실행 시간 조정
time.sleep(1)
logging.info('Main : about to set event')
# 프로그램 종료
event.set()