java concurrent 패키지 내용 정리

ksngh·2024년 11월 27일

자바

목록 보기
3/3
post-thumbnail

참조 :https://docs.oracle.com/javase/8/docs/api/index.html?java/util/concurrent/package-summary.html

동시성에 관련되어 문제를 해결하다가 java.util.concurrent 패키지에서 자주 쓰이는 클래스를 정리하게 되었다.

  • Concurrent 컬렉션:
    • 데이터를 저장, 검색, 수정하는 데 사용되는 스레드 안전한 자료구조.
    • 예: 여러 스레드가 동시에 데이터를 삽입하거나 읽는 작업을 안전하게 수행.

패키지에는 컬렉션을 포함하여 다음과 같은 도구들도 포함이 되어있다.

  • 비동기 작업 실행 및 결과를 관리하는 비동기 프로그래밍 도구.
  • 데이터를 저장하거나 관리하지 않고, 작업의 상태와 결과를 처리.

1. 스레드 관리: ThreadPoolExecutor


1.1 언제 사용하는가?

  • 다수의 작업을 동시에 처리해야 하지만 스레드 생성과 소멸의 오버헤드를 줄이고 싶을 때.
  • 서버 애플리케이션에서 클라이언트 요청을 병렬로 처리할 때.

1.2 주요 특징

  • 스레드 풀(Thread Pool): 미리 생성된 스레드가 작업을 기다리며 대기.
    • 기본 스레드 갯수 (corePoolSize)
  • 동적으로 스레드를 관리
    • 추가 스레드는 작업이 끝난 뒤 유휴 상태로 일정 시간(keepAliveTime) 대기하다가 제거됨.
    • 추가 스레드 갯수 (maxPoolSize)
  • 작업 큐(Queue)와 결합해 처리 속도 제어 가능.
    • 기본 스레드가 바쁘고, 작업이 추가되면 작업 큐에 대기
    • 큐가 가득 차면 추가 스레드를 생성하거나 작업이 거부 됨.

1.3 주요 메서드

  • execute(Runnable task): 작업을 스레드 풀에 제출. → 결과 반환 x
  • submit(Callable<T> task / Runnable task): 작업 제출 후 Future로 결과 반환.
    • Future 객체란 : 비동기 작업의 결과를 나타내는 객체이다. 작업이 완료 되었는지 확인하거나, 완료될 때까지 기다렸다가 결과를 가져올 수 있다.
  • shutdown(): 새로운 작업 수락 중단.
    • 스레드 풀을 정상 종료
    • 이미 제출된 작업은 모두 실행된다.
  • shutdownNow(): 현재 실행 중인 작업 중단.
    • 스레드 풀 강제 종료
    • 실행 중인 작업은 interrupt(중단 요청)을 받는다.
    • 큐에 대기 중인 작업은 실행되지 않고 반환.

1.4 작동 원리

  1. 작업 제출: 작업이 execute 또는 submit으로 제출됨.
    1. submit 으로 제출된 객체는 Future 객체에 결과가 반환
  2. 작업 큐에 추가: 스레드 풀이 가득 차 있으면 작업 큐에 추가.
  3. 스레드 생성: 사용 가능한 스레드가 없으면 새 스레드 생성(최대 스레드 수까지).
  4. 작업 실행: 스레드가 작업 큐에서 작업을 가져와 실행.
    1. 작업을 완료하면 Future 객체에 결과를 저장한다.
      (Q. Future 객체는 어떻게 이렇게 결과를 저장할 수 있을까?)
      (A. Future 객체는 내부적으로 result 변수, done 플래그를 저장한다.
      클라이언트가 get()메서드를 호출하면 done 플래그에 따라 결과를 반환하거나 대기한다.)
  5. 스레드 재사용: 작업 완료 후 스레드는 재사용 대기.

1.5 사용 예시


ExecutorService executor = new ThreadPoolExecutor(
    2,// corePoolSize4,// maximumPoolSize60,// keepAliveTime
    TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(10)// 작업 큐
);

executor.execute(() -> {
    System.out.println("Task executed by thread: " + Thread.currentThread().getName());
});
executor.shutdown();

2. 비동기 작업: CompletableFuture


2.1 언제 사용하는가?

  • 비동기적으로 작업을 실행하고 결과를 조합하거나 처리할 때.
  • 네트워크 요청, I/O 작업처럼 오래 걸리는 작업에 사용.
  • 병렬 작업의 결과를 조합하거나 콜백 지옥을 피하고 싶을 때.

