Apache Camel 사용법 기록

식빵·2023년 12월 20일
0

Java Lab

목록 보기
18/29
post-custom-banner

회사에서 Apache Camel 을 쓸 일이 있어서 공부중인데,
사용했던 테스트 코드를 그냥 버리기엔 아까워서 여기에 기록합니다.
이 글에서는 Apache Camel 이 뭔지에 대한 기초적인 내용은 다루지 않습니다.
사실 아직 완전히 감을 잡은게 아니라서 못쓰는 겁니다 ㅎㅎ...

참고1 : Apache Camel ver.4.2.0 를 사용했습니다.
참고2 : XML DSL 도 원래는 되야하는데 이상하게 에러가 하도 나서
일단 보류했습니다. 대신 가장 대세인 Java DSL 방식으로 테스트를 진행했습니다.

참고: 아파치 카멜(Apache camel)이란?
What is Apache Camel?
저도 아직 말로 설명하는데 어려움이 있어서 참고 링크를 답니다.



💻 테스트 환경 세팅


개발 환경

  • OS : Window 10 Pro
  • IDE : IntelliJ Ultimate
  • jdk : Azul 17
  • framework : spring boot-3.1.6

Maven - pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.1.6</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>me.dailycode</groupId>
	<artifactId>apache-camel</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>apache-camel</name>
	<description>apache-camel</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-spring-boot-starter</artifactId>
			<version>4.2.0</version>
		</dependency>

		<!-- https://camel.apache.org/components/latest/sql-component.html -->
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-sql-starter</artifactId>
			<version>4.2.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-exec-starter</artifactId>
			<version>4.2.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel.springboot</groupId>
			<artifactId>camel-jaxb-starter</artifactId>
			<version>4.2.0</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-test-spring-junit5</artifactId>
			<version>3.21.0</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.awaitility</groupId>
			<artifactId>awaitility</artifactId>
			<version>4.2.0</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.30</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<image>
						<builder>paketobuildpacks/builder-jammy-base:latest</builder>
					</image>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>



로깅 포인트 잡아주기


일단 logback 세팅을 좀 하고 가겠습니다.

  • test/resources/application.properties
# 스프링 부트 테스트와 일반적인 junit 테스트 모두 똑같은
# 로그 레벨 사용을 위해서 logback-test.xml 파일을 사용
logging.config=classpath:logback-test.xml

  • test/resources/logback-test.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <timestamp key="BY_DATE" datePattern="yyyy-MM-dd"/>
    <property name="LOG_PATTERN"
              value="[%d{yyyy-MM-dd HH:mm:ss}:%-4relative] %green([%thread]) %highlight(%-5level) %boldWhite([%C.%M:%yellow(%L)]) - %msg%n"/>
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${LOG_PATTERN}</pattern>
        </encoder>
    </appender>
    <logger name="me.dailycode.apachecamel" level="DEBUG" additivity="false">
        <appender-ref ref="CONSOLE" />
    </logger>
    <logger name="org.apache.camel.processor" level="DEBUG" additivity="false">
        <appender-ref ref="CONSOLE" />
    </logger>
    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>





🔥 테스트 코드


direct 즉시 시작하기

아는 분들은 알겠지만 camelContext.start() 를 호출해도 trigger 가 되는
Producer Endpoint 가 있고(ex: file, timer 등)
아닌 게(ex: direct, mock 등) 있습니다.

특히 direct 는 자기만으로는 시작이 절대 안됩니다.
하지만 ProducerTemplate 을 사용하면 강제로 시작시킬 순 있습니다!

package me.dailycode.apachecamel;

import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;

public class DirectStartTest {
    @Test
    void testDirect() throws Exception {
        DefaultCamelContext camelContext = new DefaultCamelContext();

        RouteBuilder.addRoutes(camelContext, in -> {
            in.from("direct:start")
                    .log("wow")
                    .to("mock:done");
        });

        camelContext.start();
        
        // 반드시 camelContext 가 시작된 이후 시점에 template.send 해야 합니다. 
        ProducerTemplate template 
        	= camelContext.createProducerTemplate();
        template.sendBody("direct:start", "This is a test message");
        camelContext.stop();
    }
}

참고: https://stackoverflow.com/questions/9636651/apache-camel-directstart-endpoint-what-does-it-mean




핵심적인 4가지 요소 세팅법

제가 생각하는 가장 핵심이 되는 4가지 세팅법을 이 목차에 작성해봅니다.

  • Registry 등록법
  • Property 세팅법
  • Body 세팅법
  • Processor 세팅법

