ETL (2)

์žญ์žญ์ดยท2021๋…„ 5์›” 10์ผ
1

ETL

๋ชฉ๋ก ๋ณด๊ธฐ
2/3
post-thumbnail

ETL (2)

๐ŸŽ Contents

0. Summary

์ด์ „ ํฌ์ŠคํŠธ ์—์„œ ETL์˜ ๊ธฐ๋ณธ์„ค์ •์„ ์•Œ์•„๋ณด์•˜๋‹ค.
์ด์ œ Spring batch์˜ Job์„ ์ƒ์„ฑํ•˜๊ณ  ItemReader, ItemProcessor, ItemWriter๋ฅผ ์ปค์Šคํ„ฐ๋งˆ์ด์ง•ํ•  ๊ฒƒ์ด๋‹ค.

๋ณด๋‹ค ์ž์„ธํžˆ Spring batch๋ฅผ ์•Œ๊ณ ์‹ถ์œผ๋ฉด ๊ด€๋ จ ํฌ์ŠคํŠธ๋ฅผ ์ฐธ๊ณ ํ•˜์‹œ์˜ค

1. Create Job

๋จผ์ €, Job์„ ์ƒ์„ฑํ•œ๋‹ค.
Job์€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ Step๋“ค๋กœ ๊ตฌ์„ฑ๋˜์–ด์žˆ๋‹ค.

  • job/ETLJob.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ETLJob {
  private static final String JOB_NAME = "etlJob";  

  private final JobBuilderFactory jobBuilderFactory;
  private final StepBuilderFactory stepBuilderFactory;

  @Qualifier("dataSource-dbsource")
  @Autowired
  private DataSource dataSourceDbSource;

  @Qualifier("dataSource-dbtarget")
  @Autowired
  private DataSource dataSourceDbTarget;

  @Value("${chunkSize:100}")
  private int chunkSize;

  @Bean(JOB_NAME)
  public Job job() {
    return jobBuilderFactory.get(JOB_NAME)
            .preventRestart()
            .start(step())
            .build();
  }

  @Bean(JOB_NAME + "_step")
  public Step step() {
  }

  @Bean 
  public JdbcCursorItemReader<Product> reader() {  
  }

  @Bean
  public ItemProcessor<Product, TransProduct> processor() {
  }

  @Bean
  public JdbcBatchItemWriter<TransProduct> writer() {
  }
}
  • preventRestart()
    • ํ•œ ๋ฒˆ ์‹คํ–‰ํ•œ Job์— ๋Œ€ํ•ด ์žฌ์‹œ์ž‘ ๋ฐฉ์ง€
  • dataSourceDbSource
    • db-source dataSource
  • dataSourceDbTarget
    • db-target dataSource
  • chunkSize
    • Database์—์„œ ์ฝ์–ด์˜ฌ ๋ฐ์ดํ„ฐ์˜ ํฌ๊ธฐ.
    • ItemReader๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด๋“ค์—ฌ ResultSet์— ์ €์žฅํ•œ๋‹ค.

2. Create Step

Step์€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ tasklet์œผ๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋‹ค.
tasklet์€ ItemReader, ItemProcessor, ItemWriter๋กœ ์„ธ๋ถ„ํ™”๋œ๋‹ค.

@Bean(JOB_NAME + "_step")
public Step step() {
  return stepBuilderFactory.get(JOB_NAME + "_step")            
          .<Product, TransProduct>chunk(chunkSize)
          .reader(reader())
          .processor(processor())
          .writer(writer())
          .build();
}
  • <Product, TransProduct>chunk(chunkSize)
    • <Product, > : Reader์—์„œ ๋ฐ˜ํ™˜ํ•  ํƒ€์ž…
    • <, TransProduct> : Writer์— ๋„˜์–ด์˜ฌ ํƒ€์ž…

3. Create ItemReader

Database์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ฌ ItemReader๋ฅผ ๋งŒ๋“ค์–ด๋ณด์ž.
๋‹ค์Œ ItemReader๋Š” DB์˜ ๋ฐ์ดํ„ฐ๋ฅผ Cursor๋ฐฉ์‹์œผ๋กœ ๊ฐ€์ ธ์˜จ๋‹ค.

ETL์—์„œ E(Extraction)์˜ ๊ณผ์ •์ด๋‹ค.

