오늘(11.30) 접수 마감인 원티드의 프리온보딩 11월 챌린지를 알게 되어서 급하게 작성하는 사전과제!
조금 어려워 보이지만 사전과제가 공부용으로도 좋아보여서 후다닥 해 보려고 한다.
동기와 비동기는 어떤 작업 혹은 연관된 작업을 처리하고자 하는 시각의 차이이지 어느 한 쪽이 절대적으로 좋은 것은 아니다.
때문에 구현해야 하는 프로그램의 목적에 따라 적절한 방식을 선택하면 된다.
예:
JDBC를 이용해 DB에 쿼리 질의를 날린다
메서드에서 다른 메서드를 호출하여 결과값을 즉시 받아온다
예:
Polling - 컨텍스트 스위칭이 지속적으로 발생해 지연이 발생한다.
예:
비동기, 논블로킹 작업을 호출하고 자신의 작업을 하던 도중 호출한 작업의 결과 값을 조회하려고 했을 때(블로킹 메서드 실행)
예:
AJAX 요청 / JS 비동기 콜백
대규모 사용자에게 푸시메세지 전송
다양한 외부 API를 한번에 호출할 때
race condition
이 발생할 수 있으므로, 반드시 synchronized 블록
등의 기법을 통해 자원을 동기화
해서 사용해야 한다는 것이 있다.import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CallbackExample1 {
private static ExecutorService executorService;
// CompletionHandler를 구현한다.
private static final CompletionHandler<String, Void> completionHandler = new CompletionHandler<>() {
// 작업 1이 성공적으로 종료된 경우 불리는 콜백 (작업 2)
@Override
public void completed(String result, Void attachment) {
log("작업 2 시작 (작업 1의 결과: " + result + ")");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 2 종료");
}
// 작업 1이 실패했을 경우 불리는 콜백
@Override
public void failed(Throwable exc, Void attachment) {
log("작업 1 실패: " + exc.toString());
}
};
public static void main(String[] args) {
executorService = Executors.newCachedThreadPool();
// 작업 1
executorService.submit(() -> {
log("작업 1 시작");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 1 종료");
String result = "Alice";
if (result.equals("Alice")) { // 작업 성공
completionHandler.completed(result, null);
} else { // 작업 실패
completionHandler.failed(new IllegalStateException(), null);
}
});
// 별개로 돌아가는 작업 3
log("작업 3 시작");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 3 종료");
}
private static void log(String content) {
System.out.println(Thread.currentThread().getName() + "> " + content);
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
public class CallbackExample2 {
private static ExecutorService executorService;
public static void main(String[] args) {
executorService = Executors.newCachedThreadPool();
// execute 함수의 인자로 callback의 구현체를 넣는다.
execute(parameter -> {
log("작업 2 시작 (작업 1의 결과: " + parameter + ")");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 2 종료");
});
// 별개로 돌아가는 작업 3
log("작업 3 시작");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 3 종료");
}
public static void execute(Consumer<String> callback) {
executorService.submit(() -> {
log("작업 1 시작");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String result = "Alice";
log("작업 1 종료");
// 작업을 마친 후 인자로 받아온 callback의 구현체를 비동기로 실행한다.
callback.accept(result);
});
}
private static void log(String content) {
System.out.println(Thread.currentThread().getName() + "> " + content);
}
}
함수형 인터페이스 참고
Runnable
인자와 리턴값이 모두 없다.Supplier<R>, Callable<R>
인자는 없고, R 타입의 객체를 리턴한다.Consumer<T>
T 타입의 인자를 받고, 아무것도 리턴하지 않는다.Function<T, R>
T 타입의 인자를 받고, R 타입의 객체를 리턴한다.@FunctionalInterface
를 통해 커스텀 함수형 인터페이스를 만들 수도 있다.
Future
객체를 사용한 비동기 처리 방식은 다른 주체에게 작업을 맡긴 상태에서 본 주체 쪽에서 작업이 끝났는지 물어보면서 직접 확인하는 방식이다.isDone()
이나 isCanceled()
메소드로 블로킹 없이 작업을 완료했는지의 여부만 확인하는 방법get()
으로 작업이 완료될 때까지 블로킹된 상태로 대기하는 방법get()
을 호출하기 전까지 이 쪽에서 할 일을 하다가, 작업을 마치면 get()
을 호출해 작업의 결과를 받아오는 식으로 사용한다. get()
메소드를 통해 Future 객체
에 담긴(담길) 작업 결과를 얻을 수 있다.참고:
Future
에 작업을 등록할 때, 등록되는 작업이Runnable
인지Callable
인지 잘 확인해야 한다.Runnable
은 아무것도 리턴하지 않기 때문에get()
을 호출했을 때null
이 나올 수 있다.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureExample {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
// 작업1 Callable이 리턴한 값을 future에 담는다.
Future<String> future = executorService.submit(() -> {
log("작업 1 시작");
Thread.sleep(1000);
log("작업 1 종료");
return "Alice";
});
log("작업 2 시작 (작업 1 종료 대기)");
String result = "";
try {
// 논블로킹으로 작업 1이 종료되었는지 확인한다.
log("작업 1 종료 여부: " + future.isDone());
// 블로킹 상태에서 작업 1이 끝날 때까지 대기한다.
result = future.get();
// 논블로킹으로 작업 1이 종료되었는지 확인한다.
log("작업 1 종료 여부: " + future.isDone());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
log("작업 1의 결과: " + result);
log("작업 2 종료");
}
private static void log(String content) {
System.out.println(Thread.currentThread().getName() + "> " + content);
}
}
// FutureTask를 생성한다. 비동기로 수행할 작업을 짜 넣는다.
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName() + "> 작업 1 시작");
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "> 작업 1 종료");
return "Alice";
});
// FutureTask를 수행하는 스레드를 시작한다.
executorService.submit(futureTask);
// 작업 결과는 Future와 같은 방식으로 얻어온다.
futureTask.get();
Future
는 결국 다른 주체의 작업 결과를 얻어오려면 잠시라도 블로킹 상태에 들어갈 수밖에 없기 때문에 사용하는 데 한계가 있다. 그래서 등장한 게 CompletableFuture
이다.CompletableFuture
를 사용하면 이전 작업의 결과를 get()
을 통해 블로킹으로 가져올 필요 없이, then...()
함수를 통해 논블로킹을 유지하며 바로 사용할 수 있다.import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) {
new Thread(() -> {
try {
CompletableFuture
.supplyAsync(FutureExample::work1)
.thenAccept(FutureExample::work2)
.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
work3();
}
private static String work1() {
log("작업 1 시작");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 1 종료");
return "Alice";
}
private static void work2(String result) {
log("작업 1의 결과: " + result);
log("작업 2 시작");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 2 종료");
}
private static void work3() {
log("작업 3 시작");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log("작업 3 종료");
}
private static void log(String content) {
System.out.println(Thread.currentThread().getName() + "> " + content);
}
}
메시지 큐(Message Queue)
는 프로세스 또는 프로그램 간에 데이터를 교환할 때 사용하는 통신 방법 중에 하나로, 메시지 지향 미들웨어(Message Oriented Middleware:MOM)
를 구현한 시스템을 의미한다.메시지 지향 미들웨어 :
비동기 메시지를 사용하는 응용 프로그램들 사이에서 데이터를 송수신하는 것을 의미한다. 여기서 메시지란 요청, 응답, 오류 메시지 혹은 단순한 정보 등의 작은 데이터가 될 수 있다.
생산자(Producer)
로 취급되는 컴포넌트가 메시지를 메시지 큐에 추가한다. 해당 메시지는 소비자(Consumer)
로 취급되는 또 다른 컴포넌트가 메시지를 검색하고 이를 사용해 어떤 작업을 수행할 때까지 메시지 큐에 저장된다. 각 메시지는 하나의 소비자에 의해 한 번만 처리
될 수 있는데, 이러한 이유로 메시지 큐를 이용하는 방식을 일대일 통신
이라고 부른다.
- 비밀번호 재설정을 위해 이메일을 발급하는 서비스, 회원가입을 위해 이메일을 발급하는 서비스 등은 메시지(이메일)를 큐에 넣을 수 있다.
- 이메일 전송 전용 서비스는 이메일이 어느 서비스로부터 생산되었는지와는 관계없이, 메시지 큐의 메시지를 하나씩 소비하고, 그저 이메일이 전송되어야 할 곳으로 이메일을 전송한다.
- 이와 같은 접근 방식은 메시지 큐에 들어오는 메시지 수가 너무 많아지는 경우 이메일 전송 전용 서비스 인스턴스를 더 둠으로써 확장할 수 있으므로 확장성이 뛰어나다.
- 사용자가 고용량의 이미지가 포함된 블로그 포스팅을 한다.
이미지는 저장소에 전송된다.- 업로드된 이미지에 대한 정보가 포함된 메시지를 이미지 최적화 서비스의 메시지 큐에 담는다.
- 이미지 최적화 서비스는 저장소에서 이미지를 가져와 최적화하고, 2번에서 저장해놨던 이미지를 대체한다.
여기서 얻을 수 있는 결론은 서버가 사용자에게 얼마나 빠르고 안정적으로 정보를 전달할 수 있는 지에 초점을 맞춘 기술이라고 볼 수 있다.