Pulog

Spring BootでSpring BatchのChunkモデルを試す

Spring Batchの挙動が微塵も理解できなくて本気で焦っています。

というよりかは、トランザクションの挙動や、中間コミットされた状態でバッチが再開しようとした際に、どのような挙動になるのかがつかめず、その辺りの知識がない状態で案件で使用したくないというだけなのかもしれないですが……

もう少し、手を動かして頭の中で整理できるようにしてみたいと思います。

読者想定

  • Spring BatchのChunkモデルとTaskletモデルの違いがわかっている方
  • Spring BatchのChunkモデルでのほぼ最小限の実装例を知りたい方
  • JavaConfigを用いたSpring Batchの実装を知りたい方

※ほぼ最小限というのは、Spring Bootを用いないSpring Batchの実装だったり、DataResourceを利用しないInMemoryなバッチ起動等があるので、ほぼという言い回しにしています。

公式のサンプル

Getting Started | Creating a Batch Service

今回は上記リンクの公式サンプル(チャンクモデル)実装を元に、CSVのデータを読み込んで、そのデータをDBにインサートする処理を作っていきます。

JPAなどを用いていますが、それ以外は必要最低限の実装しかしていないので、もう少し実践的な実装方法については別途記事を作成しようと思います。

Spring initializerでひな形作成

Spring Initializr

Dependenciesには以下を追加

  • Lombok
  • H2 Database
  • Spring Batch
  • Spring Data JPA

Spring Bootの各種設定

DBに格納する処理があるので、DBの指定などを行います。

resource/application.properties を以下のように追記します。

spring.datasource.url=jdbc:h2:./.data/h2/db;MODE=MySQL
spring.jpa.hibernate.ddl-auto=update
spring.h2.console.enabled=true
# 2021/02/12 追記
spring.batch.initialize-schema=always

resource/application.yml にするのであれば以下のように。

spring:
  datasource:
    url: jdbc:h2:./.data/h2/db;MODE=MySQL
  jpa:
    hibernate:
      ddl-auto: update
  h2:
    console:
      enabled: true
# 2021/02/12 追記
  batch:
    initialize-schema: always

データの格納手段をh2dbにし、データの格納先を .data ディレクトリを指定。更に標準だとSpring Bootを再起動するたびにdbの値が消えてしまうので、永続化の設定も追加した。

また、実際に格納されている値を確認するためにh2dbに付属しているconsosle機能も有効にしている。

これを有効にすることで localhost:8080/h2-console にアクセスすると実際にdbに格納された値を確認することができる。

2021/02/12 追記 spring.batch.initialize-schema プロパティ

少なくとも、初回起動時は spring.batch.initialize-schema=always を記載しないと正常に Spring Batch が起動しないようです。
上記プロパティを加えた状態で一度 Spring Batch を起動した状態にすれば以降はこの記述が無くても Spring Batch は起動するようになります。

なお、上記のプロパティが Spring Batch 初回起動時に指定がないと以下のようなエラーが出て、バッチが始動しないのを確認しました。

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-02-12 20:51:04.645 ERROR 10932 --- [           main] o.s.boot.SpringApplication               : Application run failed

java.lang.IllegalStateException: Failed to execute ApplicationRunner
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) ~[spring-boot-2.4.2.jar:2.4.2]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:785) ~[spring-boot-2.4.2.jar:2.4.2]

~中略~

	at com.example.demo.DemoApplication.main(DemoApplication.java:10) ~[main/:na]
Caused by: org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ?]; nested exception is org.h2.jdbc.JdbcSQLSyntaxErrorException: テーブル "BATCH_JOB_INSTANCE" が見つかりません
Table "BATCH_JOB_INSTANCE" not found; SQL statement:
SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? and JOB_KEY = ? [42102-200]
	at org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator.doTranslate(SQLErrorCodeSQLExceptionTranslator.java:239) ~[spring-jdbc-5.3.3.jar:5.3.3]
	at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:70) ~[spring-jdbc-5.3.3.jar:5.3.3]

~以下略~

データ読込元のCSVファイルを作成する

プロジェクトフォルダに sample.csv という名前でとりあえずcsvファイルを以下のように作成しておきます。

ジョン, スミス
田中, 太郎
ジョセフ, ジョースター
Erik, Satie

※Érik Alfred Leslie Satieと、アクセント記号が本来付きます。

Model, Repositoryクラスを作成

公式のサンプルだと resources/schema-all.sql を用意してテーブル作成、JDBCで値の書き込みをしていますが、今回はJPAを用いてデータの格納をしてみます。

Modelクラス作成

公式サンプルの resources/schema-all.sql で作成するテーブルと近いように作ります。

package com.example.demo.domain.user;