package me.dailycode.apachecamel;

import lombok.extern.slf4j.Slf4j;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;

import java.util.Map;
import java.util.Map.Entry;

@Slf4j
public class CamelBasicTests {

    // https://camel.apache.org/components/latest/languages/simple-language.html
    // https://camel.apache.org/components/next/eips/message-translator.html
    @Test
    public void test() throws Exception {

        DefaultCamelContext camelContext = new DefaultCamelContext();

        // Camel Registry 설정
        
        camelContext.getRegistry().bind("ProcessorInRegistry", new ProcessorInRegistry());
        // 참고: ProcessorInRegistry 는 제가 만든 단순 테스트용 
        //      org.apache.camel.Processor 를 implement 한 클래스입니다.

        // 중간중간 보면 "${}" 처럼 무슨 표현식을 사용하는데, 
        // 이건 simple expression(또는 Language) 라는 것이다.
        // 참고: https://camel.apache.org/components/latest/languages/simple-language.html

        RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {

            builder
            	.from("timer://scheduleStart?repeatCount=1")
                
                // ** 프로퍼티 세팅법 **
                .setProperty("tableName", () -> "half_and_half")
                .setProperty("srid", () -> 5186)
                .setProperty("charset", () -> "UTF-8")
                .setProperty("my_camelId")
                	.simple("${camelId}") // use simple lang


				// ** body 세팅법 **
                //: 참고로 body 는 from 에서 사용하는 Componenet 가 뭐냐에 따라 default 로 있을 수도, 
                //: 없을 수도 있다.

                // --> constant 를 사용한 방법
				// .setBody().constant(new HashMap<String, Object>())
				// --> simple lang 을 사용한 방법 3가지, 모두 같은 효과임.
				// .setBody(new SimpleExpression("${empty(map)}"))
				// .setBody(builder.simple("${empty(map)}"))
                .setBody().simple("${empty(map)}")


                // ** process 세팅법 **
                // --> 람다식 사용
                .process(exchange -> {
                     Map<String, Object> properties = exchange.getProperties();
                        Map<String, Object> body = exchange.getIn().getBody(Map.class);
                     log.info("first route body state : {}", body);
                     body.put("more", "info");
                })
                // --> 클래스 타입 String 사용. 
                // 여기서 쓰는 JustClass 는 Processor 를 implement 한 클래스입니다.
                .process("#class:me.dailycode.apachecamel.JustClass")
                
                // --> camelContext.getRegistry().bind 를 통한 binding 방법!
                .process("ProcessorInRegistry") // registry 로 등록한 것을 사용
                .to("direct:aaa");
        });


        // 이렇게 .to("mock:???"); 으로 끝내주지 않으면
        // "Waiting as there are still 1 inflight and pending exchanges 
        // to complete, timeout in ?? seconds. Inflights per route"
        // 와 같은 경고문이 나오는데, 이건 아무래도 from 에서 시작하는 게 
        // 가끔은 계속 쓰레드를 물고 있는 경우가 있어서 그렇다.
        // 특히 timer 가 그렇다. timer 를 한번만(repeatCount=1) 사용해도 상황은 같다.
        // 억지로라도 mock endpoint 를 줌으로써 이를 방지할 수 있다.
        RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
            builder
            	.from("direct:aaa")
                .log("We received ${body}")
                .process(exchange -> {
                	Map<String, Object> properties = exchange.getProperties();
                    Object body = exchange.getIn().getBody();
                    log.info("second route body state : {}", body);
                    for (Entry<String, Object> entry : properties.entrySet()) {
                    	log.info("[ key: {} | value: {} ]", entry.getKey(), entry.getValue());
                    }
                })
                .to("mock:done");
        });

        camelContext.start();
        Thread.sleep(2000);
        camelContext.stop();
    }
}




classPath 에 있는 클래스의 method 호출




Loop 사용 + Simple Lang 응용

package me.dailycode.simple_lang;

import lombok.extern.slf4j.Slf4j;
import org.apache.camel.LoggingLevel;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;

