๐ก ํด๋น ํฌ์คํ ์์ ์๊ฐ๋ ์์ ์ฝ๋๋ค์ ์ฌ๊ธฐ์์ ํ์ธํ ์ ์๋ค.
2๋ ์ ์ ์ดํํฐ๋ธ ์๋ฐ ์คํฐ๋๋ฅผ ํ๊ณ ์ด ํ๋์ ํฌํธํด๋ฆฌ์ค์ ๋ฃ์๋ค. ๊ทธ๋ฌ๋๋ ํ์ฌ ๊ท๋ชจ์ ์๊ด์์ด ๋๋ถ๋ถ์ ์ธํฐ๋ทฐ์์ ์ดํํฐ๋ธ ์๋ฐ์์ ์ธ์ ๊น์๋ ๋ด์ฉ ํน์ ๊ธฐ์ต ๋๋ ๋ด์ฉ์ ๋ํ ์ง๋ฌธ์ ๋ฐ์๋ค. ์ค๋ ์ ์ ํ๋ ์คํฐ๋๋ผ ๊ธฐ์ต ๋๋ ๋ด์ฉ์ด ๊ฑฐ์ ์์๊ณ , ์ด๋ฅผ ๊ณ๊ธฐ๋ก ์ฑ '์ดํํฐ๋ธ ์๋ฐ'๋ฅผ ๋ค์ ํ ๋ฒ ๊ณต๋ถํ ํ์์ฑ์ด ์๋ค๊ณ ์๊ฐํ๋ค.
์ด์ ์คํฐ๋์์๋ ์์ดํ 1๋ถํฐ ์ฐจ๋ก๋ก ๊ณต๋ถํ์ง๋ง, ์ด๋ฒ์๋ ๋ด๊ฐ ๊ณต๋ถํ๊ณ ์๋ ์ฃผ์ ์ ๊ด๋ จ๋ ์์ดํ ์ ๊ณต๋ถํ๊ณ ๋ธ๋ก๊ทธ ๊ธ๋ก ์ ๋ฆฌํ๋ ค ํ๋ค.
synchronized, volatile, atomic object์ ๋ํด ๋ชจ๋ฅธ๋ค๋ฉด Java์์ Thread-Safeํ๊ฒ ๊ตฌํํ๊ธฐ ํฌ์คํ ์ ๋จผ์ ์ฝ๋ ๊ฒ์ ์ถ์ฒํ๋ค.
๋๊ธฐํ๋ ๋ฐฐํ์ ์ธ ์คํ๊ณผ ์ค๋ ๋ ์ฌ์ด์ ์์ ์ ์ธ ํต์ ์ ๊ผญ ํ์ํ๋ค.
๋ฐฐํ์ ์คํ์ ํตํด ๊ฐ์ฒด๋ฅผ ํ๋์ ์ผ๊ด๋ ์ํ์์ ๋ค๋ฅธ ์ผ๊ด๋ ์ํ๋ก ๋ณํ์ํจ๋ค. ํ๋์ ์ค๋ ๋๊ฐ ๊ณต์ ์ค์ธ ๊ฐ๋ณ ๋ฐ์ดํฐ๋ฅผ ๋ณ๊ฒฝํ๋ ์ฌ์ด์ ๋ค๋ฅธ ์ค๋ ๋๊ฐ ํด๋น ๋ฐ์ดํฐ๋ฅผ ํ์ธํ์ง ๋ชปํ๊ฒ ํ๋ค.
๋๊ธฐํ๋ ๋ฉ์๋๋ ๋ธ๋ก์ ๋ค์ด๊ฐ ์ค๋ ๋๊ฐ ๊ฐ์ ๋ฝ์ ๋ณดํธ ํ์ ์ํ๋ ๋ชจ๋ ์ด์ ์์ ์ ์ต์ข ๊ฒฐ๊ณผ๋ฅผ ๋ณด๊ฒ ํด์ค๋ค. ํ๋์ ์ค๋ ๋๊ฐ ๊ณต์ ์ค์ธ ๊ฐ๋ณ ๋ฐ์ดํฐ๋ฅผ ๋ณ๊ฒฝํ๋ฉด ๋ค๋ฅธ ์ค๋ ๋์๊ฒ ๋ณ๊ฒฝ๋ ๋ฐ์ดํฐ๊ฐ ๋ณด์ฌ์ผ ํ๋ค.
๋ฐฐํ์ ์ธ ์คํ์ผ๋ก ์์์ฑ์, ์ค๋ ๋ ๊ฐ ํต์ ์ผ๋ก ๊ฐ์์ฑ์ ํ๋ณดํ ์ ์๋ค.
์๋ฐ ์ธ์ด ๋ช ์ธ์ long๊ณผ double ์ธ์ ๋ณ์๋ฅผ ์ฝ๊ณ ์ฐ๋ ๋์์ ์์์ ์ด๋ค[JLS 17.7]. ์ฌ๋ฌ ์ค๋ ๋๊ฐ ๊ฐ์ ๋ณ์๋ฅผ ๋๊ธฐํ ์์ด ์์ ํ๋ ์ค์ด๋ผ๋, ํญ์ ์ด๋ค ์ค๋ ๋๊ฐ ์ ์์ ์ผ๋ก ์ ์ฅํ ๊ฐ์ ์จ์ ํ ์ฝ์ด์ด์ ๋ณด์ฅํ๋ค๋ ๋ป์ด๋ค.
๊ทธ๋ ๋ค๋ฉด ์์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ์ธ ๋ ๋๊ธฐํ๊ฐ ํ์ ์์๊น? ์๋ ์ฝ๋์ ์คํ ๊ฒฐ๊ณผ๋ฅผ ์์ธกํด๋ณด์.
public class StopThread {
private static boolean stopRequested;
public static void main(String[] args) throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested) {
i++;
}
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
stopRequested = true;
}
}
ํ๋ก๊ทธ๋จ์ while ๋ฌธ์ ๋น ์ ธ๋์ค์ง ๋ชปํ๊ณ ๊ณ์ ์ํ๋๋ค. JLS 17.3์ ์ปดํ์ผ๋ฌ๋ non-volatile boolean ํ๋๋ฅผ ํ ๋ฒ ์ฝ์ด์จ ๋ค ์บ์ํ ๊ฐ์ ์ฌ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ ๋ค๋ฅธ ์ค๋ ๋๊ฐ ํ๋๊ฐ์ ๋ณ๊ฒฝํ๋๋ผ๋ ๋ฐ๋ณต๋ฌธ์ ์์ํ ์ข ๋ฃ๋์ง ์๋๋ค๊ณ ๋์ ์๋ค.
์ด์ฒ๋ผ ๊ณต์ ์ค์ธ ๊ฐ๋ณ ๋ฐ์ดํฐ๋ฅผ ๋น๋ก ์์์ ์ผ๋ก ์ฝ๊ณ ์ธ ์ ์์์ง๋ผ๋ ๋๊ธฐํ์ ์คํจํ๋ฉด ์์์น ๋ชปํ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์๋ค. ์์ ์ธ๊ธํ๋ฏ ๋๊ธฐํ์ ์ญํ ์ค ํ๋๋ "์์ ์ ์ธ ์ค๋ ๋ ๊ฐ ํต์ "์ด๋ค. ๋๊ธฐํํ์ง ์์ผ๋ฉด ๋ฉ์ธ ์ค๋ ๋๊ฐ ์์ ํ ๊ฐ์ ๋ฐฑ๊ทธ๋ผ์ด๋ ์ค๋ ๋๊ฐ ์ธ์ ๋ณด๊ฒ ๋ ์ง ์ ์ ์๋ค.
๊ทธ๋ ๋ค๋ฉด ์ด๋ป๊ฒ ํ๋ก๊ทธ๋จ์ ๊ฐ์ ํ ์ ์์๊น?
synchronized ํ์ ์๋ฅผ ์ฌ์ฉํด stopRequested ํ๋๋ฅผ ๋๊ธฐํํ๋ค๋ฉด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์๋ค.
public class StopThread {
private static boolean stopRequested;
private static synchronized void requestStop() {
stopRequested = true;
}
private static synchronized boolean stopRequested() {
return stopRequested;
}
public static void main(String[] args) throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested()) {
i++;
}
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
requestStop();
}
}
ํ๋ก๊ทธ๋จ์ ์์ํ๋ ๋๋ก 1์ด ํ ์ข ๋ฃ๋๋ค. ์ฐ๊ธฐ ๋ฉ์๋(requestStop)์ ์ฝ๊ธฐ ๋ฉ์๋(stopRequested)๋ ์์์ ์ผ๋ก ๋์ํ๊ธฐ ๋๋ฌธ์ ์ด ์ฝ๋์์ ๋๊ธฐํ๋ ์ค๋ ๋ ๊ฐ ํต์ ๋ชฉ์ ์ผ๋ก๋ง ์ฌ์ฉ๋์๋ค๊ณ ๋ณผ ์ ์๋ค.
stopRequested ํ๋๋ฅผ volatile๋ก ์ ์ธํด ๋๊ธฐํ๋ฅผ ์๋ตํ ์ ์๋ค. volatile ํ์ ์๋ ๋ฐฐํ์ ์ํ๊ณผ๋ ์๊ด์์ง๋ง ํญ์ ๊ฐ์ฅ ์ต๊ทผ์ ๊ธฐ๋ก๋ ๊ฐ์ ์ฝ๊ฒ ๋จ์ ๋ณด์ฅํ๋ค.
public class StopThread {
private static volatile boolean stopRequested;
public static void main(String[] args) throws InterruptedException {
Thread backgroundThread = new Thread(() -> {
int i = 0;
while (!stopRequested) {
i++;
}
});
backgroundThread.start();
TimeUnit.SECONDS.sleep(1);
stopRequested = true;
}
}
synchronized ์์ ์ ๋ง์ฐฌ๊ฐ์ง๋ก ํ๋ก๊ทธ๋จ์ 1์ด ํ ์ข ๋ฃ๋๋ค.
volatile์ ์ฃผ์ํด์ ์ฌ์ฉํด์ผ ํ๋ค. ์๋ ์ฝ๋๋ฅผ ์คํํ๋ฉด ์ด๋ค ๊ฐ์ด ์ถ๋ ฅ๋ ๊น?
public class SerialNumber {
private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
private static volatile int nextSerialNumber = 0;
private static int generateNextSerialNumber() throws InterruptedException {
Thread.sleep(1);
return nextSerialNumber++;
}
public static void main(String[] args) throws InterruptedException {
int N = 30;
CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
THREAD_POOL.execute(() -> {
try {
generateNextSerialNumber();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
latch.countDown();
});
}
latch.await();
System.out.println(nextSerialNumber);
}
}
30์ด ์ถ๋ ฅ๋์ด์ผ ํ ๊ฒ ๊ฐ์ง๋ง ํ๋ก๊ทธ๋จ์ ์คํํด๋ณด๋ฉด 30์ ๋ชป ๋ฏธ์น๋ ๊ฐ์ด ์ถ๋ ฅ๋๋ค. volatile ํ์ ์๋ ํญ์ ๊ฐ์ฅ ์ต๊ทผ์ ๊ธฐ๋ก๋ ๊ฐ์ ์ฝ๊ฒ ๋จ์ ๋ณด์ฅํ๋๋ฐ ์ ์ด๋ฐ ๊ฒฐ๊ณผ๊ฐ ๋์จ๊ฑธ๊น?
๋ฌธ์ ๋ ์ฆ๊ฐ ์ฐ์ฐ์(++)๋ค. ์ด ์ฐ์ฐ์๋ ์ฝ๋ ์์ผ๋ก๋ ํ๋์ง๋ง ์ค์ ๋ก๋ nextSerialNumber ํ๋์ ๋ ๋ฒ ์ ๊ทผํ๋ค. ๋จผ์ ๊ฐ์ ์ฝ๊ณ , ๊ทธ๋ฐ ๋ค์ 1 ์ฆ๊ฐํ ์๋ก์ด ๊ฐ์ ์ ์ฅํ๋ ๊ฒ์ด๋ค. ๋ง์ฝ ๋ ๋ฒ์งธ ์ค๋ ๋๊ฐ ์ด ๋ ์ ๊ทผ ์ฌ์ด๋ฅผ ๋น์ง๊ณ ๋ค์ด์ ๊ฐ์ ์ฝ์ด๊ฐ๋ฉด ์ฒซ ๋ฒ์งธ ์ค๋ ๋์ ๋์ผํ ๊ฐ์ ๋๋ ค๋ฐ๊ฒ ๋๋ค.
IntelliJ์์๋ ์๋์ ๊ฐ์ด nextSerialNumber++๊ฐ atomicํ์ง ์์ ์ฐ์ฐ์ด๋ผ๋ warning์ ๋์์ค๋ค.
volatile์ ๊ฐ์์ฑ์ ๋ณด์ฅํ์ง๋ง ์์์ฑ์ ๋ณด์ฅํ์ง ์๊ธฐ ๋๋ฌธ์ ์ฃผ์ํด์ ์ฌ์ฉํด์ผ ํ๋ค.
generateSerialNumber ๋ฉ์๋์ synchronized ํ์ ์๋ฅผ ๋ถ์ด๋ฉด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์๋ค. ๋์์ ํธ์ถํด๋ ์๋ก ๊ฐ์ญํ์ง ์์ผ๋ฉฐ ์ด์ ํธ์ถ์ด ๋ณ๊ฒฝํ ๊ฐ์ ์ฝ๊ฒ ๋๋ค.
public class SerialNumberSynchronized {
private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
private static int nextSerialNumber = 0;
private static synchronized int generateNextSerialNumber() throws InterruptedException {
Thread.sleep(1);
return nextSerialNumber++;
}
public static void main(String[] args) throws InterruptedException {
int N = 30;
CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
THREAD_POOL.execute(() -> {
try {
generateNextSerialNumber();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
latch.countDown();
});
}
latch.await();
System.out.println(nextSerialNumber);
}
}
synchronized๋ ์์์ฑ๊ณผ ๊ฐ์์ฑ์ ๋ชจ๋ ๋ณด์ฅํ๋ค.
๋ ๋ค๋ฅธ ๋ฐฉ๋ฒ์ผ๋ก๋ java.util.concurrent.atomic ํจํค์ง์ AtomicInteger๋ฅผ ์ฌ์ฉํ ์ ์๋ค. synchronized๋ Lock์ ์ฌ์ฉํด ๋๊ธฐํํ์ง๋ง ์ด ํจํค์ง์ ํด๋์ค๋ ๋ฝ ์์ด ์ค๋ ๋ ์์ ํ ํ๋ก๊ทธ๋๋ฐ์ ์ง์ํ๊ธฐ ๋๋ฌธ์ ์ฑ๋ฅ๋ ๋ฐ์ด๋๋ค.
public class SerialNumberAtomic {
private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
private static final AtomicInteger nextSerialNumber = new AtomicInteger();
private static int generateNextSerialNumber() throws InterruptedException {
Thread.sleep(1);
return nextSerialNumber.getAndIncrement();
}
public static void main(String[] args) throws InterruptedException {
int N = 30;
CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
THREAD_POOL.execute(() -> {
try {
generateNextSerialNumber();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
latch.countDown();
});
}
latch.await();
System.out.println(nextSerialNumber);
}
}
atomic object๋ ์์์ฑ๊ณผ ๊ฐ์์ฑ์ ๋ชจ๋ ๋ณด์ฅํ๋ค.
์ด๋ฒ ์์ดํ ์์ ์ธ๊ธ๋ ๋ฌธ์ ๋ค์ ํผํ๋ ๊ฐ์ฅ ์ข์ ๋ฐฉ๋ฒ์ ์ ์ด์ ๊ฐ๋ณ ๋ฐ์ดํฐ๋ฅผ ๊ณต์ ํ์ง ์๋ ๊ฒ์ด๋ค. ๋ถ๋ณ ๋ฐ์ดํฐ๋ง ๊ณต์ ํ๊ฑฐ๋ ์๋ฌด๊ฒ๋ ๊ณต์ ํ์ง ๋ง์. ๋๋๋ก ๊ฐ๋ณ ๋ฐ์ดํฐ๋ ๋จ์ผ ์ค๋ ๋์์๋ง ์ฐ๋๋ก ํ์.
๐ก ์ฌ๋ฌ ์ค๋ ๋๊ฐ ๊ฐ๋ณ ๋ฐ์ดํฐ๋ฅผ ๊ณต์ ํ๋ค๋ฉด ๊ทธ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ์ฐ๋ ๋์์ ๋ฐ๋์ ๋๊ธฐํํด์ผ ํ๋ค.
์์ดํ 78์์ ์ถฉ๋ถํ์ง ์์ ๋๊ธฐํ์ ๋ฌธ์ ๋ฅผ ๋ค๋ค๋ค๋ฉด, ์ด๋ฒ ์์ดํ ์์๋ ๋ฐ๋ ์ํฉ์ ๋ค๋ฃฌ๋ค. ๊ณผ๋ํ ๋๊ธฐํ๋ ์ฑ๋ฅ์ ์ ํ์ํค๊ณ , ๊ต์ฐฉ ์ํ์ ๋น ๋จ๋ฆฌ๊ฑฐ๋ ๋ฐ์ดํฐ๋ฅผ ํผ์ํ ์ ์๋ค.
์ด๋ฌํ ๋ฌธ์ ๊ฐ ๋ฐ์ํ๋ ๊ฒ์ ๋ง์ผ๋ ค๋ฉด ๋๊ธฐํ ๋ธ๋ก ์์์๋ ์ ์ด๋ฅผ ์ ๋๋ก ํด๋ผ์ด์ธํธ์ ์๋ํ๋ฉด ์๋๋ค. ์๋ฅผ ๋ค์ด ๋๊ธฐํ๋ ์์ญ ์์์๋ ์ฌ์ ์ํ ์ ์๋ ๋ฉ์๋๋ ํธ์ถํ๋ฉด ์ ๋๋ฉฐ, ํด๋ผ์ด์ธํธ๊ฐ ๋๊ฒจ์ค ํจ์ ๊ฐ์ฒด๋ฅผ ํธ์ถํด์๋ ์ ๋๋ค. ์ด๋ฐ ๋ฉ์๋๋ ์ธ๊ณ์ธ ๋ฉ์๋
๋ผ ํ๋ค.
์๋์ ๋์ค๋ ์์๋ค์ ์ ๋๋ก ์ดํดํ๊ณ ์ถ๋ค๋ฉด ํฌ์คํ ์๋จ์ ๊ฑธ์ด๋ ๋งํฌ๋ฅผ ํตํด ์ ์ฒด ์ฝ๋๋ฅผ ํ์ธํ๋ ๊ฒ์ ์ถ์ฒํ๋ค.
์๋ ObservableSet์ ์ด๋ค ์งํฉ(Set)์ ๊ฐ์ผ ๋ํผ ํด๋์ค์ด๊ณ , ์ด ํด๋์ค์ ํด๋ผ์ด์ธํธ๋ ์งํฉ์ ์์๊ฐ ์ถ๊ฐ๋๋ฉด ์๋ฆผ์ ๋ฐ์ ์ ์๋ค. ๊ด์ฐฐ์ ํจํด
public class ObservableSet<E> extends ForwardingSet<E> {
public ObservableSet(Set<E> set) {
super(set);
}
private final List<SetObserver<E>> observers = new ArrayList<>();
public void addObserver(SetObserver<E> observer) {
synchronized (observers) {
observers.add(observer);
}
}
public boolean removeObserver(SetObserver<E> observer) {
synchronized (observers) {
return observers.remove(observer);
}
}
private void notifyElementAdded(E element) {
synchronized (observers) {
for (SetObserver<E> observer : observers) {
observer.added(this, element);
}
}
}
public boolean add(E element) {
boolean added = super.add(element);
if (added) {
notifyElementAdded(element);
}
return added;
}
}
๊ด์ฐฐ์๋ค์ addObserver์ removeObserver ๋ฉ์๋๋ฅผ ํธ์ถํด ๊ตฌ๋ ์ ์ ์ฒญํ๊ฑฐ๋ ํด์งํ๋ค. ๋ ๊ฒฝ์ฐ ๋ชจ๋ ์๋ ์ฝ๋ฐฑ ์ธํฐํ์ด์ค์ ์ธ์คํด์ค๋ฅผ ์ธ์๋ก ๋ฐ๋๋ค.
@FunctionalInterface
public interface SetObserver<E> {
// ObservableSet์ ์์๊ฐ ๋ํด์ง๋ฉด ํธ์ถ๋๋ค.
void added(ObservableSet<E> set, E element);
}
์๋ ํ๋ก๊ทธ๋จ์ ์งํฉ์ ์ถ๊ฐ๋ ์ ์๊ฐ์ ์ถ๋ ฅํ๋ค๊ฐ ๊ทธ ๊ฐ์ด 23์ด๋ฉด ๊ตฌ๋ ์ ํด์งํ๋ ๊ด์ฐฐ์(SetObserver)๋ฅผ ์ถ๊ฐํ๋ค. ์ด ํ๋ก๊ทธ๋จ์ ์คํํ๋ฉด ์ด๋ค ๊ฒฐ๊ณผ๊ฐ ๋์ฌ๊น?
public static void main(String[] args) {
ObservableSet<Integer> set = new ObservableSet<>(new HashSet<>());
set.addObserver(new SetObserver<Integer>() {
@Override
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23) {
s.removeObserver(this);
}
}
});
for (int i = 0; i < 100; i++) {
set.add(i);
}
}
0๋ถํฐ 23๊น์ง ์ถ๋ ฅํ ํ ๊ด์ฐฐ์ ์์ ์ ๊ตฌ๋ ํด์งํ ๋ค์ ์กฐ์ฉํ ์ข ๋ฃ๋๋ ๊ฒ์ด ์๋๋ผ ConcurrentModificationException ์์ธ๊ฐ ๋ฐ์ํ๋ค!
ํ๋ก๊ทธ๋จ ์คํ ์์๋ฅผ ์ดํด๋ณด์. main ํจ์์ for ๋ฌธ์์ i = 23์ผ ๋
// (1) main
set.add(i);
// (2) ObservableSet
public boolean add(E element) {
boolean added = super.add(element);
if (added) {
notifyElementAdded(element);
}
return added;
}
// (3) ObservableSet
private void notifyElementAdded(E element) {
synchronized (observers) {
for (SetObserver<E> observer : observers) {
observer.added(this, element);
}
}
}
// (4) main
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23) {
s.removeObserver(this);
}
}
// (5) ObservableSet
public boolean removeObserver(SetObserver<E> observer) {
synchronized (observers) {
return observers.remove(observer);
}
}
๊ฒฐ๊ตญ ๋๊ธฐํ ๋ธ๋ก ์์์ observers ๋ฆฌ์คํธ๋ฅผ ์ํํ๋ ๋์ค์ ์์๋ฅผ ์ญ์ ํ๊ธฐ ๋๋ฌธ์ ์์ธ๊ฐ ๋ฐ์ํ ๊ฒ์ด๋ค. ์ด๋ ํ์ฉ๋์ง ์์ ๋์์ด๋ฉฐ ConcurrentModificationException์๋ ๋ช ์๋์ด ์๋ค.
๋๊ธฐํ ๋ฉ์๋(notifyElementAdded) ์์์ ํด๋ผ์ด์ธํธ๊ฐ ๋๊ฒจ์ค ํจ์ ๊ฐ์ฒด(added)๋ฅผ ํธ์ถํ๊ธฐ ๋๋ฌธ์ ๋ฌธ์ ๊ฐ ๋ฐ์ํ๋ค!
์ด๋ฒ์ removeObserver๋ฅผ ๋ฉ์ธ ์ค๋ ๋์์ ์ง์ ํธ์ถํ์ง ์๊ณ ์คํ์ ์๋น์ค๋ฅผ ์ฌ์ฉํด ๋ค๋ฅธ ์ค๋ ๋๊ฐ ํธ์ถํ๋๋ก ํ๋ค. ์คํ ๊ฒฐ๊ณผ๋ ์ด๋จ๊น?
public class ObservableTest3 {
public static void main(String[] args) {
ObservableSet<Integer> set = new ObservableSet<>(new HashSet<>());
set.addObserver(new SetObserver<Integer>() {
@Override
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23) {
// removeObserver ํธ์ถ์ ๋ฉ์ธ ์ค๋ ๋๊ฐ ์๋ ๋ค๋ฅธ ์ค๋ ๋์์ ํธ์ถํ๋๋ก ํ๋ค.
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
exec.submit(() -> s.removeObserver(this)).get();
} catch (ExecutionException | InterruptedException ex) {
throw new AssertionError(ex);
} finally {
exec.shutdown();
}
}
}
});
for (int i = 0; i < 100; i++) {
set.add(i);
}
}
}
์ด ํ๋ก๊ทธ๋จ์ ์คํํ๋ฉด ์์ธ๊ฐ ๋ฐ์ํ์ง ์์ง๋ง ๊ต์ฐฉ์ํ์ ๋น ์ง๋ค. ์์ ์ดํด๋ณธ ํ๋ก๊ทธ๋จ ์คํ ์์ ์ค ์ผ๋ถ์ด๋ค. ๋ฉ์ธ ์ค๋ ๋๊ฐ notifyElementAdded ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉฐ ๋ฝ์ ๊ฐ๊ณ ์๊ณ , ๊ทธ ๋ค์์ ๋ค๋ฅธ ์ค๋ ๋๊ฐ ์์ฑ๋์ด removeObserver๋ฅผ ์คํํ์ง๋ง ์ด๋ฏธ ๋ฉ์ธ ์ค๋ ๋๊ฐ ๋ฝ์ ๊ฐ๊ณ ์๊ธฐ ๋๋ฌธ์ ๋ฝ์ ์ป์ ์ ์๋ค. ์ด์ ๋์์ ๋ฉ์ธ ์ค๋ ๋๋ ๋ฐฑ๊ทธ๋ผ์ด๋ ์ค๋ ๋๊ฐ ๊ด์ฐฐ์๋ฅผ ์ ๊ฑฐํ๊ธฐ๋ง์ ๊ธฐ๋ค๋ฆฐ๋ค.
// (3) ObservableSet
private void notifyElementAdded(E element) {
synchronized (observers) {
for (SetObserver<E> observer : observers) {
observer.added(this, element);
}
}
}
// (4) main
public void added(ObservableSet<Integer> s, Integer e) {
System.out.println(e);
if (e == 23) {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
exec.submit(() -> s.removeObserver(this)).get();
} catch (ExecutionException | InterruptedException ex) {
throw new AssertionError(ex);
} finally {
exec.shutdown();
}
}
}
// (5) ObservableSet
public boolean removeObserver(SetObserver<E> observer) {
synchronized (observers) {
return observers.remove(observer);
}
}
์ธ๊ณ์ธ ๋ฉ์๋ ํธ์ถ์ ๋๊ธฐํ ๋ธ๋ก ๋ฐ๊นฅ์ผ๋ก ์ฎ๊ธฐ๋ฉด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์๋ค! notifyElementAdded ๋ฉ์๋์์ ๊ด์ฐฐ์ ๋ฆฌ์คํธ๋ฅผ ๋ณต์ฌํด ์ฐ๋ฉด ๋ฝ ์์ด๋ ์์ ํ๊ฒ ์ํํ ์ ์๋ค.
private void notifyElementAdded(E element) {
List<SetObserver<E>> snapShot = null;
synchronized (observers) {
snapShot = new ArrayList<>(observers);
}
for (SetObserver<E> observer : snapShot) {
observer.added(this, element);
}
}
์ด ๋ฐฉ์์ ์ ์ฉํ๋ฉด ์์ ์ดํด๋ณธ ๋ ์์ ์์ ๋ฐ์ํ๋ ์์ธ์ ๊ต์ฐฉ์ํ๊ฐ ๋ํ๋์ง ์๋ ๊ฒ์ ํ์ธํ ์ ์๋ค. ์ด๋ ๊ฒ ๋๊ธฐํ ์์ญ ๋ฐ๊นฅ์์ ํธ์ถ๋๋ ์ธ๊ณ์ธ ๋ฉ์๋๋ฅผ ์ด๋ฆฐ ํธ์ถ(open call)
์ด๋ผ ํ๋ค. ์ธ๊ณ์ธ ๋ฉ์๋๋ ์ผ๋ง๋ ์ค๋ ์คํ๋ ์ง ์ ์ ์์ผ๋ฏ๋ก ๋๊ธฐํ ์์ญ ์์์ ํธ์ถ๋๋ค๋ฉด ๊ทธ๋์ ๋ค๋ฅธ ์ค๋ ๋๋ ๋ณดํธ๋ ์์์ ์ฌ์ฉํ์ง ๋ชปํ๊ณ ๋๊ธฐํด์ผ ํ๋ค. ๋ฐ๋ผ์ ์ด๋ฆฐ ํธ์ถ์ ์คํจ ๋ฐฉ์ง ํจ๊ณผ ์ธ์๋ ๋์์ฑ ํจ์จ์ ํฌ๊ฒ ๊ฐ์ ํด์ค๋ค.
์ธ๊ณ์ธ ๋ฉ์๋ ํธ์ถ์ ๋๊ธฐํ ๋ธ๋ก ๋ฐ๊นฅ์ผ๋ก ์ฎ๊ธฐ๋ ๊ฒ๋ณด๋ค ๋ ๋์ ๋ฐฉ๋ฒ์ ์๋ฐ์ ๋์์ฑ ์ปฌ๋ ์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ CopyOnWriteArrayList๋ฅผ ์ฌ์ฉํ๋ ๊ฒ์ด๋ค.
CopyOnWriteArrayList๋ ArrayList๋ฅผ ๊ตฌํํ ํด๋์ค๋ก, ๋ด๋ถ๋ฅผ ๋ณ๊ฒฝํ๋ ์์ ์ ํญ์ ๋ณต์ฌ๋ณธ์ ๋ง๋ค์ด ์ํํ๋๋ก ๊ตฌํ๋์๋ค. ๋ฐ๋ผ์ ๋ฐฐ์ด์ ์์ ํ ์ผ์ด ๋ง๋ค๋ฉด ์ฑ๋ฅ ์ด์๊ฐ ๋ฐ์ํ๊ฒ ์ง๋ง, ์ํํ ๋ ๋ฝ์ด ํ์ ์์ด ๋งค์ฐ ๋น ๋ฅด๋ค.
public class ObservableSet<E> extends ForwardingSet<E> {
public ObservableSet(Set<E> set) {
super(set);
}
private final List<SetObserver<E>> observers = new CopyOnWriteArrayList<>();
public void addObserver(SetObserver<E> observer) {
observers.add(observer);
}
public boolean removeObserver(SetObserver<E> observer) {
return observers.remove(observer);
}
private void notifyElementAdded(E element) {
for (SetObserver<E> observer : observers) {
observer.added(this, element);
}
}
public boolean add(E element) {
boolean added = super.add(element);
if (added) {
notifyElementAdded(element);
}
return added;
}
}
๋ง์ฐฌ๊ฐ์ง๋ก ์์ ์ดํด๋ณธ ๋ ์์ ์์ ๋ฐ์ํ๋ ์์ธ์ ๊ต์ฐฉ์ํ๊ฐ ๋ํ๋์ง ์๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
๊ธฐ๋ณธ ๊ท์น์ ๋๊ธฐํ ์์ญ์์๋ ๊ฐ๋ฅํ ์ผ์ ์ ๊ฒ ํ๋ ๊ฒ์ด๋ค. ๋ํ, ๊ต์ฐฉ์ํ์ ๋ฐ์ดํฐ ํผ์์ ํผํ๋ ค๋ฉด ๋๊ธฐํ ์์ญ ์์์ ์ธ๊ณ์ธ ๋ฉ์๋๋ฅผ ์ ๋ ํธ์ถํ์ง ๋ง์.