PHP는 기본적으로 단일 스레드 언어이지만, 병렬 처리가 필요한 상황에서 다양한 해결책을 활용할 수 있습니다. 멀티스레딩과 유사한 효과를 얻는 방법들을 알아보겠습니다.
PHP의 pthreads 확장을 사용한 실제 멀티스레딩입니다.
class WorkerThread extends Thread {
private $data;
public function __construct($data) {
$this->data = $data;
}
public function run() {
// 무거운 작업 처리
$result = $this->processData($this->data);
echo "Thread completed: " . $result . "\n";
}
private function processData($data) {
// 시간이 오래 걸리는 작업
sleep(2);
return "Processed: " . $data;
}
}
// 사용 예시
$threads = [];
for ($i = 0; $i < 5; $i++) {
$threads[$i] = new WorkerThread("Data " . $i);
$threads[$i]->start();
}
foreach ($threads as $thread) {
$thread->join();
}
Swoole을 사용한 비동기 병렬 처리입니다.
use Swoole\Coroutine;
use Swoole\Runtime;
Runtime::enableCoroutine();
function processTask($id) {
echo "Task $id started\n";
Coroutine::sleep(2); // 비동기 대기
echo "Task $id completed\n";
return "Result $id";
}
// 병렬 실행
Coroutine\run(function () {
$results = [];
for ($i = 1; $i <= 5; $i++) {
go(function () use ($i, &$results) {
$results[$i] = processTask($i);
});
}
// 모든 코루틴 완료 대기
Coroutine::sleep(3);
print_r($results);
});
이벤트 루프 기반의 비동기 처리입니다.
use React\EventLoop\Factory;
use React\Promise\Promise;
$loop = Factory::create();
function asyncTask($id, $loop) {
return new Promise(function ($resolve) use ($id, $loop) {
$loop->addTimer(2, function () use ($resolve, $id) {
$resolve("Task $id completed");
});
});
}
// 병렬 실행
$promises = [];
for ($i = 1; $i <= 5; $i++) {
$promises[] = asyncTask($i, $loop);
}
React\Promise\all($promises)->then(function ($results) {
foreach ($results as $result) {
echo $result . "\n";
}
});
$loop->run();
프로세스를 포크하여 병렬 처리를 구현합니다.
function forkProcess($taskId) {
$pid = pcntl_fork();
if ($pid == -1) {
die('Could not fork');
} elseif ($pid) {
// 부모 프로세스
return $pid;
} else {
// 자식 프로세스
echo "Child process $taskId started\n";
sleep(2); // 작업 시뮬레이션
echo "Child process $taskId completed\n";
exit(0);
}
}
// 여러 프로세스 생성
$pids = [];
for ($i = 1; $i <= 5; $i++) {
$pids[] = forkProcess($i);
}
// 모든 자식 프로세스 완료 대기
foreach ($pids as $pid) {
pcntl_waitpid($pid, $status);
}
echo "All processes completed\n";
HTTP 요청을 병렬로 처리합니다.
use GuzzleHttp\Client;
use GuzzleHttp\Promise;
$client = new Client();
function makeParallelRequests($urls) {
$promises = [];
foreach ($urls as $index => $url) {
$promises[$index] = $client->getAsync($url);
}
// 모든 요청을 병렬로 실행
$responses = Promise\settle($promises)->wait();
foreach ($responses as $index => $response) {
if ($response['state'] === 'fulfilled') {
echo "Request $index completed successfully\n";
} else {
echo "Request $index failed\n";
}
}
}
$urls = [
'https://api.example1.com/data',
'https://api.example2.com/data',
'https://api.example3.com/data'
];
makeParallelRequests($urls);
Redis나 RabbitMQ를 사용한 작업 큐입니다.
class RedisQueue {
private $redis;
public function __construct() {
$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
}
public function addJob($queueName, $jobData) {
$this->redis->lpush($queueName, json_encode($jobData));
}
public function processJobs($queueName, $workerCount = 3) {
for ($i = 0; $i < $workerCount; $i++) {
$pid = pcntl_fork();
if ($pid == 0) {
// 워커 프로세스
$this->worker($queueName);
exit(0);
}
}
// 워커들 완료 대기
for ($i = 0; $i < $workerCount; $i++) {
pcntl_wait($status);
}
}
private function worker($queueName) {
while (true) {
$job = $this->redis->brpop($queueName, 5);
if ($job) {
$jobData = json_decode($job[1], true);
$this->processJob($jobData);
}
}
}
private function processJob($jobData) {
echo "Processing job: " . $jobData['id'] . "\n";
sleep(2); // 작업 시뮬레이션
echo "Job completed: " . $jobData['id'] . "\n";
}
}
PHP 7.2+에서 사용 가능한 parallel 확장입니다.
use parallel\Runtime;
use parallel\Future;
function createParallelTasks($tasks) {
$runtimes = [];
$futures = [];
foreach ($tasks as $index => $task) {
$runtimes[$index] = new Runtime();
$futures[$index] = $runtimes[$index]->run(function($data) {
// 무거운 작업 처리
sleep(2);
return "Processed: " . $data;
}, [$task]);
}
// 결과 수집
$results = [];
foreach ($futures as $index => $future) {
$results[$index] = $future->value();
$runtimes[$index]->close();
}
return $results;
}
$tasks = ['Task1', 'Task2', 'Task3', 'Task4'];
$results = createParallelTasks($tasks);
print_r($results);
작업자 스레드 풀을 관리하는 클래스입니다.
class ThreadPool {
private $workers = [];
private $tasks = [];
private $maxWorkers;
public function __construct($maxWorkers = 4) {
$this->maxWorkers = $maxWorkers;
}
public function addTask(callable $task, $data) {
$this->tasks[] = ['task' => $task, 'data' => $data];
}
public function execute() {
$chunks = array_chunk($this->tasks, ceil(count($this->tasks) / $this->maxWorkers));
foreach ($chunks as $chunk) {
$pid = pcntl_fork();
if ($pid == 0) {
// 워커 프로세스
foreach ($chunk as $taskInfo) {
call_user_func($taskInfo['task'], $taskInfo['data']);
}
exit(0);
} else {
$this->workers[] = $pid;
}
}
// 모든 워커 완료 대기
foreach ($this->workers as $pid) {
pcntl_waitpid($pid, $status);
}
}
}
// 사용 예시
$pool = new ThreadPool(3);
for ($i = 1; $i <= 10; $i++) {
$pool->addTask(function($data) {
echo "Processing: $data\n";
sleep(1);
echo "Completed: $data\n";
}, "Task $i");
}
$pool->execute();