Apache Camel은 통합 프레임워크
로, 여러 시스템 간의 메시지 라우팅 및 처리 로직을 정의하고 실행하기 위해 사용된다.
서로 다른 시스템 간 데이터를 통합하고 처리할 수 있도록 Component
와 Java APIs
들의 집합으로 구성되어 있다. 즉, Camel 은 서로 다른 애플리케이션을 연결하는 매게체 역할을 한다.
약 320개 이상의 Component
를 제공하며 이를 통해 웹 서비스, 파일 읽기 및 쓰기, 타 앱과 상호작용 등이 가능하다. Camel은 Java 용 파이프라인을 생성할 수 있는 도구라고 비유할 수 있다. 특정 지점에서 데이터를 가져와 다른 지점으로 연결하며, 그 과정에서 데이터를 변경 및 변환하거나 또 다른 파이프를 통해 전송할 수도 있다.
데이터를 A에서 B로 이동해야 하는 거의 모든 경우에 Camel이 사용될 수 있다. 가령 다음과 같은 예시가 있다.
APIM(API 관리) 솔루션에 조금 더 초점을 맞추자면 다음과 같은 예시가 있다.
다음 다이어그램은 Camel의 핵심 개념 중 일부를 보여준다.
주요 키워드
Enterprise Integration Patterns
에서 영감을 많이 받아, 재사용 가능한 패턴을 차용해 개발되었다. 시스템 간 메시지 라우팅 및 처리를 다루는 디자인 패턴의 모음이다.Camel의 가장 기본적인 개념은 Route
이다. Camel을 구성하는 객체로, 데이터를 A에서 B로 이동시킬 수 있다. 이때, 각 지점인 A, B를 Endpoint
이라 지칭한다. Route는 각 Endpoint 경로를 정의할 때 사용된다. Java 또는 Xml 구문을 통해 Route를 생성할 수 있다.
from("sourceEndpoint")
.to("destinationEndpoint");
다음은 이전 폴더에 위치한 파일들을 특정 폴더로 이동시키는 예제코드이다.
from("file:home/customers/new")
.to("file:home/customers/old");
Producer & Consumer (생산자와 소비자)
Route, Endpoint, Component에서 Producer와 Consumer 개념이 사용된다.
Producer
Consumer
DSL
Apache Camel은 다양한 형태의 DSL(Domain Specific Language)을 이용해서 라우트들을 정의한다. Spring 애플리케이션에서 DSL의 주요 두 가지 형식은 Java DSL과 Spring XML DSL로 정의된다. 다음은 RouteBuilder 클래스를 사용하는 Java DSL로 정의된 라우트 예제이다.
RouteBuilder builder = new RouteBuilder(){
@Override
public void configure() throws Exception{
// Route Definition in Java DSL for
// moving file from jms queue to file system.
from("jms:queue:mySQueue").to("file://mysrc");
}
}
Endpoint는 Camel 이 다른 시스템과 메시지를 교환하는 인터페이스 이다. 라우트에서 이동하는 단계이다.(?)
여러 방법으로 엔드포인트를 선언할 수 있찌만, 가장 대표적인 방법은 다음과 같이 URI 처럼 보이는 구문을 사용해 선언한다.
URI 형식: <component>:<specific-uri-options>
예제
prefix:mainpart?option1=xxx&option2=xxx...
file:inputFolder
(파일 디렉토리)http://example.com
(HTTP 요청)jms:queue:orders
(JMS 큐)from("timer:foo?period=1000") // 1초마다 타이머 이벤트 생성
.setBody(constant("Hello, Camel"))
.to("log:info"); // 로그 출력
Component는 특정 프로토콜이나 기술을 다루기 위해 Camel에 플러그인처럼 추가되는 모듈입니다. 엔드포인트를 생성할 수 있도록 컴포넌트 라이브러리를 제공한다. 컴포넌트는 디스크에 있는 파일, 사서함, Dropbox나 트위터 같은 앱 등 외부 시스템과의 통신(연결)할 수 있는 플러그
와 같은 역할이다.
예를들어, 애플리케이션에 데이터를 저장하거나 가져오는 작업이 필요하다고 가정하자. Component는 이미 이러한 작업을 대신 해주는 기능을 제공한다. 따라서 파일을 읽거나 웹 서비스를 호출하기 위해 직접 코드를 작성하여 구현하는데 시간을 소모할 필요가 없다. 단지, 적절한 컴포넌트를 찾아서 사용하면 된다. 가장 일반적인 컴포넌트들은 다음과 같다.
Component | 목적 | Endpoint URI |
---|---|---|
HTTP | HTTP 요청 처리 | http: |
File | 파일 읽기 및 쓰기 | file: |
REST | RESTful 서비스 구현 | rest: |
JMS | 메시지 큐와의 통신 (예: ActiveMQ, RabbitMQ) | jms: |
Direct | for joining your Camel routes together | direct: |
Salesforce | for getting data in and out of Salesforce | salesforce: |
from("file:inputFolder")
.to("http://example.com/api/upload");
먼저, Enterprise Integration
가 무엇인지 용어 정리를 하자면, 조직이 핵심 비즈니스 프로세스를 관리하고 통합하는 데 사용하는 포괄적인 소프트웨어 플랫폼으로 정의된다. 쉽게 말하면 기업 내의 다양한 시스템들을 효과적으로 연결하고 통신하게 만드는 방법론 이다.
실무적 상황을 예로 들어보자면, 금융권 기업은 예금 업무 시스템, 대출 상품 관리 시스템, 인터넷뱅킹 시스템 등 다양한 시스템을 가지고 있다. 이러한 시스템들이 서로 실시간으로 정보를 주고받아야 원활한 은행 업무가 가능하다. 이를 위해 중앙 허브 역할을 하는 통합 계층(E.g. APIM)을 두는데, 이때, Apache Camel과 같은 통합 엔진을 활용한다. 기업은 이를 통해 안정성과 확정성을 유지하면서, 다양한 애플리케이션과 시스템을 연결할 수 있는 일관된 방법을 제공받을 수 있다.
앞서 언급한 엔터프라이즈 통합 패턴이라는 책에 정의된 패턴에 따라 메시지를 처리한다. 메시지에서 변환, 분할 및 로깅과 같은 몇 가지 일반적인 작업을 수행하려는 경우 EIP를 사용한다. 다음은 Camel의 몇 가지 일반적인 EIP이다.
EIP | 역할 | Java 문법 |
---|---|---|
Splitter | Splits a message into multiple parts | .split() |
Aggregator | Combines several messages into one message | .aggregate() |
Log | Writes a simple log message | .log() |
Marshal | Converts an object into a text or binary format | .marshal() |
From* | Receives a message from an endpoint | .from() |
To* | Sends a message to an endpoint | .to() |
예제
Content-Based Routing: 메시지 내용에 따라 다른 라우트를 선택
from("direct:start")
.choice()
.when(simple("${body} contains 'urgent'"))
.to("jms:queue:urgent")
.otherwise()
.to("jms:queue:normal");
Message Transformation: 메시지 변환
from("direct:start")
.transform().simple("Modified: ${body}");
Split/Aggregate: 메시지를 분할하거나 합침
from("file:inputFolder")
.split(body().tokenize("\n"))
.to("direct:processLine");
Processor
메시지의 내용을 직접적으로 처리하여 비지니스로직을 구현할 수 있는 인터페이스이다.
메시지가 Camel 라우트를 통해 이동할 때, 중간 단계에서 Processor를 처리하여 메시지를 변환하거나 조작을 할 수 있다.
processor의 역할
Processor 또한 RouterBuilder처럼 클래스에서 상속을 받아 사용하며 apiCreator의 경우 apiService에서 제공하는 기능들 별로 processor를 상속하여 기능들을 구현하였다.
ex) parsingProcessor, convertProcessor
Transformer
모든 Camel 생성자의 런타임 컨테이너이며, 라우팅 규칙에 따라 수행된다.
주요 구성 요소들을 종합적으로 정리하자면, Camel의 키워드들은 아래와 같은 관계로 동작한다.
참고 링크
Udemy - Learn Apache Camel 강의를 통해 학습한 내용을 정리했다. 2024년 연말 기준 가장 최신의 학습자료이다.
public class CustomRouter extends RouteBuilder {
@Override
public void configure() throws Exception {
from("timer:first-timer")
.transform().constant("Time now is " + java.time.LocalTime.now()) // 상수를 사용하고 있기 때문에 동일한 메시지를 계속 전송
.to("log:first-timer"); // log endpoint
}
from()
: 메시지를 수신하는 endpointtransform()
: Camel은 EIP(Enterprise Integration Patterns)의 메시지 변형을 지원한다.constant()
: 상수 사용LocalTime.now()
메서드를 호출하더라도, 최초 호출 시 사용된 동일한 메시지가 전송된다.to()
: 메시지를 송신하는 endpoint위 예제에서는 상수를 사용하기 때문에, 메시지를 정적으로 전달한다. 이를 동적으로 관리하기 위해 Spring Bean 을 사용해 리팩토링 해보자.
@Component
class GetCurrentTimeBean {
public String getCurrentTime() {
return "Time now is " + java.time.LocalTime.now();
}
}
@Override
public void configure() throws Exception {
from("timer:second-timer")
.bean("getCurrentTime")
.to("log:second-timer");
}
그러나, 위와 같이 Bean 메서드 이름을 직접 문자열로 지정하는 것은 권장되지 않는 방법이다. 메서드 이름을 문자열로 지정하면 IDE에서 메서드 이름을 변경할 때 참조를 찾지 못해 오류 발생의 원인이 되며, 여러 개의 오버로드된 메서드가 있을 경우, Camel이 어떤 메서드를 호출할지 모호하다. 따라서 다음과 같은 방식이 권장된다.
.bean(MyBean.class, "getCurrentTime")
// 또는 의존성 주입을 받은 경우
.bean(myBean, "getCurrentTime")
만약, Bean에 단일 메서드만 있는 경우, @Handler
어노테이션을 사용한 방식을 권장한다. 가장 명확하며, 메서드 이름을 문자열로 하드코딩하지 않아도 되며, 메서드 명이 변경될 때 자동으로 모든 참조를 찾아 변경된다.
@Component
class GetCurrentTimeBean {
@Handler
public String getCurrentTime() {
return "Time now is " + java.time.LocalTime.now();
}
}
from("timer:second-timer")
.bean(MyBean.class) // 메서드 이름 지정 불필요
.to("log:second-timer");
Processing
Transformation
중요한 차이점은 메서드의 반환 타입이 있는가 이다.
bean을 사용해 둘다 구현 가능
또는 processor(), transform()을 사용할 것.
// Processor를 상속받아 사용하는 방법
class SimpleLoggingProcessor implements Processor {
private Logger logger = LoggerFactory.getLogger(SimpleLoggingProcessor.class);
@Override
public void process(Exchange exchange) throws Exception {
logger.info("SimpleLoggingProcessor {}", exchange.getMessage().getBody());
}
}