[Java] 동시성 문제 공부 및 정리 (2)

한호성·2023년 8월 4일

Introduction

현재 우리 회사가 메일 발송을 위해 사용하는 서비스는 , Microsoft office365이다.

한 계정으로 메일 발송을 할 수 있는 Thread의 개수가 정해져 있다고 공식문서에 나와 있어서, 메일을 발송하는 코드에서도, 비슷한 정도의 Thread를 통해 요청을 보내기를 해야겠다 생각이 들어서, 기존의 메일서버를 수정하게 되었다.

내가 생각한 방법은 Multi-Thread 기반으로 들어가는 Request 요청을 하나의 Event Queue에 담고, 일괄적으로 전송하는 방법을 생각했다.

메일을 발송하는 워크 쓰레드를, 공식문서에 나온 쓰레드 개수만큼 두고, office 365 단일계정을 통해 메일 발송을 요청을 할 생각이다.

이전에 구축했던 메일 서비스에 많은 요청이 들어올 때, 비동기적으로 여러 Thread에서 메일 요청을 보내게 구현이 되어있었다.

우선 개념적으로 여러 Thread 에서 데이터를 한곳에 모으기 위해 Thread-safe한 큐가 필요했고, 몇 가지 조사를 해보니, Java.util.concurrent package에 제공하는 LinkedBlockingQueue를 사용하였다.

내가 생각한대로 구현이 끝난 후, 내가 사용한 자료구조에 대해 좀 더 구체적인 조사가 필요한 거 같아서, LinkedBlocking Queue에 대해 공부해보았더니, 들어는 봤지만, 확실하게 이해하고 있지 않은 많은 부분들이 존재했다.
또한 동시성 이슈에 관해 깊은 이해가 부족하다고 생각이 들어 그와 관련해서 공부를 깊게 해보도록하자..

Index

  • 동시성 문제에 관한 용어정리 (1)
  • 공유 자원처리 방법 (1)
  • 자바에서 프로그래밍에서 동시성 처리하는 법 (2)
  • 자료구조를 직접 간단하게 만들어보자 (2)

자바 언어에서, 동시성 처리하는 방법

Synchoronized

Synchroized 키워드를 통해 해당 블럭의 엑세스를 동기화 할 수 있다. 즉 {}으로 가둬진 내용을 임계영역으로 만드는 방법이다.

자바 언어에서는 모든 개체가, Monitor를 갖고 있어서, 동기화 처리를 할 때, 활용할 수 있다.

즉 thread가 개체 모니터를 가지면, lock을 걸어서, 그 개체를 다른 thread가 접근하게 하지 못하는 것이다.

간단한 자바 코드를 살펴보자



class Solution {
    public static int staticNum = 0;

    public static synchronized void add() {
        staticNum++;
    }

    public void solution() throws InterruptedException {

        Thread test1 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {

                    System.out.println("static num :" + staticNum);
                    add();
                }

            }
        });

        Thread test2 = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    System.out.println("static num :" + staticNum);
                    add();
                }

            }
        });

        test1.start();
        test2.start();
        test1.join();
        test2.join();
        System.out.println(staticNum);

    }


}

코드를 간단하게 설명하면,
공유 자원을 늘리는 메소드에 synchronized를 걸어서 한 thread만 공유자원에 접근하도록 했다.

기존의 동시성 문제가 발생하는 것을, 잘 해결되는 것을 확인할 수 있었다.

Atomic class, Concurrent package

Atomic과 Concurrent package class들은 CAS 알고리즘과 volatile 키워드를 통해 구현되었다.

#cf) CAS : Compare And Swap의 줄임말로, 비교하고 변경한다는 의미를 갖고, 내가 갖고 있는 값과 메모리에 위치한 값이 비교한지 비교하고, 일치하면 내가 원하는 값으로 변경하는 것이다.

#cf) Volatile : 값을 캐싱하지 않고, 메인 메모리에서 직접 가져오게 하는 키워드이다.

java.util.concurrent.locks

concurrent pacakge는 동기화가 필요한 상황에서 사용할 수 있는 다양한 유틸리티 클래스들을 제공한다. 주요 기능을 정리해보자

  • Locks : 상호 배제를 사용할 수 있는 클래스 제공
  • Atomic : 동기화가 되어 있는 변수를 제공
  • Executors : 쓰레드 풀 생성, 쓰레드 생명주기 관리, Task 등록 실행을 처리할 수 있다.
  • Synchronizers : 특수한 목적의 동기화 처리하는 5개 클래스 제공

