Process VS Thread
(1) Process
A unit of resource allocation in the operating system (a running program).
Has independent CPU execution time and address space.
Code, Data, Stack, and Heap are independent.
Must have at least one main thread.
Uses pipes, files, and sockets for inter-process communication (high cost) → Context Switching.
(2) Thread
A unit of execution flow within a process.
Uses process resources.
Only the stack is allocated separately, while Code, Data, and Heap are shared.
Shares memory (variables).
The result of one thread can affect others.
Synchronization issues require great caution (difficult to debug).
(3) Multi-Threading
A single application consists of multiple threads for task processing.
Reduces system resource consumption (efficiency) and increases throughput (cost reduction).
Reduces communication overhead but is difficult to debug.
Has limited effectiveness in single-process environments.
May cause resource-sharing issues (deadlocks) and affect the entire process.
(4) Multi-Processing
A single application consists of multiple processes for task processing.
If one process encounters an issue, it does not affect others (can be terminated individually).
Causes cache changes and has a very high cost (overhead).
Uses complex communication methods.
Global Interpreter Lock
GIL (Global Interpreter Lock)
In CPython, when executing Python (Bytecode) with multiple threads, a mutex restricts access to Python objects to a single thread at a time.
This exists because CPython's memory management is not robust, ensuring thread safety.
A single thread is sufficiently fast for most tasks.
Processes can be used instead, and libraries like NumPy/Scipy perform efficiently outside the GIL's scope.
For parallel processing, various options exist, such as multiprocessing and asyncio.
To achieve true thread concurrency, alternatives like Jython, IronPython, and Stackless Python are available.
Thread
Thread, Daemon Thread, Join
"""
Section 1
Multithreading - Thread(2) - Daemon, Join
Keyword - Daemon Thread, Join
DaemonThread(데몬스레드)
(1). 백그라운드에 실행
(2). 메인스레드 종료시 즉시 종료
(3). 주로 백그라운드 무한 대기 이벤트 발생 실행하는 부분 담당 -> JVM(가비지 컬렉션), 자동 저장
(4). 일반 스레드는 작업 종료시 까지 실행
"""
import logging
import threading
import time
# Thread function
def thread_func(name, d):
logging.info("Sub-Thread %s: starting", name)
for i in d:
print(i)
logging.info("Sub-Thread %s: finishing", name)
# Main
if __name__ == "__main__":
# Logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
logging.info("Main-Thread: before creating thread")
# Function
# Deamon : Default False
x = threading.Thread(target=thread_func, args=('First', range(20000)), daemon=True)
y = threading.Thread(target=thread_func, args=('Two', (range(10000))), daemon=True)
logging.info("Main-Thread: before running thread")
# Sub-thread
x.start()
y.start()
# DaemonThread
# print(x.isDaemon())
# Join
# x.join()
logging.info("Main-Thread: wait for the thread to finish")
logging.info("Main-Thread: all done")
ThreadPoolExecutor
"""
Section 1
Multithreading - Thread(3) - ThreadPoolExecutor
Keyword - Many Threads, concurrent.futures, (xxx)PoolExecutor
Group Thread
(1). Python 3.2 이상 표준 라이브러리 사용
(2). concurrent.futures
(3). with 사용으로 생성, 소멸 라이프사이클 관리 용이
(4). 디버깅하기가 난해함(단점)
(5). 대기중인 작업 -> Queue -> 완료 상태 조사 -> 결과 또는 예외 -> 단일화(캡슐화)
"""
import logging
from concurrent.futures import ThreadPoolExecutor
import time
def task(name):
logging.info("Sub-Thread: %s: starting", name)
result = 0
for i in range(10001):
result += i
logging.info("Sub-Thread: %s: finishing result: %d", name, result)
return result
def main():
# Logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
logging.info("Main-Thread: before creating and running thread")
# Execution 1
# max_workers: 작업의 개수가 넘어가면 직접 설정이 유리
#excutor = ThreadPoolExecutor(max_workers=3)
#task1 = excutor.submit(task, ('First',))
#task2 = excutor.submit(task, ('Two',))
#print(task1.result())
#print(task2.result())
# Execution 2
with ThreadPoolExecutor(max_workers=3) as excutor:
tasks = excutor.map(task, ['First', 'Second', 'Third'])
print(list(tasks))
# Main
if __name__ == "__main__":
main()
Lock, Deadlock
"""
Section 1
Multithreading - Thread(4) - Lock, DeadLock
Keyword - Lock, DeadLock, Race Conditoin, Thread synchronization
Terms
(1). Semaphore: 프로세스간 공유 된 자원에 접근 시 문제 발생 가능성
-> 한 개의 프로세스만 접근 처리 고안(경쟁 상태 예방)
(2). Mutex: 공유된 자원의 데이터를 여러 스레드가 접근하는 것을 막는것
-> 경쟁 상태 예방
(3). Lock: 상호 배제를 위한 잠금(Lock) 처리
-> 데이터 경쟁
(4). Deadlock: 프로세스가 자원을 획득하지 못해 다음 처리를 못하는 무한 대기 상황
-> 교착 상태
(5). Thread synchronization: 스레드 동기화를 통해서 안정적으로 동작하게 처리한다.
-> 동기화 메소드, 동기화 블럭
(6). Semaphore VS Mutex
-> 세모포어와 뮤텍스 개체는 모두 병렬 프로그래밍 환경에서 상호배제를 위해 사용
-> 뮤텍스 개체는 단일 스레드가 리소스 또는 중요 섹션을 소비 허용
-> 세마포어는 리소스에 대한 제한된 수의 동시 액세스를 허용
"""
import logging
from concurrent.futures import ThreadPoolExecutor
import time
import threading
class FakeDataStore:
# Share variable
def __init__(self):
self.value = 0
self._lock = threading.Lock()
# Update
def update(self, n):
logging.info('Thread %s: starting update', n)
# Mutex & Lock for synchonization
# Acquire Lock #1
# self._lock.acquire()
# logging.info('Thread %s: has lock', n)
# local_copy = self.value
# local_copy += 1
# time.sleep(0.1)
# self.value = local_copy
# logging.info('Thread %s about to release lock', n)
# Release Lock #1
# self._lock.release()
# logging.info('Thread %s: finishing update', n)
# Acquire Lock #2
with self._lock:
logging.info('Thread %s: has lock', n)
local_copy = self.value
local_copy += 1
time.sleep(0.1)
self.value = local_copy
logging.info('Thread %s about to release lock', n)
logging.info('Thread %s: finishing update', n)
# Main
if __name__ == "__main__":
# Logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# Class instance
store = FakeDataStore()
logging.info("Testing update. Starting value is %d", store.value)
# With Context
with ThreadPoolExecutor(max_workers=2) as executor:
for n in ['First', 'Second', 'Third']:
executor.submit(store.update, n)
logging.info("Testing update. Ending value is %d", store.value)
Producer / Consumer
"""
Section 1
Multithreading - Thread(5) - Prod vs Cons Using Queue
Keyword - Producer/Consumer Pattern
Producer-Consumer Pattern
(1). 멀티스레드 디자인 패턴의 정석
(2). 서버측 프로그래밍의 핵심
(3). 주로 허리역할 중요
Python Event 객체
(1). Flag 초기값 0
(2). Set() -> 1, Clear() -> 0, Wait(1 -> 리턴, - 0> 대기), is_set() -> 현 플래그 상태
"""
import concurrent.futures
import logging
import queue
import random
import threading
import time
def producer(queue, event):
""" 네트워크 대기 상태라 가정(서버) """
while not event.is_set():
message = random.randint(1, 11)
logging.info('Producer got message: %s', message)
queue.put(message)
logging.info('Producer received event Exiting')
def consumer(queue, event):
""" 응답 받고 소비하는 것으로 가정 or DB 저장 """
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info('Consumer storing message: %s (size=%d)', message, queue.qsize())
logging.info('Consumer received event Exiting')
# Main
if __name__ == "__main__":
# Logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# Size is important
pipeline = queue.Queue(maxsize=10)
# Event Flag
event = threading.Event()
# With Context
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
# Execution time
time.sleep(0.5)
logging.info("Main: about to set event")
# Terminate
event.set()
Parallelism
Parallelism
Executes tasks at the exact same time.
Divides tasks into multiple parts (e.g., summing partial results and aggregating).
Not achievable with multiprocessing when using a single-core CPU.
Commonly used in deep learning, Bitcoin mining, etc.
Process vs. Thread (Key Differences)
| Feature | Process | Thread |
|---|---|---|
| Memory | Independent memory | Shared memory |
| Memory Usage | Requires more memory | Requires less memory |
| Zombie State | Possible to create zombie processes | Hard to create zombie threads |
| Overhead | High overhead | Low overhead |
| Creation/Termination Speed | Slower | Faster |
| Code Complexity & Debugging | Easier to write but harder to debug | Harder to write and debug |
Multiprocessing
Join, is_alive
"""
Section 2
Parallelism with Multiprocessing - Multiprocessing(1) - Join, is_alive
Keyword - multiprocessing, processing state
"""
from multiprocessing import Process
import time
import logging
def proc_func(name):
print('Sub-Process {}: starting'.format(name))
time.sleep(3)
print('Sub-Process {}: finishing'.format(name))
def main():
# Logging format
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
# function variable
p = Process(target=proc_func, args=('First',))
logging.info('Main-Porcess: before creating Process')
# Process
p.start()
logging.info('Main-Process: During Process')
# logging.info('Main-Process: Terminated Process')
# p.terminate()
logging.info('Main-Process: Joined Process')
p.join()
# Process state
print(f'Process p is alive: {p.is_alive()}')
# Main
if __name__ == "__main__":
main()
Naming, Parallel
"""
Section 2
Parallelism with Multiprocessing - multiprocessing(2) - Naming
Keyword - Naming, parallel processing
"""
from multiprocessing import Process, current_process
import os
import random
import time
def square(n):
time.sleep(random.randint(1,3))
process_id = os.getpid()
process_name = current_process().name
result = n * n
print(f'Process ID: {process_id}, Process Name: {process_name}')
print(f'Result of {n} square: {result}')
def main():
# Parent
parent_process_id = os.getpid()
print(f'Parent prcess ID {parent_process_id}')
processes = list()
for i in range(1, 10):
t = Process(name=str(i), target=square, args=(i,))
processes.append(t)
t.start()
for process in processes:
process.join()
print('Main-Processing Done!')
# Main
if __name__ == "__main__":
main()
ProcessPoolExecutor
"""
Section 2
Parallelism with Multiprocessing - multiprocessing(3) - ProcessPoolExecutor
Keyword - ProcessPoolExecutor, as_completed, futures, timeout, dict
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
import urllib.request
# URLS
URLS = [
'http://www.daum.net',
'http://www.cnn.com',
'http://naver.com',
'http://ruliweb',
'http://some-made-up-domain.com'
]
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
def main():
# Process Context
with ProcessPoolExecutor(max_workers=5) as executor:
# Future load
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
# Execution
for future in as_completed(future_to_url):
# Key, Future
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
# Main
if __name__ == "__main__":
main()
Sharing Sate
"""
Section 2
Parallelism with Multiprocessing - multiprocessing(4) - Sharing State
Keyword - memory sharing, array, value
"""
from multiprocessing import Process, current_process, Value, Array
import os
def generate_update_number(v: int):
for _ in range(50):
v.value += 1
print(current_process().name, "data", v.value)
def main():
# Parent
parent_process_id = os.getpid()
print(f'Parent process ID {parent_process_id}')
processes = list()
# from multiprocess import shared_memory
# from multiprocess import Manager
# share_numbers = Array('i', range(50))
share_value = Value('i', 0)
for _ in range(1, 10):
p = Process(target=generate_update_number, args=(share_value,))
processes.append(p)
p.start()
for p in processes:
p.join()
print('Final Data in parent process', share_value.value)
# Main
if __name__ == "__main__":
main()
Queue, Pipe
"""
Section 2
Parallelism with Multiprocessing - multiprocessing(5) - Queue, Pipe
Keyword - Queue, Pipe, Communications between processes
"""
# Pipe
from multiprocessing import Process, current_process, Pipe
import time
import os
def worker(id, baseNum, conn):
process_id = os.getpid()
process_name = current_process().name
sub_total = 0
for i in range(baseNum):
sub_total += 1
conn.send(sub_total)
conn.close()
print(f'Process ID: {process_id}, Process Name: {process_name} ID: {id}')
print(f'Result: {sub_total}')
def main():
# Parent
parent_process_id = os.getpid()
print(f'Parent process ID {parent_process_id}')
start_time = time.time()
parent_conn, child_conn = Pipe()
t = Process(name=str(1), target=worker, args=(1, 100000000, child_conn))
t.start()
t.join()
print('--- %s seconds ---' % (time.time() - start_time))
print()
print('Main-processing total count={}'.format(parent_conn.recv()))
print('Main-processing done!')
# Main
if __name__ == "__main__":
main()
Concurrency
Concurrency
A software-level solution to mitigate the drawbacks and challenges of parallelism while maximizing CPU utilization.
Uses a multi-threaded pattern on a single-core processor to handle tasks efficiently.
Switches between tasks after processing a certain portion, rather than executing them simultaneously.
Works by passing control between tasks, making it appear parallel but not truly parallel.
Concurrency vs. Parallelism
| Feature | Concurrency | Parallelism |
|---|---|---|
| Nature | Logical simultaneous execution pattern | Physical simultaneous execution |
| Execution Environment | Works on both single-core and multi-core CPUs | Requires multi-core CPUs |
| Task Handling | Shares execution of a single task among multiple threads | Handles separate tasks independently |
| Debugging | Extremely difficult (requires handling Mutex, Deadlocks) | Difficult (uses OpenMP, MPI, CUDA) |
Blocking VS Non-Blocking IO
Blocking I/O vs. Non-blocking I/O
Blocking I/O
When a system call is made, the process waits until the kernel completes the I/O operation.
The kernel owns the execution control during the I/O operation.
The process blocks (waits) until a response is received, making it unable to perform other tasks.
Non-blocking I/O
When a system call is made, it immediately returns without waiting for the I/O operation to complete.
The user process retains execution control and can continue performing other tasks.
The process periodically checks the system call to determine if the I/O operation has completed.
Async vs. Sync
| Feature | Asynchronous (Async) | Synchronous (Sync) |
|---|---|---|
| Notification Flow | Kernel (callee) → User process (caller) | User process (caller) → Kernel (callee) |
| Execution | Non-blocking, event-driven | Blocking, sequential execution |
| Control Handling | Kernel notifies when I/O is complete | Caller waits for completion |
| Efficiency | More efficient for I/O-heavy tasks | Less efficient, but simpler |
Multiprocessing VS Threading VS Async IO
CPU Bound vs. I/O Bound
CPU Bound
The process speed is limited by CPU performance.
Used for heavy computations such as matrix multiplication, high-speed calculations, file compression, and set operations.
CPU-intensive tasks dominate execution time.
I/O Bound
Performance is limited by I/O operations, such as file writing, disk operations, network communication, and serial port transmission.
CPU performance does not significantly reduce execution time in I/O-bound tasks.
Memory Bound & Cache Bound
Performance bottlenecks can also occur due to memory access speeds (Memory Bound) or cache efficiency (Cache Bound).
Choosing the right concurrency model depends on the workload type.
Final Comparison: Choosing the Right Model
| Model | Description | Best for | Analogy |
|---|---|---|---|
| Multiprocessing | Uses multiple processes | CPU-bound tasks | 10 kitchens, 10 chefs, 10 meals |
| Threading | Multiple threads in a single/multi-process, OS handles task switching | Fast I/O-bound tasks | 1 kitchen, 10 chefs, 10 meals |
| Async I/O | Single process, single thread, tasks cooperatively decide switching | Slow I/O-bound tasks | 1 kitchen, 1 chef, 10 meals |
I/O Bound
Synchronous
"""
Section 3
Concurrency, CPU Bound vs I/O Bound - I/O Bound(1) - Synchronous
Keyword - I/O Bound, requests
"""
import requests
import time
# Download
def request_site(url, session):
# print(session)
# print(session.headers)
with session.get(url) as response:
print(f'[Read Contents : {len(response.content)}, Status Code: {response.status_code}] from {url}')
# Request
def request_all_sites(urls):
with requests.Session() as session:
for url in urls:
request_site(url, session)
def main():
urls = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
"https://realpython.com"
] * 3
start_time = time.time()
request_all_sites(urls)
duration = time.time() - start_time
print()
print(f'Downloaded {len(urls)} sites in {duration} seconds')
if __name__ == "__main__":
main()
Threading
"""
Section 3
Concurrency, CPU Bound vs I/O Bound - I/O Bound(2) - Threading VS Asyncio Vs Multiprocessing
Keyword - I/O Bound, requests
"""
import concurrent.futures
import threading
import requests
import time
# Object for each thread(independent namespace)
thread_local = threading.local()
def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session
# Download
def request_site(url):
session = get_session()
# print(session)
# print(session.headers)
with session.get(url) as response:
print(f'[Read Contents: {len(response.content)}, Status Code : {response.status_code} from {url}]')
# Request
def request_all_sites(urls):
# Multithread
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
executor.map(request_site, urls)
def main():
urls = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
"https://realpython.com"
] * 3
start_time = time.time()
request_all_sites(urls)
duration = time.time() - start_time
print()
print(f'Downloaded {len(urls)} sites in {duration} seconds')
if __name__ == "__main__":
main()
Multiprocessing
"""
Section 3
Concurrency, CPU Bound vs I/O Bound - I/O Bound(2) - Threading VS Asyncio Vs Multiprocessing
Keyword - I/O Bound, requests
"""
import multiprocessing
import requests
import time
# Object for each process memory
session = None
def set_global_session():
global session
if not session:
session = requests.Session()
# Download
def request_site(url):
# print(session)
# print(session.headers)
with session.get(url) as response:
print(f'[Read Contents: {len(response.content)}, Status Code : {response.status_code} from {url}]')
# Request
def request_all_sites(urls):
# Multiprocessing
with multiprocessing.Pool(initializer=set_global_session, processes=4) as pool:
pool.map(request_site, urls)
def main():
urls = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
"https://realpython.com"
] * 3
start_time = time.time()
request_all_sites(urls)
duration = time.time() - start_time
print()
print(f'Downloaded {len(urls)} sites in {duration} seconds')
if __name__ == "__main__":
main()
Async I/O
"""
Section 3
Concurrency, CPU Bound vs I/O Bound - I/O Bound(2) - Threading VS Asyncio Vs Multiprocessing
Keyword - Asyncio
동시 프로그래밍 패러다임 변화
싱글 코어 -> 처리향상 미미, 저하 -> 비동기 프로그래밍 -> CPU연산, DB연동, API호출 대기 시간 늘어남
파이썬 3.4 -> 비동기(asyncio) 표준라이브러리 등장
"""
import time
import asyncio
async def exe_calculate_async(name, n):
for i in range(1, n + 1):
print(f'{name} -> {i} of {n} is calculating...')
await asyncio.sleep(1)
print(f'{name} - {n} working done!')
async def process_async():
start = time.time()
await asyncio.wait(
[exe_calculate_async('One', 3),
exe_calculate_async('Two', 2),
exe_calculate_async('Three', 1)
])
end = time.time()
print(f'>>> total seconds : {end - start}')
if __name__ == "__main__":
# Sync
# process_sync()
# Async above 3.7
asyncio.run(process_async())
# below 3.7
# asyncio.get_event_loop().run_until_complete(process_async())
I/O Bound Asyncio
"""
Section 3
Concurrency, CPU Bound vs I/O Bound - I/O Bound(2) - Threading VS Asyncio Vs Multiprocessing
Keyword - I/O Bound, asyncio
"""
import multiprocessing
import asyncio
import aiohttp
import time
# I/O Bound Asyncio
# Download
async def request_site(session, url):
# print(session)
# print(session.headers)
async with session.get(url) as response:
print(f'[Read Contents {0}, from {1}]'.format(response.content_length, url))
# Request
async def request_all_sites(urls):
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.ensure_future(request_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
def main():
urls = [
"https://www.jython.org",
"http://olympus.realpython.org/dice",
"https://realpython.com"
] * 3
start_time = time.time()
# asyncio.run(request_all_sites(urls))
asyncio.get_event_loop().run_until_complete(request_all_sites(urls))
duration = time.time() - start_time
print()
print(f'Downloaded {len(urls)} sites in {duration} seconds')
if __name__ == "__main__":
main()
CPU Bound
Synchronous
"""
Section 3
Concurrency, CPU Bound vs I/O Bound - CPU Bound(1) - Synchronous
Keyword - CPU Bound
"""
import time
def cpu_bound(number):
return sum(i * i for i in range(number))
def find_sums(numbers):
result = []
for number in numbers:
result.append(cpu_bound(number))
return result
def main():
numbers = [3_000_000 + x for x in range(10)]
# Time
start_time = time.time()
total = find_sums(numbers)
print(f'Total list : {total}')
print(f'Sum : {sum(total)}')
duration = time.time() - start_time
print()
print(f'Duration : {duration} seconds')
if __name__ == "__main__":
main()
Multiprocessing
"""
Section 3
Concurrency, CPU Bound vs I/O Bound - CPU Bound(2) - Multiprocessing
Keyword - CPU Bound
"""
from multiprocessing import current_process, Array, Manager, Process, freeze_support
import time
import os
def cpu_bound(number, total_list):
process_id = os.getpid()
process_name = current_process().name
print(f'Process ID : {process_id}, Process Name : {process_name}')
total_list.append(sum(i * i for i in range(number)))
def main():
numbers = [3_000_000 + x for x in range(10)]
processes = list()
manager = Manager()
total_list = manager.list()
# Time
start_time = time.time()
for i in numbers:
t = Process(name=str(i), target=cpu_bound, args=(i, total_list))
processes.append(t)
t.start()
for process in processes:
process.join()
print(f'Total list : {total_list}')
print(f'Sum : {sum(total_list)}')
duration = time.time() - start_time
print()
print(f'Duration : {duration} seconds')
if __name__ == "__main__":
main()
A thread is the smallest unit of a process that can be scheduled and executed by the CPU. In video processing, threads enable concurrent tasks like decoding frames, applying filters, and rendering output, making the workflow more efficient and faster. Processes, on the other hand, are independent programs with their own memory space. When editing or converting large media files, combining multiple threads within a process can significantly reduce load times. For those looking to save videos from social platforms, the Threads video downloader offers a reliable solution for downloading and storing high-quality videos with ease. Just one click and it’s saved!