@Slf4j
public class SimpleLangStringTests {
  @Test
  @DisplayName(
      "property 로 전달된 문자열을 자르고 편집하는게 simple lang 만으로 될까?")
  void
  stringSubConcatTest() throws InterruptedException {
    try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
      RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
        builder.from("direct:start")
            .process(exchange -> {
              exchange.setProperty("targetFilePath",
                  "D:\\shape_files\\appending_test_data\\copied_dir\\half_and_half.zip");
            })
            .to("direct:goto1");
      });

      RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
        builder
            .from("direct:goto1")
            // .setProperty("shapeFilePath")
            	//.simple("${exchangeProperty[targetFilePath].substring(0,10)}")
            // .setProperty("shapeFilePath")
            	//.simple("${exchangeProperty[targetFilePath].lastIndexOf('\\')}")
            .setProperty("shapeFilePath")
            .simple("${exchangeProperty[targetFilePath]"
                + ".substring("
                + "0,"
                + "${exchangeProperty[targetFilePath].lastIndexOf('.')}"
                + ")}"
                + ".shp") // zip 파일의 확장자를 shp 으로 변경하고 다음 router 에 전달
            .to("direct:goto2");
      });

      RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
        builder.from("direct:goto2")
            .process(exchange -> {
              System.out.println("targetFilePath : "
                  + exchange.getProperty("targetFilePath", String.class));
              System.out.println("shapeFilePath : "
                  + exchange.getProperty("shapeFilePath", String.class));
            })
            .to("mock:end");
      });

      camelContext.start();
      ProducerTemplate template = camelContext.createProducerTemplate();
      template.sendBody("direct:start", new HashMap<String, Object>());
    } catch (IOException e) {
      log.error("IOException occurred : {}", e.getMessage());
    } catch (Exception e) {
      log.error("Exception occurred : {}", e.getMessage());
    }
  }

  @Test
  @DisplayName(
      "property 로 전달된 문자열을 자르고 편집하는게 loop 문 내부에서도 가능할까?")
  void
  stringSubConcatWhileLoopTest() {
    try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
      RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
        builder.from("direct:start")
            .process(exchange -> {
              List<Object> listOfShapeZips = List.of(
                  "D:\\shape_files\\appending_test_data\\copied_dir\\one.zip",
                  "D:\\shape_files\\appending_test_data\\copied_dir\\two.zip",
                  "D:\\shape_files\\appending_test_data\\copied_dir\\three.zip");
              exchange.setProperty("listOfShapeZips", listOfShapeZips);
            })
            .to("direct:goto1");
      });

      // https://camel.apache.org/components/latest/eips/loop-eip.html
      RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
        builder.from("direct:goto1")
            .log("total Loop Count => ${exchangeProperty[listOfShapeZips].size}")
            .loop()
            .simple("${exchangeProperty[listOfShapeZips].size}").copy()
                .setProperty("loop_idx")
                .simple("${exchangeProperty.CamelLoopIndex}")
                .setProperty("currentZipFile")
                .simple(
                    "${exchangeProperty.listOfShapeZips[${exchangeProperty.loop_idx}]}")
                .log(
                    "${date:now:yyyy/MM/dd HH:mm:ss.SSS} - Loop Idx = ${exchangeProperty.loop_idx}")
                .process(exchange -> {
                  String currentZipFile =
                      exchange.getProperty("currentZipFile", String.class);
                  System.out.println("currentZipFile = " + currentZipFile);
                })
                // .to("direct:whileScope") ==> endpoint 도 (당연히) 사용할 수 있다.
            .end()
            .log(LoggingLevel.INFO,
                "camel Context done! ${camelId}"); // loop 를 다 끝내고 갈 곳.
      });

      camelContext.start();
      ProducerTemplate template = camelContext.createProducerTemplate();
      template.sendBody("direct:start", new HashMap<String, Object>());
    } catch (IOException e) {
      log.error("IOException occurred : {}", e.getMessage());
    } catch (Exception e) {
      log.error("Exception occurred : {}", e.getMessage());
    }
  }
}




SQL 단건 INSERT


@Slf4j
public class InsertOnlyOneRowTests {

    // https://camel.apache.org/components/latest/eips/loop-eip.html

    static HikariDataSource hikariDataSource;

    @BeforeAll
    static void beforeAll() {
        // SQL 컴포넌트를 위한 DataSource 생성
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/postgres");
        config.setUsername("postgres");
        config.setPassword("postgres");
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        hikariDataSource = new HikariDataSource(config);
    }

    @AfterAll
    static void afterAll() {
        hikariDataSource.close();
    }

