현재 우리 회사가 메일 발송을 위해 사용하는 서비스는 , Microsoft office365이다.
한 계정으로 메일 발송을 할 수 있는 Thread의 개수가 정해져 있다고 공식문서에 나와 있어서, 메일을 발송하는 코드에서도, 비슷한 정도의 Thread를 통해 요청을 보내기를 해야겠다 생각이 들어서, 기존의 메일서버를 수정하게 되었다.
내가 생각한 방법은 Multi-Thread 기반으로 들어가는 Request 요청을 하나의 Event Queue에 담고, 일괄적으로 전송하는 방법을 생각했다.
메일을 발송하는 워크 쓰레드를, 공식문서에 나온 쓰레드 개수만큼 두고, office 365 단일계정을 통해 메일 발송을 요청을 할 생각이다.
이전에 구축했던 메일 서비스에 많은 요청이 들어올 때, 비동기적으로 여러 Thread에서 메일 요청을 보내게 구현이 되어있었다.
우선 개념적으로 여러 Thread 에서 데이터를 한곳에 모으기 위해 Thread-safe한 큐가 필요했고, 몇 가지 조사를 해보니, Java.util.concurrent package에 제공하는 LinkedBlockingQueue를 사용하였다.
내가 생각한대로 구현이 끝난 후, 내가 사용한 자료구조에 대해 좀 더 구체적인 조사가 필요한 거 같아서, LinkedBlocking Queue에 대해 공부해보았더니, 들어는 봤지만, 확실하게 이해하고 있지 않은 많은 부분들이 존재했다.
또한 동시성 이슈에 관해 깊은 이해가 부족하다고 생각이 들어 그와 관련해서 공부를 깊게 해보도록하자..
Index
- 동시성 문제에 관한 용어정리 (1)
- 공유 자원처리 방법 (1)
- 자바에서 프로그래밍에서 동시성 처리하는 법 (2)
- 자료구조를 직접 간단하게 만들어보자 (2)
Synchroized 키워드를 통해 해당 블럭의 엑세스를 동기화 할 수 있다. 즉 {}으로 가둬진 내용을 임계영역으로 만드는 방법이다.
자바 언어에서는 모든 개체가, Monitor를 갖고 있어서, 동기화 처리를 할 때, 활용할 수 있다.
즉 thread가 개체 모니터를 가지면, lock을 걸어서, 그 개체를 다른 thread가 접근하게 하지 못하는 것이다.
간단한 자바 코드를 살펴보자
class Solution {
public static int staticNum = 0;
public static synchronized void add() {
staticNum++;
}
public void solution() throws InterruptedException {
Thread test1 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println("static num :" + staticNum);
add();
}
}
});
Thread test2 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println("static num :" + staticNum);
add();
}
}
});
test1.start();
test2.start();
test1.join();
test2.join();
System.out.println(staticNum);
}
}
코드를 간단하게 설명하면,
공유 자원을 늘리는 메소드에 synchronized를 걸어서 한 thread만 공유자원에 접근하도록 했다.
기존의 동시성 문제가 발생하는 것을, 잘 해결되는 것을 확인할 수 있었다.
Atomic과 Concurrent package class들은 CAS 알고리즘과 volatile 키워드를 통해 구현되었다.
#cf) CAS : Compare And Swap의 줄임말로, 비교하고 변경한다는 의미를 갖고, 내가 갖고 있는 값과 메모리에 위치한 값이 비교한지 비교하고, 일치하면 내가 원하는 값으로 변경하는 것이다.
#cf) Volatile : 값을 캐싱하지 않고, 메인 메모리에서 직접 가져오게 하는 키워드이다.
concurrent pacakge는 동기화가 필요한 상황에서 사용할 수 있는 다양한 유틸리티 클래스들을 제공한다. 주요 기능을 정리해보자
Lock interface를 구현한 구현체로는 ReentrantLock 이 존재한다. ( 임계영역의 시작지점과 종료지점을 직접 명시할 수 있게 해준다.)
내가 동기화를 위해 사용했던 LinkedBlockingQueue 자료구조는 Lock class를 사용했었다.
이전에 공부했던 OS 동기화 알고리들 중 , 뮤텍스 방식을 사용하고 있다고 생각이 들었다.
(Lock을 갖고 있는 Thread만 Lock을 해제할 수 있다.)
위의 먼저 설명된 Synchronized를 활용한 동기화 방법과 Lock을 활용한 동기화 방법에 대해 비교해보자.
1 가장 큰 차이점은, thread들의 starvation이 control할 수 있냐 없냐이다.
Synchronized 키워드를 사용해서, 동기화 처리 할 경우 thread들 간의 우선순위는 존재하지 않고, 경합을 하게 되고 , 이 때, 계속해서 경쟁에 밀리는 thread들은 일을 처리하지 못하는 경우가 있다.
반면에 Lock의 같은 fariness 옵션을 줄 경우, 먼저 대기한 thread가 먼저 자원을 점유할 수 있도록 해준다.
2 그 다음으로 차이점은, Lock 상태의 Condition 을 여러개 둘 수 있다는 점이다. 어떤 자원에 접근해야하는 여러 thread들이 있는데, 다른 큐에 각각 대기 시켜야하는 경우 사용 할 수 있다. 아래의 간단하게 작성해둔 코드를 보면 이해할 수 있을 것이다.
public class MyblockingQueue {
public Queue<Integer> queue;
public final int size;
private ReentrantLock lock = new ReentrantLock();
private final Condition fullCondition = lock.newCondition();
private final Condition emptyCondition = lock.newCondition();
public MyblockingQueue(int size) {
this.size = size;
queue = new ArrayDeque<>(size);
}
public void put(Integer value) {
lock.lock();
while (isFull()) {
try {
System.out.println(Thread.currentThread() + "Put 대기 ");
fullCondition.await();
System.out.println(Thread.currentThread() + " Put 대기해제 ");
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
queue.add(value);
System.out.println("put 한후 queue 상태 " );
for(Integer data : queue){
System.out.print(" "+ data);
}
System.out.println("");
System.out.println(Thread.currentThread() + " Queue 값 넣은 후, 추가했다고 signaling");
emptyCondition.signalAll();
lock.unlock();
}
public Integer poll() {
lock.lock();
while (isEmpty()) {
try {
System.out.println(Thread.currentThread() + "poll 대기 ");
emptyCondition.await();
System.out.println(Thread.currentThread() + "poll 대기해제");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
final Integer poll = queue.poll();
System.out.println("poll 한후 queue 상태 " );
for(Integer value : queue){
System.out.print(" "+ value);
}
System.out.println("");
fullCondition.signal();
System.out.println(Thread.currentThread() + " Queue 값 뺸 후,넣을 자리 있다고 signaling");
lock.unlock();
return poll;
}
private boolean isFull() {
return queue.size() == size;
}
private boolean isEmpty() {
return queue.size() == 0;
}
}
개인적으로 Blocking queue를 만들어 봤다.
위의 2가지 포인트를 고려해서 만들었고, 잘 작동하는 것도 확인 했다.
기존의 만들어진 Concurrent package를 잘 활용한다면, 동기화를 잘 할 수 있을거라 판단한다.
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html
https://zion830.tistory.com/57
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html
글 잘 봤습니다.