Notice
Recent Posts
Recent Comments
Link
«   2025/07   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31
Tags more
Archives
Today
Total
관리 메뉴

두리공장

[Spring batch] Step 5 - DB를 읽어서 DB에 저장하기 본문

java

[Spring batch] Step 5 - DB를 읽어서 DB에 저장하기

두리공장 2022. 3. 24. 00:41

이제 DB로 read한 후, DB 로 write 하는 배치를 만들어보자.
앞서 만든 테이블을 이용하여 배치를 수행하도록 해 보자.
ItemWrite는 이미 JdbcBatchItemWriter 로 구현되어 있으므로 ItemReader만 만들면 되겠다.

먼저 PagingQueryProvider를 만들자.

    @Bean
    public PagingQueryProvider pagingQueryProvider(DataSource dataSource) throws Exception {
        SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
        factoryBean.setSelectClause("select *");
        factoryBean.setFromClause("from spi_global_rankings");
        //factoryBean.setWhereClause("where rank = :rank");
        factoryBean.setSortKey("rank");
        factoryBean.setDataSource(dataSource);
        return factoryBean.getObject();
    }

factoryBean에는 테이블 조회 조건을 넣어준다.
그리고 ResultSet을 도메인객체와 매핑시켜주는 RowMapper를 만든다.

public class RankingRowMapper implements RowMapper<Ranking> {

    @Override
    public Ranking mapRow(ResultSet rs, int rowNum) throws SQLException {
        Ranking ranking = new Ranking();

        ranking.setRank(rs.getInt("rank"));
        ranking.setPrev_rank(rs.getInt("prev_rank"));
        ranking.setLeague(rs.getString("league"));
        ranking.setName(rs.getString("name"));
        ranking.setOff(rs.getFloat("off"));
        ranking.setDef(rs.getFloat("def"));
        ranking.setSpi(rs.getFloat("spi"));

        return ranking;
    }
}


이것을 JdbcPagingItemReader 에서 사용한다.

    // DB를 읽기위한 ItemReader 이다.
    @Bean
    @StepScope
    public JdbcPagingItemReader<Ranking> rankingItemReader2(
            DataSource dataSource
    //        @Value("#{jobParameters['rank']}") String rank
    ) throws Exception {
        //Map<String, Object> parameterValues = new HashMap<>(1);

        //parameterValues.put("rank", rank);
        return new JdbcPagingItemReaderBuilder<Ranking>()
                .name("rankingItemReader")
                .dataSource(dataSource)
                .queryProvider(pagingQueryProvider(dataSource))
                //.parameterValues(parameterValues)
                .pageSize(10)
                //.rowMapper(new RankingRowMapper())
                .rowMapper(new BeanPropertyRowMapper<>(Ranking.class)) //RowMapper를 사용하지 않고 바로 도메인객체와 매핑할 수 있다.
                .build();
    }

ItemWriter는 기존것을 그대로 사용하고,

    @Bean
    public JdbcBatchItemWriter<Ranking> jdbcBatchItemWriter(DataSource dataSource) throws Exception {
        return new JdbcBatchItemWriterBuilder<Ranking>()
                .dataSource(dataSource)
                .sql( "INSERT INTO spi_global_rankings2 (rank,prev_rank,name,league,off,def,spi) VALUES ("
                        +":rank, :prev_rank, :name, :league, :off, :def, :spi)")
                .beanMapped()
                .build();
    }

스텝과 잡을 만들어서,

    //스텝을 만든다.
    @Bean
    public Step stepForRank2() throws Exception {
        return this.stepBuilderFactory.get("stepForRank2")
                .<Ranking, Ranking>chunk(10)
                .reader(rankingItemReader2(null))
                .writer(jdbcBatchItemWriter(null))
                .build();
    }

    //잡을 만든다.
    @Bean
    public Job jobForRank2() throws Exception {
        return this.jobBuilderFactory.get("jobForRank2")
                .start(stepForRank2())
                .build();
    }

수행하면 된다.

 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

2022-03-24 00:29:48.585  INFO 14516 --- [           main] com.sunnier.batch.BatchApplication       : Starting BatchApplication using Java 1.8.0_202 on DESKTOP-AHN11RT with PID 14516 (C:\git_repo\batch\target\classes started by sunni in C:\git_repo\batch)
2022-03-24 00:29:48.600  INFO 14516 --- [           main] com.sunnier.batch.BatchApplication       : No active profile set, falling back to 1 default profile: "default"
2022-03-24 00:29:49.444  INFO 14516 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2022-03-24 00:29:49.516  INFO 14516 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2022-03-24 00:29:49.743  INFO 14516 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: MYSQL
2022-03-24 00:29:49.868  INFO 14516 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2022-03-24 00:29:49.978  INFO 14516 --- [           main] com.sunnier.batch.BatchApplication       : Started BatchApplication in 1.948 seconds (JVM running for 3.013)
2022-03-24 00:29:49.978  INFO 14516 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: [foo=1]
2022-03-24 00:29:50.075  INFO 14516 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=jobForRank2]] launched with the following parameters: [{foo=1}]
2022-03-24 00:29:50.134  INFO 14516 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [stepForRank2]
2022-03-24 00:29:51.138  INFO 14516 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [stepForRank2] executed in 1s0ms
2022-03-24 00:29:51.158  INFO 14516 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=jobForRank2]] completed with the following parameters: [{foo=1}] and the following status: [COMPLETED] in 1s66ms
2022-03-24 00:29:51.165  INFO 14516 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2022-03-24 00:29:51.175  INFO 14516 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.
Disconnected from the target VM, address: '127.0.0.1:54553', transport: 'socket'

데이터가 복사 되었다. (2회 수행)

select count(*) from spi_global_rankings2;

count(*)|
--------|
    1280|