    // https://camel.apache.org/components/latest/sql-component.html#_component_options
    // https://camel.apache.org/components/latest/sql-component.html#_uri_format
    // https://camel.apache.org/components/latest/sql-component.html#_using_named_parameters
    @Test
    void testLoop1() throws Exception {

        try (DefaultCamelContext camelContext = new DefaultCamelContext()) {

            // datasource 를 route 에서 자유롭게 사용하기 위해서 레지스트리 등록
            camelContext.getRegistry().bind("MyDataSource", hikariDataSource);

            RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
                builder
                    .from("timer://scheduleStart_2?repeatCount=1")
                .process(exchange -> {
                    Integer property = exchange.getProperty(Exchange.LOOP_INDEX, Integer.class);
                    log.info("index inside processor - {}", property);
                    HashMap<String, Object> body = new HashMap<>();
                    ArrayList<Map<String, Object>> list = new ArrayList<>();
                    Map<String, Object> single = Collections.singletonMap("today", "2024-01-01");
                    list.add(single);
                    body.put("data", list);
                    body.put("mySeq", 300);
                    exchange.getMessage().setBody(body);
                })
                .log("log:${body}") // CamelLogger 를 console 창에서 검색
                .to("sql:insert into public.somesome(id,name) " +
                        "values(:#${body[mySeq]}, :#${body[data][0][today]})" +
                        "?dataSource=#MyDataSource");
            });
            camelContext.start();
            Thread.sleep(3000);
            camelContext.stop();
        }
    }
}
  • 참고 (1)

    • sql component 에서 simple lang 을 통해서 Propertes 또는 Body 의 데이터를 읽어 오는데, 이때 실제 데이터의 타입을 체크하여 데이터를 넣습니다.
    • ex:
      • insert into some(name) values (:#${body[name]})
      • insert into some(name) values (':#${body[name]}')
  • 참고 (2)

    • 위 예시처럼 Body 의 특정 값을 가져오기 위해서는 웬만해서는 [?]
      통해서 가져오시기 바랍니다.
    • 위 예시 코드에서body[data][0].today 처럼하면 에러가 날겁니다.
    • 위 예시 처럼 body[data][0][today] 사용하면 문제없습니다.




Loop + SQL Insert

package me.dailycode.apachecamel;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;

@Slf4j
public class CamelLoopingAndSqlTests {

    // https://camel.apache.org/components/latest/eips/loop-eip.html

    static HikariDataSource hikariDataSource;

    @BeforeAll
    static void beforeAll() {
        // SQL 컴포넌트를 위한 DataSource 생성
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/postgres");
        config.setUsername("postgres");
        config.setPassword("postgres");
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        hikariDataSource = new HikariDataSource(config);
    }

    @Test
    void testLoop1() throws Exception {

        DefaultCamelContext camelContext = new DefaultCamelContext();

        // datasource 를 route 에서 자유롭게 사용하기 위해서 레지스트리 등록
        camelContext.getRegistry().bind("MyDataSource", hikariDataSource);

        // 하고자 하는 일,
        // create table public.somesome(id int4, name varchar(255)); 라는 테이블을 미리 만들어 두고,
        // 해당 테이블에 SQL 을 총 5번 날려서 insert 할 거임
        // 이때 id 는 Camel Loop 의 index 를 사용할 것이며,
        // 명칭은 그냥 dailyCode 으로 고정한다.

        RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
            builder
            .from("timer://scheduleStart_2?repeatCount=1")
            .loop(8).copy()
            .log("This Is - ${exchangeProperty.CamelLoopIndex}") 
            .process(exchange -> {
                Integer property = exchange.getProperty(Exchange.LOOP_INDEX, Integer.class);
                log.info("index inside processor - {}", property);
            })
            .to("sql:insert into public.somesome(id,name) " +
                    "values(:#${exchangeProperty.CamelLoopIndex}, 'dailyCode')" +
                    "?dataSource=#MyDataSource");
        });
        camelContext.start();
        Thread.sleep(3000);
        camelContext.stop();
    }


    @Test
    void testListOfMap() throws Exception {
        DefaultCamelContext camelContext = new DefaultCamelContext();

        // datasource 를 route 에서 자유롭게 사용하기 위해서 레지스트리 등록
        camelContext.getRegistry().bind("MyDataSource", hikariDataSource);

        // 먼저 테이블 생성하시고~
        /*
        create table public.somebody
        (
            id   int4,
            name varchar(255),
            address varchar(255),
            age int4
        );
        */

        var listOfSomebody = List.of(
            Map.of(
            	"id", 1,
            	"name", "dailyCode",
            	"address", "서울시 구로동",
            	"age", 20
            ),
            Map.of(
            	"id", 2,
            	"name", "codingToastBread",
            	"address", "식빵나라 식빵시",
            	"age", 25
            )
        );

        RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
            builder.from("timer://scheduleStart_1?repeatCount=1")
                    .setBody().constant(listOfSomebody)
                    .log("initial body - ${body}")
                    .loop().simple("${body.size}").copy()
                    .log("${body[${exchangeProperty.CamelLoopIndex}]}")
                    .to("mock:done");
        });

        camelContext.start();
        Thread.sleep(2000);
        camelContext.stop();
    }

}
  • 참고
    • sql: select now() as today 같이 어떤 결과값이 생길 수 있는데, 이러한 결과들은
      body 에 List 형태로 담겨서 다음으로 보내집니다.
    • 그러니 다음 router 에서 simple lang 로 데이터를 보려면?
    • ${body[0][today]} , 여기서 today 는 select 문에서 지정한 alias 입니다.

