[Java] 동시성 프로그래밍 기초 키워드 (2)

yb__char·2024년 11월 22일
0
post-thumbnail

이전 키워드 정리할 때 못다한 키워드들을 추가하여 정리해봤습니다.
Future의 문제점을 보완하기 위한 CompletableFuture, Thread의 안정성을 위한 ThreadLocal, JVM에서 비동기 처리를 위한 Async에 대해 이야기를 더 해보려합니다!


CompletableFuture

CompletableFuture 같은 경우 Future의 단점 및 한계를 극복하기 위해 나온 Java 8의 인터페이스이다.

Future가 추가되면서 비동기 작업에 대한 결과값을 반환 받을 수 있게 되었다. 하지만 Future는 다음과 같은 한계점이 있었다.

  • 외부에서 완료시킬 수 없고, get의 타임아웃 설정으로만 완료 가능
  • 블로킹 코드(get)를 통해서만 이후의 결과를 처리할 수 있음
  • 여러 Future를 조합할 수 없음 ex) 회원 정보를 가져오고, 알림을 발송 등등..
  • 여러 작업을 조합하거나 예외 처리할 수 없음

Future는 외부에서 작업을 완료시킬 수 없고, 작업 완료는 오직 get 호출 시에 타임아웃으로만 가능하다. 또한 비동기 작업의 응답에 추가 작업을 하려면 get을 호출해야 하는데, get은 블로킹 호출이므로 좋지 않다. 또한 여러 Future들을 조합할 수도 없으며, 예외가 발생한 경우에 이를 위한 예외처리도 불가능하다. 그래서 Java 8에서는 이러한 문제를 모두 해결한 CompletableFuture가 등장하게 되었다.

CompletableFuture는 기존의 Future를 기반으로 외부에서 완료시킬 수 있어서 CompletableFuture라는 이름을 갖게 되었다.
Future 외에도 CompletionStage 인터페이스도 구현하고 있는데, CompletionStage는 작업들을 중첩시키거나 완료 후 콜백을 위해 추가되었다. 예를 들어 Future에서는 불가능했던 "몇 초 이내에 응답이 안 오면 기본값을 반환한다." 와 같은 작업이 가능해진 것이다.
즉, Future의 진화된 형태로써 외부에서 작업을 완료시킬 수 있을 뿐만 아니라 콜백 등록 및 Future 조합 등이 가능하다는 것이다.

비동기 작업 실행

  • runAsync

    • 반환값이 없는 경우
    • 비동기로 작업 실행 콜
  • supplyAsync

    • 반환값이 있는 경우
    • 비동기로 작업 실행 콜

runAsync는 반환 값이 없으므로 Void 타입이며, 아래의 코드를 실행해보면 future가 별도의 쓰레드에서 실행됨을 확인할 수 있다.

@Test
void runAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
    });

    future.get();
    System.out.println("Thread: " + Thread.currentThread().getName());
}

supplyAsync는 runAsync와 달리 반환값이 존재한다. 그래서 비동기 작업의 결과를 받아올 수 있다.

@Test
void supplyAsync() throws ExecutionException, InterruptedException {

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    });

    System.out.println(future.get());
    System.out.println("Thread: " + Thread.currentThread().getName());
}

runAsync와 supplyAsync는 기본적으로 Java 7에 추가된 ForkJoinPool의 commonPool()을 사용해 작업을 실행할 쓰레드를 쓰레드 풀로부터 얻어 실행시킨다. 만약 원하는 쓰레드 풀을 사용하려면, 이전에 공부한 ExecutorService를 파라미터로 넘겨주면 된다.

작업 콜백

  • thenApply

    • 반환 값을 받아서 다른 값을 반환함
    • 함수형 인터페이스 Function을 파라미터로 받음
  • thenAccpet

    • 반환 값을 받아 처리하고 값을 반환하지 않음
    • 함수형 인터페이스 Consumer를 파라미터로 받음
  • thenRun

    • 반환 값을 받지 않고 다른 작업을 실행함
    • 함수형 인터페이스 Runnable을 파라미터로 받음

