java

[Spring batch] Step 3 - CSV 파일을 읽어서 DB에 저장하기

두리공장 2022. 3. 21. 22:54

Step 2 에서 CSV 파일을 읽어서 System.out.println 으로 출력해 보았다.
이제는 DB로 데이터를 넣어보도록 하자.
데이터를 DB에 넣기 위해서는 Table을 먼저 만들어주자.
데이터를 입력하기위해 사용할 데이터 형태는 아래와 같다.

rank,prev_rank,name,league,off,def,spi
1,1,Manchester City,Barclays Premier League,2.9,0.2,93.73
2,3,Liverpool,Barclays Premier League,2.92,0.25,93.15
3,2,Bayern Munich,German Bundesliga,3.62,0.56,93.01
4,4,Chelsea,Barclays Premier League,2.39,0.29,88.69
5,6,Real Madrid,Spanish Primera Division,2.58,0.5,86.63
6,5,Ajax,Dutch Eredivisie,3.02,0.74,86.49
7,9,Barcelona,Spanish Primera Division,2.48,0.54,84.77
8,7,Paris Saint-Germain,French Ligue 1,2.6,0.67,83.52

테이블을 만들어보자.

CREATE TABLE test_db.spi_global_rankings (
	rank NUMERIC NULL,
	prev_rank NUMERIC NULL,
	name varchar(100) NULL,
	league varchar(100) NULL,
	off float NULL,
	def float NULL,
	spi float NULL	
)
ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_unicode_ci;

그러면 이제 ItemWriter를 만들어보자.
DB에 데이터를 넣기 위해 다양한 ItemWriter를 제공하는데, 그중에서 jdbcBatchWriter를 사용해 보도록 하겠다.

@Bean
    public JdbcBatchItemWriter<Ranking> jdbcBatchItemWriter(DataSource dataSource) throws Exception {
        return new JdbcBatchItemWriterBuilder<Ranking>()
                .dataSource(dataSource)
                .sql( "INSERT INTO spi_global_rankings (rank,prev_rank,name,league,off,def,spi) VALUES ("
                        +":rank, :prev_rank, :name, :league, :off, :def, :spi)")
                .beanMapped()
                .build();
    }

.beanMapped() 메서드를 사용함으로 인해 네임드 파라미터를 도메인 객체에 이름에 맞게 바인딩 해준다.

이제 실행해 보자
Run/Debug Configurations의 Program arguments에는 "filename=file:d:\spi_global_rankings.csv foo=2"을 넣어준다. foo=2라는 값을 넣은 이유는 스프링 배치는 아규먼트값이 같으면 배치가 실행되지 않기때문에 임의의 값을 변경해서 넣어주어야 한다.(배치 중단시 이어서 작업하기 위함)

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

2022-03-21 22:38:39.148  INFO 12736 --- [           main] com.sunnier.batch.BatchApplication       : Starting BatchApplication using Java 1.8.0_202 on DESKTOP-AHN11RT with PID 12736 (C:\git_repo\batch\target\classes started by sunni in C:\git_repo\batch)
2022-03-21 22:38:39.163  INFO 12736 --- [           main] com.sunnier.batch.BatchApplication       : No active profile set, falling back to 1 default profile: "default"
2022-03-21 22:38:40.054  INFO 12736 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-03-21 22:38:40.126  INFO 12736 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2022-03-21 22:38:40.343  INFO 12736 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2022-03-21 22:38:40.484  INFO 12736 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2022-03-21 22:38:40.593  INFO 12736 --- [           main] com.sunnier.batch.BatchApplication       : Started BatchApplication in 2.124 seconds (JVM running for 3.99)
2022-03-21 22:38:40.593  INFO 12736 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [filename=file:d:\spi_global_rankings.csv, foo=2]
2022-03-21 22:38:40.710  INFO 12736 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job1]] launched with the following parameters: [{filename=file:d:\spi_global_rankings.csv, foo=2}]
2022-03-21 22:38:40.773  INFO 12736 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2022-03-21 22:38:41.605  INFO 12736 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 829ms
2022-03-21 22:38:41.626  INFO 12736 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=job1]] completed with the following parameters: [{filename=file:d:\spi_global_rankings.csv, foo=2}] and the following status: [COMPLETED] in 882ms
2022-03-21 22:38:41.631  INFO 12736 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2022-03-21 22:38:41.642  INFO 12736 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
Disconnected from the target VM, address: '127.0.0.1:50928', transport: 'socket'

