parallelStream() 사용 시 LazyInitializationException이 발생하는 현상

Garam·2024년 11월 26일
post-thumbnail

문제상황

  • 코드
@Transactional(propagation = Propagation.REQUIRES_NEW)
public List<ChatFlowListResponse> getChatFlows(User user, boolean isShared, boolean test, int page, int limit, boolean executable) {
    // 순회할 ChatFlow 리스트
    List<ChatFlow> chatFlows;
    
    // 디버깅 위한 출력문
    boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
    String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
    System.out.println("actualTransactionActive = " + actualTransactionActive);
    System.out.println("currentTransactionName = " + currentTransactionName);
    System.out.println("main entityManager open = " + entityManager.isOpen());
    System.out.println("main entityManager hash code = " + entityManager.hashCode());
    
    // parallelStream 시 Session이 열려 있지 않은 상황
    if (executable) {
        chatFlows = chatFlows.parallelStream().filter(
	        // 같은 클래스 내부의 precheck 메서드 호출
	        chatFlow -> precheck(chatFlow.getId()).isExecutable()
        ).toList();
    }

    return ...;
}

public PreCheckResponse precheck(Long chatFlowId) {
		 // 디버깅 위한 출력문
    Thread currentThread = Thread.currentThread();
    System.out.println("currentThread = " + currentThread.getId());
    System.out.println("is entityManager open = " + entityManager.isOpen());
    System.out.println("entityManager hash code = " + entityManager.hashCode());
    boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
    String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
    System.out.println("actualTransactionActive = " + actualTransactionActive);
    System.out.println("currentTransactionName = " + currentTransactionName);
    System.out.println("===========================");
		
		// 메서드 로직 시작
    ChatFlow chatFlow = chatFlowRepository.findById(chatFlowId).orElseThrow(() ->
            new BaseException(ErrorCode.CHAT_FLOW_NOT_FOUND)
    );

    List<Node> nodes = nodeRepository.findByChatFlowId(chatFlowId);
    List<Edge> edges = edgeRepository.findByChatFlowId(chatFlowId);

    // ... 기타 Lazy Loading으로 설정된 여러 연관된 객체를 다루는 로직

    return PreCheckResponse.createTrue();
}

  • 에러
org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: com.ssafy.flowstudio.domain.node.entity.Node.outputEdges: could not initialize proxy - no Session
    at org.hibernate.collection.spi.AbstractPersistentCollection.throwLazyInitializationException(AbstractPersistentCollection.java:636) ~[hibernate-core-6.5.3.Final.jar:6.5.3.Final]
    at org.hibernate.collection.spi.AbstractPersistentCollection.withTemporarySessionIfNeeded(AbstractPersistentCollection.java:219) ~[hibernate-core-6.5.3.Final.jar:6.5.3.Final]
    at org.hibernate.collection.spi.AbstractPersistentCollection.readSize(AbstractPersistentCollection.java:150) ~[hibernate-core-6.5.3.Final.jar:6.5.3.Final]
    at org.hibernate.collection.spi.PersistentBag.isEmpty(PersistentBag.java:358) ~[hibernate-core-6.5.3.Final.jar:6.5.3.Final]
    at com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.precheck(ChatFlowService.java:452) ~[main/:na]
    at com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.lambda$getChatFlows$3(ChatFlowService.java:93) ~[main/:na]
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:178) ~[na:na]
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
    at java.base/java.util.stream.Nodes$CollectorTask.doLeaf(Nodes.java:2183) ~[na:na]
    at java.base/java.util.stream.Nodes$CollectorTask.doLeaf(Nodes.java:2149) ~[na:na]
    at java.base/java.util.stream.AbstractTask.compute(AbstractTask.java:327) ~[na:na]
    at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) ~[na:na]
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) ~[na:na]

  • 터미널 출력문
mainThread = 205
actualTransactionActive = true
currentTransactionName = com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.getChatFlows
main entityManager open = true
main entityManager hash code = 764256713
-------------
currentThread = 205
is entityManager open = true
entityManager hash code = 764256713
currentThread = 233
is entityManager open = true
entityManager hash code = 764256713
actualTransactionActive = true
actualTransactionActive = false
currentTransactionName = null
currentTransactionName = com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.getChatFlows
===========================
===========================
currentThread = 234
is entityManager open = true
entityManager hash code = 764256713
actualTransactionActive = false
currentTransactionName = null
===========================

해결

  • parallelStream 대신 Sequential한 stream 사용
  • 트랜잭션은 기본적으로 스레드 간에 전파되지 않음
  • 메인 스레드에서 트랜잭션이 시작되더라도, 다른 스레드에서 작업을 수행할 때는 트랜잭션을 공유하지 않는다
chatFlows = chatFlows.stream().filter(chatFlow -> this.precheck(chatFlow.getId()).isExecutable()).toList();

실패한 해결방안과 이유

@Transactional(propagation = Propagation.REQUIRES_NEW

  1. 멀티 스레딩과 Propagation의 관련 여부
  • Propagation은 @Transactional 애노테이션이 적용된 메서드가 호출될 때 기존 트랜잭션을 사용할지, 새 트랜잭션을 생성할지, 트랜잭션 없이 실행할지를 정의하는 전파 규칙
  • Propagation은 트랜잭션 전파 방식에 관련된 설정으로, 멀티스레딩에서 트랜잭션을 새로 만들지 여부와는 직접적인 관계가 없다
  • Propagation은 주로 하나의 스레드 내에서, 여러 메서드 간에 트랜잭션을 어떻게 처리할지를 정의하는 규칙이다
  1. 클래스 내부의 메소드를 직접 호출하면 AOP를 우회한다.
  • precheck()에 @Propagation.REQUIRES_NEW 를 설정하더라도 클래스 내부의 호출이므로 AOP를 우회하여 애노테이션이 적용되지 않는다.
@Service
public class MyService {

	// 자가자신이 다른 메소드에 의해 호출될 때 REQUIRES_NEW로 하겠다.
	@Transactional(propagation = Propagation.REQUIRES_NEW) 
	public void outerMethod() {
	    innerMethod();
	}
	
	// this.innerMethod로 직접 호출하면 프록시를 거치지 않고 직접 호출되므로 AOP를 우회하여 해당 애노테이션은 적용되지 않는다.
	~~@Transactional~~
	public void innerMethod() {
	}
}

# 메인 메소드 내부의 출력문
currentTransactionName = com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.getChatFlows
---------------------------
# 자식 메소드(precheck) 내부의 출력문 ()
# 싱글 스레드 & `@Transactional(propagation = Propagation.REQUIRES_NEW`가 precheck 메소드에 적용된 환경
# 모두 부모 메소드의 transaction 사용중으로 확인, REQUIRES_NEW가 적용되지 않았음을 알 수 있음
currentThread = 206
currentTransactionName = com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.getChatFlows
===========================
currentThread = 206
currentTransactionName = com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.getChatFlows
===========================
currentThread = 206
currentTransactionName = com.ssafy.flowstudio.api.service.chatflow.ChatFlowService.getChatFlows
===========================


https://stackoverflow.com/questions/23266866/java-parallelstream-with-spring-annotated-methods

0개의 댓글