@Bean 
public JdbcCursorItemReader<Product> reader() {
  return new JdbcCursorItemReaderBuilder<Product>()
          .fetchSize(chunkSize)
          .dataSource(dataSourceDbSource)
          .rowMapper(new BeanPropertyRowMapper<>(Product.class))
          .sql("SELECT id, name, price, created FROM product")
          .name("JdbcCursorItemReader")
          .build();
}
  • dataSource(dataSourceDbSource)
    • DB(db-source)์— ์ ‘๊ทผํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•  datasource ๊ฐ์ฒด ํ• ๋‹น
  • rowMapper(new BeanPropertyRowMapper<>(Product.class))
    • ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ Java Instance๋กœ ๋งคํ•‘ํ•˜๊ธฐ ์œ„ํ•œ Mapper

4. Create ItemProcessor

ItemProcessor๋Š” ItemReader์—์„œ ๊ฐ€์ ธ์˜จ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณ€ํ™˜/ํ•„ํ„ฐ๋ฅผ ํ•ด์ฃผ๋Š” ์—ญํ• ์ด๋‹ค.

  • Product -> TransProduct
    • Product์˜ ๊ฐ€๊ฒฉ์ด 2500๋ณด๋‹ค ํฌ๋ฉด discount๋ฅผ ํ•ด์ฃผ๋Š” ItemProcessor์ด๋‹ค.

      ETL์—์„œ T(Transformation)์˜ ๊ณผ์ •์ด๋‹ค.

@Bean
public ItemProcessor<Product, TransProduct> processor() {
  return product -> {
    String name = product.getName();
    Long price = product.getPrice();
    LocalDateTime created = product.getCreated();
    if( price > 2500 ) {
      return new TransProduct(name, price - 500L, created, true);
    } else {
      return new TransProduct(name, price, created, false);
    }
  };
}

5. Create ItemWriter

ItemProcessor๋ฅผ ๊ฑฐ์นœ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฅธ DB(db-target)์— ์ €์žฅํ•œ๋‹ค.
Datasource๋Š” db-target์˜ datasource๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

ETL์—์„œ L(Loading)์˜ ๊ณผ์ •์ด๋‹ค.

@Bean
public JdbcBatchItemWriter<TransProduct> writer() {
  return new JdbcBatchItemWriterBuilder<TransProduct>()
              .dataSource(dataSourceDbTarget)
              .sql("INSERT INTO trans_product(name, price, created, discount) values (:name, :price, :created, :discount)")
              .beanMapped()
              .build();
}
  • dataSource(dataSourceDbTarget)
    • db-target์˜ datasource ๊ฐ์ฒด๋ฅผ ํ• ๋‹นํ•œ๋‹ค.
  • beanMapped()
    • JdbcBatchItemWriter์—์„œ POJO๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ @Bean๊ณผ ํ•จ๊ป˜ ์‚ฌ์šฉํ•œ๋‹ค.
    • POJO๊ฐ€ ์•„๋‹Œ Map<Key, Value>๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์‚ฌ์šฉํ•˜์ง€ ์•Š์Œ.

6. Execute

--job.name์„ ์ ์–ด spring batch๊ฐ€ ์‹คํ–‰ํ•  job์„ ๋ช…์‹œํ•œ๋‹ค.
ํ•ด๋‹น ์„ค์ •์€ application.yml์— ์žˆ๋‹ค.

์ฃผ์˜: Spring batch์˜ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ํ…Œ์ด๋ธ”์—์„œ๋Š” ์„ฑ๊ณตํ•œ job์— ๋Œ€ํ•ด ์ค‘๋ณต์‹คํ–‰ ๋ฐฉ์ง€๋ฅผ ํ•˜๊ณ  ์žˆ๋‹ค.
๋‹ค๋ฅธ๋ง๋กœ ์•„๋ž˜์˜ ๋ช…๋ น์–ด๋กœ๋Š” etlJob์„ ์—ฌ๋Ÿฌ๋ฒˆ ์‹คํ–‰ ํ•  ์ˆ˜ ์—†๋‹ค๋Š” ๋ง์ด๋‹ค.
ํŽธ๋ฒ•์ด์ง€๋งŒ java -jar *.jar --job.name=etlJob version=1๊ณผ ๊ฐ™์ด ์•„๋ฌด ์‹œ์Šคํ…œํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๋„ฃ์–ด์ฃผ๋ฉด ์ค‘๋ณต์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

./gradlew build -x test

java -jar ./build/libs/*.jar --job.name=etlJob


## 7. Check
```sh
docker exec -it db-target bash

mysql -u rivernine -p
# Enter password
use etl;

select * from trans_product;

db-target etl database์˜ trans_product ํ…Œ์ด๋ธ”์„ ์กฐํšŒํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋ฐ์ดํ„ฐ๊ฐ€ ์‚ฝ์ž…๋œ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.


๋ชจ๋“  ์†Œ์Šค๋Š” Github์— ์˜ฌ๋ ค๋†“์•˜๋‹ค.

0๊ฐœ์˜ ๋Œ“๊ธ€