Process finished with exit code 0

작업이 그냥 종료되었다.  DB에 저장되었는지 확인해 보자.

select * from spi_global_rankings sgr;

rank|prev_rank|name                               |league                                  |off |def |spi  |
----|---------|-----------------------------------|----------------------------------------|----|----|-----|
   1|        1|Manchester City                    |Barclays Premier League                 | 2.9| 0.2|93.73|
   2|        3|Liverpool                          |Barclays Premier League                 |2.92|0.25|93.15|
   3|        2|Bayern Munich                      |German Bundesliga                       |3.62|0.56|93.01|
   4|        4|Chelsea                            |Barclays Premier League                 |2.39|0.29|88.69|
   5|        6|Real Madrid                        |Spanish Primera Division                |2.58| 0.5|86.63|
   6|        5|Ajax                               |Dutch Eredivisie                        |3.02|0.74|86.49|
   7|        9|Barcelona                          |Spanish Primera Division                |2.48|0.54|84.77|
   8|        7|Paris Saint-Germain                |French Ligue 1                          | 2.6|0.67|83.52|
   9|       10|RB Leipzig                         |German Bundesliga                       |2.63|0.72|82.91|
  10|       11|Arsenal                            |Barclays Premier League                 |2.16|0.46|82.65|
  11|        8|Internazionale                     |Italy Serie A                           |2.42|0.62|82.46|

저장이 잘 되었다.

마지막으로 전체 소스를 다시 살펴보자.
java 패키지 구조

└─batch
    │  BatchApplication.java
    ├─domain
    │      Ranking.java
    ├─mapper
    │      RankingFieldSetMapper.java
    └─tokenizer
            RankingLineTokenizer.java

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sunnier</groupId>
    <artifactId>batch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>batch</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>

        <dependency>
            <groupId>org.mariadb.jdbc</groupId>
            <artifactId>mariadb-java-client</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

application.yml

spring:
  datasource:
    url: jdbc:mariadb://localhost:3306/test_db
    username: {id}
    password: {password}
    driver-class-name: org.mariadb.jdbc.Driver
  batch:
    jdbc:
      initialize-schema: always

BatchApplication.java

package com.sunnier.batch;

import com.sunnier.batch.domain.Ranking;
import com.sunnier.batch.mapper.RankingFieldSetMapper;
import com.sunnier.batch.tokenizer.RankingLineTokenizer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.mapping.PatternMatchingCompositeLineMapper;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

