ch09 스프링 통합

ChapJun·2022년 2월 11일
0

9.1 간단한 통합 플로우 선언하기

  • 파일에 데이터를 쓰는 메세지 게이트웨이 선언
@MessagingGateway(defaultRequestChannel = "textInChannel")
public interface FileWriterGateway {

    void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
}
  • @MessagingGateway : FileWriterGateway 인터페이스의 구현체(클래스)를 런타임 시에 생성하라고 스프링 통합에 알려준다.
  • defaultRequestChannel : 해당 인터페이스의 메서드 호출로 생성된 메시지가 이 속성에 지정된 메시지 채널로 전송된다는 것을 나타낸다.
  • writeToFile()의 호출로 생긴 메시지가 textInChannel이라는 이름의 채널로 전송된다.
  • @Header : filename에 전달되는 값이 메시지 페이로드가 아닌 메시지 헤더에 있다는 것을 나타낸다.

9.2 통합플로우 구성하기 (XML 방식, 자바 방식, DSL 방식)

  • DSL 방식
@Configuration
public class FileWriterIntegrationConfig {

    @Bean
    public IntegrationFlow fileWriterFlow() {
        return IntegrationFlows.from(MessageChannels.direct("textInChannel")) // 인바운드채널
            .<String, String>transform(t -> t.toUpperCase()) // 변환기
            .handle(Files
                .outboundAdapter(new File("/tmp/sia5/files"))
                .fileExistsMode(FileExistsMode.APPEND)
                .appendNewLine(true)) // 서비스액티베이터
            .get();
    }
}

9.3 스프링 통합 컴포넌트 구성

스프링 통합은 다수의 통합 시나리오를 갖는 많은 영역을 포함한다. 따라서 그 모든 것을 하나의 챕터에 포함시키려고 하는 것은 마치 코끼리를 봉투에 맞춰 넣으려고 하는 것과 같다. 통합 플로우는 하나 이상의 컴포넌트로 구성되며 다음과 같다.

  1. 채널(Channel) : 한 요소로부터 다른 요소로 메시지를 전달한다.
  2. 필터(Filter) : 조건에 맞는 메시지가 플로우를 통과하게 해준다.
  3. 변환기(Transaformer) : 메시지 값을 변경하거나 메시지 페이로드의 타입을 다른 타입으로 변환한다.
  4. 라우터(Router) : 여러 채널 중 하나로 메시지를 전달하며, 대개 메시지 헤더를 기반으로 한다.
  5. 분배기(Splitter) : 들어오는 메시지를 두 개 이상의 메시지로 분할하며, 분할된 각 메시지는 다른 채널로 전송된다.
  6. 집적기(Aggregator) : 분배기와 상반된 것으로 별개의 채널로부터 전달되는 다수의 메시지를 하나의 메시지로 결합한다.
  7. 서비스 액티베이터(Service activator) : 메시지를 처리하도록 자바 메서드에 메시지를 넘겨준 후 메서드의 반환값을 출력 채널로 전송한다.
  8. 채널 어댑터(Channel adapter) : 외부 시스템에 채널을 연결한다. 외부 시스템으로부터 입력을 받거나 쓸 수 있다.
  9. 게이트웨이(Gateway) : 인터페이스를 통해 통합 플로우로 데이터를 전달한다.

메시지 채널

