[JAVA] 자바의 스레드와 동시성 (feat. Executor Service)

FE.1·2024년 4월 11일
3
post-thumbnail

멀티 스레딩의 필요성


기본적인 프로그래밍의 방식

  • 출력 결과의 흐름을 보기 쉽게 구분하기 위해 반복문을 1xx, 2xx, 3xx로 구성하였다.
public static void main(String[] args) {
	// Task1
	for (int i = 101; i < 199; i++) {
		System.out.print(i + " ");
	}
	System.out.print("\nTask1 Done");

	// Task2
	for (int i = 201; i < 299; i++) {
		System.out.print(i + " ");
	}
	System.out.print("\nTask2 Done");

	// Task3
	for (int i = 301; i < 399; i++) {
		System.out.print(i + " ");
	}
	System.out.print("\nTask3 Done");
	
	System.out.print("\nMain Done");
}
  • CPU는 오로지 한 가지의 Task만 수행하기 때문에, 다른 Task2, Task3은 아무리 Task1과 유사한 실행과정을 거친다고 해도, CPU는 유사성을 인지하지 못하고, Task1의 작업을 마칠 때 까지 대기해야 한다.
  • 출력 결과는 순차적으로 진행될 것이다.

Threads


  • 스레드는 이렇게 유사성을 가진 모든 수행문을 동시에 실행하도록 해준다.
  • 외부 서비스나 데이터 저장소로부터 데이터 입력 등을 기다리며 동시에 지속해서 다른 Task에 대한 정보를 얻을 수 있기 때문에 CPU 효율성을 극대화할 수 있다.

Thread를 생성하는 두 가지 방법

  1. Thread 클래스를 확장하는 방법

    class Task1 extends Thread {
    	public void run() {
    		// 작업할 내용 작성
    	}
    }
    • 스레드를 실행할 때는 run()이 아닌, start()로 호출해야 한다.
  2. Runnable 인터페이스를 실행하는 방법

    class Task2 implements Runnable {
    	@Override
    	public void run() {
    		// 작업할 내용 작성
    	}
    }
    • 생성 후 Thread 클래스 생성을 위한 추가적인 과정을 거쳐야 한다.

Thread 클래스를 확장하는 방법 및 Runnable 인터페이스를 실행하는 방법

// Thread 클래스를 확장하는 방법 
class Task1 extends Thread {
	public void run() {
		System.out.print("Task1 Started");
		for (int i = 101; i < 199; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask1 Done");
	}
}

// Runnable 인터페이스를 실행하는 방법
class Task2 implements Runnable {

	@Override
	public void run() {
		System.out.print("\nTask2 Started");
		for (int i = 201; i < 299; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask2 Done");
	}
}

public class ThreadBasicsRunner {

	public static void main(String[] args) {
		// Task1
		System.out.print("\nTask1 Kicked Off");
		Task1 task1 = new Task1();
		task1.start(); // 여기서 task1.run()를 호출하면 Thread가 아닌 일반적인 메서드 호출이 되므로 병렬 처리가 되지 않는다. .start()로 호출해야 한다!

		// Task2
		System.out.print("\nTask2 Kicked Off");
		Task2 task2 = new Task2();
		Thread task2Thread = new Thread(task2);
		task2Thread.start();

		// Task3
		System.out.print("\nTask3 Kicked Off");
		for (int i = 301; i < 399; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask3 Done");

		System.out.print("\nMain Done");
	}
}

출력 결과

Task1 Kicked Off
Task2 Kicked OffTask1 Started
Task3 Kicked Off
Task2 Started301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 201 202 203 204 205 206 207 208 209 210 211 212 213 214 101 362 215 102 103 104 105 106 107 108 109 110 111 112 113 114 115 363 364 365 366 367 368 369 370 371 372 373 216 116 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 
Task3 Done
Main Done217 218 219 220 221 222 223 224 225 226 227 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 
Task1 Done228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 
Task2 Done
  • Task1의 출력이 시작되기 전에 이미 Task2의 실행이 시작되었고, 동시에 Task3의 작업은 끝 마쳤고 이후 차례로 Task1Task2의 작업이 완료된 것을 확인할 수 있다.(병렬 처리 성공)

Thread의 상태

