ThreadPoolExecutor로 실행 된 Thread stop 일기

이건영·2023년 9월 5일
0

부하가 큰 다운로드 요청이 오면 다운로드 완료까지 시간이 오래걸리는데 사용자는 취소하고 바로 요청하는 일이 빈번했다.(사용자는 한번에 1개의 다운로드만 가능하다는 정책 때문에 취소하고 다시 요청하는 일이 빈번 했음)
DB에서 download 상태값을 최소로만 변경하고 실질적으로 thread를 중지 시키는 것이 아니기 때문에 download server의 resource 점유를 계속 차지하고 있는 상황이었다.
해서 thread stop하는 기능을 추가하기로 한다(Thread.interrupt() 함수만 실행하면 되는 줄 알고 싱글벙글 쉽게 접근함....)

기존 다운로드 구조는 다운로드 서버 분리 일기를 참조

사용자가 취소 요청을 날렸을 때 여러대의 다운로드 서버중에서 어느 다운로드 서버에서 해당 다운로드를 실행하고 있는지 모르기 때문에 다운로드 취소 요청도 SQS를 이용해서 개발하기로 설계했다.

@SqsListener(value = "${aws.sqs.download.cancel.name:dev-download-cancel}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)

SQS에서 message 를 가져갈 떄 삭제 정책을 NEVER로 가져가고 취소 성공 시 혹은 현재 download 상태가 생성 완료 인 시점에 Queue에서 삭제되도록 한다.(어느 인스턴스에서 해당 Thread를 실행중인지 모르기 때문에 초기 설계에서는 가져 갈 때 까지 Queue에 남아 있는다.)

download type은 총 3가지 excel, ppt, zip 각 타입별로 thread stop 시 처리 해야 할 로직이 다르기때문에 CancellableTask이라는 interface 선언하여 각 취소 요청 시 마다 구현하여 필요 한 로직 처리
RunningTaskManager를 Component로 Bean에 등록해주고 downloadId(unique한 key값)을 key로 submit()함수로 return되는 Future값을 인자로 저장하여 Map에 등록 해준다.(download 완료 시에는 map에서 지워주는 로직 추가)

public interface CancellableTask {

	void cancel();
}
@RequiredArgsConstructor
public class ExcelDownloadCancellableTask<R> implements CancellableTask {

	private final SXSSFWorkbook workbook;
	private final Future<R> future;

	public void returnValue(){
		try {
			future.get();
		} catch (InterruptedException | ExecutionException e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public void cancel() {
		if(future.isDone()) {
			return;
		}

		if (future.isCancelled()) {
			return;
		}
        //취소 로직 구현
        
		log.info("complete cancel excel download");
	}

	public static <T> ExcelDownloadCancellableTask<T> of(
		SXSSFWorkbook workbook,
		Future<T> future
	) {
		return new ExcelDownloadCancellableTask<>(workbook, future);
	}

}
@Component
@RequiredArgsConstructor
public class RunningTaskManager {
	private final ThreadPoolTaskExecutor excelDownloadThreadPool;
	private final ThreadPoolTaskExecutor pptDownloadThreadPool;
	private final ThreadPoolTaskExecutor zipDownloadThreadPool;
	private final Map<String, CancellableTask> cancellableTasks = Maps.newHashMap();

	public void start(
		String downloadId,
		SXSSFWorkbook workbook,
		Runnable runnable
	) {
		Future<?> future = excelDownloadThreadPool.submit(runnable);
		cancellableTasks.put(downloadId, ExcelDownloadCancellableTask.of(workbook, future));
	}
}

CancellableTask의 cancel() 함수에서 future.cancel(); 을 날리면 바로 thread가 중지 되겠지 하고 싱글벙글 요청을 날려봤지만.......

죽지 않는다......

죽지않는 thread 여러 시도를 다 해 봤지만 결국 cancel() 함수를 까보기로 한다.

구현체를 까보자

내부에 Thread의 interrupt() 함수를 호출한다. 뭐지 왜 안될까........ 고민하던 도중
interrupt() 함수를 까본다.

내가 원하는 InterruptedException을 발생시키려면 해당 thread에서 wait(), join(), sleep()을 써주어야 한다.

처음 싱글벙글 당차게 도전 했지만 당차게 까인 interrupt
고민하다가 긴 task를 할당 받은 일이 아니기 때문에 우회해서 처리하기로 한다.....
SXSSFWorkbook과 XMLSlideShow을 인자로 받아서 저장중이기 때문에 각각

public interface CancellableTask {
	void cancel();
}
public class ExcelDownloadCancellableTask<R> implements CancellableTask {

	private final SXSSFWorkbook workbook;
	private final Future<R> future;

	@Override
	public void cancel() {
		if(future.isDone()) {
			return;
		}

		if (future.isCancelled()) {
			return;
		}

		try {
			//file과 connection을 끊어 IoException을 발생 시킨다.
			workbook.close();
			workbook.dispose();
		} catch (Exception e) {
			log.error("ERROR LOG========================", e);
		}

		log.info("complete cancel excel download");
	}

	public static <T> ExcelDownloadCancellableTask<T> of(
		SXSSFWorkbook workbook,
		Future<T> future
	) {
		return new ExcelDownloadCancellableTask<>(workbook, future);
	}
}
public class RunningTaskManager {
	private final ThreadPoolTaskExecutor excelDownloadThreadPool;
	private final ThreadPoolTaskExecutor pptDownloadThreadPool;
	//CancellableTask 를 보관하기 위한 변수
	private final Map<String, CancellableTask> cancellableTasks = Maps.newHashMap();

	public void start(
		String downloadId,
		SXSSFWorkbook workbook,
		Runnable runnable
	) {
		Future<?> future = excelDownloadThreadPool.submit(runnable);
		cancellableTasks.put(downloadId, ExcelDownloadCancellableTask.of(workbook, future));
	}

	public void start(
		String downloadId,
		XMLSlideShow ppt,
		Runnable runnable
	) {
		Future<?> future = pptDownloadThreadPool.submit(runnable);
		cancellableTasks.put(downloadId, PptDownloadCancellableTask.of(ppt, future));
	}

	public boolean cancel(String downloadId) {
		if(!cancellableTasks.containsKey(downloadId)) {
			return false;
		}

		CancellableTask task = cancellableTasks.get(downloadId);
		task.cancel();
		removeTask(downloadId);
		return true;
	}

	private void removeTask(String downloadId) {
		cancellableTasks.remove(downloadId);
	}
}

stream connection을 끊어서 강제로 IOException을 발생 시키기로 한다....

profile
일단 해보자

0개의 댓글