메시지 채널은 통합 파이프라인을 통해서 메시지가 이동하는 수단이다. 즉, 채널은 스프링 통합의 다른 부분을 연결하는 통로다. 스프링 통합은 다음을 포함해서 여러 채널 구현체(클래스)를 제공한다.

  • PublishSubscribeChannel : 전송되는 메시지는 하나 이상의 컨슈머로 전달된다. 컨슈머가 여럿일 때는 모든 컨슈머가 해당 메시지를 수신한다.
  • QueueChannel : 전송되는 메시지는 FIFO 방식으로 컨슈머가 가져갈 때까지 큐에 저장된다. 컨슈머가 여럿일 때는 그중 하나의 컨슈머만 해당 메시지를 수신한다.
  • PriorityChannel : QueueChannel과 유사하지만, FIFO 방식 대신 메시지의 priority 헤더를 기반으로 컨슈머가 메시지를 가져간다.
  • RendezvousChannel : QueueChannel과 유사하지만, 전송자와 동일한 스레드로 실행되는 컨슈머를 호출하여 단일 컨슈머에게 메시지를 전송한다. 이 채널은 트랜잭션을 지원한다.
  • DirectChannel : 전송자와 동일한 스레드로 실행되는 컨슈머를 호출하여 단일 컨슈머에게 메세지를 전송한다. (트랜잭션 지원)
  • ExecutorChannel : DirectChannel과 유사하지만, TaskExecutor를 통해서 메시지가 전송된다. (전송자와 다른 스레드에서 처리된다) 이 채널 타입은 트랜잭션을 지원하지 않는다.
  • FluxMessageChannel : 프로젝트 리액터(Product Reactor)의 플럭스(Flux)를 기반으로 하는 리액티브 스트림즈 퍼블리셔(Reactive Streams Publisher) 채널이다.
// option1. PublishSubscribeChannel
@Bean
public MessageChannel orderChannel() {
  return new PublishSubscribeChannel();
}

// option2. QueueChannel
@Bean
public MessageChannel orderChannel() {
  return new QueueChannel();
}

@Bean
public IntegrationFlow orderFlow() {
	return IntegrationFlows 
		...
		.channel("orderChannel")
		...
		.get();
}

필터

필터는 통합 파이프라인의 중간에 위치할 수 있으며, 플로우의 전 단계로부터 다음 단계로의 메시지 전달을 허용 또는 불허한다.

@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
    return IntegrationFlows 
        ...
        .<Integer>filter((p) -> p % 2 == 0) 
        ...
        .get();
}

변환기

변환기는 메시지 값의 변경이나 타입을 변환하는 일을 수행한다.

// option1. 어노테이션 기반 변환기 설정
    @Bean 
    @Transformer(inputChannel = "textInChannel", outputChannel = "fileWriterChannel")
    public GenericTransformer<String, String> upperCaseTransformer() {
        return String::toUpperCase;
    }
    
// option1. DSL 기반 변환기 설정    
.<String, String>transform(String::toUpperCase) // 변환기 선언    

라우터

라우터는 전달 조건을 기반으로 통합 플로우 내부를 분기(서로 다른 채널로 메시지를 전달)한다.

예를 들어, 정수값을 전달하는 numberChannel이라는 이름의 채널이 있다고 하자. 그리고 모든 짝수 메시지를 evenChannel이라는 이름의 채널로 전달하고, 홀수 메시지는 oddChannel이라는 이름의 채널로 전달한다고 가정해보자.

// 자바 DSL 구성 라우터 설정
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
    return IntegrationFlows
        ...
        .<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping 
            .subFlowMapping("EVEN", 
                sf -> sf.<Integer, Integer>transform(n -> n * 10) 
                    .handle((i,h) -> { ... })
                )
            .subFlowMapping("ODD", sf -> sf 
                .transform(RomanNumbers::toRoman) 
                .handle((i,h) -> { ... })
             ) 
            .get();
}

분배기

때로는 통합 플로우에서 하나의 메시지를 여러 개로 분할하여 독립적으로 처리하는 것이 유용할 수 있다.

분배기를 사용할 수 있는 중요한 두 가지 경우가 있다.

  • 메시지 페이로드가 같은 타입의 컬렉션 항목들을 포함하며, 각 메시지 페이로드 별로 처리하고자 할 때다
  • 연관된 정보를 함께 전달하는 하나의 메시지 페이로드는 두 개 이상의 서로 다른 타입 메시지로 분할될 수 있다.

예를 들어, 주문 데이터를 전달하는 메시지는 대금 청구 정보와 주문 항목 리스트의 두 가지 메시지로 분할할 수 있다.


public class OrderSplitter {
    public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
				ArrayList<Object> parts = new ArrayList<>(); 
				parts.add(po.getBillingInfo()); 
				parts.add(po.getLineItems());
				
				return parts;
		}
}