  • NEW
    • Task를 생성하고, 아직 실행(start)은 하지 않은 상태
  • RUNNABLE
    • Thread 실행이 가능한 준비 상태(같은 시점 다른 Thread가 이미 작업 실행중)
  • RUNNING
    • Thread가 실행중인 상태
  • BLOCKED/WAITING
    • 외부 인터페이스나 데이터베이스 등으로부터 어떤 입력을 대기하고 있거나 실행이 완료되지 않은 다른 Thread로부터 데이터를 입력받아야 하는 상태
  • TERMINATED/DEAD
    • 상단의 출력 결과를 보면 Task1 Done이 출력되었다.
    • Task1이라는 Thread 실행이 완료된 것을 의미한다.

Thread를 실행(start)하면 RUNNABLE, RUNNING, BLOCKED/WAITING 중 하나의 상태를 띄게 된다.

... 214 101 362 ...
  • Task2의 수행인 214 출력에는 현재 Task2의 상태는 RUNNING

  • Task1의 수행인 101 출력 시점에는 현재 Task1의 상태는 RUNNING

    • Task1의 시점에서 Task2는 준비는 되어있지만 실행은 되고 있지 않은 RUNNABLE 상태
    • RUNNALBE은 실행을 하려고 하지만, 다른 Thread가 우선적으로 실행되고 있는 상태를 뜻한다.
  • Task3의 수행인 362는 메인 메서드에 의해 실행되어 정확히 말하면, Thread가 아닌 execution에 해당하는데 메인 메서드에 의해 실행된 것이다. 이때 다른 Thread인 Task1, Task2는 RUNNALBE 상태로 대기하고 있는다.

  • 예) 외부 서비스의 응답을 기다리거나 데이터베이스를 이용하고 있는데, 데이터베이스가 느리다면 작업을 실행할 상황이 되지않기 때문에 외부 서비스로부터 데이터가 들어오기를 기다리고 있어 사용자는 차단당하게 된다. 사용자가 필요한 데이터를 Thread가 제공하기를 기다리는 상태가 BLOCKED이다.

Thread에 우선순위 부여하기

  • 우선순위(1~10)는 추천에 불과하다. 항상 상위의 우선순위를 보장하는 것을 의미하는 것은 아니다. (힌트 느낌)

  • 반영이 될 수도 안 될 수도 있다.

  • .setPriority(1~10)

// Task1
System.out.print("\nTask1 Kicked Off");
Task1 task1 = new Task1();
**task1.setPriority(1);**
task1.start();

출력 결과

Task1 Kicked Off
Task2 Kicked OffTask1 Started
Task3 Kicked Off
Task2 Started
Task1 Done345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 234 393 394 395 396 397 398 
Task3 Done
Main Done235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 
Task2 Done

Task1 DoneTask2보다 먼저 출력이 되는 것처럼 우선순위를 부여할 수 있다.

그러나 예제를 여러 번 실행해보면 Task1이나 Task3가 먼저 종료 될 때도 있는 것을 확인 할 수 있다.

여기서 주의해야 할 점은 우선 순위 설정은 단지 추천 순위 정도이란 것이다. 우선 순위를 제일 높게 설정한다고 항상 먼저 실행됨을 보장 할 수는 없다.

Thread와 소통 - join 메서드

메인 메서드인 Task3가 실행되기 전에 Task1Task2가 실행이 완료되도록 하고 싶다.

  • 이때 join() 를 통해 이전 작업이 완료된 것을 보장하고 다음 작업을 수행하도록 할 수 있다.
  • 하단의 코드는 상단의 코드와 변경 사항이 존재한다. (Task1, Task2 둘 다한테 우선순위를 부여함)
class Task1 extends Thread {
	public void run() {
		System.out.print("Task1 Started");
		for (int i = 101; i < 199; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask1 Done");
	}
}

class Task2 implements Runnable {

	@Override
	public void run() {
		System.out.print("\nTask2 Started");
		for (int i = 201; i < 299; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask2 Done");
	}
}

public class ThreadBasicsRunner {

	public static void main(String[] args) throws InterruptedException {

		// Task1
		System.out.print("\nTask1 Kicked Off");
		Task1 task1 = new Task1();
		task1.setPriority(1);
		task1.start();

		// Task2
		System.out.print("\nTask2 Kicked Off");
		Task2 task2 = new Task2();
		Thread task2Thread = new Thread(task2);
		task2Thread.setPriority(10);
		task2Thread.start();

		// wait for task1, task2 to complete
		task1.join();
		task2Thread.join();
		
		// Task3
		System.out.print("\nTask3 Kicked Off");
		for (int i = 301; i < 399; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask3 Done");

		System.out.print("\nMain Done");
	}
}

출력 결과

Task1 Kicked Off
Task2 Kicked OffTask1 Started
Task2 Started
Task2 Done197 198 
Task1 Done
Task3 Kicked Off301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 
Task3 Done
Main Done
  • Task1 Done , Task2 Done 이후에 Task3가 실행되는 것을 확인할 수 있다.

Thread 상태 제어 메서드, 동시성 키워드 - sleep, yield, synchronized