import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Data
@Entity(name = "user")
@NoArgsConstructor
public class User {
    public User(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    public Integer id;
    public String firstName;
    public String lastName;
}

Repositoryクラス

DBとデータのやり取りを行うReposirotyクラスを作成します。

JpaRepository を継承してデータのやり取りをするモデルクラスと主キーの型を指定しています。

package com.example.demo.domain.user;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface UserRepository extends JpaRepository<User, Integer> {
}

Reader, Writer, Processor, Job, Stepを実装するConfigurationクラスの実装

Config用のクラスを作成(今回は BatchConfiguration クラス)する。

Readerはファイルを読み取る FlatFileItemReader を用いて実装します。

ProcessorはReaderで読み取ったデータを変換する処理を実装します。特に変換せずに保存するだけだったら省略可能です。

WriterはDBに値を書き込む RepositoryItemWriter を用いて実装をします。

今回は全てを BatchConfiguration に実装していくが、それぞれクラスを分けて実装することももちろん可能です。

package com.example.demo.batchprocessing;

import com.example.demo.domain.user.User;
import com.example.demo.domain.user.UserRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.data.builder.RepositoryItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

import java.text.Normalizer;

@Configuration
@EnableBatchProcessing
@Slf4j
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public UserRepository userRepository;

    /**
     * sample.csv ファイルを読み込む
     * @return バッチ処理用 Reader item
     */
    @Bean
    public FlatFileItemReader<User> reader() {
        return new FlatFileItemReaderBuilder<User>()
                .name("userItemReader")
                .resource(new FileSystemResource("sample.csv"))
                .delimited()
                .names("firstName", "lastName")
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
                    setTargetType(User.class);
                }})
                .build();
    }

    /**
     * Reader から渡されたオブジェクトを保存する形に変換する
     * 今回はアルファベットの大文字変換と、半角カナを全角カナに変換している
     * @return 変換後に保存するアイテム
     */
    @Bean
    public ItemProcessor<User, User> processor() {
        return user -> {
            final var firstName = Normalizer.normalize(user.getFirstName().toUpperCase(), Normalizer.Form.NFKC);
            final var lastName = Normalizer.normalize(user.getLastName().toUpperCase(), Normalizer.Form.NFKC);
            final var transformedUser = new User(firstName, lastName);

            log.info("before converted : " + user);
            log.info("transformed user : " + transformedUser);
            return transformedUser;
        };
    }

    /**
     * Processor から渡されたオブジェクトを書き込む(保存処理をかける)
     * @return 保存処理用のリポジトリ
     */
    @Bean
    public RepositoryItemWriter<User> writer() {
        return new RepositoryItemWriterBuilder<User>()
                .repository(userRepository)
                .methodName("save")
                .build();
    }

    /**
     * 1つのバッチ処理の処理順やエラーハンドリング等の定義を行う
     * @return Jobを返却
     */
    @Bean
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .flow(step1())
                .end()
                .build();
    }

    /**
     * Jobの中に含まれるステップを定義する
     * @return ステップを返却する
     */
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<User, User>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

}

バッチの実行

ここまで作成し、Spring Bootを起動したタイミングでバッチが走るようになる。

Gradleプロジェクトであれば、 bootRun タスクでSpring Bootが立ち上がる。

11:11:28: Executing task 'DemoApplication.main()'...

> Task :compileJava
> Task :processResources
> Task :classes

> Task :DemoApplication.main()

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.3.5.RELEASE)

