์ด์ ํฌ์คํธ ์์ ETL์ ๊ธฐ๋ณธ์ค์ ์ ์์๋ณด์๋ค.
์ด์ Spring batch์ Job์ ์์ฑํ๊ณ ItemReader, ItemProcessor, ItemWriter
๋ฅผ ์ปค์คํฐ๋ง์ด์งํ ๊ฒ์ด๋ค.
๋ณด๋ค ์์ธํ Spring batch๋ฅผ ์๊ณ ์ถ์ผ๋ฉด ๊ด๋ จ ํฌ์คํธ๋ฅผ ์ฐธ๊ณ ํ์์ค
๋จผ์ , Job์ ์์ฑํ๋ค.
Job์ ์ฌ๋ฌ ๊ฐ์ Step๋ค๋ก ๊ตฌ์ฑ๋์ด์๋ค.
job/ETLJob.java
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ETLJob {
private static final String JOB_NAME = "etlJob";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Qualifier("dataSource-dbsource")
@Autowired
private DataSource dataSourceDbSource;
@Qualifier("dataSource-dbtarget")
@Autowired
private DataSource dataSourceDbTarget;
@Value("${chunkSize:100}")
private int chunkSize;
@Bean(JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(step())
.build();
}
@Bean(JOB_NAME + "_step")
public Step step() {
}
@Bean
public JdbcCursorItemReader<Product> reader() {
}
@Bean
public ItemProcessor<Product, TransProduct> processor() {
}
@Bean
public JdbcBatchItemWriter<TransProduct> writer() {
}
}
preventRestart()
dataSourceDbSource
db-source
dataSourcedataSourceDbTarget
db-target
dataSourcechunkSize
ItemReader
๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋ค์ฌ ResultSet
์ ์ ์ฅํ๋ค.Step์ ์ฌ๋ฌ ๊ฐ์ tasklet์ผ๋ก ๊ตฌ์ฑ๋์ด ์๋ค.
tasklet์ ItemReader, ItemProcessor, ItemWriter
๋ก ์ธ๋ถํ๋๋ค.
@Bean(JOB_NAME + "_step")
public Step step() {
return stepBuilderFactory.get(JOB_NAME + "_step")
.<Product, TransProduct>chunk(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
<Product, TransProduct>chunk(chunkSize)
<Product, >
: Reader์์ ๋ฐํํ ํ์
<, TransProduct>
: Writer์ ๋์ด์ฌ ํ์
ItemReader
Database์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ ItemReader
๋ฅผ ๋ง๋ค์ด๋ณด์.
๋ค์ ItemReader
๋ DB์ ๋ฐ์ดํฐ๋ฅผ Cursor๋ฐฉ์์ผ๋ก ๊ฐ์ ธ์จ๋ค.
ETL์์ E(Extraction)์ ๊ณผ์ ์ด๋ค.
@Bean public JdbcCursorItemReader<Product> reader() { return new JdbcCursorItemReaderBuilder<Product>() .fetchSize(chunkSize) .dataSource(dataSourceDbSource) .rowMapper(new BeanPropertyRowMapper<>(Product.class)) .sql("SELECT id, name, price, created FROM product") .name("JdbcCursorItemReader") .build(); }
dataSource(dataSourceDbSource)
- DB(
db-source
)์ ์ ๊ทผํ๊ธฐ ์ํด ์ฌ์ฉํ datasource ๊ฐ์ฒด ํ ๋นrowMapper(new BeanPropertyRowMapper<>(Product.class))
- ์ฟผ๋ฆฌ ๊ฒฐ๊ณผ๋ฅผ Java Instance๋ก ๋งคํํ๊ธฐ ์ํ Mapper
ItemProcessor
ItemProcessor
๋ ItemReader
์์ ๊ฐ์ ธ์จ ๋ฐ์ดํฐ๋ฅผ ๋ณํ/ํํฐ๋ฅผ ํด์ฃผ๋ ์ญํ ์ด๋ค.
Product
-> TransProduct
Product
์ ๊ฐ๊ฒฉ์ด 2500๋ณด๋ค ํฌ๋ฉด discount๋ฅผ ํด์ฃผ๋ ItemProcessor
์ด๋ค.ETL์์ T(Transformation)์ ๊ณผ์ ์ด๋ค.
@Bean
public ItemProcessor<Product, TransProduct> processor() {
return product -> {
String name = product.getName();
Long price = product.getPrice();
LocalDateTime created = product.getCreated();
if( price > 2500 ) {
return new TransProduct(name, price - 500L, created, true);
} else {
return new TransProduct(name, price, created, false);
}
};
}
ItemWriter
ItemProcessor
๋ฅผ ๊ฑฐ์น ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฅธ DB(db-target
)์ ์ ์ฅํ๋ค.
Datasource๋ db-target
์ datasource๋ฅผ ์ฌ์ฉํ๋ค.
ETL์์ L(Loading)์ ๊ณผ์ ์ด๋ค.
@Bean public JdbcBatchItemWriter<TransProduct> writer() { return new JdbcBatchItemWriterBuilder<TransProduct>() .dataSource(dataSourceDbTarget) .sql("INSERT INTO trans_product(name, price, created, discount) values (:name, :price, :created, :discount)") .beanMapped() .build(); }
dataSource(dataSourceDbTarget)
db-target
์ datasource ๊ฐ์ฒด๋ฅผ ํ ๋นํ๋ค.beanMapped()
JdbcBatchItemWriter
์์ POJO๋ฅผ ์ฌ์ฉํ ๋@Bean
๊ณผ ํจ๊ป ์ฌ์ฉํ๋ค.- POJO๊ฐ ์๋
Map<Key, Value>
๋ฅผ ์ฌ์ฉํ๋ฉด ์ฌ์ฉํ์ง ์์.
--job.name
์ ์ ์ด spring batch๊ฐ ์คํํ job์ ๋ช
์ํ๋ค.
ํด๋น ์ค์ ์ application.yml
์ ์๋ค.
์ฃผ์: Spring batch์ ๋ฉํ๋ฐ์ดํฐ ํ ์ด๋ธ์์๋ ์ฑ๊ณตํ job์ ๋ํด ์ค๋ณต์คํ ๋ฐฉ์ง๋ฅผ ํ๊ณ ์๋ค.
๋ค๋ฅธ๋ง๋ก ์๋์ ๋ช ๋ น์ด๋ก๋etlJob
์ ์ฌ๋ฌ๋ฒ ์คํ ํ ์ ์๋ค๋ ๋ง์ด๋ค.
ํธ๋ฒ์ด์ง๋งjava -jar *.jar --job.name=etlJob version=1
๊ณผ ๊ฐ์ด ์๋ฌด ์์คํ ํ๋ผ๋ฏธํฐ๋ฅผ ๋ฃ์ด์ฃผ๋ฉด ์ค๋ณต์คํ์ด ๊ฐ๋ฅํ๋ค../gradlew build -x test
java -jar ./build/libs/*.jar --job.name=etlJob
## 7. Check
```sh
docker exec -it db-target bash
mysql -u rivernine -p
# Enter password
use etl;
select * from trans_product;
db-target
etl database์ trans_product ํ
์ด๋ธ์ ์กฐํํ๋ฉด ๋ค์๊ณผ ๊ฐ์ด ๋ฐ์ดํฐ๊ฐ ์ฝ์
๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
๋ชจ๋ ์์ค๋ Github์ ์ฌ๋ ค๋์๋ค.