2.2 주요 특징

  • Future의 확장: 결과를 처리하는 콜백 제공.
    • Future는 작업 완료 여부를 기다리기 위해 get()을 호출해야 하지만, CompletableFuture는 작업 완료 후 콜백을 등록하여 비동기로 처리할 수 있다.

      // Future 방식
      Future<String> future = executor.submit(() -> "Hello");
      String result = future.get(); // 결과를 기다리며 블로킹 발생
      
      // CompletableFuture 방식
      CompletableFuture.supplyAsync(() -> "Hello")
          .thenAccept(result -> System.out.println(result)); // 결과 준비 시 실행
  • 비동기 실행 및 체이닝 지원thenApplythenAcceptwhenComplete.
  • 동기/비동기 혼합 가능.

2.3 주요 메서드

  • supplyAsync(Supplier<T>): 비동기 작업 실행 후 값을 반환.
  • thenApply(Function<T, R>): 결과를 변환.
  • thenAccept(Consumer<T>): 결과를 소비.
  • exceptionally(Function<Throwable, ? extends T>): 예외 처리.
  • thenCombine: 두 작업의 결과를 조합.
  • allOf: 여러 작업이 모두 완료될 때 실행.
  • anyOf: 여러 작업 중 하나라도 완료되면 실행.
  • exceptionally: 예외 발생 시 대체 결과를 제공.
  • handle: 성공/실패 여부에 따라 각각 처리.

2.4 작동 원리

  1. 비동기 작업 실행ForkJoinPool.commonPool()을 사용해 작업 실행
    1. Java 8에서 도입된 ForkJoinPool의 공유 스레드 풀(shared thread pool)
  • CompletableFuture는 내부적으로 스레드 안전한 방식으로 상태를 관리합니다.
  • 작업 완료 시 결과를 저장하거나, 작업이 중복으로 실행되지 않도록 CAS(Compare-And-Swap) 메커니즘을 사용합니다.
  1. CompletableFuture가 처리하는 로직에서 동시성을 고려하지 않은 공유 데이터를 수정한다면 동시성 문제가 발생할 수 있습니다.

예시: 동시성 문제

java
코드 복사
List<String> sharedList = Collections.synchronizedList(new ArrayList<>());

CompletableFuture.runAsync(() -> sharedList.add("Task1"));
CompletableFuture.runAsync(() -> sharedList.add("Task2"));

해결 방법:

  • 공유 데이터의 수정 작업을 동기화하거나, Concurrent 컬렉션을 사용하는 방식으로 동시성 문제를 해결해야 합니다.

2.5 사용 예시

java
CompletableFuture.supplyAsync(() -> {
    return "Hello";
}).thenApply(result -> {
    return result + " World!";
}).thenAccept(finalResult -> {
    System.out.println(finalResult);
});

3. 동시 데이터 관리: ConcurrentHashMap


3.1 언제 사용하는가?

  • 다수의 스레드가 동시에 읽기 및 쓰기 작업을 수행할 때.
  • 캐싱, 세션 관리처럼 고성능 동시 접근이 필요한 경우.

