jBatch(Java EE 7)をNetBeans+GlassFish+Mavenではじめました

jBatchとは?

Java EE 7から新たに加わった、バッチ処理フレームワークの仕様です。
EE 6までは、Java EE標準仕様にバッチは入っていなかったのですが、EE 7から加わりました。
正式名称は「Batch Application for the Java Platform」で、公式?の略称が「jBatch」です。
Spring Frameworkには、以前から「Spring Batch」というものがあったのですが、これを基にjBatchの仕様が策定されたようです。
また、Spring Batch自身も、jBatch準拠になっているようです。

jBatchの実装

jBatchに限らず、Java EEは仕様のみ(ほとんどインターフェイスアノテーション・例外・enum)なので、jBatchを使うためには、それに対応する「実装」が必要です。
僕が調べた限りでは、今のところ実装は下記の3つです。

  1. RI (GlassFishに内包。特別な名前はついていないようです)
  2. JBeret (WildFlyに内包)
  3. Spring Batch

jBatchに関する情報

まず最初に見るべき情報

Java Day Tokyo 2015の上妻さんの発表「jBatch実践入門」を見ることをお勧めします。
PDFおよびYouTubeで見ることができます。とても分かりやすくまとまっている、素晴らしい資料・発表です。
http://www.oracle.com/technetwork/jp/ondemand/online2015-javaday-2511676-ja.html

公式情報(英語)

僕もまだしっかりとは目を通せていないのですが、JSRとチュートリアルは読んだ方がよいでしょう。
JSR 352 JSR-000352 Batch Applications for the Java Platform 1.0, Revision A Maintenance Release for Evaluation
Java EE 7 Tutorial 55 Batch Processing (Release 7)
あと、洋書だとArun Guptaさんの「Java EE 7 Essentials」にjBatchの章があります。僕はKindle版を持ってます。
Amazon.co.jp: Java EE 7 Essentials 電子書籍: Arun Gupta: Kindleストア

jBatchをはじめよう!