참고




File 옮기기/복사하기

package me.dailycode.apachecamel;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;

public class FileCopyTest {

    private static final String SOURCE_FOLDER = "D:\\ndata\\dummy_files";
    private static final String DESTINATION_FOLDER = 
    					"D:\\ndata\\dummy_files_destination";
    @Test
    void testCopy() throws Exception {
        DefaultCamelContext camelContext = new DefaultCamelContext();

        RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
        	
            // 복사하라면 이걸 사용.
            builder.from("file://" + SOURCE_FOLDER + "?noop=true")
                    .to("direct:goto1");
                    
            // 옮기려는 거면 아래 걸 사용
            // builder.from("file://" + SOURCE_FOLDER + "?delete=true")
            //        .to("direct:goto1");
        });

        // ~중간 로깅. 특별한 의미는 없음. 굳이 한번 나눠봄 ㅎㅎ;; ~
        RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
            builder.from("direct:goto1")
                    .log("file:ext = ${file:ext}")
                    .log("file:onlyname.noext = ${file:onlyname.noext}")
                    .log("file:name = ${file:name}")
                    .to("direct:goto2");
        });

        RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
            builder.from("direct:goto2")
                     .setHeader("Exchange.FILE_NAME").simple("${file:onlyname.noext}-done.${file:ext}")
                    .to("file://" + DESTINATION_FOLDER);
        });

        camelContext.start();
        Thread.sleep(2000);
        camelContext.stop();
    }
}

noop=true 옵션을 안 주면, file component 의 기본 동작이 발동합니다.
file component 는 파일을 옮기고 나서, 파일 출처지 에서 camel/ 이라는
디렉토리를 생성하고 그 안에 작업했던 파일들을 옮깁니다.
저는 이 동작이 싫어서 일부러 noop=true 처리했습니다.

주의!
참고로 file (producer) 컴포넌트에는 fileName 이라는 옵션이 있는데,
해당 옵션은 Wild Card 기능을 제공하지 않습니다! 주의하시기 바랍니다.

// 동작이 정상적으로 안될 겁니다.
builder.from("file://" + SOURCE_FOLDER + "?fileName=*.txt&noop=true")`

또한 아래처럼 fileName 에 아무 값도 안 넣어줘도 동작을 안할겁니다.

builder.from("file://" + SOURCE_FOLDER + "?fileName=")`




Exec 사용해서 명령어 날리기

stdout 결과만 받기

아래처럼 하면 stdout 스트림만 받습니다

package me.dailycode.apachecamel.exec_camel;

import lombok.extern.slf4j.Slf4j;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;

import java.util.HashMap;

@Slf4j
public class ExecCamelTests {
    @Test
    void testExecCamel() throws Exception {
        try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
            RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
                builder
                    .from("direct:start")
                    .to("exec:ipconfig?args=/all")
                    .convertBodyTo(String.class, "EUC-KR")
                    .process(exchange -> {
                        String body = exchange.getIn().getBody(String.class);
                        log.info(body);
                    });
            });

            camelContext.start();
            ProducerTemplate template = camelContext.createProducerTemplate();
            template.sendBody("direct:start", new HashMap<String, Object>());
            camelContext.stop();
        }
    }
}

stdout, stderr 둘 다 받기

앞서 본 코드에서는 안탑깝게도 stderr 스트림을 받지 못합니다.
stdout, stderr 둘 다 받고 싶다면 코딩 작업이 더 수반됩니다.

package me.dailycode.apachecamel.exec_camel;

import lombok.extern.slf4j.Slf4j;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.exec.ExecResult;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;

@Slf4j
public class ExecCamelComponentTests {