  • sleep() : 밀리초 동안 대기 상태에 들어간다.

  • yield() : CPU 점유를 포기하고 대기 상태에 들어간다. (힌트 정도 → 반영이 안 될 수도 있음)

    • 현재 Thread가 현재의 이용 가능한 이 상태를 다른 Thread에게 양보하거나 양도하겠다
  • synchronized : 동기화 작업을 의미

    • 동기화는 오버헤드가 많이 생긴다.
    • 예를 들어 어떤 작업을 위한 1000줄의 코드가 단 한 개의 Thread에 의해서만 실행이 가능하므로, 다른 Thread는 무조건 대기를 해야 하기 때문이다.

Thread 실행 제어의 필요성

  • Thread를 실행했을 때 생기는 중요한 결핍 중 하나는 Thread의 실행에 대해서 그 어떤 통제도 불가능하다는 점이다.

    • 어느 특정 시점에서 얼마나 많은 Thread가 실행되고 있는지 알 수 없다.
    • 예를 들어 세 개의 Thread만 어떤 시점에서 실행하게 만들고 싶다고 했을 때, 이러한 조작은 start() 메서드를 이용하여 확립할 경우에는 매우 어렵다.
  • 또 다른 문제는 Task1, Task2 중 하나만 완료될 때까지 기다리고 싶다고 했을 때, Thread의 기본적인 기능으로는 분기 상황에 맞는 코드를 작성하는 것이 불가능하다.

  • Thread 생성과 실행의 중복 코드 발생

    • 만약 100개의 Task를 실행하고 싶다면? 🤯
    • 코드를 묶음의 형식으로 처리하는 방법이 없다.
  • Thread가 실행한 Task의 결과를 반환받고 싶은데 Thread의 기본적인 기능으로는 불가능하다.

Executor 서비스


Executor Service는 지금까지 말한 Thread의 불편함을 모두 해결해준다.

  • 다수의 Thread를 한 번에 실행 가능

  • Thread의 상태 확인 가능

  • Thread의 논리 실행 가능

    • Thread1 혹은 Thread2 혹은 Thread3가 완료되면 알려주도록 하거나, 이중 어떤 Thread가 먼저 완료되었는지도 알 수 있고 또한 모든 Thread가 완료될 때까지 기다리도록 단 한 문장으로 제어할 수 있다.

단일 스레드 제어하기

newSingleThreadExecutor() : 한 번에 하나의 Thread를 실행하도록 하는 기능 제공

public class ExecutorServiceRunner {

	public static void main(String[] args) {
		// SingleThreadExecutor : 한 번에 하나의 Thread를 실행하도록 하는 기능 제공
		ExecutorService executorService = Executors.newSingleThreadExecutor();
		executorService.execute(new Task1());
		executorService.execute(new Thread(new Task2()));

		// Task3
		System.out.print("\nTask3 Kicked Off");
		for (int i = 301; i < 399; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask3 Done");

		System.out.print("\nMain Done");
		
		executorService.shutdown();
	}
}

출력 결과

Task1 Started
Task3 Kicked Off
Task3 Done
Main Done187 188 189 190 191 192 193 194 195 196 197 198 
Task1 Done
Task2 Started201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 
Task2 Done
  • Task1, Task2는 executorService를 통해 실행하고, Task3는 메인 메서드에서 실행이 되고 있다.
  • SingleThreadExecutor로 지정했기 때문에 어느 시점에서든 하나의 작업만 수행 가능하다.

Thread 수의 사용자 정의를 통해 복수의 Thread를 이용하는 방법

class Task extends Thread {

	private int number;

	public Task(int number) {
		this.number = number;
	}

	public void run() {
		System.out.print("\nTask" + number + " Started");
		for (int i = number * 100; i <= number * 100 + 99; i++) {
			System.out.print(i + " ");
		}
		System.out.print("\nTask" + number + " Done");
	}
}

public class ExecutorServiceRunner {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(2);