3.2 주요 특징

  • 읽기 동작은 잠금 없이 수행.
    • 읽기 작업은 데이터 변경을 일으키지 않음:
      • 데이터를 단순히 조회하는 작업(get)은 동시성 문제를 일으키지 않습니다.
      • Java 메모리 모델(Java Memory Model, JMM)에서 volatile 키워드와 Happens-Before 관계를 활용해 최신 데이터를 읽을 수 있습니다.
    • Q. volatile 키워드와 Happens-Before 관계가 뭘까요?
    • A. Happens-Before 관계는 스레드 간 동작의 순서를 정의하고, 메모리 가시성을 보장하는 규칙. volatile 변수를 선언하면 Happens-Before 관계를 자동으로 설정. 즉, volatile 변수에 값을 쓰는 작업은 해당 값을 읽는 작업 이전에 반드시 수행됩니다.
  • 쓰기 동작은 세그먼트 잠금(Lock Striping)으로 처리.
    • 쓰기 작업(put,remove)은 데이터 변경을 동반하므로 부분 잠금(세그먼트 잠금)이 적용됩니다. 이는 전체 맵을 잠그지 않고 충돌 가능성이 있는 특정 버킷(bucket)만 잠금 처리하여 동시성을 높입니다.
    • 부분 잠금의 동작 방식
      • 해시 기반 버킷 위치 계산:
        • 키의 해시 값을 기반으로 해당 데이터가 저장될 버킷(bucket)을 결정합니다.
        • Q. 버킷은 뭔가요?
        • A. 해시 기반 자료구조에서 데이터를 저장하는 단위 공간입니다. 충돌(해시값이 같은 키)을 처리하기 위해 체인 형태(연결 리스트) 또는 트리 구조로 데이터를 관리.
      • 버킷 수준에서 잠금:
        • 변경 작업(쓰기, 삭제)은 해당 버킷에만 영향을 미치므로, 해당 버킷만 잠급니다.
        • 다른 버킷은 독립적으로 동작하므로 병렬 처리가 가능합니다.
      • CAS와 synchronized의 조합:
        • 데이터 변경 시 Compare-And-Swap(CAS) 연산을 사용해 락 없이 빠르게 시도.
        • Q.CAS가 뭔가요?
        • A. 락 없이 데이터의 상태를 안전하게 업데이트하는 비블로킹 알고리즘
          1. 현재 값 확인: 특정 메모리 위치의 값을 읽음.
          2. 비교: 읽은 값과 기대한 값이 같은지 비교.
          3. 교체: 두 값이 같다면 새로운 값으로 변경.
          4. 실패 처리: 값이 예상과 다르면 연산을 재시도.
        • 충돌이 발생할 경우, 해당 버킷에서만 synchronized 키워드를 통해 잠금 수행.
        • Q.synchronized 가 뭔가요?
        • A. Java의 동기화 키워드로, 여러 스레드가 동시에 하나의 자원에 접근하는 것을 제어하기 위해 사용.
    • 부분 잠금(Partial Locking):
      • 전체 맵을 잠그는 대신, 특정 버킷에만 잠금을 걸어 다른 키-값 작업에는 영향을 주지 않음.
      • 여러 스레드가 동시에 다른 버킷에서 작업할 수 있으므로 병렬 처리 효율이 높음.
    • 락 경합 감소:
      • 데이터가 충돌하지 않으면(해시값 충돌이 적으면) 쓰기 작업 간의 경합이 거의 없음.
      • 읽기 작업은 락이 필요 없으므로 쓰기와 병렬로 동작 가능.

3.3 주요 메서드

  • put: 지정된 키와 값을 삽입. 키가 이미 존재하면 값을 덮어씀.
  • putIfAbsent: 지정된 키가 없을 때만 값을 삽입.
  • computeIfAbsent: 키가 없으면 연산을 수행한 결과를 삽입.
  • computeIfPresent: 키가 존재하면 연산을 수행하여 값을 업데이트.
  • replace: 기존 값을 새로운 값으로 대체.
  • replace(K key, V oldValue, V newValue): 기존 값이 특정 값일 때만 대체.
  • merge: 기존 값과 새 값을 병합 후 저장.
  • getOrDefault: 키가 없을 경우 기본값 반환.
  • remove: 지정된 키와 값을 제거.
  • replaceAll: 모든 값을 연산 결과로 대체.

3.4 사용 예시

java
코드 복사
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.computeIfAbsent("key", k -> 0);
System.out.println(map.get("key"));// 출력: 1

4. 동시 데이터 관리: ConcurrentLinkedQueue


4.1 언제 사용하는가?

  • 작업 큐처럼 동시 추가 및 제거가 필요한 경우.
  • 생산자-소비자 패턴을 구현하지만, 대기가 필요 없는 경우.
    • Q. 생산자 - 소비자 패턴이 뭔가요?
    • A. 생산자-소비자 패턴(Producer-Consumer Pattern)은 멀티스레드 환경에서 작업 생산(producer)과 작업 소비(consumer)를 서로 독립적으로 처리하도록 설계된 동시성 패턴입니다. 생산자 스레드는 데이터를 생성하고, 소비자 스레드는 데이터를 처리하며, 버퍼(공유 자원)로 데이터를 전달합니다.
    1. 생산자(Producer):
      • 공유 자원에 데이터를 삽입.
      • 버퍼가 가득 찼으면, wait()를 호출하여 대기.
      • 데이터 삽입 후, 소비자를 깨우기 위해 notify() 호출.
    2. 소비자(Consumer):
      • 공유 자원에서 데이터를 꺼내 처리.
      • 버퍼가 비었으면, wait()를 호출하여 대기.
      • 데이터 소비 후, 생산자를 깨우기 위해 notify() 호출.

4.2 주요 특징

  • 락-프리 설계: CAS 알고리즘으로 동작.
  • FIFO(First In First Out) 구조.
  • 크기 제한 없음.

4.3 주요 메서드

  • offer: 큐의 끝에 요소 추가.
  • poll: 큐의 앞에서 요소 제거.
  • peek: 큐의 앞 요소 반환(제거하지 않음).

