HotPublisher & Cold Publisher
HotPublisher
- subscribe가 없더라도 데이터를 생성하고, stream에 push하는 publisher
- 트위터 게시글 읽기, 공유 리소스 변화 등
- 여러 subscriber에게 동일한 데이터 전달
ColdPublisher
- subscribe가 시작되는 순간ㅂ누터 데이터를 생성하고 전송
- 파일 읽기, 웹 API 요청 등
- subscriber에 따라 독립적인 데이터 스트림 제공
HotPublisherMain
@Slf4j
public class SimpleHotPublisherMain {
@SneakyThrows
public static void main(String[] args) {
var publisher = new SimpleHotPublisher();
var subscriber = new SimpleNamedSubscriber<>("subscriber1");
publisher.subscribe(subscriber);
Thread.sleep(5000);
subscriber.cancel();
var subscriber2 = new SimpleNamedSubscriber<>("subscriber2");
var subscriber3 = new SimpleNamedSubscriber<>("subscriber3");
publisher.subscribe(subscriber2);
publisher.subscribe(subscriber3);
Thread.sleep(5000);
subscriber2.cancel();
subscriber3.cancel();
Thread.sleep(1000);
var subscriber4 = new SimpleNamedSubscriber<>("subscriber4");
publisher.subscribe(subscriber4);
Thread.sleep(5000);
subscriber4.cancel();
publisher.shutdown();
}
}
HotPublisher
@Slf4j
public class HotPublisher implements Flow.Publisher<Integer>{
private final ExecutorService publisherExecutor = Executors.newSingleThreadExecutor();
private final Future<Void> task;
private List<Integer> numbers = new ArrayList<>();
private List<HotSubscription> subscriptions = new ArrayList<>();
public HotPublisher(){
numbers.add(1);
task = publisherExecutor.submit(() ->{
for ( int i = 2 ; Thread.interrupted(); i++){
numbers.add(i);
log.info("numbers: {}", numbers);
subscriptions.forEach(HotSubscription::wakeup);
Thread.sleep(100);
}
return null;
});
}
public void shutdown(){
this.task.cancel(true);
publisherExecutor.shutdown();
}
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
HotSubscription hotSubscription = new HotSubscription(subscriber);
subscriber.onSubscribe(hotSubscription);
subscriptions.add(hotSubscription);
}
private class HotSubscription implements Flow.Subscription{
private int offset;
private int requiredOffset;
private final Flow.Subscriber<? super Integer> subscriber;
private final ExecutorService subscriptionExecutorService = Executors.newSingleThreadExecutor();
public HotSubscription(Flow.Subscriber<? super Integer> subscriber){
int lastIndex = numbers.size()-1;
this.offset = lastIndex;
this.requiredOffset = lastIndex;
this.subscriber = subscriber;
}
@Override
public void request(long n) {
requiredOffset += n;
onNextWhilePossible();
}
public void wakeup(){
onNextWhilePossible();
}
private void onNextWhilePossible(){
subscriptionExecutorService.submit(() ->{
while(offset < requiredOffset && offset < numbers.size()){
int item = numbers.get(offset);
subscriber.onNext(item);
offset++;
}
});
}
@Override
public void cancel() {
this.subscriber.onComplete();
if(subscriptions.contains(this)){
subscriptions.remove(this);
}
subscriptionExecutorService.shutdown();
}
}
}