さて、説明はこれくらいにして、具体的なサンプルコードの説明に入ります。
上記のように、既に何人かの方がブログに書いていらっしゃるのですが、意外にも一番スタンダード(だと僕は思ってるのですが・・・(^^; )なNetBeans+Maven+GlassFishという組み合わせが無かったため、今回はコレで行きたいと思います。ちなみに、IntelliJ IDEA 14.1.3+Gradle 2.4+Payara 4.1.152.1でも、同様のコードで動作確認済みです。

サンプルコードはGitHubにアップしています。
MasatoshiTada/jBatch-sample · GitHub

今回の環境

OS XだとGlassFish 4.1が動かないという情報もあったのですが、今のところ問題なく動作しています。

やりたいこと

下記のような「test1.csv」を読み込んで、JPAエンティティに変換し、DBにINSERTする。

1,suzuki
2,tanaka
3,takahashi
4,aoyama
5,yamashiro
6,ishida
7,naoki
8,ochiai
9,kubota
10,kaneda
11,hino
12,kobayakawa
13,uehara
14,fujita
15,takeshita

プロジェクトの作成

NetBeansで[Maven]-[Webアプリケーション]で作成します。
pom.xmlには、下記のような依存性を追加します。プロジェクト作成直後は「javaee-web-api」になっていますが、今回はjBatchを使うので下記のように「javaee-api」に修正してください。
これは、jBatchがJava EE Web Profileではなく、Full Profileに含まれているためです。

    <dependencies>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.35</version>
        </dependency>
    </dependencies>

DB・JPAの準備

テーブルの作成

データベースに「test」というテーブルを作成してください。

CREATE TABLE TEST (
    ID INTEGER NOT NULL, 
    NAME VARCHAR(255), 
    PRIMARY KEY (ID))

もし、僕のサンプルコードを「git clone」してお使いになる場合は、後述のpersistence.xmlにエンティティクラスからDBにテーブルを自動生成する設定を記述していますので、この手順は不要です。

GlassFishのコネクションプーリング設定

詳しい手順はこちらへ→GlassFish 4.1でのJNDIコネクションプーリングの設定方法 - Java EE 事始め!
今回は「jdbc/mysqlTest」という名前にします。

JPAエンティティ

NetBeansを使えば、テーブルからエンティティクラスおよびpersistence.xmlを自動生成することが可能です。
詳しい手順はこちらへ→NetBeansとMavenでJPAプロジェクトの作成~エンティティ自動生成まで - Java EE 事始め!

エンティティ

こんなエンティティが自動生成されます。

@Entity
public class Test implements Serializable {
    @Id
    @Basic(optional = false)
    @NotNull
    private Integer id;
    @Size(max = 20)
    private String name;

    public Test() {
    }

    public Test(Integer id, String name) {
        this.id = id;
        this.name = name;
    }
    // setter/getter/equals/hashCode省略
}
persistence.xml

大半の記述は自動生成されますが、ログ出力などの設定を追記してください。

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
  <persistence-unit name="testPU" transaction-type="JTA">
    <provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>
    <!-- GlassFishに設定したJDBCリソース名 -->
    <jta-data-source>jdbc/mysqlTest</jta-data-source>
    <exclude-unlisted-classes>false</exclude-unlisted-classes>
    <shared-cache-mode>NONE</shared-cache-mode>
    <validation-mode>NONE</validation-mode>
    <properties>
        <!-- エンティティからのテーブル自動生成 -->
        <property name="javax.persistence.schema-generation.database.action" value="create"/>
        <!-- ログ出力 -->
        <property name="eclipselink.logging.level" value="ALL"/>
        <!-- DB製品の指定(必須ではない) -->
        <property name="eclipselink.target-database" value="MySQL"/>
        <!-- 一括追加の設定 -->
        <property name="eclipselink.jdbc.batch-writing" value="JDBC"/>
        <!-- 一括追加サイズの設定 -->
        <property name="eclipselink.jdbc.batch-writing.size" value="5"/>
    </properties>
  </persistence-unit>
</persistence>

Producerクラスの作成

JPAのEntityManagerを他のクラスにインジェクトできるように、Producerクラスを作成します。

import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Produces;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@Dependent
public class JpaProducer {
    @PersistenceContext(unitName = "testPU")
    private EntityManager em;
    
    @Produces
    public EntityManager getEntityManager() {
        return em;
    }
}

バッチクラスの作成

お待たせしました。ここからがjBatchのコードです。
今回は「chunk方式」という、「読み込み」「処理」「書き込み」の3つで処理を行う方式でやります。

読み込みクラス

前述の「test1.csv」を読み込みます。

import java.io.BufferedReader;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import javax.batch.api.chunk.ItemReader;
import javax.batch.runtime.context.JobContext;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;

@Named
@Dependent
public class MyItemReader implements ItemReader {

    @Inject
    private JobContext jobContext;
    
    private BufferedReader br;
    
    @Override
    public void open(Serializable checkpoint) throws Exception {
        System.out.println("ItemReader open.");
        String dirName = jobContext.getProperties().getProperty("dir");
        String fileName = jobContext.getProperties().getProperty("input_file");
        br = Files.newBufferedReader(Paths.get(dirName, fileName), StandardCharsets.UTF_8);
    }

    @Override
    public void close() throws Exception {
        System.out.println("ItemReader close.");
        br.close();
    }

    @Override
    public Object readItem() throws Exception {
        String data = br.readLine();
        System.out.println("Reader readItem : " + data);
        return data;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        System.out.println("ItemReader checkpoint.");
        return null;
    }
    
}

後述しますが、@Namedと@Dependentを忘れないようにしてください。

処理クラス

読み込みクラスでCSVから読んだ1行1行を、Testエンティティに変換します。

import javax.annotation.PostConstruct;
import javax.batch.api.chunk.ItemProcessor;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
import sample.entity.Test;

@Named
@Dependent
public class MyItemProcessor implements ItemProcessor {

    @PostConstruct
    public void init() {
        System.out.println("ItemProcessor init.");
    }
    
    @Override
    public Object processItem(Object item) throws Exception {
        String line = (String) item;
        System.out.println("Processor processItem : " + line);
        String[] values = line.split(",");
        int id = Integer.parseInt(values[0]);
        String name = values[1];
        Test test = new Test(id, name);
        return test;
    }
    
}

こちらも、@Namedと@Dependentを忘れないようにしてください。

書き込みクラス

変換したTestエンティティを、DBにINSERTします。

import java.io.Serializable;
import java.util.List;
import javax.batch.api.chunk.ItemWriter;
import javax.batch.runtime.context.JobContext;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;

@Named
@Dependent
public class MyItemWriter implements ItemWriter {

    @Inject
    private JobContext jobContext;
    
    @Inject
    private EntityManager em;
    
    @Override
    public void open(Serializable checkpoint) throws Exception {
        System.out.println("ItemWriter open.");
    }

    @Override
    public void close() throws Exception {
        System.out.println("ItemWriter close.");
    }

    @Override
    public void writeItems(List<Object> items) throws Exception {
        System.out.println("ItemWriter writeItem : " + items);
        items.forEach(item -> em.persist(item));
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        System.out.println("ItemWriter checkpoint.");
        return null;
    }
    
}

くどいようですが、@Namedと@Dependentを忘れないようにしてください。

ジョブXMLの作成

バッチジョブを定義するXMLを書きます。
XMLを置くディレクトリは、「src/main/resources/META-INF/batch-jobs」です。
ここに、「my-batch-job.xml」という名前で下記のようなXMLを作成します。

<?xml version="1.0" encoding="UTF-8"?>
<job id="my-batch-job"
     xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <properties>
        <property name="dir" value="CSVを置いているディレクトリのパス"/>
        <property name="input_file" value="test1.csv"/>
    </properties>

    <step id="first-step">
        <chunk item-count="5">
            <reader    ref="myItemReader"/>
            <processor ref="myItemProcessor"/>
            <writer    ref="myItemWriter"/>
        </chunk>
    </step>
</job>

「job」がルート要素で、その中に1つ以上の「step」があります。1つのstepは、chunk方式では「reader」「processor」「writer」から成り立ちます。
ref属性には、「各クラスの完全修飾名」(パッケージ付きクラス名:FQCN)または「@Namedで指定した名前」を記述しますが、後者にしないとDIできずにNullPointerExceptionが発生します。
僕はここで詰まっていたのですが、前述のかずひらさんのブログで解決しました。
はじめてのjBatch - CLOVER


ちなみに@Namedアノテーションに何も属性を指定しなかった場合は、「クラスの単純名の頭文字を小文字にしたもの」が名前になります。
すなわち、下記のコードは全て同じ意味になります。

@Named
public class HogeClass {}
@Named("hogeClass")
public class HogeClass {}
@Named(value = "hogeClass")
public class HogeClass {}

もちろん、明示的に任意の名前を指定することも可能です。

@Named("hoge")
public class HogeClass {}


今回はitem-count属性に「5」を指定しています。これにより、「CSVを1行読み込み→それをTestエンティティに変換」を5回繰り返した後、その5つのエンティティを一気にDBへ書き込み、という流れになります。これの流れを、読み込みレコードが無くなるまで(=読み込みクラスのreadItem()がnullを返すまで)繰り返します。

ジョブを起動する

ジョブを起動する方法は色々あり、EJBタイマー・JAX-RSリソース・サーブレット・SE環境ではmain()メソッドで可能です。
今回は手軽さ重視でサーブレットから起動します。

import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;

@WebServlet(urlPatterns = "/batch-start")
public class BatchStartServlet extends HttpServlet {

    @Override
    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        // ジョブの起動
        JobOperator jobOperator = BatchRuntime.getJobOperator();
        long executionId = jobOperator.start("my-batch-job", null);

        PrintWriter out = response.getWriter();
        out.println("start!!!");
    }
    
}

