jBatch(Java EE 7)をNetBeans+GlassFish+Mavenではじめました
jBatchとは?
Java EE 7から新たに加わった、バッチ処理用フレームワークの仕様です。
EE 6までは、Java EE標準仕様にバッチは入っていなかったのですが、EE 7から加わりました。
正式名称は「Batch Application for the Java Platform」で、公式?の略称が「jBatch」です。
Spring Frameworkには、以前から「Spring Batch」というものがあったのですが、これを基にjBatchの仕様が策定されたようです。
また、Spring Batch自身も、jBatch準拠になっているようです。
jBatchの実装
jBatchに限らず、Java EEは仕様のみ(ほとんどインターフェイス・アノテーション・例外・enum)なので、jBatchを使うためには、それに対応する「実装」が必要です。
僕が調べた限りでは、今のところ実装は下記の3つです。
jBatchに関する情報
まず最初に見るべき情報
Java Day Tokyo 2015の上妻さんの発表「jBatch実践入門」を見ることをお勧めします。
PDFおよびYouTubeで見ることができます。とても分かりやすくまとまっている、素晴らしい資料・発表です。
http://www.oracle.com/technetwork/jp/ondemand/online2015-javaday-2511676-ja.html
日本語ブログ情報
多くの方がブログに書いていらっしゃいます。
jBatch(JSR-352) on Java SE 環境 | 寺田 佳央 - Yoshio Terada
The Java EE 7 TutorialのjBatchの章をテキトーに訳した - kagamihogeの日記
JSR352-Batch Applicationを試してみた(Batchlet編) - しおしお(´・ω・`)
JSR352-Batch Applicationを試してみた(BatchletでDBアクセス-JPA編) - しおしお(´・ω・`)
はじめてのjBatch - CLOVER
こちらはSpring Batchの情報なのですが、jBatchと似ているので参考になると思います。
01.Spring Batchの基本概念(ステップ) - soracane
公式情報(英語)
僕もまだしっかりとは目を通せていないのですが、JSRとチュートリアルは読んだ方がよいでしょう。
JSR 352 JSR-000352 Batch Applications for the Java Platform 1.0, Revision A Maintenance Release for Evaluation
Java EE 7 Tutorial 55 Batch Processing (Release 7)
あと、洋書だとArun Guptaさんの「Java EE 7 Essentials」にjBatchの章があります。僕はKindle版を持ってます。
Amazon.co.jp: Java EE 7 Essentials 電子書籍: Arun Gupta: Kindleストア
jBatchをはじめよう!
さて、説明はこれくらいにして、具体的なサンプルコードの説明に入ります。
上記のように、既に何人かの方がブログに書いていらっしゃるのですが、意外にも一番スタンダード(だと僕は思ってるのですが・・・(^^; )なNetBeans+Maven+GlassFishという組み合わせが無かったため、今回はコレで行きたいと思います。ちなみに、IntelliJ IDEA 14.1.3+Gradle 2.4+Payara 4.1.152.1でも、同様のコードで動作確認済みです。
サンプルコードはGitHubにアップしています。
MasatoshiTada/jBatch-sample · GitHub
今回の環境
やりたいこと
下記のような「test1.csv」を読み込んで、JPAエンティティに変換し、DBにINSERTする。
1,suzuki 2,tanaka 3,takahashi 4,aoyama 5,yamashiro 6,ishida 7,naoki 8,ochiai 9,kubota 10,kaneda 11,hino 12,kobayakawa 13,uehara 14,fujita 15,takeshita
プロジェクトの作成
NetBeansで[Maven]-[Webアプリケーション]で作成します。
pom.xmlには、下記のような依存性を追加します。プロジェクト作成直後は「javaee-web-api」になっていますが、今回はjBatchを使うので下記のように「javaee-api」に修正してください。
これは、jBatchがJava EE Web Profileではなく、Full Profileに含まれているためです。
<dependencies> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.35</version> </dependency> </dependencies>
DB・JPAの準備
テーブルの作成
データベースに「test」というテーブルを作成してください。
CREATE TABLE TEST ( ID INTEGER NOT NULL, NAME VARCHAR(255), PRIMARY KEY (ID))
もし、僕のサンプルコードを「git clone」してお使いになる場合は、後述のpersistence.xmlにエンティティクラスからDBにテーブルを自動生成する設定を記述していますので、この手順は不要です。
GlassFishのコネクションプーリング設定
詳しい手順はこちらへ→GlassFish 4.1でのJNDIコネクションプーリングの設定方法 - Java EE 事始め!
今回は「jdbc/mysqlTest」という名前にします。
JPAエンティティ
NetBeansを使えば、テーブルからエンティティクラスおよびpersistence.xmlを自動生成することが可能です。
詳しい手順はこちらへ→NetBeansとMavenでJPAプロジェクトの作成~エンティティ自動生成まで - Java EE 事始め!
エンティティ
こんなエンティティが自動生成されます。
@Entity public class Test implements Serializable { @Id @Basic(optional = false) @NotNull private Integer id; @Size(max = 20) private String name; public Test() { } public Test(Integer id, String name) { this.id = id; this.name = name; } // setter/getter/equals/hashCode省略 }
persistence.xml
大半の記述は自動生成されますが、ログ出力などの設定を追記してください。
<?xml version="1.0" encoding="UTF-8"?> <persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"> <persistence-unit name="testPU" transaction-type="JTA"> <provider>org.eclipse.persistence.jpa.PersistenceProvider</provider> <!-- GlassFishに設定したJDBCリソース名 --> <jta-data-source>jdbc/mysqlTest</jta-data-source> <exclude-unlisted-classes>false</exclude-unlisted-classes> <shared-cache-mode>NONE</shared-cache-mode> <validation-mode>NONE</validation-mode> <properties> <!-- エンティティからのテーブル自動生成 --> <property name="javax.persistence.schema-generation.database.action" value="create"/> <!-- ログ出力 --> <property name="eclipselink.logging.level" value="ALL"/> <!-- DB製品の指定(必須ではない) --> <property name="eclipselink.target-database" value="MySQL"/> <!-- 一括追加の設定 --> <property name="eclipselink.jdbc.batch-writing" value="JDBC"/> <!-- 一括追加サイズの設定 --> <property name="eclipselink.jdbc.batch-writing.size" value="5"/> </properties> </persistence-unit> </persistence>
Producerクラスの作成
JPAのEntityManagerを他のクラスにインジェクトできるように、Producerクラスを作成します。
import javax.enterprise.context.Dependent; import javax.enterprise.inject.Produces; import javax.persistence.EntityManager; import javax.persistence.PersistenceContext; @Dependent public class JpaProducer { @PersistenceContext(unitName = "testPU") private EntityManager em; @Produces public EntityManager getEntityManager() { return em; } }
バッチクラスの作成
お待たせしました。ここからがjBatchのコードです。
今回は「chunk方式」という、「読み込み」「処理」「書き込み」の3つで処理を行う方式でやります。
読み込みクラス
前述の「test1.csv」を読み込みます。
import java.io.BufferedReader; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import javax.batch.api.chunk.ItemReader; import javax.batch.runtime.context.JobContext; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.inject.Named; @Named @Dependent public class MyItemReader implements ItemReader { @Inject private JobContext jobContext; private BufferedReader br; @Override public void open(Serializable checkpoint) throws Exception { System.out.println("ItemReader open."); String dirName = jobContext.getProperties().getProperty("dir"); String fileName = jobContext.getProperties().getProperty("input_file"); br = Files.newBufferedReader(Paths.get(dirName, fileName), StandardCharsets.UTF_8); } @Override public void close() throws Exception { System.out.println("ItemReader close."); br.close(); } @Override public Object readItem() throws Exception { String data = br.readLine(); System.out.println("Reader readItem : " + data); return data; } @Override public Serializable checkpointInfo() throws Exception { System.out.println("ItemReader checkpoint."); return null; } }
後述しますが、@Namedと@Dependentを忘れないようにしてください。
処理クラス
読み込みクラスでCSVから読んだ1行1行を、Testエンティティに変換します。
import javax.annotation.PostConstruct; import javax.batch.api.chunk.ItemProcessor; import javax.enterprise.context.Dependent; import javax.inject.Named; import sample.entity.Test; @Named @Dependent public class MyItemProcessor implements ItemProcessor { @PostConstruct public void init() { System.out.println("ItemProcessor init."); } @Override public Object processItem(Object item) throws Exception { String line = (String) item; System.out.println("Processor processItem : " + line); String[] values = line.split(","); int id = Integer.parseInt(values[0]); String name = values[1]; Test test = new Test(id, name); return test; } }
こちらも、@Namedと@Dependentを忘れないようにしてください。
書き込みクラス
変換したTestエンティティを、DBにINSERTします。
import java.io.Serializable; import java.util.List; import javax.batch.api.chunk.ItemWriter; import javax.batch.runtime.context.JobContext; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.inject.Named; import javax.persistence.EntityManager; @Named @Dependent public class MyItemWriter implements ItemWriter { @Inject private JobContext jobContext; @Inject private EntityManager em; @Override public void open(Serializable checkpoint) throws Exception { System.out.println("ItemWriter open."); } @Override public void close() throws Exception { System.out.println("ItemWriter close."); } @Override public void writeItems(List<Object> items) throws Exception { System.out.println("ItemWriter writeItem : " + items); items.forEach(item -> em.persist(item)); } @Override public Serializable checkpointInfo() throws Exception { System.out.println("ItemWriter checkpoint."); return null; } }
くどいようですが、@Namedと@Dependentを忘れないようにしてください。
ジョブXMLの作成
バッチジョブを定義するXMLを書きます。
XMLを置くディレクトリは、「src/main/resources/META-INF/batch-jobs」です。
ここに、「my-batch-job.xml」という名前で下記のようなXMLを作成します。
<?xml version="1.0" encoding="UTF-8"?> <job id="my-batch-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="dir" value="CSVを置いているディレクトリのパス"/> <property name="input_file" value="test1.csv"/> </properties> <step id="first-step"> <chunk item-count="5"> <reader ref="myItemReader"/> <processor ref="myItemProcessor"/> <writer ref="myItemWriter"/> </chunk> </step> </job>
「job」がルート要素で、その中に1つ以上の「step」があります。1つのstepは、chunk方式では「reader」「processor」「writer」から成り立ちます。
ref属性には、「各クラスの完全修飾名」(パッケージ付きクラス名:FQCN)または「@Namedで指定した名前」を記述しますが、後者にしないとDIできずにNullPointerExceptionが発生します。
僕はここで詰まっていたのですが、前述のかずひらさんのブログで解決しました。
はじめてのjBatch - CLOVER
ちなみに@Namedアノテーションに何も属性を指定しなかった場合は、「クラスの単純名の頭文字を小文字にしたもの」が名前になります。
すなわち、下記のコードは全て同じ意味になります。
@Named public class HogeClass {}
@Named("hogeClass") public class HogeClass {}
@Named(value = "hogeClass") public class HogeClass {}
もちろん、明示的に任意の名前を指定することも可能です。
@Named("hoge") public class HogeClass {}
今回はitem-count属性に「5」を指定しています。これにより、「CSVを1行読み込み→それをTestエンティティに変換」を5回繰り返した後、その5つのエンティティを一気にDBへ書き込み、という流れになります。これの流れを、読み込みレコードが無くなるまで(=読み込みクラスのreadItem()がnullを返すまで)繰り返します。
ジョブを起動する
ジョブを起動する方法は色々あり、EJBタイマー・JAX-RSリソース・サーブレット・SE環境ではmain()メソッドで可能です。
今回は手軽さ重視でサーブレットから起動します。
import javax.batch.operations.JobOperator; import javax.batch.runtime.BatchRuntime; @WebServlet(urlPatterns = "/batch-start") public class BatchStartServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // ジョブの起動 JobOperator jobOperator = BatchRuntime.getJobOperator(); long executionId = jobOperator.start("my-batch-job", null); PrintWriter out = response.getWriter(); out.println("start!!!"); } }
極端に言えば、このコードを書ける場所であれば、どこからでもジョブを起動できるということです。
JobOperator jobOperator = BatchRuntime.getJobOperator(); long executionId = jobOperator.start("my-batch-job", null);
「executionId」は、ジョブを一時停止する時やリスタートする時に使うのですが、今回は特に使っていません。
いざ、実行!
web.xmlに、ウェルカムページの設定を書いておきます。
<web-app version="3.1" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"> <welcome-file-list> <welcome-file>batch-start</welcome-file> </welcome-file-list> </web-app>
NetBeansからプロジェクトを右クリック→[実行]で、GlassFish起動→WARデプロイ→ブラウザが起動してウェルカムページの表示、までやってくれます。
実行ログ
情報: ItemProcessor init. 情報: JTS5014: Recoverable JTS instance, serverId = [100] 情報: ItemReader open. 情報: ItemWriter open. 情報: Reader readItem : 1,suzuki 情報: Processor processItem : 1,suzuki 情報: Reader readItem : 2,tanaka 情報: Processor processItem : 2,tanaka 情報: Reader readItem : 3,takahashi 情報: Processor processItem : 3,takahashi 情報: Reader readItem : 4,aoyama 情報: Processor processItem : 4,aoyama 情報: Reader readItem : 5,yamashiro 情報: Processor processItem : 5,yamashiro 情報: ItemWriter writeItem : [Test[ id = 1, name = suzuki ], Test[ id = 2, name = tanaka ], Test[ id = 3, name = takahashi ], Test[ id = 4, name = aoyama ], Test[ id = 5, name = yamashiro ]] 詳細: client acquired: 846621209 詳細: TX binding to tx mgr, status=STATUS_ACTIVE 詳細: acquire unit of work: 631199463 最も詳細: persist() operation called on: Test[ id = 1, name = suzuki ]. 最も詳細: persist() operation called on: Test[ id = 2, name = tanaka ]. 最も詳細: persist() operation called on: Test[ id = 3, name = takahashi ]. 最も詳細: persist() operation called on: Test[ id = 4, name = aoyama ]. 最も詳細: persist() operation called on: Test[ id = 5, name = yamashiro ]. 情報: ItemReader checkpoint. 情報: ItemWriter checkpoint. 詳細: TX beforeCompletion callback, status=STATUS_ACTIVE 詳細: begin unit of work commit 詳細: TX beginTransaction, status=STATUS_ACTIVE 最も詳細: Execute query InsertObjectQuery(Test[ id = 5, name = yamashiro ]) 最も詳細: Connection acquired from connection pool [default]. 最も詳細: Execute query InsertObjectQuery(Test[ id = 4, name = aoyama ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 1, name = suzuki ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 3, name = takahashi ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 2, name = tanaka ]) 最も詳細: reconnecting to external connection pool 詳細: Begin batch statements 普通: INSERT INTO TEST (ID, NAME) VALUES (?, ?) 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 詳細: End Batch Statements 最も詳細: Connection released to connection pool [default]. 詳細: TX afterCompletion callback, status=COMMITTED 詳細: end unit of work commit 詳細: release unit of work 詳細: client released 情報: Reader readItem : 6,ishida 情報: Processor processItem : 6,ishida 情報: Reader readItem : 7,naoki 情報: Processor processItem : 7,naoki 詳細: client acquired: 1918580639 情報: Reader readItem : 8,ochiai 詳細: TX binding to tx mgr, status=STATUS_ACTIVE 情報: Processor processItem : 8,ochiai 情報: Reader readItem : 9,kubota 詳細: acquire unit of work: 653950772 情報: Processor processItem : 9,kubota 最も詳細: persist() operation called on: Test[ id = 6, name = ishida ]. 情報: Reader readItem : 10,kaneda 最も詳細: persist() operation called on: Test[ id = 7, name = naoki ]. 情報: Processor processItem : 10,kaneda 最も詳細: persist() operation called on: Test[ id = 8, name = ochiai ]. 情報: ItemWriter writeItem : [Test[ id = 6, name = ishida ], Test[ id = 7, name = naoki ], Test[ id = 8, name = ochiai ], Test[ id = 9, name = kubota ], Test[ id = 10, name = kaneda ]] 最も詳細: persist() operation called on: Test[ id = 9, name = kubota ]. 最も詳細: persist() operation called on: Test[ id = 10, name = kaneda ]. 情報: ItemReader checkpoint. 情報: ItemWriter checkpoint. 詳細: TX beforeCompletion callback, status=STATUS_ACTIVE 詳細: begin unit of work commit 詳細: TX beginTransaction, status=STATUS_ACTIVE 最も詳細: Execute query InsertObjectQuery(Test[ id = 6, name = ishida ]) 最も詳細: Connection acquired from connection pool [default]. 最も詳細: Execute query InsertObjectQuery(Test[ id = 7, name = naoki ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 10, name = kaneda ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 9, name = kubota ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 8, name = ochiai ]) 最も詳細: reconnecting to external connection pool 詳細: Begin batch statements 普通: INSERT INTO TEST (ID, NAME) VALUES (?, ?) 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 詳細: End Batch Statements 最も詳細: Connection released to connection pool [default]. 詳細: TX afterCompletion callback, status=COMMITTED 詳細: end unit of work commit 詳細: release unit of work 詳細: client released 情報: Reader readItem : 11,hino 情報: Processor processItem : 11,hino 情報: Reader readItem : 12,kobayakawa 情報: Processor processItem : 12,kobayakawa 情報: Reader readItem : 13,uehara 詳細: client acquired: 1975411799 情報: Processor processItem : 13,uehara 情報: Reader readItem : 14,fujita 情報: Processor processItem : 14,fujita 情報: Reader readItem : 15,takeshita 情報: Processor processItem : 15,takeshita 情報: ItemWriter writeItem : [Test[ id = 11, name = hino ], Test[ id = 12, name = kobayakawa ], Test[ id = 13, name = uehara ], Test[ id = 14, name = fujita ], Test[ id = 15, name = takeshita ]] 詳細: TX binding to tx mgr, status=STATUS_ACTIVE 詳細: acquire unit of work: 2116586805 最も詳細: persist() operation called on: Test[ id = 11, name = hino ]. 最も詳細: persist() operation called on: Test[ id = 12, name = kobayakawa ]. 最も詳細: persist() operation called on: Test[ id = 13, name = uehara ]. 最も詳細: persist() operation called on: Test[ id = 14, name = fujita ]. 最も詳細: persist() operation called on: Test[ id = 15, name = takeshita ]. 情報: ItemReader checkpoint. 情報: ItemWriter checkpoint. 詳細: TX beforeCompletion callback, status=STATUS_ACTIVE 詳細: begin unit of work commit 詳細: TX beginTransaction, status=STATUS_ACTIVE 最も詳細: Execute query InsertObjectQuery(Test[ id = 14, name = fujita ]) 最も詳細: Connection acquired from connection pool [default]. 最も詳細: Execute query InsertObjectQuery(Test[ id = 13, name = uehara ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 15, name = takeshita ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 12, name = kobayakawa ]) 最も詳細: Execute query InsertObjectQuery(Test[ id = 11, name = hino ]) 最も詳細: reconnecting to external connection pool 詳細: Begin batch statements 普通: INSERT INTO TEST (ID, NAME) VALUES (?, ?) 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 普通: bind => [2 parameters bound] 詳細: End Batch Statements 最も詳細: Connection released to connection pool [default]. 詳細: TX afterCompletion callback, status=COMMITTED 詳細: end unit of work commit 詳細: release unit of work 詳細: client released 情報: Reader readItem : null 情報: ItemReader checkpoint. 情報: ItemWriter checkpoint. 情報: ItemReader close. 情報: ItemWriter close.
5件ずつ読み込んで、DBにINSERTしているのが分かります。
DB
mysql> select * from test; ID NAME 1 suzuki 2 tanaka 3 takahashi 4 aoyama 5 yamashiro 6 ishida 7 naoki 8 ochiai 9 kubota 10 kaneda 11 hino 12 kobayakawa 13 uehara 14 fujita 15 takeshita
確かに、DBへの追加が出来ています。
他にもjBatchで出来ること
今回は1つのジョブしか作っていませんが、複数のジョブを作成することも可能です。
その場合は、ジョブXMLを同じフォルダ内に複数ファイル作ることになります。
また、今回は1つのジョブに1つのステップしか作っていませんが、通常は1つのジョブは複数のステップから成り立ちます。
それら複数のステップは、条件分岐(Decision)したり、ステップを並列で実行(Split)することもできます。
さらに、例外発生時には指定したチェックポイントから読み込みを再開したり、例外が発生したレコードはスキップしたりすることもできます。
まとめ
jBatchは今まで触ったことが無かったのですが、思いのほか簡単でした。CDIでちょっと詰まりましたが(^^;
今後、もう少し複雑なサンプルを作って、分岐・並列・例外処理なども試してみようと思います!
今日はここまで。