스레드는 일련의 명령어를 실행하는 메커니즘. 보통 운영체제가 제공.
태스크와 스레드 사이를 일대일 관례로 만드는 것은 바람직하지 않다.
짧은시간 실행되는 태스크는 스레드를 시작하는 시간을 낭비하지 말고 같은 스레드에서 다수를 실행하는게 낫다.
강도 높은 계산을 수행하는 태스크일 때는 태스크별로 스레드를 사용하는 대신, 프로세서별로 스레드를 하나씩 사용해서 스레드 스위칭 오버헤드를 피하는게 좋다.
public static void main(String[] args) {
Runnable hellos = () -> {
for (int i = 1; i <= 1000; i++) {
System.out.println("Hello " + i);
}
}
Runnable goodbyes = () -> {
for (int i = 1; i <= 1000; i++) {
System.out.println("Goodbye " + i);
}
}
Executor executor = Executors.newCachedThreadPool();
executor.execute(hellos);
executor.execute(goodbyes);
}
서브태스크는 각각 부분 결과를 계산하고 모든 태스크가 완료되면 결과들을 결합한다. 서브태스크에 Callable 사용 가능.
public interface Callable<V> {
V call() throws Exception;
}
Callable 실행 위해 Executor의 서브인터페이스 ExecutorService 인터페이스가 필요. newCachedThreadPool 와 같은 메서드가 반환해준다.
ExecutorService exec = Executors.newCachedThreadPool();
Callable<V> task = ...;
Future<V> result = exec.submit(task);
태스크를 제출하면 Future 클래스 객체를 얻는다. 언젠간 결과를 얻게 되는 계산을 표현한다.
각 서브태스크를 별도로 제출(submit) 하는 대신에 Callable 인스턴스의 Collection을 전달하여 invokeAll 메서드를 호출할 수 있다.
파일 집합에서 파일별로 단어가 나오는 횟수를 세는 예제.
String word = ...;
Set<Path> paths = ...;
List<Callable<Long>> tasks = new ArrayList<>();
for (Path p : paths) {
tasks.add(
() -> {return p에서 단어가 나타난 횟수}
);
}
List<Future<Long>> results = executor.invokeAll(tasks);
long total = 0;
for (Future<Long> result : results) {
total += result.get();
}
ExecutorService::invokeAny 메서드는 제출한 태스크 중 하나가 예외를 던지지 않고 완료하면 즉시 반환. 일치하는 대상을 발견한 즉시 결론을 내릴 수 있는 검색에 유용.
여러 스레드들이 스태틱 변수 등 공유되는 변수에 접근할 때 변수의 변화가 스레드간에 보이지 않을 수 있다.
캐싱, 명령어 재배치 등 여러 이유가 있다.
공유 변수를 volatile 제어자로 선언하면 문제가 사라지긴 한다.
이는 일반적인 해결책이 되지는 못한다.
병행 태스크 여러 개가 volatile로 선언된 공유 변수 counter를 업데이트한다. (counter++ 수행)
volatile 제어자 덕분에 업데이트가 보이긴 한다.
counter++ 는 원자적이지 않다.
count++ 가
register = count + 1
count = register
이렇게 두 명령어로 이루어진다고 할 때,
count = 0 // init
register1 = count + 1 // thread 1
register2 = count + 1 // thread 2
count = register2 // thread 2
count = register1 // thread 1
두 스레드가 위와 같은 순서로 명령어를 수행하게 스케줄링되면 결과값이 2가 아니라 1이다.
count 같은 공유 변수를 업데이트하는 부분을 경쟁 조건이라고 한다.
lock(잠금)을 이용해서 경쟁 조건이 일어나는 코드를 잠그면 해결이 되지만 이것도 일반적인 해결책이 될 수 없다.
퍼포먼스를 현저하게 떨어뜨리거나, deadlock을 야기하는 실수를 저지르기 쉽다.
가두기(confinement)
예를들어 태스크에서 뭔가를 세야할 때 공유 카운터 업데이트 하지 말고 각 태스크에 비공개 카운터를 제공. 그 태스크가 완료될 때 결과들을 결합하는 또 다른 태스크에 각각의 결과를 전달하도록 하기.
불변성(immutability)
불변 객체를 공유하도록 한다.
잠금 설정(locking)
한번에 한 태스크만 공유 자료구조에 접근하도록 하기.
병렬성 기회를 줄이므로 많은 비용이 들 수 있다.
// 병렬스트림
long result = coll.parallelStream().filter(s -> s.startsWith("A")).count();
// 병렬 배열 연산
Arrays.parallelSetAll(values, i -> i % 10);
Arrays.parallelSort(words, Comparator.comparing(String::length));
여러 스레드에서 동시에 큐나 해시 테이블과 같은 일반 자료구조를 수정하면 해당 자료구조의 내부가 손상되기 쉽다.
java.util.concurrent 패키지에 있는 컬렉션은 영리하게 구현되어있다.
String word = "test word";
ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();
map.compute(word, (k, v) -> v == null ? 1 : v + 1); // compute 메서드는 원자적
map.putIfAbsent(word, 0L); // 원자적
BlockingQueue<E>
큐와 비슷하게 사용 가능.
한 태스크에서 다른 태스크로 데이터를 안전하게 전달 가능.
AtomicLong nextNumber = new AtomicLong();
// 어떤 스레드에서
long id = nextNumber.incrementAndGet();
AtomicLong::incrementAndGet 메서드는 원자성이 보장된다.
largest.updateAndGet(x -> Math.max(x, observed));
등 여러 원자성을 보장하는 메서드들이 있다.
또 AtomicLong 뿐 아니라 AtomicInteger, AtomicIntegerArray 등 여러 클래스들이 있다.
Lock countLock = new ReentrantLock();
int count;
countLock.lock();
try {
count++; // critical section
} finially {
countLock.unlock();
}
// synchronized 사용
synchronized (obj) {
// critical section
}
// 위와 본질적으로 같다
obj.intrinsicLock.lock();
try {
// critical section
} finally {
obj.intrinsicLock.unlock();
}
public synchronized void method() {
}
ProcessBuilder builder = new ProcessBuilder("gcc", "myapp.c");
builder = builder.directory(path.toFile()); // 작업 디렉토리 변경
프로세스의 표준 출력/입력/오류 스트림 접근
OutputStream processIn = p.getOutputStream();
InputStream processOut = p.getInputStream();
InputStream processErr = p.getErrorStream();
Process p = new ProcessBuilder("/bin/ls", "-l")
.directory(Paths.get("/tmp").toFile())
.start();
try (Scanner in = new Scanner(p.getInputStream())) {
while (in.hasNextLine()) {
System.out.println(in.nextLine());
}
}