Lock interface를 구현한 구현체로는 ReentrantLock 이 존재한다. ( 임계영역의 시작지점과 종료지점을 직접 명시할 수 있게 해준다.)

내가 동기화를 위해 사용했던 LinkedBlockingQueue 자료구조는 Lock class를 사용했었다.

이전에 공부했던 OS 동기화 알고리들 중 , 뮤텍스 방식을 사용하고 있다고 생각이 들었다.
(Lock을 갖고 있는 Thread만 Lock을 해제할 수 있다.)

synchronized vs Lock

위의 먼저 설명된 Synchronized를 활용한 동기화 방법과 Lock을 활용한 동기화 방법에 대해 비교해보자.

1 가장 큰 차이점은, thread들의 starvation이 control할 수 있냐 없냐이다.

Synchronized 키워드를 사용해서, 동기화 처리 할 경우 thread들 간의 우선순위는 존재하지 않고, 경합을 하게 되고 , 이 때, 계속해서 경쟁에 밀리는 thread들은 일을 처리하지 못하는 경우가 있다.

반면에 Lock의 같은 fariness 옵션을 줄 경우, 먼저 대기한 thread가 먼저 자원을 점유할 수 있도록 해준다.

2 그 다음으로 차이점은, Lock 상태의 Condition 을 여러개 둘 수 있다는 점이다. 어떤 자원에 접근해야하는 여러 thread들이 있는데, 다른 큐에 각각 대기 시켜야하는 경우 사용 할 수 있다. 아래의 간단하게 작성해둔 코드를 보면 이해할 수 있을 것이다.

간단하게 자료구조 구현해보기


public class MyblockingQueue {

    public Queue<Integer> queue;

    public final int size;
    private ReentrantLock lock = new ReentrantLock();

    private final Condition fullCondition = lock.newCondition();
    private final Condition emptyCondition = lock.newCondition();

    public MyblockingQueue(int size) {
        this.size = size;
        queue = new ArrayDeque<>(size);
    }

    public void put(Integer value) {
        lock.lock();
        while (isFull()) {
            try {
                System.out.println(Thread.currentThread() + "Put 대기 ");
                fullCondition.await();
                System.out.println(Thread.currentThread() + " Put 대기해제 ");
            } catch (InterruptedException e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }

        queue.add(value);
        System.out.println("put 한후 queue 상태 " );
        for(Integer data : queue){
            System.out.print(" "+ data);
        }
        System.out.println("");
        System.out.println(Thread.currentThread() + " Queue 값 넣은 후, 추가했다고 signaling");
        emptyCondition.signalAll();
        lock.unlock();
    }

    public Integer poll() {
        lock.lock();
        while (isEmpty()) {
            try {
                System.out.println(Thread.currentThread() + "poll 대기 ");
                emptyCondition.await();
                System.out.println(Thread.currentThread() + "poll 대기해제");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        final Integer poll = queue.poll();
        System.out.println("poll 한후 queue 상태 " );
        for(Integer value : queue){
            System.out.print(" "+ value);
        }
        System.out.println("");
        fullCondition.signal();
        System.out.println(Thread.currentThread() + " Queue 값 뺸 후,넣을 자리 있다고 signaling");
        lock.unlock();
        return poll;
    }

    private boolean isFull() {
        return queue.size() == size;
    }

    private boolean isEmpty() {
        return queue.size() == 0;
    }

}

개인적으로 Blocking queue를 만들어 봤다.

  • Lock 구현체 ReentrantLock
  • 2개의 Condition을 두어서,
    • Queue 에 add 할 thread
    • Queue에 poll한 thread

위의 2가지 포인트를 고려해서 만들었고, 잘 작동하는 것도 확인 했다.

기존의 만들어진 Concurrent package를 잘 활용한다면, 동기화를 잘 할 수 있을거라 판단한다.

Reference

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html
https://zion830.tistory.com/57
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html

profile
개발자 지망생입니다.

1개의 댓글

comment-user-thumbnail
2023년 8월 4일

글 잘 봤습니다.

답글 달기