		executorService.execute(new Task(1));
		executorService.execute(new Task(2));
		executorService.execute(new Task(3));
		executorService.execute(new Task(4));

		executorService.shutdown();
	}
}

출력 결과

Task1 Started
Task2 Started
Task2 Done
Task3 Started300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 
Task1 Done
Task4 Started400 401 402 403 359 360 361 362 363 364 404 405 365 406 407 408 409 410 411 412 413 414 415 416 417 366 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 367 368 369 370 371 372 373 374 375 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 
Task3 Done480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 
Task4 Done
  • 현재 스레드 풀이 2개이므로 Task1Task2 2가지 모두 실행되었고, 이후 Task2가 실행 완료됨 동시에 Task3 실행 시작, 이어서 Task1의 실행 완료됨 동시에 Task4의 실행이 시작된 것을 확인할 수 있다.

Callable로 후처리 스레드에서 돌아오기

Thread로부터 값을 반환받기

class CallableTask implements Callable<String> {

	private String name;

	public CallableTask(String name) {
		this.name = name;
	}

	@Override
	public String call() throws Exception {
		Thread.sleep(1000); // 일련의 과정을 sleep으로 대체
		return "Hello " + name;
	}
}

public class CallableRunner {

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		ExecutorService executorService = Executors.newFixedThreadPool(1);

		Future<String> welcomeFuture = executorService.submit(new CallableTask("in28Minutes"));
		System.out.print("\n new CallableTask(\"in28Minutes\") executed");

		String welcomeMessage = welcomeFuture.get();
		// 해당 줄 부터는 get() 실행이 완료될 때까지 대기 상태에 머물러 있다.
		System.out.print("\n" + welcomeMessage);

		System.out.print("\n Main completed");
		executorService.shutdown();
	}
}
  • 반환값을 받아올 때는 Future라는 객체를 통해서 반환되므로 꼭 .get()를 통해 값을 가져와야 한다.

출력 결과

new CallableTask("in28Minutes") executed
Hello in28Minutes
Main completed

invoke를 사용한 동시에 다중 처리하기

모든 Task의 실행을 완료할 때까지 대기하고 한 번에 출력하기

class CallableTask implements Callable<String> {

	private String name;

	public CallableTask(String name) {
		this.name = name;
	}

	@Override
	public String call() throws Exception {
		Thread.sleep(1000); // 일련의 과정을 sleep으로 대체
		return "Hello " + name;
	}
}

public class MultipleCallableRunner {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newFixedThreadPool(3);

		List<CallableTask> tasks = List.of(
			new CallableTask("in28Minutes"),
			new CallableTask("Ranga"),
			new CallableTask("Adam")
		);

		List<Future<String>> results = executorService.invokeAll(tasks);

		for (Future<String> result : results) {
			System.out.println(result.get());
		}

		executorService.shutdown();
	}
}

출력 결과

Hello in28Minutes
Hello Ranga
Hello Adam
  • 스레드 풀 수를 13으로 변경하면 조금 더 빠르게 동시에 출력이 되는 것을 확인할 수 있다.

invokeAny로 가장 빠른 Task만 처리 기다리기

Task중 하나가 끌날 때까지만 기다려서 실행이 첫 번째로 끝난 것의 값을 결과로 얻고 싶은 경우

...
public class MultipleAnyCallableRunner {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newFixedThreadPool(1);

		List<CallableTask> tasks = List.of(new CallableTask("in28Minutes"),
			new CallableTask("Ranga"),
			new CallableTask("Adam"));

		String result = executorService.invokeAny(tasks);
		System.out.println(result);

		executorService.shutdown();
	}
}

출력 결과

Hello Ranga
  • 결과가 계속 바뀔 것이다. 사용되는 Thread가 3개이기 때문에 실행할 때마다 다른 결과가 나오는 것이 가능하다
  • Thread 수를 1개로 지정하면 같은 결과만 나올 것이다. (Thread 수가 곧 사용자 수이다.)

마치며


멀티 스레딩의 흐름제어에 대해서 알 수 있었으며, Thread 제어에 있어서 ExecutorService의 용이성에 대해서 알 수 있었던 좋은 경험이었다. Task 작업 속도를 증가시키고, 병렬 구조로 가능한 많은 Task를 실행하고자 할 때 멀티 스레딩을 고려해보자

참고

profile
공부하자!

1개의 댓글

comment-user-thumbnail
2024년 4월 11일

와아! 이렇게 정리하신다니! 배우고 갑니다^^

답글 달기