java
[Spring batch] Step 3 - CSV 파일을 읽어서 DB에 저장하기
두리공장
2022. 3. 21. 22:54
Step 2 에서 CSV 파일을 읽어서 System.out.println 으로 출력해 보았다.
이제는 DB로 데이터를 넣어보도록 하자.
데이터를 DB에 넣기 위해서는 Table을 먼저 만들어주자.
데이터를 입력하기위해 사용할 데이터 형태는 아래와 같다.
rank,prev_rank,name,league,off,def,spi
1,1,Manchester City,Barclays Premier League,2.9,0.2,93.73
2,3,Liverpool,Barclays Premier League,2.92,0.25,93.15
3,2,Bayern Munich,German Bundesliga,3.62,0.56,93.01
4,4,Chelsea,Barclays Premier League,2.39,0.29,88.69
5,6,Real Madrid,Spanish Primera Division,2.58,0.5,86.63
6,5,Ajax,Dutch Eredivisie,3.02,0.74,86.49
7,9,Barcelona,Spanish Primera Division,2.48,0.54,84.77
8,7,Paris Saint-Germain,French Ligue 1,2.6,0.67,83.52
테이블을 만들어보자.
CREATE TABLE test_db.spi_global_rankings (
rank NUMERIC NULL,
prev_rank NUMERIC NULL,
name varchar(100) NULL,
league varchar(100) NULL,
off float NULL,
def float NULL,
spi float NULL
)
ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_unicode_ci;
그러면 이제 ItemWriter를 만들어보자.
DB에 데이터를 넣기 위해 다양한 ItemWriter를 제공하는데, 그중에서 jdbcBatchWriter를 사용해 보도록 하겠다.
@Bean
public JdbcBatchItemWriter<Ranking> jdbcBatchItemWriter(DataSource dataSource) throws Exception {
return new JdbcBatchItemWriterBuilder<Ranking>()
.dataSource(dataSource)
.sql( "INSERT INTO spi_global_rankings (rank,prev_rank,name,league,off,def,spi) VALUES ("
+":rank, :prev_rank, :name, :league, :off, :def, :spi)")
.beanMapped()
.build();
}
.beanMapped() 메서드를 사용함으로 인해 네임드 파라미터를 도메인 객체에 이름에 맞게 바인딩 해준다.
이제 실행해 보자
Run/Debug Configurations의 Program arguments에는 "filename=file:d:\spi_global_rankings.csv foo=2"을 넣어준다. foo=2라는 값을 넣은 이유는 스프링 배치는 아규먼트값이 같으면 배치가 실행되지 않기때문에 임의의 값을 변경해서 넣어주어야 한다.(배치 중단시 이어서 작업하기 위함)
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.4)
2022-03-21 22:38:39.148 INFO 12736 --- [ main] com.sunnier.batch.BatchApplication : Starting BatchApplication using Java 1.8.0_202 on DESKTOP-AHN11RT with PID 12736 (C:\git_repo\batch\target\classes started by sunni in C:\git_repo\batch)
2022-03-21 22:38:39.163 INFO 12736 --- [ main] com.sunnier.batch.BatchApplication : No active profile set, falling back to 1 default profile: "default"
2022-03-21 22:38:40.054 INFO 12736 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2022-03-21 22:38:40.126 INFO 12736 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2022-03-21 22:38:40.343 INFO 12736 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: MYSQL
2022-03-21 22:38:40.484 INFO 12736 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2022-03-21 22:38:40.593 INFO 12736 --- [ main] com.sunnier.batch.BatchApplication : Started BatchApplication in 2.124 seconds (JVM running for 3.99)
2022-03-21 22:38:40.593 INFO 12736 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [filename=file:d:\spi_global_rankings.csv, foo=2]
2022-03-21 22:38:40.710 INFO 12736 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job1]] launched with the following parameters: [{filename=file:d:\spi_global_rankings.csv, foo=2}]
2022-03-21 22:38:40.773 INFO 12736 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2022-03-21 22:38:41.605 INFO 12736 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 829ms
2022-03-21 22:38:41.626 INFO 12736 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job1]] completed with the following parameters: [{filename=file:d:\spi_global_rankings.csv, foo=2}] and the following status: [COMPLETED] in 882ms
2022-03-21 22:38:41.631 INFO 12736 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2022-03-21 22:38:41.642 INFO 12736 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
Disconnected from the target VM, address: '127.0.0.1:50928', transport: 'socket'
Process finished with exit code 0
작업이 그냥 종료되었다. DB에 저장되었는지 확인해 보자.
select * from spi_global_rankings sgr;
rank|prev_rank|name |league |off |def |spi |
----|---------|-----------------------------------|----------------------------------------|----|----|-----|
1| 1|Manchester City |Barclays Premier League | 2.9| 0.2|93.73|
2| 3|Liverpool |Barclays Premier League |2.92|0.25|93.15|
3| 2|Bayern Munich |German Bundesliga |3.62|0.56|93.01|
4| 4|Chelsea |Barclays Premier League |2.39|0.29|88.69|
5| 6|Real Madrid |Spanish Primera Division |2.58| 0.5|86.63|
6| 5|Ajax |Dutch Eredivisie |3.02|0.74|86.49|
7| 9|Barcelona |Spanish Primera Division |2.48|0.54|84.77|
8| 7|Paris Saint-Germain |French Ligue 1 | 2.6|0.67|83.52|
9| 10|RB Leipzig |German Bundesliga |2.63|0.72|82.91|
10| 11|Arsenal |Barclays Premier League |2.16|0.46|82.65|
11| 8|Internazionale |Italy Serie A |2.42|0.62|82.46|
저장이 잘 되었다.
마지막으로 전체 소스를 다시 살펴보자.
java 패키지 구조
└─batch
│ BatchApplication.java
├─domain
│ Ranking.java
├─mapper
│ RankingFieldSetMapper.java
└─tokenizer
RankingLineTokenizer.java
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.sunnier</groupId>
<artifactId>batch</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>batch</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
datasource:
url: jdbc:mariadb://localhost:3306/test_db
username: {id}
password: {password}
driver-class-name: org.mariadb.jdbc.Driver
batch:
jdbc:
initialize-schema: always
BatchApplication.java
package com.sunnier.batch;
import com.sunnier.batch.domain.Ranking;
import com.sunnier.batch.mapper.RankingFieldSetMapper;
import com.sunnier.batch.tokenizer.RankingLineTokenizer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.mapping.PatternMatchingCompositeLineMapper;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
//batch 생성시 필수 어노테이션
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
//파일을 읽기위한 ItemReader를 선언한다.
@Bean
@StepScope
public FlatFileItemReader<Ranking> rankingItemReader(@Value("#{jobParameters['filename']}") Resource inputFile) {
return new FlatFileItemReaderBuilder<Ranking>()
.name("rankingItemReader")
.lineMapper(lineTokenizer())
.linesToSkip(1) //csv파일을 첫번째줄은 건너뛴다 (default값은 0)
.resource(inputFile)
.build();
}
@Bean
public ItemWriter<Ranking> itemWriter(){
return (items) -> items.forEach(System.out::println);
}
@Bean
public JdbcBatchItemWriter<Ranking> jdbcBatchItemWriter(DataSource dataSource) throws Exception {
return new JdbcBatchItemWriterBuilder<Ranking>()
.dataSource(dataSource)
.sql( "INSERT INTO spi_global_rankings (rank,prev_rank,name,league,off,def,spi) VALUES ("
+":rank, :prev_rank, :name, :league, :off, :def, :spi)")
.beanMapped()
.build();
}
@Bean
public PatternMatchingCompositeLineMapper lineTokenizer() {
// lineTokenizers 에 파싱정보 전달
Map<String, LineTokenizer> lineTokenizers = new HashMap<>(1);
lineTokenizers.put("*",new RankingLineTokenizer());
//필드셋 매퍼에 도메인객체 매핑정보를 전달
Map<String, FieldSetMapper> fieldSetMappers = new HashMap<>(1);
fieldSetMappers.put("*",new RankingFieldSetMapper());
//패턴매칭라인매퍼에 토크나이저와 필드셋매퍼 정보를 전달
PatternMatchingCompositeLineMapper lineMappers = new PatternMatchingCompositeLineMapper();
lineMappers.setTokenizers(lineTokenizers);
lineMappers.setFieldSetMappers(fieldSetMappers);
return lineMappers;
}
// 파싱정보를 설정한다. (RankingLineTokenizer.java에 구현함)
/* @Bean
public DelimitedLineTokenizer rankingLineTokenizer() {
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("rank","prev_rank","name","league","off","def","spi");
return lineTokenizer;
}*/
//스텝을 만든다.
@Bean
public Step step() throws Exception {
return this.stepBuilderFactory.get("step1")
.<Ranking, Ranking>chunk(10)
.reader(rankingItemReader(null))
.writer(jdbcBatchItemWriter(null))
.build();
}
//잡을 만든다.
@Bean
public Job job() throws Exception {
return this.jobBuilderFactory.get("job1")
.start(step())
.build();
}
public static void main(String[] args) {
SpringApplication.run(BatchApplication.class, args);
}
}
Ranking.java
package com.sunnier.batch.domain;
import lombok.Data;
@Data
public class Ranking {
//rank,prev_rank,name,league,off,def,spi
private int rank;
private int prev_rank;
private String name;
private String league;
private float off;
private float def;
private float spi;
}
RankingFieldSetMapper.java
package com.sunnier.batch.mapper;
import com.sunnier.batch.domain.Ranking;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;
public class RankingFieldSetMapper implements FieldSetMapper<Ranking> {
@Override
public Ranking mapFieldSet(FieldSet fieldSet) throws BindException {
Ranking ranking = new Ranking();
//rank,prev_rank,name,league,off,def,spi
ranking.setRank(fieldSet.readInt("rank"));
ranking.setPrev_rank(fieldSet.readInt("prev_rank"));
ranking.setName(fieldSet.readString("name"));
ranking.setLeague(fieldSet.readString("league"));
ranking.setOff(fieldSet.readFloat("off"));
ranking.setDef(fieldSet.readFloat("def"));
ranking.setSpi(fieldSet.readFloat("spi"));
return ranking;
}
}
RankingLineTokenizer.java
package com.sunnier.batch.tokenizer;
import org.springframework.batch.item.file.transform.*;
import java.util.ArrayList;
import java.util.List;
public class RankingLineTokenizer implements LineTokenizer {
private String delimiter = ",";
private String[] names = new String[]{
"rank","prev_rank","name","league","off","def","spi"
};
private FieldSetFactory fieldSetFactory = new DefaultFieldSetFactory();
@Override
public FieldSet tokenize(String record) {
String[] fields = record.split(delimiter);
List<String> parsedFields = new ArrayList<>();
for(int i=0; i<fields.length;i++){
parsedFields.add(fields[i]);
}
return fieldSetFactory.create(parsedFields.toArray(new String[0]),names);
}
}
끝.