極端に言えば、このコードを書ける場所であれば、どこからでもジョブを起動できるということです。

JobOperator jobOperator = BatchRuntime.getJobOperator();
long executionId = jobOperator.start("my-batch-job", null);

「executionId」は、ジョブを一時停止する時やリスタートする時に使うのですが、今回は特に使っていません。

いざ、実行!

web.xmlに、ウェルカムページの設定を書いておきます。

<web-app version="3.1" xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee 
                http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd">
    <welcome-file-list>
        <welcome-file>batch-start</welcome-file>
    </welcome-file-list>
</web-app>

NetBeansからプロジェクトを右クリック→[実行]で、GlassFish起動→WARデプロイ→ブラウザが起動してウェルカムページの表示、までやってくれます。

実行ログ

情報:   ItemProcessor init.
情報:   JTS5014: Recoverable JTS instance, serverId = [100]
情報:   ItemReader open.
情報:   ItemWriter open.
情報:   Reader readItem : 1,suzuki
情報:   Processor processItem : 1,suzuki
情報:   Reader readItem : 2,tanaka
情報:   Processor processItem : 2,tanaka
情報:   Reader readItem : 3,takahashi
情報:   Processor processItem : 3,takahashi
情報:   Reader readItem : 4,aoyama
情報:   Processor processItem : 4,aoyama
情報:   Reader readItem : 5,yamashiro
情報:   Processor processItem : 5,yamashiro
情報:   ItemWriter writeItem : [Test[ id = 1, name = suzuki ], Test[ id = 2, name = tanaka ], Test[ id = 3, name = takahashi ], Test[ id = 4, name = aoyama ], Test[ id = 5, name = yamashiro ]]
詳細:   client acquired: 846621209
詳細:   TX binding to tx mgr, status=STATUS_ACTIVE
詳細:   acquire unit of work: 631199463
最も詳細:   persist() operation called on: Test[ id = 1, name = suzuki ].
最も詳細:   persist() operation called on: Test[ id = 2, name = tanaka ].
最も詳細:   persist() operation called on: Test[ id = 3, name = takahashi ].
最も詳細:   persist() operation called on: Test[ id = 4, name = aoyama ].
最も詳細:   persist() operation called on: Test[ id = 5, name = yamashiro ].
情報:   ItemReader checkpoint.
情報:   ItemWriter checkpoint.
詳細:   TX beforeCompletion callback, status=STATUS_ACTIVE
詳細:   begin unit of work commit
詳細:   TX beginTransaction, status=STATUS_ACTIVE
最も詳細:   Execute query InsertObjectQuery(Test[ id = 5, name = yamashiro ])
最も詳細:   Connection acquired from connection pool [default].
最も詳細:   Execute query InsertObjectQuery(Test[ id = 4, name = aoyama ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 1, name = suzuki ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 3, name = takahashi ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 2, name = tanaka ])
最も詳細:   reconnecting to external connection pool
詳細:   Begin batch statements
普通:   INSERT INTO TEST (ID, NAME) VALUES (?, ?)
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
詳細:   End Batch Statements
最も詳細:   Connection released to connection pool [default].
詳細:   TX afterCompletion callback, status=COMMITTED
詳細:   end unit of work commit
詳細:   release unit of work
詳細:   client released
情報:   Reader readItem : 6,ishida
情報:   Processor processItem : 6,ishida
情報:   Reader readItem : 7,naoki
情報:   Processor processItem : 7,naoki
詳細:   client acquired: 1918580639
情報:   Reader readItem : 8,ochiai
詳細:   TX binding to tx mgr, status=STATUS_ACTIVE
情報:   Processor processItem : 8,ochiai
情報:   Reader readItem : 9,kubota
詳細:   acquire unit of work: 653950772
情報:   Processor processItem : 9,kubota
最も詳細:   persist() operation called on: Test[ id = 6, name = ishida ].
情報:   Reader readItem : 10,kaneda
最も詳細:   persist() operation called on: Test[ id = 7, name = naoki ].
情報:   Processor processItem : 10,kaneda
最も詳細:   persist() operation called on: Test[ id = 8, name = ochiai ].
情報:   ItemWriter writeItem : [Test[ id = 6, name = ishida ], Test[ id = 7, name = naoki ], Test[ id = 8, name = ochiai ], Test[ id = 9, name = kubota ], Test[ id = 10, name = kaneda ]]
最も詳細:   persist() operation called on: Test[ id = 9, name = kubota ].
最も詳細:   persist() operation called on: Test[ id = 10, name = kaneda ].
情報:   ItemReader checkpoint.
情報:   ItemWriter checkpoint.
詳細:   TX beforeCompletion callback, status=STATUS_ACTIVE
詳細:   begin unit of work commit
詳細:   TX beginTransaction, status=STATUS_ACTIVE
最も詳細:   Execute query InsertObjectQuery(Test[ id = 6, name = ishida ])
最も詳細:   Connection acquired from connection pool [default].
最も詳細:   Execute query InsertObjectQuery(Test[ id = 7, name = naoki ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 10, name = kaneda ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 9, name = kubota ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 8, name = ochiai ])
最も詳細:   reconnecting to external connection pool
詳細:   Begin batch statements
普通:   INSERT INTO TEST (ID, NAME) VALUES (?, ?)
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
詳細:   End Batch Statements
最も詳細:   Connection released to connection pool [default].
詳細:   TX afterCompletion callback, status=COMMITTED
詳細:   end unit of work commit
詳細:   release unit of work
詳細:   client released
情報:   Reader readItem : 11,hino
情報:   Processor processItem : 11,hino
情報:   Reader readItem : 12,kobayakawa
情報:   Processor processItem : 12,kobayakawa
情報:   Reader readItem : 13,uehara
詳細:   client acquired: 1975411799
情報:   Processor processItem : 13,uehara
情報:   Reader readItem : 14,fujita
情報:   Processor processItem : 14,fujita
情報:   Reader readItem : 15,takeshita
情報:   Processor processItem : 15,takeshita
情報:   ItemWriter writeItem : [Test[ id = 11, name = hino ], Test[ id = 12, name = kobayakawa ], Test[ id = 13, name = uehara ], Test[ id = 14, name = fujita ], Test[ id = 15, name = takeshita ]]
詳細:   TX binding to tx mgr, status=STATUS_ACTIVE
詳細:   acquire unit of work: 2116586805
最も詳細:   persist() operation called on: Test[ id = 11, name = hino ].
最も詳細:   persist() operation called on: Test[ id = 12, name = kobayakawa ].
最も詳細:   persist() operation called on: Test[ id = 13, name = uehara ].
最も詳細:   persist() operation called on: Test[ id = 14, name = fujita ].
最も詳細:   persist() operation called on: Test[ id = 15, name = takeshita ].
情報:   ItemReader checkpoint.
情報:   ItemWriter checkpoint.
詳細:   TX beforeCompletion callback, status=STATUS_ACTIVE
詳細:   begin unit of work commit
詳細:   TX beginTransaction, status=STATUS_ACTIVE
最も詳細:   Execute query InsertObjectQuery(Test[ id = 14, name = fujita ])
最も詳細:   Connection acquired from connection pool [default].
最も詳細:   Execute query InsertObjectQuery(Test[ id = 13, name = uehara ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 15, name = takeshita ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 12, name = kobayakawa ])
最も詳細:   Execute query InsertObjectQuery(Test[ id = 11, name = hino ])
最も詳細:   reconnecting to external connection pool
詳細:   Begin batch statements
普通:   INSERT INTO TEST (ID, NAME) VALUES (?, ?)
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
普通:   	bind => [2 parameters bound]
詳細:   End Batch Statements
最も詳細:   Connection released to connection pool [default].
詳細:   TX afterCompletion callback, status=COMMITTED
詳細:   end unit of work commit
詳細:   release unit of work
詳細:   client released
情報:   Reader readItem : null
情報:   ItemReader checkpoint.
情報:   ItemWriter checkpoint.
情報:   ItemReader close.
情報:   ItemWriter close.

