
특히, shutdown() 메소드를 호출할 때 현재 실행 중인 작업이 모두 완료된 후 종료되도록 하기 위해 awaitTermination()을 사용하는 것이 좋습니다.
이를 통해 실행 중인 작업이 완료되기 전에 쓰레드가 중단되는 상황을 방지할 수 있습니다.
다음은 쓰레드를 여러 개 실행하고, 동시에 처리할 작업에 맞춰 쓰레드를 생성하는 구조입니다.
이 구조는 한정된 쓰레드가 쌓여진 작업을 해소하는 방식입니다.
import java.util.concurrent.BlockingQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
class Tasks(private val workers: Int = Runtime.getRuntime().availableProcessors()) {
@Volatile
private var isRunning = true
private val taskQueue: BlockingQueue<(Tasks) -> Unit> = LinkedBlockingQueue()
private val executor: ExecutorService = Executors.newFixedThreadPool(workers).also { service ->
repeat(workers) {
service.submit {
while (isRunning) {
try {
val task = taskQueue.take()
task(this)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
break
}
}
}
}
}
fun addTask(task: (Tasks) -> Unit) {
if (isRunning) taskQueue.offer(task)
}
fun shutdown() {
isRunning = false
executor.shutdown()
// awaitTermination()을 사용하여 작업이 완료될 때까지 대기하도록 구현할 수 있음
}
}
이 구조에서는 작업을 생성하고, 동기화 큐에 쌓은 후, 무한 루프를 돌며 큐에서 꺼내 작업을 처리합니다.
동시성은 순서를 보장하지 않으므로, 앞에 작업이 끝날 때 큐에 새로운 작업을 추가하는 방식을 사용할 수 있습니다.
긴 작업을 얼마나 잘 쪼개는가가 전체 동시성의 핵심입니다.
fun main() {
val tasks = Tasks()
repeat(10) {
tasks.addTask(getTask(it, 0, Random.nextInt(3, 6)))
}
Thread.sleep(2000)
tasks.shutdown()
}
fun getTask(taskId: Int, serial: Int, limit: Int): (Tasks) -> Unit {
return { tasks ->
println("$taskId-$serial ${Thread.currentThread().name}")
Thread.sleep(Random.nextLong(100, 301))
if (serial < limit) tasks.addTask(getTask(taskId, serial + 1, limit))
}
}
위 예제는 각 작업이 순차적으로 실행되도록 하여, 각각의 작업이 끝날 때마다 새로운 작업이 큐에 추가됩니다.
리스트를 워커 쓰레드로 처리하는 방법도 있습니다.
다음과 같이 작성할 수 있습니다.
fun list(list: List<String>, tasks: Tasks, index: Int = 0, block: (String) -> Unit) {
tasks.addTask { tasks ->
block(list[index])
if (index < list.size - 1) list(list, tasks, index + 1, block)
}
}
list(listOf("Hello", "World"), tasks) { println("$it ${Thread.currentThread().name}") }
각 태스크는 리스너에게 줄 데이터의 생산자 역할을 하며, 데이터 생산이나 처리를 최대한 잘게 쪼갤수록 동시성이 좋아집니다.
하나의 연결된 작업은 순차적으로 실행되며, 멀티쓰레딩은 여러 개의 연결된 작업을 동시에 처리하는 방식입니다.
긴 트랜잭션 구간을 사용하지 않고, 결과적 일관성을 유지하기 위해 작은 단위의 작업으로 쪼개는 것이 중요합니다.
동기화 비용이 미미하므로, 작업 실행에 집중하여 최적화를 도모해야 합니다.
인덱스 오류를 방지하기 위해 list() 함수에서 인덱스를 체크하는 방법을 개선할 수 있습니다.
index < list.size 조건을 tasks.addTask 호출 이전에 확인하여 인덱스 범위를 벗어난 경우를 방지할 수 있습니다.
이를 통해 안전하게 태스크를 추가할 수 있습니다.
아래는 이를 반영한 코드입니다
fun list(list: List<String>, tasks: Tasks, index: Int = 0, block: (String) -> Unit) {
// 인덱스가 리스트 크기보다 작은 경우에만 태스크를 추가합니다.
if (index < list.size) {
tasks.addTask { tasks ->
block(list[index])
// 다음 인덱스를 처리하기 위해 재귀 호출
list(list, tasks, index + 1, block)
}
}
}
이렇게 수정하면, list() 함수가 index가 리스트의 크기를 초과하는 경우에는
재귀 호출을 하지 않게 되어 인덱스 오류를 방지할 수 있습니다.
list() 함수에서 인덱스 검사를 tasks.addTask 호출 이전에 수행하여, 리스트의 크기를 초과하는 인덱스에 대한 호출을 방지합니다.
각 태스크는 리스트의 각 요소에 대해 안전하게 실행될 수 있도록 합니다.
멀티스레딩 환경에서 각 태스크가 동시에 실행될 때, 서로의 작업에 영향을 주지 않도록 보장합니다.
클로저를 활용하여 각 태스크가 독립적으로 실행될 수 있도록 하며, 리스트의 모든 요소를 안전하게 처리합니다.
이런 방식으로 작업을 추가하고 실행하면, 동시성 문제 없이 안정적으로 태스크를 관리할 수 있습니다.
작업을 적절히 분할하는 것은 동시성의 효율성을 높이는 핵심입니다.
너무 큰 작업은 대기 시간을 증가시키고, 너무 작은 작업은 컨텍스트 스위칭 비용을 증가시켜 성능을 저하시킬 수 있습니다.
작업을 적절하게 분할하는 것이 중요합니다.
작업을 적절히 쪼갤 경우 각 작업이 병렬로 처리되며, 전체 작업의 처리 속도를 향상시킬 수 있습니다.
예를 들어, 대용량 파일 처리 시 파일을 여러 개의 작은 블록으로 나누어 각 블록을 병렬로 처리함으로써 시간을 단축할 수 있습니다.
작업을 너무 많이 쪼개면 오히려 컨텍스트 스위칭 비용이 커질 수 있습니다.
예를 들어, 100개의 작은 작업을 생성하면 각 작업 간의 전환에 드는 비용이 크기 때문에 오히려 전체 처리 시간이 증가할 수 있습니다.
따라서, 작업 크기를 결정하는 것이 중요합니다.
동시성을 처리하면서 작업의 순서를 보장해야 하는 경우, 작업을 큐에 넣고, 앞선 작업이 끝나면 다음 작업을 추가하는 방식으로 구현할 수 있습니다.
getTask 함수는 각 작업이 완료된 후 다음 작업을 추가하는 방식으로 구성되어 있습니다.
이 구조는 순차적으로 작업을 실행하면서도 각 작업의 동시성을 보장합니다.
작업을 쪼갤 때는 최대한 짧은 작업으로 나누는 것이 중요합니다.
예를 들어, 데이터베이스 쿼리나 파일 처리와 같은 작업을 가능한 짧게 분할하면 전체 시스템의 응답성과 효율성을 높일 수 있습니다.
하나의 연결된 작업은 단일 쓰레드에서 순차적으로 실행되며, 이 경우 동시성 관리가 필요하지 않습니다.
하지만 여러 개의 개별 작업이 동시에 실행될 경우, 각 작업의 블록을 최소화해야 합니다.
쪼개진 작업이 완료될 때마다 결과를 보고하는 구조는, 전체적인 처리 시간을 단축할 수 있는 장점이 있습니다.
즉, 작업의 진행 상황을 모니터링하고 필요 시 조정할 수 있습니다.
아래의 코드는 위의 설명을 반영하여 워커 쓰레드 패턴을 구현한 예제입니다
fun main() {
val tasks = Tasks()
repeat(10) {
tasks.addTask(getTask(it, 0, Random.nextInt(3, 6)))
}
Thread.sleep(2000)
tasks.shutdown()
}
fun getTask(taskId: Int, serial: Int, limit: Int): (Tasks) -> Unit {
return { tasks ->
println("$taskId-$serial ${Thread.currentThread().name}")
Thread.sleep(Random.nextLong(100, 301))
if (serial < limit) tasks.addTask(getTask(taskId, serial + 1, limit))
}
}
class Tasks(val workers: Int = Runtime.getRuntime().availableProcessors()) {
@Volatile private var isRunning = true
private val taskQueue: BlockingQueue<(Tasks) -> Unit> = LinkedBlockingQueue()
private var executor: ExecutorService = Executors.newFixedThreadPool(workers).also { service ->
repeat(workers) {
service.submit {
while (isRunning) {
try {
val task = taskQueue.take()
task(this)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
break
}
}
}
}
}
fun addTask(task: (Tasks) -> Unit) {
if (isRunning) taskQueue.offer(task)
}
fun shutdown() {
isRunning = false
executor.shutdown()
}
}
각 워커가 작업을 완료한 후 결과를 하나의 리스트에 저장하여, 모든 작업이 완료된 후 한 번에 처리하는 방법입니다.
이 방식은 결과를 최종적으로 모아서 처리할 수 있는 장점이 있습니다.
class Tasks(val workers: Int = Runtime.getRuntime().availableProcessors()) {
@Volatile private var isRunning = true
private val taskQueue: BlockingQueue<(Tasks) -> Unit> = LinkedBlockingQueue()
private val results: MutableList<String> = mutableListOf() // 결과 저장 리스트
private var executor: ExecutorService = Executors.newFixedThreadPool(workers).also { service ->
repeat(workers) {
service.submit {
while (isRunning) {
try {
val task = taskQueue.take()
task(this)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
break
}
}
}
}
}
fun addTask(task: (Tasks) -> Unit) {
if (isRunning) taskQueue.offer(task)
}
fun addResult(result: String) {
results.add(result) // 결과 추가
}
fun shutdown() {
isRunning = false
executor.shutdown()
println("Final results: $results") // 최종 결과 출력
}
}
각 태스크가 실행되면서 중간 결과를 리스너에 전달하여 실시간으로 결과를 처리하는 방법입니다. 이를 통해 작업이 완료될 때마다 즉시 결과를 사용할 수 있습니다.
fun getTask(taskId: Int, serial: Int, limit: Int, listener: (String) -> Unit): (Tasks) -> Unit {
return { tasks ->
val result = "$taskId-$serial ${Thread.currentThread().name}" // 작업 결과 생성
listener(result) // 리스너에 결과 전달
println(result)
Thread.sleep(Random.nextLong(100, 301))
if (serial < limit) tasks.addTask(getTask(taskId, serial + 1, limit, listener))
}
}
결과를 별도의 BlockingQueue에 저장하여 각 작업이 완료될 때마다 그 결과를 큐에 추가하는 방법입니다.
이 방법은 결과 처리를 비동기적으로 수행할 수 있게 해줍니다.
private val resultQueue: BlockingQueue<String> = LinkedBlockingQueue()
fun processResults() {
Thread {
while (isRunning) {
try {
val result = resultQueue.take()
// 결과 처리 로직
println("Processed result: $result")
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
break
}
}
}.start()
}
fun addResult(result: String) {
resultQueue.offer(result) // 결과 큐에 추가
}
아래는 작업이 완료된 후 결과를 리스너에 전달하는 최종 예제입니다.
fun main() {
val tasks = Tasks()
val listener: (String) -> Unit = { result ->
// 결과 처리 로직
println("Result received: $result")
}
repeat(10) {
tasks.addTask(getTask(it, 0, Random.nextInt(3, 6), listener))
}
Thread.sleep(2000)
tasks.shutdown()
}
Kotlin을 활용한 워커 쓰레드 패턴은 효율적인 멀티쓰레드 작업 처리를 가능하게 합니다.
자원 정리를 철저히 하고, 각 작업을 최적화하여 최대한 동시성을 높이는 것이 중요한 요소입니다.
이러한 패턴을 잘 활용한다면 성능 향상과 자원 관리를 효과적으로 수행할 수 있을 것입니다.
작업의 분할과 최적화는 워커 쓰레드 패턴의 핵심입니다.
적절한 크기로 작업을 나누어 동시성을 극대화하면서도, 과도한 컨텍스트 스위칭을 피하는 것이 중요합니다.
위의 내용을 바탕으로 추가적인 설명을 통해 워커 쓰레드 패턴에 대해 조금은 이해가 되었습니다.