//batch 생성시 필수 어노테이션
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    //파일을 읽기위한 ItemReader를 선언한다.
    @Bean
    @StepScope
    public FlatFileItemReader<Ranking> rankingItemReader(@Value("#{jobParameters['filename']}") Resource inputFile) {
        return new FlatFileItemReaderBuilder<Ranking>()
                .name("rankingItemReader")
                .lineMapper(lineTokenizer())
                .linesToSkip(1) //csv파일을 첫번째줄은 건너뛴다 (default값은 0)
                .resource(inputFile)
                .build();
    }

    @Bean
    public ItemWriter<Ranking> itemWriter(){
        return (items) -> items.forEach(System.out::println);
    }


    @Bean
    public JdbcBatchItemWriter<Ranking> jdbcBatchItemWriter(DataSource dataSource) throws Exception {
        return new JdbcBatchItemWriterBuilder<Ranking>()
                .dataSource(dataSource)
                .sql( "INSERT INTO spi_global_rankings (rank,prev_rank,name,league,off,def,spi) VALUES ("
                        +":rank, :prev_rank, :name, :league, :off, :def, :spi)")
                .beanMapped()
                .build();
    }


    @Bean
    public PatternMatchingCompositeLineMapper lineTokenizer() {

        // lineTokenizers 에 파싱정보 전달
        Map<String, LineTokenizer> lineTokenizers = new HashMap<>(1);
        lineTokenizers.put("*",new RankingLineTokenizer());

        //필드셋 매퍼에 도메인객체 매핑정보를 전달
        Map<String, FieldSetMapper> fieldSetMappers = new HashMap<>(1);
        fieldSetMappers.put("*",new RankingFieldSetMapper());

        //패턴매칭라인매퍼에 토크나이저와 필드셋매퍼 정보를 전달
        PatternMatchingCompositeLineMapper lineMappers = new PatternMatchingCompositeLineMapper();
        lineMappers.setTokenizers(lineTokenizers);
        lineMappers.setFieldSetMappers(fieldSetMappers);

        return lineMappers;
    }

    // 파싱정보를 설정한다. (RankingLineTokenizer.java에 구현함)
    /* @Bean
    public DelimitedLineTokenizer rankingLineTokenizer() {
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setNames("rank","prev_rank","name","league","off","def","spi");
        return lineTokenizer;
    }*/

    //스텝을 만든다.
    @Bean
    public Step step() throws Exception {
        return this.stepBuilderFactory.get("step1")
                .<Ranking, Ranking>chunk(10)
                .reader(rankingItemReader(null))
                .writer(jdbcBatchItemWriter(null))
                .build();
    }

    //잡을 만든다.
    @Bean
    public Job job() throws Exception {
        return this.jobBuilderFactory.get("job1")
                .start(step())
                .build();
    }

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }

}

Ranking.java

package com.sunnier.batch.domain;

import lombok.Data;

@Data
public class Ranking {
    //rank,prev_rank,name,league,off,def,spi
    private int rank;
    private int prev_rank;
    private String name;
    private String league;
    private float off;
    private float def;
    private float spi;
}

RankingFieldSetMapper.java

package com.sunnier.batch.mapper;

import com.sunnier.batch.domain.Ranking;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

public class RankingFieldSetMapper implements FieldSetMapper<Ranking> {

    @Override
    public Ranking mapFieldSet(FieldSet fieldSet) throws BindException {
        Ranking ranking = new Ranking();
        //rank,prev_rank,name,league,off,def,spi
        ranking.setRank(fieldSet.readInt("rank"));
        ranking.setPrev_rank(fieldSet.readInt("prev_rank"));
        ranking.setName(fieldSet.readString("name"));
        ranking.setLeague(fieldSet.readString("league"));
        ranking.setOff(fieldSet.readFloat("off"));
        ranking.setDef(fieldSet.readFloat("def"));
        ranking.setSpi(fieldSet.readFloat("spi"));

        return ranking;
    }
}

RankingLineTokenizer.java

package com.sunnier.batch.tokenizer;

import org.springframework.batch.item.file.transform.*;
import java.util.ArrayList;
import java.util.List;

public class RankingLineTokenizer implements LineTokenizer {
    private String delimiter = ",";

    private String[] names = new String[]{
            "rank","prev_rank","name","league","off","def","spi"
    };
    private FieldSetFactory fieldSetFactory = new DefaultFieldSetFactory();

    @Override
    public FieldSet tokenize(String record) {
        String[] fields = record.split(delimiter);
        List<String> parsedFields = new ArrayList<>();
        for(int i=0; i<fields.length;i++){
            parsedFields.add(fields[i]);
        }
        return fieldSetFactory.create(parsedFields.toArray(new String[0]),names);
    }
}

끝.