4.4 작동 원리

  1. CAS 기반 작업:
    • 삽입 시 tail 포인터를 원자적으로 이동.
    • 제거 시 head 포인터를 원자적으로 이동.
  2. 연결 리스트 기반:
    • 각 노드는 다음 노드를 가리킴.

4.5 사용 예시

java
코드 복사
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("Task1");
System.out.println(queue.poll());// 출력: Task1

5. 동기화: Semaphore


5.1 언제 사용하는가?

  • 리소스에 대한 접근을 제한해야 할 때.
  • 네트워크 연결, 데이터베이스 커넥션 풀과 같은 한정된 리소스 관리.

5.2 주요 특징

  • Semaphore의 역할:
    • 일정 수의 스레드만 동시에 공유 자원에 접근할 수 있도록 제어.
    • 허용된 스레드 수(permit)를 초과한 스레드는 대기 상태로 진입.
  • Permit(허가):
    • Semaphore 내부적으로 관리되는 카운트 값으로, 자원 접근 가능 횟수를 나타냄.
    • Permit이 0이면 추가 스레드는 대기해야 함.
  • Semaphore의 종류:
    • 공정(Fair) Semaphore: 대기 중인 스레드가 FIFO(First-In-First-Out) 방식으로 자원에 접근.
    • 비공정 Semaphore: 스레드가 임의 순서로 자원에 접근.
  • acquire로 허가 요청, release로 반환.

5.3 주요 메서드

  • acquire: 허가를 얻기 위해 대기.
  • release: 허가 반환.
  • availablePermits: 남은 허가 수 반환.

5.4 작동 원리

  • 내부적으로 카운트를 감소/증가하며 접근 제어.
  • 허가가 없을 경우 스레드는 대기 상태로 전환.

5.5 사용 예시

java
코드 복사
Semaphore semaphore = new Semaphore(2);

for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        try {
            semaphore.acquire();
            System.out.println("Accessing resource");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            semaphore.release();
        }
    }).start();
}

6. 큐 및 작업 대기열: BlockingQueue


6.1 언제 사용하는가?

  • 생산자-소비자 패턴에서 작업 대기열이 필요할 때.
  • 대기 상태를 통해 작업의 균형을 맞추고 싶을 때.
    • 한 쪽(생산자)이 데이터를 생성하고 다른 쪽(소비자)이 데이터를 처리하는 구조에서, 생산 속도와 소비 속도가 다를 경우 대기 상태를 통해 균형을 맞출 수 있습니다.
    • 예를 들어, 생산자는 데이터를 계속 생성하지만 소비자가 데이터를 처리하지 못하면 데이터가 과부하 상태가 될 수 있습니다.

6.2 주요 특징

  • FIFO 구조.
  • 크기 제한 가능.

6.3 주요 메서드

  • put: 큐가 가득 차면 대기 후 추가.
  • take: 큐가 비어 있으면 대기 후 제거.
  • offer: 큐가 가득 차면 실패.

6.4 작동 원리

  • 내부적으로 ReentrantLock과 Condition을 사용. Java의 ReentrantLock은 synchronized를 대체하거나 보완하기 위한 고급 동기화 도구로, 더 정교한 제어와 기능을 제공합니다. Condition은 ReentrantLock과 함께 사용되는 객체로, 대기(wait)와 알림(notify) 메커니즘을 구현합니다. Condition 은 synchronized의 wait()와 notify()를 대체하며, 더 유연한 대기-알림 구조를 제공합니다.
  • 생산자가 추가하려 할 때 큐가 가득 차 있으면 대기 상태로 전환.
  • 소비자가 가져가면 대기 중인 생산자에게 신호를 보냄.

6.5 사용 예시

java
코드 복사
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);

new Thread(() -> {
    try {
        queue.put("Task1");
        System.out.println("Task1 added");
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

new Thread(() -> {
    try {
        System.out.println(queue.take());// Task1 출력
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}).start();

정리

  • ThreadPoolExecutor: 스레드 관리를 최적화하고 작업 큐와 결합해 병렬 작업 처리.
  • CompletableFuture: 비동기 작업과 결과 처리를 유연하게 구현.
  • ConcurrentHashMap: 동시 데이터 읽기/쓰기 환경에서 안전하고 빠름.
  • ConcurrentLinkedQueue: 락-프리로 고성능 큐 구현.
  • Semaphore: 리소스 접근을 제한적으로 제어.
  • BlockingQueue: 생산자-소비자 패턴에서 작업 대기와 동기화 지원.

각 상황에 맞는 도구를 사용하면 안정적이고 효율적인 동시성 프로그래밍을 구현할 수 있습니다.

profile
백엔드 개발자입니다.

0개의 댓글