return IntegrationFlows...
    .split(orderSplitter()).<Object, String> route(
        p->{
            if(p.getClass().isAssignableFrom(BillingInfo.class)){
                return"BILLING_INFO";
            }else{
                return"LINE_ITEMS";
            }
        }, mapping->mapping
            .subFlowMapping("BILLING_INFO",
                sf->sf.<BillingInfo> handle((billingInfo,h)->{
                    ...
                }))
            .subFlowMapping("LINE_ITEMS",
                sf->sf.split()
                        .<LineItem> handle((lineItem,h)->{
                    ...
                }))
            )
    .get();

서비스 액티베이터

서비스 액티베이터는 입력 채널로부터 메시지를 수신하고 이 메시지를 MessageHandler 인터페이스를 구현한 클래스(빈)에 전달한다.

public IntegrationFlow someFlow() {
    return IntegrationFlows
            ...
            .handle(msg -> {
                System.out.println("Message payload: " + msg.getPayload());
            }) 
        .get();
}

public IntegrationFlow orderFlow(OrderRepository orderRepo) {
    return IntegrationFlows
        ...
        .<Order>handle((payload, headers) -> { 
            return orderRepo.save(payload);
        }) 
        ...
        .get();
}

public IntegrationFlow fileWriterFlow() {
      return IntegrationFlows
             ...
             .handle(Files.outboundAdapter(new File("C:\\tmp\\sia5\\files"))
             .fileExistsMode(FileExistsMode.APPEND)
             .appendNewLine(true))
             .get();
}

게이트웨이

게이트웨이는 애플리케이션이 통합 플로우로 데이터를 제출(submit)하고 선택적으로 플로우의 처리 결과인 응답을 받을 수 있는 수단이다.

이전의 본 FileWriterGateway는 단방향 게이트웨이며, 파일에 쓰기 위해 문자열을 인자로 받고 void를 반환하는 메서드를 갖고 있다. 양방향 게이트웨이의 작성도 어렵지 않으며, 이때는 게이트웨이 인터페이스를 작성할 때 통합 플로우로 전송할 값을 메서드에서 반환해야 한다.
예를 들어, 문자열을 받아서 모두 대문자로 변환하는 통합 플로우의 게이트웨이를 생각해 보자.

// option1. 어노테이션 기반 게이트웨이 설정
@Component
@MessagingGateway(defaultRequestChannel="inChannel",
    defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
    String uppercase(String in);
}

// option2. 자바 DSL 구성 게이트웨이 설정
@Bean
public IntegrationFlow uppercaseFlow() {
    return IntegrationFlows
        .from("inChannel")
        .<String, String> transform(s -> s.toUpperCase()) 
				.channel("outChannel")
        .get();
}

채널 어댑터

채널 어댑터는 통합 플로우의 입구와 출구를 나타낸다. 데이터는 인바운드(inbound) 채널 어댑터를 통해 통합 플로우로 들어오고, 아웃바운드(outbound) 채널 어댑터를 통해 통합 플로우에서 나간다.

인바운드 채널 어댑터는 플로우에 지정된 데이터 소스에 따라 여러 가지 형태를 갖는다. 예를 들어, 증가되는 숫자를 AtomicInteger로부터 플로우로 넣는 인바운드 채널 어댑터를 선언할 수 있다.

@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
    return IntegrationFlows 
        .from(integerSource, "getAndIncrement",
          c -> c.poller(Pollers.fixedRate(1000))) 
        ...
        .get();
}

아웃바운드 채널 어댑터는 통합 플로우의 끝단이며, 최종 메시지를 어플리케이션이나 다른 시스템에 넘겨준다.

엔드포인트 모듈

스프링 통합은 우리 나름의 채널 어댑터를 생성할 수 있게 해준다. 아래 표에 있는 것을 포함해서 다양한 외부 시스템과의 통합을 위해 채널 어댑터가 포함된 24개 이상의 엔드포인트 모듈(인바운드, 아웃바운드 모두)을 스프링 통합이 제공한다.

외부 시스템과의 통합을 위한 24개 이상의 엔드포인트 모듈