    @Test
    void testExecCamel() throws Exception {
        try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
            // window 의 경우 대다수가 기본 인코딩이 window-949 ( EUC-KR 호환되는 거 ) 입니다.
            // 그래서 아래처럼 charset 을 생성했습니다.
            Charset charset
                    = System.getProperty("os.name").toLowerCase().contains("win") ?
                    Charset.forName("EUC-KR") : StandardCharsets.UTF_8;

            RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
                builder
                .from("direct:start")
                .to("exec:ogr2ogr?args=-append --config PG_USE_COPY YES -progress -f \"PostgreSQL\" \"PG:host=localhost port=5432 user=postgres password=postgres dbname=postgres\" \"PG:host=localhost port=5432 user=postgres password=postgres dbname=postgres\" -nln \"coding_toast.half_and_half2\" -sql \"SELECT A.id AS id, A.name AS name, A.geom AS geom FROM coding_toast.half_and_half A\" -nlt point --config SHAPE_ENCODING UTF-8 -lco GEOMETRY_NAME=geom")
                .process(exchange -> {
                    // stderr, stdout 모두를 담고 있는 ExecResult 를 추출한다.
                    ExecResult execResult = exchange.getIn().getBody(ExecResult.class);

                    // stdOut 처리
                    String stdOutString = streamToString(charset, execResult.getStdout());
                    loggingResultMessage(stdOutString, "Success");

                    // stdErr 처리
                    String stdErrString = streamToString(charset, execResult.getStderr());
                    loggingResultMessage(stdErrString, "Error(Or Warning)");

                    // 참고(중요): stdOut, stdErr 둘 다 있을 수 있습니다!
                })
                .to("direct:goto1");
            });

            RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
                builder.from("direct:goto1")
                        .log("done")
                        .to("mock:end");
            });

            camelContext.start();
            ProducerTemplate template = camelContext.createProducerTemplate();
            template.sendBody("direct:start", new HashMap<String, Object>());
        }
    }

    private static String streamToString(Charset charset, InputStream stream) throws IOException {
        if (stream != null) {
            try (stream) {
                return StreamUtils.copyToString(stream, charset);
            }
        }
        return "";
    }

    /**
     * 로그 이쁘게 찍기 위한 Helper 메소드
     *
     * @param returnMessage stream 을 통해서 받은 메세지
     * @param result        메세지와 관련된 결과가 Success 인지 아닌지를 판단하는 문자열
     */
    private static void loggingResultMessage(String returnMessage, String result) {
        if (StringUtils.hasText(returnMessage)) {
            log.info("""
                        {} return Message :\s
                        ========================================
                        {}
                        ========================================\
                        """,
                    result, returnMessage);
        }
    }
}





Endpoint URI 동적 세팅

아마 아래와 같은 코드를 원할 수도 있다.

RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
    .from("direct:start")
    .setProperty("host").constant("localhost")
    .to("exec:ogr2ogr?" +
            "args=${exchangeProperty.host}")
    .process(exchange -> {
       /* 생략 */
    })
    .to("direct:goto1");
});

하지만 이러면 ${} 이 어떤 치환작업이 일어나지 않고,
그대로 ${} 형태로 uri 에 세팅된다.


만약 ${} 가 치환이 되어서 동적으로 Endpoint URI 가 생성되게 하고 싶다면
to() 대신에 toD() 를 사용하면 된다.

RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
builder
    .from("direct:start")
    .setProperty("args").constant("/all")
    .toD("exec:ipconfig?args=${exchangeProperty.args}")
    .process(exchange -> {
        /* 생략 */
    })
    .to("direct:goto1");
});

다만 중간중간에 "(쌍따옴표)를 사용하면 아주 높은 확률로 위와 같은 방식에서
에러가 날 가능성이 높습니다!

이럴 때는 apache camel exec 에서 기본으로 제공되는 방법을 응용하는 게 좋습니다.

package me.dailycode.camel._01;

import lombok.extern.slf4j.Slf4j;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.List;

@Slf4j
public class SimpleDirectTest {