Java8에는 다양한 함수형 인터페이스들이 추가되었는데, CompletableFuture 역시 이들을 콜백으로 등록할 수 있게 한다. 그래서 비동기 실행이 끝난 후에 전달 받은 작업 콜백을 실행시켜주는데, thenApply는 값을 받아서 다른 값을 반환시켜주는 콜백이다.

@Test
void thenApply() throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    }).thenApply(s -> {
        return s.toUpperCase();
    });

    System.out.println(future.get());
}

thenAccept는 반환 값을 받아서 사용하고, 값을 반환하지는 않는 콜백이다.

@Test
void thenAccept() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    }).thenAccept(s -> {
        System.out.println(s.toUpperCase());
    });

    future.get();
}

thenRun은 반환 값을 받지 않고, 그냥 다른 작업을 실행하는 콜백이다.

@Test
void thenRun() throws ExecutionException, InterruptedException {
    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
        return "Thread: " + Thread.currentThread().getName();
    }).thenRun(() -> {
        System.out.println("Thread: " + Thread.currentThread().getName());
    });

    future.get();
}

작업 조합

  • thenCompose

    • 두 작업이 이어서 실행하도록 조합하며, 앞선 작업의 결과를 받아서 사용할 수 있음
    • 함수형 인터페이스 Function을 파라미터로 받음
  • thenCombine

    • 두 작업을 독립적으로 실행하고, 둘 다 완료되었을 때 콜백을 실행함
    • 함수형 인터페이스 Function을 파라미터로 받음
  • allOf

    • 여러 작업들을 동시에 실행하고, 모든 작업 결과에 콜백을 실행함
  • anyOf

    • 여러 작업들 중에서 가장 빨리 끝난 하나의 결과에 콜백을 실행함