2020-11-20 11:11:32.295  INFO 10590 --- [           main] com.example.demo.DemoApplication         : Starting DemoApplication on YukinoMacBook.local with PID 10590 (/Users/yuki/demo/build/classes/java/main started by yuki in /Users/yuki/demo)
2020-11-20 11:11:32.298  INFO 10590 --- [           main] com.example.demo.DemoApplication         : No active profile set, falling back to default profiles: default
2020-11-20 11:11:33.172  INFO 10590 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data JPA repositories in DEFERRED mode.
2020-11-20 11:11:33.275  INFO 10590 --- [           main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 88ms. Found 1 JPA repository interfaces.
2020-11-20 11:11:33.889  INFO 10590 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-11-20 11:11:33.975  INFO 10590 --- [         task-1] o.hibernate.jpa.internal.util.LogHelper  : HHH000204: Processing PersistenceUnitInfo [name: default]
2020-11-20 11:11:34.095  INFO 10590 --- [         task-1] org.hibernate.Version                    : HHH000412: Hibernate ORM core version 5.4.22.Final
2020-11-20 11:11:34.249  INFO 10590 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2020-11-20 11:11:34.719  INFO 10590 --- [         task-1] o.hibernate.annotations.common.Version   : HCANN000001: Hibernate Commons Annotations {5.1.0.Final}
2020-11-20 11:11:34.754  INFO 10590 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2020-11-20 11:11:34.887  WARN 10590 --- [           main] o.s.b.a.batch.JpaBatchConfigurer         : JPA does not support custom isolation levels, so locks may not be taken when launching Jobs
2020-11-20 11:11:34.892  INFO 10590 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: H2
2020-11-20 11:11:35.061  INFO 10590 --- [         task-1] org.hibernate.dialect.Dialect            : HHH000400: Using dialect: org.hibernate.dialect.H2Dialect
2020-11-20 11:11:35.332  INFO 10590 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2020-11-20 11:11:35.520  INFO 10590 --- [           main] DeferredRepositoryInitializationListener : Triggering deferred initialization of Spring Data repositories…
2020-11-20 11:11:36.355  INFO 10590 --- [         task-1] o.h.e.t.j.p.i.JtaPlatformInitiator       : HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
2020-11-20 11:11:36.367  INFO 10590 --- [         task-1] j.LocalContainerEntityManagerFactoryBean : Initialized JPA EntityManagerFactory for persistence unit 'default'
2020-11-20 11:11:36.690  INFO 10590 --- [           main] DeferredRepositoryInitializationListener : Spring Data repositories initialized!
2020-11-20 11:11:36.702  INFO 10590 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 5.163 seconds (JVM running for 5.823)
2020-11-20 11:11:36.704  INFO 10590 --- [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2020-11-20 11:11:36.785  INFO 10590 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=importUserJob]] launched with the following parameters: [{run.id=14}]
2020-11-20 11:11:36.849  INFO 10590 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
2020-11-20 11:11:36.952  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : before converted : User(id=null, firstName=ジョン, lastName=スミス)
2020-11-20 11:11:36.953  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : transformed user : User(id=null, firstName=ジョン, lastName=スミス)
2020-11-20 11:11:36.954  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : before converted : User(id=null, firstName=田中, lastName=太郎)
2020-11-20 11:11:36.955  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : transformed user : User(id=null, firstName=田中, lastName=太郎)
2020-11-20 11:11:36.955  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : before converted : User(id=null, firstName=ジョセフ, lastName=ジョースター)
2020-11-20 11:11:36.955  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : transformed user : User(id=null, firstName=ジョセフ, lastName=ジョースター)
2020-11-20 11:11:36.956  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : before converted : User(id=null, firstName=Erik, lastName=Satie)
2020-11-20 11:11:36.956  INFO 10590 --- [           main] c.e.d.b.BatchConfiguration               : transformed user : User(id=null, firstName=ERIK, lastName=SATIE)
2020-11-20 11:11:37.026  INFO 10590 --- [           main] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 176ms
2020-11-20 11:11:37.039  INFO 10590 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=importUserJob]] completed with the following parameters: [{run.id=14}] and the following status: [COMPLETED] in 204ms

Spring Bootの各種設定 でDBの値を見れるコンソールを有効にしているので、実際に格納された値を localhost:8080/h2-console で確認をしてみる。

Batch終了後にSpringApplicationも終了する

2021/02/12 追記

以前のバージョンでは Batch 終了後にビルドタスクは初期状態だと終了しなかったのですが、Spring Batch のバージョンを新しくして確認したところ、Batch 処理の終了後にビルドタスクも終了することを確認しました。

恐らく Spring Batch 4.3.x がリリースされたタイミングで挙動が変わったようですが、リリースノートにそれらしき記述は見つからず……

以下の記述をしても引き続き正常に動作するので、心配であれば以下記述を試してくださればと思います。

また、それに伴い h2db のコンソールが確認できなくなってしまいました。

H2DB のコンソールを確認する方法として、 spring-boot-starter-webbuild.gradledependencies に追記していただいて、再度 bootRun を実行すれば勝手にビルドタスクが終了することはなくなります。

spring-boot-starter-web を追加後は勝手にビルドタスクは終了しないので、以下の記述を追記してもらうことで、引き続きバッチの終了のタイミングで SpringApplication も終了させることができます。

以下、追記前の内容となります。


これで実装は完成したが、Batch処理が完了してもビルドタスクが止まらないことに気づくと思います。

そこで、main関数でSpringApplicationを呼んでいる箇所に以下関数でくくってあげることで、Batch処理が終了した際に自動的にSpringApplicationも終了してくれるようになります。

※バッチの処理が短い場合、h2DBのコンソールが見れなくなってしまうので、実際に案件などでBatch処理を作る際にはこちらを思い出していただければと思います。また、本番環境ではh2DBのコンソールは無効にするのをお忘れなく。

package com.example.batchDemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) {
-		SpringApplication.run(DemoApplication.class, args);
+		System.exit(SpringApplication.exit(SpringApplication.run(DemoApplication.class, args)));
	}

}

終わりに

ここまでできたら実際にJARにまとめるなどして、実際にバッチとして使い始めることができます。

基本的なバッチであればChunkモデルで実装できると思うのですが、処理が予めReader, Processor, Writerに分かれているので、設計時点でそれを考慮した上で実装することが必要になります。

上記で実装したものを以下リポジトリにて公開しています。また、Gradle, java, docker コマンドを利用した Batch の実行方法も参考程度に載せているので、そちらも確認いただければと思います。
https://git.pu10g.com/ykato/spring-batch-chunk-example

次回のSpring Batchの記事ではTaskletモデルを用いたバッチ実装を紹介しようと思います。

それでわ。

参考にした記事・資料