    @Test
    void testExecCamel() throws Exception {
        try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
            RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
                builder
                    .from("direct:step1")
                    .process(exchange -> {

                        List<String> commandArgs = Arrays.asList(
                        "-overwrite",
                        "--config",
                        "PG_USE_COPY",
                        "YES",
                        "-progress",
                        "-f", "PostgreSQL",
                        "PG:host=localhost port=5432 dbname=postgres schemas=coding_toast user=postgres password=postgres",
                        "D:\\data\\shape\\EZM_ECO_L\\12.shp",
                        "-sql",
                        "select \\\"식물군락명\\\" as \\\"식물군락명\\\", \\\"군락기호\\\" as \\\"군락기호\\\" from \\\"12\\\"",
                        "-nlt", "MULTILINESTRING",
                        "-lco", "GEOMETRY_NAME=geom",
                        "-nln", "EZM_ECO_L",
                        "--config", "SHAPE_ENCODING", "EUC-KR",
                        "-lco", "precision=NO"
                        );
                        exchange.getIn().setHeader("CamelExecCommandArgs", commandArgs);
                    })
                    .to("exec:ogr2ogr");
            });
            camelContext.start();
            ProducerTemplate template = camelContext.createProducerTemplate();
            template.sendBody("direct:step1", "");
            camelContext.stop();
        }
    }
}

위 코드를 보면 알겠지만 먼저 필요한 파라미터들을 모아서 List 에 저장하고,
이를 Header 에 CamelExecCommandArgs 라는 key 로 값을 넣기만 하면 끝입니다.
이러면 exec:명령어 가 실행될 때 자동으로 명령어의 인자값으로 사용합니다.

그리고 주의사항으로 List 에 지정된 값들을 보면 중간중간 " (쌍따옴표)가 있는
문자열이 있습니다. 여기에는 해당 " 바로 앞에 escape 문자열(\) 를 붙여줘야만 합니다.
그래야 정상적으로 동작합니다.

ex) 안녕"하"세요 ==> 안녕\"하\"세요





다른 클래스의 코드 빌려쓰기

이 예시에서 사용된 클래스는 commons-io:commons-ioFilenameUtils 클래스입니다.

package me.dailycode.camel._01;

import lombok.extern.slf4j.Slf4j;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.io.FilenameUtils;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

@Slf4j
public class SimpleDirectTest {

    @Test
    void testExecCamel() throws Exception {
        try (DefaultCamelContext camelContext = new DefaultCamelContext()) {
            camelContext.getRegistry().bind("fileNameUtils", FilenameUtils.class);

            HashMap<String, Object> messageBody = new HashMap<>();
            messageBody.put("filePath", "D:\\coding\\toast\\test.csv");

            // 아래 3개의 메소드를 simple 에서 호출해보는 것이 목표다.
            FilenameUtils.getFullPathNoEndSeparator(messageBody.get("filePath").toString());
            FilenameUtils.getExtension(messageBody.get("filePath").toString());
            FilenameUtils.getExtension(messageBody.get("filePath").toString());

            // 참고1: 여기서는 static 메소드를 사용했지만, 일반 메소드도 호출이 가능하다!
            // 참고2: 만약에 클래스의 생성자가 private 이면 사용이 불가하다. ex: java.lang.Math 클래스가 대표적이다.

            RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
                builder
                        .from("direct:step1")
                        .setProperty("baseName").simple("${bean:fileNameUtils?method=getBaseName(${body[filePath]})}")
                        .setProperty("extension").simple("${bean:fileNameUtils?method=getExtension(${body[filePath]})}")
                        .setProperty("parentDir").simple("${bean:fileNameUtils?method=getFullPathNoEndSeparator(${body[filePath]})}")
                        .process(exchange ->  {
                            for (Map.Entry<String, Object> ent : exchange.getProperties().entrySet()) {
                                System.out.printf("[ %s  = %s ]%n", ent.getKey(), ent.getValue());
                            }
                        })
                        .to("mock:job_done")
                ;
            });
            camelContext.start();
            ProducerTemplate template = camelContext.createProducerTemplate();
            template.sendBody("direct:step1", messageBody);
            camelContext.stop();
        }
    }
}





ServiceSupport 를 통한 시작/중지 Hook 넣기

org.apache.camel.support.ServiceSupport 를 Processor 클래스 작성 시에
확장해서 사용하면 해당 processor 가 시작/중지(=shutdown) 될 때의 순간을 잡아낼 수 있습니다.

import org.apache.camel.support.ServiceSupport;
import org.apache.camel.Processor;
import org.apache.camel.Exchange;

public class SomeProcessor extends ServiceSupport implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {
      // 생략...
    }
    

    @Override
    protected void doStart() throws Exception {
        System.out.println("START!!!!");
    }

    @Override
    protected void doStop() throws Exception {
    	System.out.println("Shutdown Call!!!!");
        // 더 이상 필요없는 자원은 해제할 때 아주 유용합니다.
    }
}





실행 중 Route 강제 종료하기