아래에서 살펴볼 thenCompose와 thenCombine 예제의 실행 결과는 같지만 동작 과정은 다르다. 먼저 thenCompose를 살펴보면 hello Future가 먼저 실행된 후에 반환된 값을 매개변수로 다음 Future를 실행한다.

    @Test
    void thenCompose() throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
            return "Hello";
        });

        // Future 간에 연관 관계가 있는 경우
        CompletableFuture<String> future = hello.thenCompose(this::mangKyu);
        System.out.println(future.get());
    }

    private CompletableFuture<String> mangKyu(String message) {
        return CompletableFuture.supplyAsync(() -> {
            `return message + " " + "MangKyu";
        });
    }

하지만 thenCombine은 각각의 작업들이 독립적으로 실행되고, 얻어진 두 결과를 조합해서 작업을 처리한다.

    @Test
    void thenCombine() throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        return "Hello";
    });

    CompletableFuture<String> mangKyu = CompletableFuture.supplyAsync(() -> {
        return "MangKyu";
    });

    CompletableFuture<String> future = hello.thenCombine(mangKyu, (h, w) -> h + " " + w);
    System.out.println(future.get());
}

그 다음은 allOf와 anyOf를 살펴볼 차례이다. 아래의 코드를 실행해보면 모든 결과에 콜백이 적용됨을 확인할 수 있다.

    @Test
    void allOf() throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        return "Hello";
    });

    CompletableFuture<String> mangKyu = CompletableFuture.supplyAsync(() -> {
        return "MangKyu";
    });

    List<CompletableFuture<String>> futures = List.of(hello, mangKyu);

    CompletableFuture<List<String>> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .thenApply(v -> futures.stream().
                    map(CompletableFuture::join).
                    collect(Collectors.toList()));

    result.get().forEach(System.out::println);

}

반면에 anyOf의 경우에는 가장 빨리 끝난 1개의 작업에 대해서만 콜백이 실행됨을 확인할 수 있다.

    @Test
    void anyOf() throws ExecutionException, InterruptedException {
        CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        return "Hello";
    });

    CompletableFuture<String> mangKyu = CompletableFuture.supplyAsync(() -> {
        return "MangKyu";
    });

    CompletableFuture<Void> future = CompletableFuture.anyOf(hello, mangKyu).thenAccept(System.out::println);
    future.get();

}

예외 처리

  • exeptionally

    • 발생한 에러를 받아서 예외를 처리함
    • 함수형 인터페이스 Function을 파라미터로 받음
  • handle, handleAsync

    • (결과값, 에러)를 반환받아 에러가 발생한 경우와 아닌 경우 모두를 처리할 수 있음
    • 함수형 인터페이스 BiFunction을 파라미터로 받음

각각에 대해 throw하는 경우와 throw하지 않는 경우를 모두 실행시켜보도록 하자. 아래의 @ParameterizedTest는 동일한 테스트를 다른 파라미터로 여러 번 실행할 수 있도록 도와주는데, 실행해보면 throw 여부에 따라 실행 결과가 달라짐을 확인할 수 있다.

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    void exceptionally(boolean doThrow) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (doThrow) {
        throw new IllegalArgumentException("Invalid Argument");
    }

        return "Thread: " + Thread.currentThread().getName();
    }).exceptionally(e -> {
        return e.getMessage();
    });

    System.out.println(future.get());

}

java.lang.IllegalArgumentException: Invalid Argument
// Thread: ForkJoinPool.commonPool-worker-19

마찬가지로 handle을 실행해보면  throw 여부에 따라 실행 결과가 달라짐을 확인할 수 있다.

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    void handle(boolean doThrow) throws ExecutionException, InterruptedException {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (doThrow) {
        throw new IllegalArgumentException("Invalid Argument");
    }

        return "Thread: " + Thread.currentThread().getName();
    }).handle((result, e) -> {
        return e == null
                ? result
                : e.getMessage();
    });

    System.out.println(future.get());

}

그 외에도 아직 완료되지 않았으면 get을 바로 호출하고, 실패 시에 주어진 exception을 던지게 하는 completeExceptionally와 강제로 예외를 발생시키는 obtrudeException과 예외적으로 완료되었는지를 반환하는 isCompletedExceptionally 등과 같은 기능들도 있다.


ThreadLocal

ThreadLocal이란 Java에서 지원하는 Thread safe한 기술로 멀티 스레드 환경에서 각각의 스레드에게 별도의 저장공간을 할당하여 별도의 상태를 갖을 수 있게끔 도와준다. (ThreadLocal은 기본적으로 Thread의 정보를 Key 값으로 하여 값을 저장하는 Map의 구조(ThreadLocalMap)를 가지고 있습니다.)

ThreadLocal이 필요한 이유
예를들어 Spring의 tomcat을 보면 매 요청마다 생성해놓은 Thread pool에서 Thread를 할당하여 유저의 요청을 처리하도록 되어있다.
여기서 문제가 발생하는데 Spring에서 bean을 등록하게 되면 해당 객체는 단 1개만 만들어져서 모든 Thread가 공유하여 사용하도록 되어있다. 이때 해당 인스턴스의 특정 필드를 모든 Thread가 공유하게 되는 것인데 여기서 Thread 동기화 문제가 발생하게 된다.

동시성 문제
여러 쓰레드가 동시에 같은 인스턴스의 필드 값을 변경하면서 발생하는 문제를 동시성 문제라 한다. 이런 동시성 문제는 여러 쓰레드가 같은 인스턴스의 필드에 접근해야 하기 때문에 트래픽이 적은 상황에서는 확률상 잘 나타나지 않고, 트래픽이 점점 많아질 수 록 자주 발생한다. 특히 Spring Bean처럼 싱글톤 객체의 필드를 변경하며 사용할 때 이러한 동시성 문제를 조심해야 한다.

내부 구현

public class Thread implements Runnable {
	//...logics
	ThreadLocal.ThreadLocalMap threadLocals = null;
}
public class ThreadLocal<T> {
	ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

    void createMap(Thread t, T firstValue) { 
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }


    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t); 
        if (map != null)                                   
             map.set(this, value);
        else
            createMap(t, value);                      
    }

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }


    public void remove() {
        ThreadLocalMap m = getMap(Thread.currentThread());
        if (m != null)
            m.remove(this);
   }

	static class ThreadLocalMap {
		static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
	}
}

ThreadLocal클래스를 이용해 ThreadLocal 내부의 ThreadLocalMap이라는 클래스를 이용해 key/value로 데이터를 보관하고 있다.
그리고 ThreadLocal의 get, set등의 메서드들의 원리도 Thread에서 현재 수행중인 thread를 currentThread() 메서드를 통해 꺼낸 뒤 해당 Thread에서 ThreadLocalMap을 찾아 리턴하는 것이다.

ThreadLocal의 사용 사례

로그인/회원가입 기능을 개발하다 보면 스프링 시큐리티(Spring Security)에서는 SecurityContextHolder에 SecurityContext 안에 Authentication을 보관하도록 개발할 것이다. 여기서 SecurityContextHolder는 SecurityContext를 저장하는 방식을 전략 패턴으로 유연하게 대응하는데, 이 중 기본 전략이 MODE_THREADLOCAL로 ThreadLocal을 사용하여 SecurityContext를 보관하는 방식이였다.

public class SecurityContextHolder {

	//...
    
    //SecurityContextHolderStrategy 안에 SecurityContext가 보관된다.
    private static SecurityContextHolderStrategy strategy; 
    
	private static void initialize() {
		if (!StringUtils.hasText(strategyName)) {
			// Set default
			strategyName = MODE_THREADLOCAL; //기본 전략이 ThreadLocal을 사용한다.
		}

		if (strategyName.equals(MODE_THREADLOCAL)) {
			strategy = new ThreadLocalSecurityContextHolderStrategy();
		}
		else if (strategyName.equals(MODE_INHERITABLETHREADLOCAL)) {
			strategy = new InheritableThreadLocalSecurityContextHolderStrategy();
		}
		else if (strategyName.equals(MODE_GLOBAL)) {
			strategy = new GlobalSecurityContextHolderStrategy();
		}
		else {
			// Try to load a custom strategy
			try {
				Class<?> clazz = Class.forName(strategyName);
				Constructor<?> customStrategy = clazz.getConstructor();
				strategy = (SecurityContextHolderStrategy) customStrategy.newInstance();
			}
			catch (Exception ex) {
				ReflectionUtils.handleReflectionException(ex);
			}
		}

		initializeCount++;
	}
}
package org.springframework.security.core.context;

import org.springframework.util.Assert;

final class ThreadLocalSecurityContextHolderStrategy implements SecurityContextHolderStrategy {

	private static final ThreadLocal<SecurityContext> contextHolder = new ThreadLocal<>();

	@Override
	public void clearContext() {
		contextHolder.remove();
	}

	@Override
	public SecurityContext getContext() {
		SecurityContext ctx = contextHolder.get();
		if (ctx == null) {
			ctx = createEmptyContext();
			contextHolder.set(ctx);
		}
		return ctx;
	}

	@Override
	public void setContext(SecurityContext context) {
		Assert.notNull(context, "Only non-null SecurityContext instances are permitted");
		contextHolder.set(context);
	}

	@Override
	public SecurityContext createEmptyContext() {
		return new SecurityContextImpl();
	}

}

SecurityContextHolderStrategy의 구현체중 하나인 ThreadLocalSecurityContextHolderStrategy를 들여다보면 실제로 SecurityContext가 ThreadLocal안에 담겨있는 것을 볼 수 있다.

ThreadLocal 사용 시 주의점
ThreadLocal을 사용할 때 반드시 인지해야할 주의할 점이 있다. 앞서 이야기했듯이 우리가 사용하는 WAS(tomcat)은 Thread pool 기반으로 동작한다. 따라서 ThreadLocal을 사용할 때 사용 후에 비워주지 않는다면 해당 Thread를 부여받게 되는 다른 사용자가 기존에 세팅된 ThreadLocal의 데이터를 공유하게 될 수도 있다.

그렇기에, Thread 의 사용이 끝나는 시점에 Thread Pool에 반환을 하기 직전! 반드시 ThreadLocal을 초기화시켜주는 작업을 해줘야 한다.

Github Repository

profile
안녕하세요 백엔드 개발자 차윤범입니다 :)

0개의 댓글