AMQP -> spring-integration-amqp
Spring application events -> spring-integration-event
RSS and Atom -> spring-integration-feed
Filesystem -> spring-integration-file
FTP/FTPS -> spring-integration-ftp
GemFire -> spring-integration-gemfire
HTTP -> spring-integration-http
JDBC -> spring-integration-jdbc
JPA -> spring-integration-jpa
JMS -> spring-integration-jms
Email -> spring-integration-mail
MongoDB -> spring-integration-mongodb
MQTT -> spring-integration-mqtt
Redis -> spring-integration-redis
RMI -> spring-integration-rmi
SFTP -> spring-integration-sftp
STOMP -> spring-integration-stomp
Stream -> spring-integration-stream
Syslog -> spring-integration-syslog
TCP/UDP -> spring-integration-ip
Twitter -> spring-integration-twitter
Web Services -> spring-integration-ws
WebFlux -> spring-integration-webflux
WebSocket -> spring-integration-websocket
XMPP -> spring-integration-xmpp
ZooKeeper -> spring-integration-zookeeper

9.4 요약

  • 스프링 통합은 플로우를 정의할 수 있게 해준다. 데이터는 애플리케이션으로 들어오거나 나갈 때 플로우를 통해 처리할 수 있다.
  • 통합 플로우는 XML, Java, Java DSL을 사용해서 정의할 수 있다.
  • 메시지 게이트웨이와 채널 어댑처는 통합 플로우의 입구나 출구의 역할을 한다.
  • 메시지는 플로우 내부에서 변환, 분할, 집적, 전달될 수 있으며, 서비스 액티베이터에의해 처리될 수 있다.
  • 메시지 채널은 통합 플로우의 컴포넌트들을 연결한다.
@SpringBootApplication
public class TacoCloudApplication {

    public static void main(String[] args) {
        SpringApplication.run(TacoCloudApplication.class, args);
    }

    @Bean
    public CommandLineRunner writeData(FileWriterGateway gateway) {
        return args -> {
            gateway.writeToFile("simple.txt", "Hello, Spring Integration! (test)");
        };
    }
}

    
@Configuration
public class FileWriterIntegrationConfig {

    @Bean
    public IntegrationFlow fileWriterFlow() {
        return IntegrationFlows
                .from(MessageChannels.direct("textInChannel")) // 인바운드 채널
                .<String, String>transform(String::toUpperCase) // 변환기 선언
                .handle(Files.outboundAdapter(new File("C:\\tmp\\sia5\\files"))  // 서비스액티베이터
                .fileExistsMode(FileExistsMode.APPEND)
                .appendNewLine(true))
                .get();
    }

}

@MessagingGateway(defaultRequestChannel = "textInChannel")
public interface FileWriterGateway {
    void writeToFile(@Header(FileHeaders.FILENAME) String filename, String data);
}

참고

스프링 통합은 Enterprise Integration Patterns에 나오는 패턴들을 스프링 프레임워크에 구현해놓았다. Enterprise Integration Patterns은 엔터프라이즈 환경에서 사용하고 있는 다양한 분야(예. 결제, 메일, 각 부서에 필요한 서비스)의 애플리케이션을 통합, 즉 유기적으로 연결해서 효율적으로 적절하게 통합하는 방법을 여러 패턴을 통해 제시했다.

Enterprise Integration Pattern도 앞서 설명했다시피 엔터프라이즈 환경에서 사용하고있는 다양한 서비스를 필요에 따라 잘 융화시키기 위해 파일 전송, 데이터베이스 공유, 원격 프로시저, 메세징을 핵심 과제로 다양한 방법, 즉 패턴을 제시하고 있는 것이다.

예를들어 누군가가 회사 대표메일로 이메일을 보내면 메일을 읽고(a) 분석해서(b) 고객응대가 필요할 경우 CS팀과 홍보팀에 메일을 전달(c)하고 정규화된 양식으로 변경(c)한 다음, 해당 정보를 데이터베이스에 저장(d)하는 플로우를 구성한다고 하면, a 단계에선 메일을 읽을 수 있는 파트, b에선 메일을 분석하는 엔진으로 보내는 파트, c에선 정규화된 양식으로 변환하는 파트, 나머지 d에선 데이터베이스에 저장하는 파트로 각 모듈을 구성하고 내/외부 서비스들과 접촉하는 부분을 쉽게 구현할 수 있도록 spring integration이 제공한다고 보면 된다.

profile
Chap Chap

0개의 댓글