회사에서 Apache Camel 을 쓸 일이 있어서 공부중인데,
사용했던 테스트 코드를 그냥 버리기엔 아까워서 여기에 기록합니다.
이 글에서는 Apache Camel 이 뭔지에 대한 기초적인 내용은 다루지 않습니다.
사실 아직 완전히 감을 잡은게 아니라서 못쓰는 겁니다 ㅎㅎ...
참고1
:Apache Camel ver.4.2.0
를 사용했습니다.
참고2
:XML DSL
도 원래는 되야하는데 이상하게 에러가 하도 나서
일단 보류했습니다. 대신 가장 대세인Java DSL
방식으로 테스트를 진행했습니다.
참고: 아파치 카멜(Apache camel)이란?
What is Apache Camel?
저도 아직 말로 설명하는데 어려움이 있어서 참고 링크를 답니다.
OS
: Window 10 Pro
IDE
: IntelliJ Ultimate
jdk
: Azul 17
framework
: spring boot-3.1.6
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>me.dailycode</groupId>
<artifactId>apache-camel</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>apache-camel</name>
<description>apache-camel</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>4.2.0</version>
</dependency>
<!-- https://camel.apache.org/components/4.0.x/sql-component.html -->
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-sql-starter</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-exec</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-jaxb-starter</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-spring-junit5</artifactId>
<version>3.21.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<image>
<builder>paketobuildpacks/builder-jammy-base:latest</builder>
</image>
</configuration>
</plugin>
</plugins>
</build>
</project>
일단 logback 세팅을 좀 하고 가겠습니다.
test/resources/application.properties
# 스프링 부트 테스트와 일반적인 junit 테스트 모두 똑같은
# 로그 레벨 사용을 위해서 logback-test.xml 파일을 사용
logging.config=classpath:logback-test.xml
test/resources/logback-test.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<timestamp key="BY_DATE" datePattern="yyyy-MM-dd"/>
<property name="LOG_PATTERN"
value="[%d{yyyy-MM-dd HH:mm:ss}:%-4relative] %green([%thread]) %highlight(%-5level) %boldWhite([%C.%M:%yellow(%L)]) - %msg%n"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
</appender>
<logger name="me.dailycode.apachecamel" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE" />
</logger>
<logger name="org.apache.camel.processor" level="DEBUG" additivity="false">
<appender-ref ref="CONSOLE" />
</logger>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
아는 분들은 알겠지만 camelContext.start()
를 호출해도 trigger 가 되는
Producer Endpoint
가 있고(ex: file
, timer
등)
아닌 게(ex: direct
, mock
등) 있습니다.
특히 direct 는 자기만으로는 시작이 절대 안됩니다.
하지만 ProducerTemplate
을 사용하면 강제로 시작시킬 순 있습니다!
package me.dailycode.apachecamel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
public class DirectStartTest {
@Test
void testDirect() throws Exception {
DefaultCamelContext camelContext = new DefaultCamelContext();
RouteBuilder.addRoutes(camelContext, in -> {
in.from("direct:start")
.log("wow")
.to("mock:done");
});
camelContext.start();
// 반드시 camelContext 가 시작된 이후 시점에 template.send 해야 합니다.
ProducerTemplate template
= camelContext.createProducerTemplate();
template.sendBody("direct:start", "This is a test message");
camelContext.stop();
}
}
참고: https://stackoverflow.com/questions/9636651/apache-camel-directstart-endpoint-what-does-it-mean
제가 생각하는 가장 핵심이 되는 4가지 세팅법을 이 목차에 작성해봅니다.
Registry
등록법Property
세팅법Body
세팅법Processor
세팅법package me.dailycode.apachecamel;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.Map.Entry;
@Slf4j
public class CamelBasicTests {
// https://camel.apache.org/components/4.0.x/languages/simple-language.html
// https://camel.apache.org/components/next/eips/message-translator.html
@Test
public void test() throws Exception {
DefaultCamelContext camelContext = new DefaultCamelContext();
// Camel Registry 설정
camelContext.getRegistry().bind("ProcessorInRegistry", new ProcessorInRegistry());
// 참고: ProcessorInRegistry 는 제가 만든 단순 테스트용
// org.apache.camel.Processor 를 implement 한 클래스입니다.
// 중간중간 보면 "${}" 처럼 무슨 표현식을 사용하는데,
// 이건 simple expression(또는 Language) 라는 것이다.
// 참고: https://camel.apache.org/components/4.0.x/languages/simple-language.html
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("timer://scheduleStart?repeatCount=1")
// ** 프로퍼티 세팅법 **
.setProperty("tableName", () -> "half_and_half")
.setProperty("srid", () -> 5186)
.setProperty("charset", () -> "UTF-8")
.setProperty("my_camelId")
.simple("${camelId}") // use simple lang
// ** body 세팅법 **
//: 참고로 body 는 from 에서 사용하는 Componenet 가 뭐냐에 따라 default 로 있을 수도,
//: 없을 수도 있다.
// --> constant 를 사용한 방법
// .setBody().constant(new HashMap<String, Object>())
// --> simple lang 을 사용한 방법 3가지, 모두 같은 효과임.
// .setBody(new SimpleExpression("${empty(map)}"))
// .setBody(builder.simple("${empty(map)}"))
.setBody().simple("${empty(map)}")
// ** process 세팅법 **
// --> 람다식 사용
.process(exchange -> {
Map<String, Object> properties = exchange.getProperties();
Map<String, Object> body = exchange.getIn().getBody(Map.class);
log.info("first route body state : {}", body);
body.put("more", "info");
})
// --> 클래스 타입 String 사용.
// 여기서 쓰는 JustClass 는 Processor 를 implement 한 클래스입니다.
.process("#class:me.dailycode.apachecamel.JustClass")
// --> camelContext.getRegistry().bind 를 통한 binding 방법!
.process("ProcessorInRegistry") // registry 로 등록한 것을 사용
.to("direct:aaa");
});
// 이렇게 .to("mock:???"); 으로 끝내주지 않으면
// "Waiting as there are still 1 inflight and pending exchanges
// to complete, timeout in ?? seconds. Inflights per route"
// 와 같은 경고문이 나오는데, 이건 아무래도 from 에서 시작하는 게
// 가끔은 계속 쓰레드를 물고 있는 경우가 있어서 그렇다.
// 특히 timer 가 그렇다. timer 를 한번만(repeatCount=1) 사용해도 상황은 같다.
// 억지로라도 mock endpoint 를 줌으로써 이를 방지할 수 있다.
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("direct:aaa")
.log("We received ${body}")
.process(exchange -> {
Map<String, Object> properties = exchange.getProperties();
Object body = exchange.getIn().getBody();
log.info("second route body state : {}", body);
for (Entry<String, Object> entry : properties.entrySet()) {
log.info("[ key: {} | value: {} ]", entry.getKey(), entry.getValue());
}
})
.to("mock:done");
});
camelContext.start();
Thread.sleep(2000);
camelContext.stop();
}
}
package me.dailycode.simple_lang;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
@Slf4j
public class SimpleLangStringTests {
@Test
@DisplayName(
"property 로 전달된 문자열을 자르고 편집하는게 simple lang 만으로 될까?")
void
stringSubConcatTest() throws InterruptedException {
try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("direct:start")
.process(exchange -> {
exchange.setProperty("targetFilePath",
"D:\\shape_files\\appending_test_data\\copied_dir\\half_and_half.zip");
})
.to("direct:goto1");
});
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("direct:goto1")
// .setProperty("shapeFilePath")
//.simple("${exchangeProperty[targetFilePath].substring(0,10)}")
// .setProperty("shapeFilePath")
//.simple("${exchangeProperty[targetFilePath].lastIndexOf('\\')}")
.setProperty("shapeFilePath")
.simple("${exchangeProperty[targetFilePath]"
+ ".substring("
+ "0,"
+ "${exchangeProperty[targetFilePath].lastIndexOf('.')}"
+ ")}"
+ ".shp") // zip 파일의 확장자를 shp 으로 변경하고 다음 router 에 전달
.to("direct:goto2");
});
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("direct:goto2")
.process(exchange -> {
System.out.println("targetFilePath : "
+ exchange.getProperty("targetFilePath", String.class));
System.out.println("shapeFilePath : "
+ exchange.getProperty("shapeFilePath", String.class));
})
.to("mock:end");
});
camelContext.start();
ProducerTemplate template = camelContext.createProducerTemplate();
template.sendBody("direct:start", new HashMap<String, Object>());
} catch (IOException e) {
log.error("IOException occurred : {}", e.getMessage());
} catch (Exception e) {
log.error("Exception occurred : {}", e.getMessage());
}
}
@Test
@DisplayName(
"property 로 전달된 문자열을 자르고 편집하는게 loop 문 내부에서도 가능할까?")
void
stringSubConcatWhileLoopTest() {
try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("direct:start")
.process(exchange -> {
List<Object> listOfShapeZips = List.of(
"D:\\shape_files\\appending_test_data\\copied_dir\\one.zip",
"D:\\shape_files\\appending_test_data\\copied_dir\\two.zip",
"D:\\shape_files\\appending_test_data\\copied_dir\\three.zip");
exchange.setProperty("listOfShapeZips", listOfShapeZips);
})
.to("direct:goto1");
});
// https://camel.apache.org/components/4.0.x/eips/loop-eip.html
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("direct:goto1")
.log("total Loop Count => ${exchangeProperty[listOfShapeZips].size}")
.loop()
.simple("${exchangeProperty[listOfShapeZips].size}").copy()
.setProperty("loop_idx")
.simple("${exchangeProperty.CamelLoopIndex}")
.setProperty("currentZipFile")
.simple(
"${exchangeProperty.listOfShapeZips[${exchangeProperty.loop_idx}]}")
.log(
"${date:now:yyyy/MM/dd HH:mm:ss.SSS} - Loop Idx = ${exchangeProperty.loop_idx}")
.process(exchange -> {
String currentZipFile =
exchange.getProperty("currentZipFile", String.class);
System.out.println("currentZipFile = " + currentZipFile);
})
// .to("direct:whileScope") ==> endpoint 도 (당연히) 사용할 수 있다.
.end()
.log(LoggingLevel.INFO,
"camel Context done! ${camelId}"); // loop 를 다 끝내고 갈 곳.
});
camelContext.start();
ProducerTemplate template = camelContext.createProducerTemplate();
template.sendBody("direct:start", new HashMap<String, Object>());
} catch (IOException e) {
log.error("IOException occurred : {}", e.getMessage());
} catch (Exception e) {
log.error("Exception occurred : {}", e.getMessage());
}
}
}
@Slf4j
public class InsertOnlyOneRowTests {
// https://camel.apache.org/components/4.0.x/eips/loop-eip.html
static HikariDataSource hikariDataSource;
@BeforeAll
static void beforeAll() {
// SQL 컴포넌트를 위한 DataSource 생성
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/postgres");
config.setUsername("postgres");
config.setPassword("postgres");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
hikariDataSource = new HikariDataSource(config);
}
@AfterAll
static void afterAll() {
hikariDataSource.close();
}
// https://camel.apache.org/components/4.0.x/sql-component.html#_component_options
// https://camel.apache.org/components/4.0.x/sql-component.html#_uri_format
// https://camel.apache.org/components/4.0.x/sql-component.html#_using_named_parameters
@Test
void testLoop1() throws Exception {
try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
// datasource 를 route 에서 자유롭게 사용하기 위해서 레지스트리 등록
camelContext.getRegistry().bind("MyDataSource", hikariDataSource);
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("timer://scheduleStart_2?repeatCount=1")
.process(exchange -> {
Integer property = exchange.getProperty(Exchange.LOOP_INDEX, Integer.class);
log.info("index inside processor - {}", property);
HashMap<String, Object> body = new HashMap<>();
ArrayList<Map<String, Object>> list = new ArrayList<>();
Map<String, Object> single = Collections.singletonMap("today", "2024-01-01");
list.add(single);
body.put("data", list);
body.put("mySeq", 300);
exchange.getMessage().setBody(body);
})
.log("log:${body}") // CamelLogger 를 console 창에서 검색
.to("sql:insert into public.somesome(id,name) " +
"values(:#${body[mySeq]}, :#${body[data][0][today]})" +
"?dataSource=#MyDataSource");
});
camelContext.start();
Thread.sleep(3000);
camelContext.stop();
}
}
}
참고 (1)
sql
component 에서 simple lang
을 통해서 Propertes
또는 Body
의 데이터를 읽어 오는데, 이때 실제 데이터의 타입을 체크하여 데이터를 넣습니다.insert into some(name) values (:#${body[name]})
✅insert into some(name) values (':#${body[name]}')
❌참고 (2)
[?]
를body[data][0].today
처럼하면 에러가 날겁니다.body[data][0][today]
사용하면 문제없습니다.package me.dailycode.apachecamel;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
@Slf4j
public class CamelLoopingAndSqlTests {
// https://camel.apache.org/components/4.0.x/eips/loop-eip.html
static HikariDataSource hikariDataSource;
@BeforeAll
static void beforeAll() {
// SQL 컴포넌트를 위한 DataSource 생성
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://localhost:5432/postgres");
config.setUsername("postgres");
config.setPassword("postgres");
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
hikariDataSource = new HikariDataSource(config);
}
@Test
void testLoop1() throws Exception {
DefaultCamelContext camelContext = new DefaultCamelContext();
// datasource 를 route 에서 자유롭게 사용하기 위해서 레지스트리 등록
camelContext.getRegistry().bind("MyDataSource", hikariDataSource);
// 하고자 하는 일,
// create table public.somesome(id int4, name varchar(255)); 라는 테이블을 미리 만들어 두고,
// 해당 테이블에 SQL 을 총 5번 날려서 insert 할 거임
// 이때 id 는 Camel Loop 의 index 를 사용할 것이며,
// 명칭은 그냥 dailyCode 으로 고정한다.
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("timer://scheduleStart_2?repeatCount=1")
.loop(8).copy()
.log("This Is - ${exchangeProperty.CamelLoopIndex}")
.process(exchange -> {
Integer property = exchange.getProperty(Exchange.LOOP_INDEX, Integer.class);
log.info("index inside processor - {}", property);
})
.to("sql:insert into public.somesome(id,name) " +
"values(:#${exchangeProperty.CamelLoopIndex}, 'dailyCode')" +
"?dataSource=#MyDataSource");
});
camelContext.start();
Thread.sleep(3000);
camelContext.stop();
}
@Test
void testListOfMap() throws Exception {
DefaultCamelContext camelContext = new DefaultCamelContext();
// datasource 를 route 에서 자유롭게 사용하기 위해서 레지스트리 등록
camelContext.getRegistry().bind("MyDataSource", hikariDataSource);
// 먼저 테이블 생성하시고~
/*
create table public.somebody
(
id int4,
name varchar(255),
address varchar(255),
age int4
);
*/
var listOfSomebody = List.of(
Map.of(
"id", 1,
"name", "dailyCode",
"address", "서울시 구로동",
"age", 20
),
Map.of(
"id", 2,
"name", "codingToastBread",
"address", "식빵나라 식빵시",
"age", 25
)
);
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("timer://scheduleStart_1?repeatCount=1")
.setBody().constant(listOfSomebody)
.log("initial body - ${body}")
.loop().simple("${body.size}").copy()
.log("${body[${exchangeProperty.CamelLoopIndex}]}")
.to("mock:done");
});
camelContext.start();
Thread.sleep(2000);
camelContext.stop();
}
}
sql: select now() as today
같이 어떤 결과값이 생길 수 있는데, 이러한 결과들은List
형태로 담겨서 다음으로 보내집니다.${body[0][today]}
, 여기서 today
는 select 문에서 지정한 alias 입니다.
참고
package me.dailycode.apachecamel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
public class FileCopyTest {
private static final String SOURCE_FOLDER = "D:\\ndata\\dummy_files";
private static final String DESTINATION_FOLDER = "D:\\ndata\\dummy_files_destination";
@Test
void testCopy() throws Exception {
DefaultCamelContext camelContext = new DefaultCamelContext();
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
// 복사하라면 이걸 사용.
builder.from("file://" + SOURCE_FOLDER + "?noop=true")
.to("direct:goto1");
// 옮기려는 거면 아래 걸 사용
// builder.from("file://" + SOURCE_FOLDER + "?delete=true")
// .to("direct:goto1");
});
// ~중간 로깅. 특별한 의미는 없음. 굳이 한번 나눠봄 ㅎㅎ;; ~
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("direct:goto1")
.log("file:ext = ${file:ext}")
.log("file:onlyname.noext = ${file:onlyname.noext}")
.log("file:name = ${file:name}")
.to("direct:goto2");
});
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("direct:goto2")
.setHeader("Exchange.FILE_NAME").simple("${file:onlyname.noext}-done.${file:ext}")
.to("file://" + DESTINATION_FOLDER);
});
camelContext.start();
Thread.sleep(2000);
camelContext.stop();
}
}
noop=true
옵션을 안 주면, file component
의 기본 동작이 발동합니다.
file component
는 파일을 옮기고 나서, 파일 출처지
에서 camel/
이라는
디렉토리를 생성하고 그 안에 작업했던 파일들을 옮깁니다.
저는 이 동작이 싫어서 일부러 noop=true 처리했습니다.
주의!
참고로 file (producer) 컴포넌트에는 fileName 이라는 옵션이 있는데,
해당 옵션은 Wild Card 기능을 제공하지 않습니다! 주의하시기 바랍니다.// 동작이 정상적으로 안될 겁니다. builder.from("file://" + SOURCE_FOLDER + "?fileName=*.txt&noop=true")`
또한 아래처럼 fileName 에 아무 값도 안 넣어줘도 동작을 안할겁니다.
builder.from("file://" + SOURCE_FOLDER + "?fileName=")`
아래처럼 하면 stdout 스트림만 받습니다
package me.dailycode.apachecamel.exec_camel;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
@Slf4j
public class ExecCamelTests {
@Test
void testExecCamel() throws Exception {
try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("direct:start")
.to("exec:ipconfig?args=/all")
.convertBodyTo(String.class, "EUC-KR")
.process(exchange -> {
String body = exchange.getIn().getBody(String.class);
log.info(body);
});
});
camelContext.start();
ProducerTemplate template = camelContext.createProducerTemplate();
template.sendBody("direct:start", new HashMap<String, Object>());
camelContext.stop();
}
}
}
앞서 본 코드에서는 안탑깝게도 stderr 스트림을 받지 못합니다.
stdout, stderr 둘 다 받고 싶다면 코딩 작업이 더 수반됩니다.
package me.dailycode.apachecamel.exec_camel;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.exec.ExecResult;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@Slf4j
public class ExecCamelComponentTests {
@Test
void testExecCamel() throws Exception {
try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
// window 의 경우 대다수가 기본 인코딩이 window-949 ( EUC-KR 호환되는 거 ) 입니다.
// 그래서 아래처럼 charset 을 생성했습니다.
Charset charset
= System.getProperty("os.name").toLowerCase().contains("win") ?
Charset.forName("EUC-KR") : StandardCharsets.UTF_8;
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("direct:start")
.to("exec:ogr2ogr?args=-append --config PG_USE_COPY YES -progress -f \"PostgreSQL\" \"PG:host=localhost port=5432 user=postgres password=postgres dbname=postgres\" \"PG:host=localhost port=5432 user=postgres password=postgres dbname=postgres\" -nln \"coding_toast.half_and_half2\" -sql \"SELECT A.id AS id, A.name AS name, A.geom AS geom FROM coding_toast.half_and_half A\" -nlt point --config SHAPE_ENCODING UTF-8 -lco GEOMETRY_NAME=geom")
.process(exchange -> {
// stderr, stdout 모두를 담고 있는 ExecResult 를 추출한다.
ExecResult execResult = exchange.getIn().getBody(ExecResult.class);
// stdOut 처리
String stdOutString = streamToString(charset, execResult.getStdout());
loggingResultMessage(stdOutString, "Success");
// stdErr 처리
String stdErrString = streamToString(charset, execResult.getStderr());
loggingResultMessage(stdErrString, "Error(Or Warning)");
// 참고(중요): stdOut, stdErr 둘 다 있을 수 있습니다!
})
.to("direct:goto1");
});
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder.from("direct:goto1")
.log("done")
.to("mock:end");
});
camelContext.start();
ProducerTemplate template = camelContext.createProducerTemplate();
template.sendBody("direct:start", new HashMap<String, Object>());
}
}
private static String streamToString(Charset charset, InputStream stream) throws IOException {
if (stream != null) {
try (stream) {
return StreamUtils.copyToString(stream, charset);
}
}
return "";
}
/**
* 로그 이쁘게 찍기 위한 Helper 메소드
*
* @param returnMessage stream 을 통해서 받은 메세지
* @param result 메세지와 관련된 결과가 Success 인지 아닌지를 판단하는 문자열
*/
private static void loggingResultMessage(String returnMessage, String result) {
if (StringUtils.hasText(returnMessage)) {
log.info("""
{} return Message :\s
========================================
{}
========================================\
""",
result, returnMessage);
}
}
}
아마 아래와 같은 코드를 원할 수도 있다.
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("direct:start")
.setProperty("host").constant("localhost")
.to("exec:ogr2ogr?" +
"args=${exchangeProperty.host}")
.process(exchange -> {
/* 생략 */
})
.to("direct:goto1");
});
하지만 이러면 ${}
이 어떤 치환작업이 일어나지 않고,
그대로 ${}
형태로 uri 에 세팅된다.
만약 ${}
가 치환이 되어서 동적으로 Endpoint URI 가 생성되게 하고 싶다면
to()
대신에 toD()
를 사용하면 된다.
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("direct:start")
.setProperty("args").constant("/all")
.toD("exec:ipconfig?args=${exchangeProperty.args}")
.process(exchange -> {
/* 생략 */
})
.to("direct:goto1");
});
package me.dailycode.apachecamel.exec_camel;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.junit.jupiter.api.Test;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
public class PreAndPostCompTests {
@Test
void testPreAndPost() {
try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
.from("timer://scheduleStart?repeatCount=1")
// 엄청 오래 걸리는 작업
.process("엄청 오래 걸림")
.to("mock:done")
;
});
camelContext.start();
// 2 초 후 route 정지 시키기
Thread.sleep(2000L);
List<RouteDefinition> routeDefinitions = camelContext.getRouteDefinitions();
Iterator<RouteDefinition> iterator = routeDefinitions.iterator();
// iterator 를 통해서 지워야합니다~ 안 그러면
// ConcurrentModificationException 에러가 납니다~
while (iterator.hasNext()) {
RouteDefinition routeDefinition = iterator.next();
iterator.remove(); // 반드시 이 지점에서 remove!
try {
// 최대 1초의 지연시간을 줍니다.
camelContext.stopRoute(routeDefinition.getRouteId(),
1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("error on stopping Route: {},
errorMsg: {}", routeDefinition, e.getMessage());
}
try {
camelContext.removeRoute(routeDefinition.getRouteId());
} catch (Exception e) {
log.error("error on removing Route: {}, errorMsg: {}",
routeDefinition, e.getMessage());
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
이 방법 찾는데 한달 걸렸네요 ㅋㅋㅋ
위에서 보여드린 Route 강제 종료
는 새로운 프로세스를 생성해서 작업을 실행하는
exec component 와 함께 사용할 때 조심해야 합니다.
왜냐하면 프로세스를 생성한 쪽에서 자기가 만든 프로세스의 응답을 받지
않으면, 만들어진 프로세스는 그대로 zombie 가 되거나, 제대로 자원이 해제되지 않을 수 있습니다. 또는 어떠한 버그를 일으킬 수 있습니다.
저 같은 경우에는 DB 에 아주 무거운 쿼리를 수행하는 sql 을 exec component 를 통해서
날렸는데, apache camel app 을 강제로 종료했더니, DB 명령어의 대상이 되는 테이블에 Lock 이 걸리는 이상현상이 일어나더군요... (자세한 내막은 모르겠지만, 아무튼 버그가 나버렸습니다)
그러므로 exec 는 가볍게 동작할 수 있는 것 위주로 하고
무거운 연산의 경우에는 하나의 direct component + Process
를 만들고,
Process 내에서 java 의 ProcessBuilder 같은 걸 사용하면 좋을 듯합니다.
이때 받은 Process ID(=PID)를 routeId 와 매핑해서 어딘가에 저장해놨다가,
camelContext.stopRoute(routeDefinition.getRouteId(),
1000, TimeUnit.MILLISECONDS);
...를 호출하기 전에 매핑한 저장소에서 routeId 기준으로 PID 가 있는지 찾아내고,
먼저 해당 PID 를 kill 하는 사전작업을 거치도록 하면 됩니다.
아무래도 회사에서 쓰는 Apache Camel 은 XML DSL 을 쓰다보니,
Java DSL 밖에 못쓰는 제 기준에서는 이걸 수동으로 다 바꿔야 되서
여러모로 귀찮습니다.
이때 사용하면 좋은게 ChatGPT 입니다!
아주 잘 변경해줍니다.
저는 질문을 이렇게 하고 나서, XML 을 복붙하면 알아서 수정해주더군요.