package me.dailycode.apachecamel.exec_camel;

import lombok.extern.slf4j.Slf4j;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.junit.jupiter.api.Test;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
public class PreAndPostCompTests {

    @Test
    void testPreAndPost() {

        try (DefaultCamelContext camelContext = new DefaultCamelContext()) {

            RouteBuilder.addRoutes(camelContext, (RouteBuilder builder) -> {
                builder
                .from("timer://scheduleStart?repeatCount=1")
                // 엄청 오래 걸리는 작업
                .process("엄청 오래 걸림")
                .to("mock:done")
                ;
            });
            
            camelContext.start();

			// 2 초 후 route 정지 시키기
            Thread.sleep(2000L);

            List<RouteDefinition> routeDefinitions = camelContext.getRouteDefinitions();
            Iterator<RouteDefinition> iterator = routeDefinitions.iterator();

			// iterator 를 통해서 지워야합니다~ 안 그러면 
            // ConcurrentModificationException 에러가 납니다~
            while (iterator.hasNext()) {
                RouteDefinition routeDefinition = iterator.next();
                iterator.remove(); // 반드시 이 지점에서 remove!

                try {
                	// 최대 1초의 지연시간을 줍니다.
                    camelContext.stopRoute(routeDefinition.getRouteId(), 
                    	1000, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    log.error("error on stopping Route: {}, 
                    errorMsg: {}", routeDefinition, e.getMessage());
                }

                try {
                    camelContext.removeRoute(routeDefinition.getRouteId());
                } catch (Exception e) {
					log.error("error on removing Route: {}, errorMsg: {}", 
                    	routeDefinition, e.getMessage());
                }
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

이 방법 찾는데 한달 걸렸네요 ㅋㅋㅋ



주의! Route 강제 종료와 exec 컴포넌트

위에서 보여드린 Route 강제 종료 는 새로운 프로세스를 생성해서 작업을 실행하는
exec component 와 함께 사용할 때 조심해야 합니다.

왜냐하면 프로세스를 생성한 쪽에서 자기가 만든 프로세스의 응답을 받지
않으면, 만들어진 프로세스는 그대로 zombie 가 되거나, 제대로 자원이 해제되지 않을 수 있습니다. 또는 어떠한 버그를 일으킬 수 있습니다.

저 같은 경우에는 DB 에 아주 무거운 쿼리를 수행하는 sql 을 exec component 를 통해서
날렸는데, apache camel app 을 강제로 종료했더니, DB 명령어의 대상이 되는 테이블에 Lock 이 걸리는 이상현상이 일어나더군요... (자세한 내막은 모르겠지만, 아무튼 버그가 나버렸습니다)

그러므로 exec 는 가볍게 동작할 수 있는 것 위주로 하고
무거운 연산의 경우에는 하나의 direct component + Process 를 만들고,
Process 내에서 java 의 ProcessBuilder 같은 걸 사용하면 좋을 듯합니다.
이때 받은 Process ID(=PID)를 routeId 와 매핑해서 어딘가에 저장해놨다가,

camelContext.stopRoute(routeDefinition.getRouteId(), 
                    	1000, TimeUnit.MILLISECONDS);

...를 호출하기 전에 매핑한 저장소에서 routeId 기준으로 PID 가 있는지 찾아내고,
먼저 해당 PID 를 kill 하는 사전작업을 거치도록 하면 됩니다.





📝 Reference


전반적 참고사항




레지스트리 등록법




(내 기준) 주요 컴포넌트 및 EIP




Simple Language 문법




XML DSL To Java DSL

아무래도 회사에서 쓰는 Apache Camel 은 XML DSL 을 쓰다보니,
Java DSL 밖에 못쓰는 제 기준에서는 이걸 수동으로 다 바꿔야 되서
여러모로 귀찮습니다.

이때 사용하면 좋은게 ChatGPT 입니다!
아주 잘 변경해줍니다.

저는 질문을 이렇게 하고 나서, XML 을 복붙하면 알아서 수정해주더군요.

  • apache camel 이 뭔지 알지? 내가 그거 관련해서 계속 질문을 할거야
  • 내가 지금부터 XML DSL 을 여기에 작성하면, 그걸 Java DSL 로 변경하는 작업을 너에게 맡길거야. 할 수 있겠니?
  • 이후 XML DSL 복붙하면 끝




이 게시물에서 다루지 않은 것

profile
백엔드를 계속 배우고 있는 개발자입니다 😊
post-custom-banner

0개의 댓글