5件ずつ読み込んで、DBにINSERTしているのが分かります。

DB

mysql> select * from test;
 ID   NAME       
   1   suzuki     
   2   tanaka     
   3   takahashi  
   4   aoyama     
   5   yamashiro  
   6   ishida     
   7   naoki      
   8   ochiai     
   9   kubota     
  10   kaneda     
  11   hino       
  12   kobayakawa 
  13   uehara     
  14   fujita     
  15   takeshita  

確かに、DBへの追加が出来ています。

他にもjBatchで出来ること

今回は1つのジョブしか作っていませんが、複数のジョブを作成することも可能です。
その場合は、ジョブXMLを同じフォルダ内に複数ファイル作ることになります。
また、今回は1つのジョブに1つのステップしか作っていませんが、通常は1つのジョブは複数のステップから成り立ちます。
それら複数のステップは、条件分岐(Decision)したり、ステップを並列で実行(Split)することもできます。
さらに、例外発生時には指定したチェックポイントから読み込みを再開したり、例外が発生したレコードはスキップしたりすることもできます。

まとめ

jBatchは今まで触ったことが無かったのですが、思いのほか簡単でした。CDIでちょっと詰まりましたが(^^;
今後、もう少し複雑なサンプルを作って、分岐・並列・例外処理なども試してみようと思います!
今日はここまで。