allow-restart-if-completeの挙動を確かめてみる
TweetPosted on Monday Feb 10, 2014 at 06:48PM in Technology
jbatchでは異常終了したジョブの再実行が可能である。再実行の際、正常終了したStepを再度実行するかどうかはジョブXML内のstep要素のallow-restart-if-complete属性で制御できる。これを実験してみる
環境
- WildFly8.0.0.Final
- その他はArquillianでジョブのテストをしてみると同じ
環境・前提条件
- Arquillianでジョブのテストをしてみるで作ったプロジェクトが普通に動いているものとする
準備
必要な資源の詳細はこのへんを参照
バッチ資源
aric.xml (ジョブXML)
- Stepが3つある
- あるジョブ引数を与えるとStep3で例外が起こりジョブが死ぬ
- Step1のみallow-restart-if-complete=trueになっている(デフォルトはfalse[1])
- Step1は再実行時も動く
- Step2は再実行時は動かない
ExceptionBatchlet.java
- 3つのStepは全部これを使っている
- @BatchPropertyで与えられる引数がtrueの場合例外を投げる
テスト用資源
AbstractJobTest.java
- 再実行用のメソッドがなかったので追加してある
ARICJobTest.java (テストクラス)
- テスト内容は以下
- 一旦ジョブを実行した後にexecutionIdを使って再実行する
- 初回実行時は異常終了(Step3で例外を出させる)
- 再実行時は正常終了(Step3で例外を出させない)
テストクラスを実行してみる
ログ
17:09:14,004 FINE [org.nailedtothex.jbatch.example.aric.ExceptionBatchlet] (batch-batch - 4) process(): stepName=step1, exception=null 17:09:14,008 FINE [org.nailedtothex.jbatch.example.aric.ExceptionBatchlet] (batch-batch - 4) process(): stepName=step2, exception=null 17:09:14,012 FINE [org.nailedtothex.jbatch.example.aric.ExceptionBatchlet] (batch-batch - 4) process(): stepName=step3, exception=true 17:09:14,013 WARN [org.jberet] (batch-batch - 4) JBERET000001: Failed to run batchlet org.jberet.job.model.RefArtifact@694eb7ca: java.lang.RuntimeException: true at org.nailedtothex.jbatch.example.aric.ExceptionBatchlet.process(ExceptionBatchlet.java:28) [classes:] at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:61) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:207) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:126) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:177) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:126) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:177) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51] at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51] at org.jboss.threads.JBossThread.run(JBossThread.java:122) 17:09:15,013 FINE [org.nailedtothex.jbatch.example.aric.ExceptionBatchlet] (batch-batch - 8) process(): stepName=step1, exception=null 17:09:15,017 FINE [org.nailedtothex.jbatch.example.aric.ExceptionBatchlet] (batch-batch - 8) process(): stepName=step3, exception=false
Repository
job_execution
jbatch=# select * from job_execution where jobexecutionid in (21, 22); jobexecutionid | jobinstanceid | version | createtime | starttime | endtime | lastupdatedtime | batchstatus | exitstatus | jobparameters | restartposition ----------------+---------------+---------+-------------------------+-------------------------+-------------------------+-------------------------+-------------+------------+---------------------+----------------- 21 | 20 | | 2014-02-13 17:09:14.001 | 2014-02-13 17:09:14.001 | 2014-02-13 17:09:14.016 | 2014-02-13 17:09:14.016 | FAILED | FAILED | failAtStep3 = true +| | | | | | | | | | | 22 | 20 | | 2014-02-13 17:09:15.01 | 2014-02-13 17:09:15.01 | 2014-02-13 17:09:15.019 | 2014-02-13 17:09:15.019 | COMPLETED | COMPLETED | failAtStep3 = false+| | | | | | | | | | | (2 rows) jbatch=#
step_execution
jbatch=# select * from step_execution where jobexecutionid in (21, 22) order by jobexecutionid, stepexecutionid; stepexecutionid | jobexecutionid | version | stepname | starttime | endtime | batchstatus | exitstatus | executionexception | persistentuserdata | readcount | writecount | commitcount | rollbackcount | readskipcount | processskipcount | filtercount | writeskipcount | readercheckpointinfo | writercheckpointinfo -----------------+----------------+---------+----------+-------------------------+-------------------------+-------------+------------+--------------------------------------------------------------------------------------------------------------------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+----------------------+---------------------- 34 | 21 | | step1 | 2014-02-13 17:09:14.002 | 2014-02-13 17:09:14.005 | COMPLETED | SUCCESS | | | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | 35 | 21 | | step2 | 2014-02-13 17:09:14.007 | 2014-02-13 17:09:14.008 | COMPLETED | SUCCESS | | | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | 36 | 21 | | step3 | 2014-02-13 17:09:14.011 | 2014-02-13 17:09:14.014 | FAILED | FAILED | java.lang.RuntimeException: true +| | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | | | | | | | | | at org.nailedtothex.jbatch.example.aric.ExceptionBatchlet.process(ExceptionBatchlet.java:28) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.BatchletRunner.run(BatchletRunner.java:61) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:207) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:126) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:177) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.CompositeExecutionRunner.runJobElement(CompositeExecutionRunner.java:126) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:177) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) +| | | | | | | | | | | | | | | | | | | at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) +| | | | | | | | | | | | | | | | | | | at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149)+| | | | | | | | | | | | | | | | | | | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) +| | | | | | | | | | | | | | | | | | | at java.util.concurrent.FutureTask.run(FutureTask.java:262) +| | | | | | | | | | | | | | | | | | | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) +| | | | | | | | | | | | | | | | | | | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) +| | | | | | | | | | | | | | | | | | | at java.lang.Thread.run(Thread.java:744) +| | | | | | | | | | | | | | | | | | | at org.jboss.threads.JBossThread.run(JBossThread.java:122) +| | | | | | | | | | | | | | | | | | | | | | | | | | | | | | 37 | 22 | | step1 | 2014-02-13 17:09:15.011 | 2014-02-13 17:09:15.013 | COMPLETED | SUCCESS | | | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | 38 | 22 | | step3 | 2014-02-13 17:09:15.016 | 2014-02-13 17:09:15.017 | COMPLETED | SUCCESS | | | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | (5 rows) jbatch=#
- 問題無さげ
- 初回実行時はstep1, step2, step3が動いて異常終了
- 再実行時はstep1, step3が動いて正常終了
参考文献
Tags: jbatch
CLIからリモートEJB経由でジョブを操作してみる
TweetPosted on Monday Feb 10, 2014 at 06:44PM in Technology
JAX-RSなどを使ってREST APIを作る方が良いのだろうと思われるが、大変なのでリモートEJB経由でやってみる。さらにコマンドラインから叩きたいので、シェルスクリプトから蹴れるように少し作り込んでみる
WildFly - リモートEJB呼び出しを使ってみる を併せて読むと多少参考になるかも
環境
- WildFly8.0.0.CR1
- Oracle JDK7u51
- OS X 10.9.1
準備
プロジェクトを作る
下の3つを作る。ソースはGitHubに置いた
- JSR352APIのSerializableなJavaBeansを含むプロジェクト(jbatchif)
- リモートEJBとバッチの実装を含むプロジェクト(jbatch)
- クライアント側プロジェクト(jbatchcli)
JSR352APIのSerializableなJavaBeansを含むプロジェクト(jbatchif)
SimpleJobInstanceImpl.java, SimpleJobExecutionImpl.java, SimpleStepExecutionImpl.java, SimpleMetricImpl.java
JobInstance, JobExecution, StepExecution, MetricインタフェースのSerializableな実装。WildFlyのJSR352実装のこれらのクラスをそのまま使うとNotSerializableExceptionが出てしまうのでしかたなく作った(org.jberet.runtime.metric.StepMetricsが非Serializableらしい)。リモートEJB側とクライアント側の両方で使うので切り出しておく。フィールドとアクセサしかないただのJavaBeans。
リモートEJBとバッチの実装を含むプロジェクト(jbatch)
MyJobOperatorImpl.java
こいつをリモートEJB経由で呼び出す。throws宣言が若干気になるけど面倒なのでそのままにしておく
TestBatchlet.java, job001.xml
動かしてみる で使ったのと同じ
クライアント側プロジェクト(jbatchcli)
Main.java
メインクラス。引数でリモートEJBのJNDI名、ジョブ名、ジョブパラメータを受け取る。細かい処理はprocessorパッケージの下のクラスにやらせる。Commons CLIとか使って作り込んだ方がいいんだろうけど、現状ではかなり適当。
config.sh
シェルスクリプトの環境依存っぽいところを外だししてある
jbatch.sh
javaコマンドをたたくところ
jndi.properties
リモートEJBのlookupに必要な設定をする。ここを適当に書き換えればWildFly以外のAPサーバでもいけるはず
logging.properties
JBossのEJB呼び出し用ライブラリがデバッグ的な文言をINFOレベルで出力してくるのでログレベル上げて出ないようにしとく
デプロイする
CLIから叩く前にjbatchプロジェクトをデプロイする
CLIから叩いてみる
ジョブ一覧を表示してみる
kyle-no-MacBook:resources kyle$ ./jbatch.sh list-job-names Picked up _JAVA_OPTIONS: -Dfile.encoding=UTF-8 jobNames: [job001] kyle-no-MacBook:resources kyle$
jbatchランタイムに認識されたジョブXMLの一覧的なのが出る。job001しかないので1つだけだけど。
ジョブを起動してみる
kyle-no-MacBook:resources kyle$ ./jbatch.sh start job001 Picked up _JAVA_OPTIONS: -Dfile.encoding=UTF-8 executionId: 63 kyle-no-MacBook:resources kyle$
普通に動いている。WildFly側にもログが出てる。こんな感じ
まだジョブXML一覧と起動のところしか作ってない。停止とか再実行とかexecutionIdから情報見るとかはそのうち作り込む。
何故かなかなかjavaプロセスが終了しないことがあるのが気になる。1分位待ってると終わるんだけど。
参考文献
Tags: jbatch
Chunk方式のStepを使ってみる
TweetPosted on Monday Feb 10, 2014 at 06:42PM in Technology
jbatchでバルク処理を行う場合、Chunk-orientedという方式に合わせたインタフェース群を実装することが推奨されている。若干複雑なので、簡単な実装を作ってみて、ログや仕様を見つつ処理の流れを確認してみる。
JSR 352 Implementation - Connection close probl… | Communityによると、ここに書いてあるような実装はよろしくないようです。SELECT文の発行などは全て同一トランザクション内で完結する必要があるらしい。open()で取得するのはPKだけにして、実際のデータはreadItem()で別途取得するようにするなどした方が良いかも
トランザクション境界を跨いでJDBCの資源(Connection, Statement, ResultSet等)を使う場合は,非JTAデータソースを使いましょう.詳細はこちらを参照.
環境
- WildFly8.0.0.Final
- その他はArquillianでジョブのテストをしてみると同じ
前提条件
Arquillianでジョブのテストをしてみるで作ったプロジェクトが普通に動いているものとする
Chunk-oriented Processingとは
以下の3つのインタフェースで1つのStepが構成される。開発者はこれらのインタフェースを実装したクラスを作って1つのStepを作る
- ItemReader
- ItemProcessor
- ItemWriter
データの入力元と出力先が両方ともDBの場合、それぞれの役割と実装の仕方はざっくり以下のような感じになるかと思われる
ItemReader
SELECT文を実行して処理したい単位でデータを取り出しItemProcessorに渡す。
void open(Serializable)
SELECT文を発行するのが普通の使い方かと思われる。
バッチが初回実行の場合、引数はnullだが、途中でバッチが死んだ後の再実行の場合、checkpointInfo()で返したカウンタが引数で渡されるので、これをうまく使えば処理済みのデータをスキップして途中から処理を再開させられる。
JPAを使う場合
データ量が少ない場合はEntityManagerを使いQueryを作ってgetResultList()を呼ぶ。データ量が多い場合、[4][5][8]あたりを読むと良いかも
JDBCを使う場合
DataSourceを使いConnection, Statement, ResultSetを作る。データ量が多い場合は[6]あたりを読むとよいかも。
Object readItem()
データを1件取り出す処理。データがなくなったらnullを返すと全件処理終了の意味になる。
JPAを使う場合
open()で作ったListからget()でデータを取り出して返す。
JDBCを使う場合
ResultSet#next()を呼んでカーソルを制御しつつ、簡単なものならResultSetをそのまま返すか、ResultSet#get()でデータを取り出して、適当なDTO等につめて返す
void close()
open()で開いた資源を解放する。Connection, Statement, ResultSetを使った場合はここでcloseする。
Serializable checkpointInfo()
カウンタを返す。返したカウンタはRepositoryに保存され、どこまで処理が完了したかが記録される。
ItemProcessor
ItemReaderが取り込んだデータを適当に処理して処理結果を作る。省略可。
Object process(Object)
ItemReader#readItem()が返したオブジェクトがここでの引数になる。これを使って何らかの処理をする。ここで返した処理結果はItemWriter#writeItems()の引数になる
ItemWriter
ItemProcessorが作った処理結果をどこかのテーブルに書き込む。
void open(Serializable)
JDBCを使う場合、ここでConnectionとINSERT/UPDATE/DELETE文のPreparedStatementを開いておいたりすると良いのかも たぶん何もしない.
void writeItems(List)
ItemProcessorが作った処理結果が1トランザクションで書き込む単位でまとめてListに詰めて引数で渡されるので、ループ等を使って要素をどこかのテーブルに書き込む。
JPAを使う場合
EntityManager#persist()かmerge()を呼び出す。
JDBCを使う場合
open()で開いておいたConnection, PreparedStatementを作って,データを書き込む。
void close()
Connection, Statementを使った場合はここで閉じる たぶん何もしない.
Serializable checkpointInfo()
ItemReaderの同メソッドと同じ。
準備
単純なchunk方式のStepが1つだけのバッチをJPAを使って作ってみる。資源の詳細はこのあたり参照
バッチを構成する資源
ChunkInputItem.java, ChunkOutputItem.java
超単純なエンティティ。
- ItemReaderがChunkInputItemを取ってくる
- ItemProcessorがChunkInputItemをChunkOutputItemに変換する
- ItemWriterがChunkOutputItemをDBに書き込む
chunk.xml
- ジョブXML。chunk方式のStep1個だけの単純なもの
- 3件処理したらコミット
- processorにジョブパラメータを渡してみる
ChunkItemReader.java
- データ量が多い場合はよろしくないが、簡単のためgetResultList()でデータを取ってくる
- 一応カウンタ付き
ChunkItemProcessor.java
- @BatchPropertyで受け取ったプロパティを使って、@PostConstructで初期化的な処理をやってみる
- ChunkInputItemからChunkOutputItemを作って返す
ChunkItemWriter.java
- EntityManager#persist()を呼ぶ
- 一応カウンタ付き
テスト用資源
ChunkJobTest.java
- テストクラス
- ジョブプロパティを渡してみる
ChunkInputItem.yml, ChunkOutputItem.yml
- テストデータ
- ChunkInputItemは10件
- ChunkOutputItemは0件
expected.yml
- ChunkOutputItemへの出力の期待結果
実行してみる
テスト結果は普通に緑色になる。ログはこんな感じ
22:00:29,478 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#postConstruct(): divide=2 22:00:29,478 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#open(): checkpoint=null, index starts at=0 22:00:29,481 DEBUG [org.hibernate.SQL] (batch-batch - 3) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 22:00:29,491 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#open(): checkpoint=null, index starts at=0 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=0 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=0, input=0, processed=false] 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=0, input=0, processed=false], output=ChunkOutputItem [id=0, result=0] 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=1 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=1 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=1, input=10, processed=false] 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=1, input=10, processed=false], output=ChunkOutputItem [id=1, result=5] 22:00:29,492 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=2 22:00:29,493 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=2 22:00:29,493 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=2, input=20, processed=false] 22:00:29,493 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=2, input=20, processed=false], output=ChunkOutputItem [id=2, result=10] 22:00:29,493 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=3 22:00:29,493 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=0 22:00:29,493 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=0, result=0] 22:00:29,493 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=1, result=5] 22:00:29,494 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=2, result=10] 22:00:29,494 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=3 22:00:29,494 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=1 22:00:29,494 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=3 22:00:29,497 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,505 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,506 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,507 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=3 22:00:29,507 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=3, input=30, processed=false] 22:00:29,507 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 22:00:29,507 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=4 22:00:29,507 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=4 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=4, input=40, processed=false] 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=5 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=5 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=5, input=50, processed=false] 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=5, input=50, processed=false], output=ChunkOutputItem [id=5, result=25] 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=6 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=1 22:00:29,508 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=3, result=15] 22:00:29,509 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=4, result=20] 22:00:29,509 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=5, result=25] 22:00:29,509 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=6 22:00:29,509 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=2 22:00:29,509 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=6 22:00:29,510 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,512 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,513 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=6 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=6, input=60, processed=false] 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=6, input=60, processed=false], output=ChunkOutputItem [id=6, result=30] 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=7 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=7 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=7, input=70, processed=false] 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=7, input=70, processed=false], output=ChunkOutputItem [id=7, result=35] 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=8 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=8 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=8, input=80, processed=false] 22:00:29,514 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=8, input=80, processed=false], output=ChunkOutputItem [id=8, result=40] 22:00:29,515 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=9 22:00:29,515 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=2 22:00:29,515 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=6, result=30] 22:00:29,515 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=7, result=35] 22:00:29,515 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=8, result=40] 22:00:29,515 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=9 22:00:29,516 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=3 22:00:29,516 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=9 22:00:29,516 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,517 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,517 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,518 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=9 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=9, input=90, processed=false] 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=9, input=90, processed=false], output=ChunkOutputItem [id=9, result=45] 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=10 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=10 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=null 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=3 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=9, result=45] 22:00:29,519 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=10 22:00:29,520 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=4 22:00:29,520 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=10 22:00:29,520 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?) 22:00:29,522 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#close() 22:00:29,522 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#close() 22:00:29,522 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#close() 22:00:29,522 FINE [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#close()
処理の流れを見てみる
[2]の「11.6 Regular Chunk Processing」に詳細が書いてあるが、以下のようなイメージの順序で処理が行われる
まずopenが呼ばれる(ログの2-4行目)
- トランザクション開始
- ItemReader#open()
- ItemWriter#open()
- コミット
データがなくなる(readItem()がnullを返す)まで以下ループ(ログの5-26行目)
- トランザクション開始
- ItemReader#readItem()
- ItemProcessor#processItem()
- ItemReader#readItem()
- ItemProcessor#processItem()
- ItemReader#readItem()
- ItemProcessor#processItem()
- itemWriter#writeItems() ※ItemProcessor#processItem()の返り値3件分がListに詰められたものが引数
- ItemReader#checkpointInfo()をRepositoryに記録
- ItemWriter#checkpointInfo()をRepositoryに記録
- コミット
ItemReader#readItem()で件数カウンタをインクリメントしつつデータを1件返す。ItemProcessor#processItem()にデータが渡ってきたら、処理を行い、結果をreturnで返す。これがitem-countに指定した数繰り返され、処理結果がListに詰められてItemWriter#writeItems()に渡される。ここでも件数カウンタをインクリメントしつつ書き込む。コミット前にItemReaderとItemWriterのcheckpointInfo()が呼ばれ、件数カウンタがRepositoryに書き込まれる。
件数カウンタを実装しておくと、処理中に問題が起きた後の再実行時に処理済みのデータをスキップさせることが可能になる。再実行時のスキップが必要ない場合は、AbstractItemReaderを継承して、Iteratorを使うと楽に実装できる。
なお、アプリで使っているものとRepositoryのものとは通常別のデータソースになるので、基本的にXAトランザクションが必要になるようだ.または,XAトランザクションを使いたくない場合は,Repository用のデータソースを非JTAデータソースにするといいかもしれない.
最後にcloseが呼ばれる(ログの83-86行目)
- トランザクション開始
- ItemReader#close()
- ItemWriter#close()
- コミット
備考
- ItemReader#close()とItemWriter#close()の呼ばれる順序が仕様[2]と逆な気がする。仕様だとItemWriter#close()→ItemReader#close()の順だが、この実装だとItemReader#close()→ItemWriter#close()の順に呼ばれている
- ItemReaderとItemWriterのclose()が2回呼ばれるのが気になる。[3]を見るとsafeClose()ってとこで例外を握りつぶしつつ、しつこくclose()するようになってる。まあこれはこれでいいのかも
- checkpointInfo()も無駄に呼ばれている気がする。けど別に害は無いだろうから気にしない
- JDBCを使う場合はDbUtils.closeQuietly()があると便利かも。ResultSetHandlerは使い辛そう
続き
参考文献
- Batch Application for the Java Platform – JSR352
- JSR-000352 Batch Applications for the Java Platform - Final Release
- jsr352/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java at master · jberet/jsr352
- java - How to handle large dataset with JPA (or at least with Hibernate)? - Stack Overflow
- java - JPA: what is the proper pattern for iterating over large result sets? - Stack Overflow
- java - JDBC: How to read all rows from huge table? - Stack Overflow
- 調査メモ: DbUtilsのサンプル
- JPAからフェッチサイズを変更したかった - kagamihogeの日記
Tags: jbatch
別プロセスからリモートEJBを呼び出してみる
TweetPosted on Sunday Feb 09, 2014 at 03:18PM in Technology
WildFly8にデプロイしてあるEJB(WARファイル内に存在)を、別のJavaプロセスからリモート呼び出ししてみる
環境
- Eclipse Kepler SR1
- WildFly8.0.0.CR1
- Oracle JDK7u51
前提条件
WildFly8.0.0CR1でサーブレットを動かしてみる あたりでEclipseに加えた設定が終わっているものとする
準備
プロジェクトを作る
以下の3つを作ります
- リモートEJBインタフェースを含むプロジェクト(remoteejbif)
- リモートEJBの実装を含むプロジェクト(remoteejbwar)
- クライアント側プロジェクト(remoteejbclient)
リモートEJBインタフェースを含むプロジェクト(remoteejbif)
- 「remoteejbif」という名称のJavaプロジェクトを作る
- ソースを追加
package foo; public interface HogeBean { String hoge(); }
リモートEJBの実装を含むプロジェクト(remoteejbwar)
- 「remoteejbwar」という名称のMavenプロジェクトをarchetype「javaee7-essentials」から作る
- プロジェクトの設定を開く
- Project Referencesでremoteejbifにチェックを入れる
- Java Build PathのProjectsタブでAddを押してremoteejbifを追加する
- Deployment Assembly→Add→Projectでremoteejbifを追加する。追加した後、再度設定内容を確認すると、追加されていなかったりするので注意(もう一度追加するとOKっぽい。なんだこれ)
- ソースを追加
- ソースを追加したらJava EE Perspective→Servers→WildFly→右クリック→Add and Removeからデプロイ
package foo; import javax.ejb.Remote; import javax.ejb.Stateless; @Stateless @Remote(HogeBean.class) public class HogeBeanImpl implements HogeBean{ @Override public String hoge() { return "hoge from remote"; } }
クライアント側プロジェクト(remoteejbclient)
- 「remoteejbclient」という名称のJavaプロジェクトを作る
- プロジェクトの設定を開く
- Project Referencesでremoteejbifにチェックを入れる
- Java Build PathのProjectsタブでAddを押してremoteejbifを追加する
- Java Build PathのLibrariesタブでAdd External Jarsを押して$WILDFLY_HOME/bin/client/jboss-client.jarを追加する
- ソースを追加
package foo; import java.util.Properties; import javax.naming.Context; import javax.naming.InitialContext; public class HogeBeanInvoker2 { public static void main(String[] args) throws Exception { Properties p = new Properties(); p.put("remote.connections", "default"); p.put("remote.connection.default.port", "8080"); p.put("remote.connection.default.host", "localhost"); p.put("remote.connectionprovider.create.options.org.xnio.Options.SSL_ENABLED", false); p.put(Context.URL_PKG_PREFIXES, "org.jboss.ejb.client.naming"); p.put("org.jboss.ejb.client.scoped.context", true); Context c = null; try { c = new InitialContext(p); HogeBean hogeBean = (HogeBean) c.lookup("ejb:/remoteejbwar//HogeBeanImpl!foo.HogeBean"); System.out.println(hogeBean.hoge()); } finally { c.close(); } } }
実行
「remoteejbclient」プロジェクトに作ったHogeBeanInvoker2を普通に実行する(右クリック→Run As→Java Application)。こんな感じの出力が出る。わりと速い
2 10, 2014 5:24:54 午後 org.xnio.Xnio <clinit> INFO: XNIO version 3.2.0.Beta4 2 10, 2014 5:24:54 午後 org.xnio.nio.NioXnio <clinit> INFO: XNIO NIO Implementation Version 3.2.0.Beta4 2 10, 2014 5:24:54 午後 org.jboss.remoting3.EndpointImpl <clinit> INFO: JBoss Remoting version (unknown) 2 10, 2014 5:24:54 午後 org.jboss.ejb.client.remoting.VersionReceiver handleMessage INFO: EJBCLIENT000017: Received server version 2 and marshalling strategies [river] 2 10, 2014 5:24:54 午後 org.jboss.ejb.client.remoting.RemotingConnectionEJBReceiver associate INFO: EJBCLIENT000013: Successful version handshake completed for receiver context EJBReceiverContext{clientContext=org.jboss.ejb.client.EJBClientContext@1f1ee981, receiver=Remoting connection EJB receiver [connection=org.jboss.ejb.client.remoting.ConnectionPool$PooledConnection@2674241d,channel=jboss.ejb,nodename=kyle-no-macbook]} on channel Channel ID bb6edca7 (outbound) of Remoting connection 3aa7fab1 to localhost/127.0.0.1:8080 2 10, 2014 5:24:54 午後 org.jboss.ejb.client.EJBClient <clinit> INFO: JBoss EJB Client version 2.0.0.Beta5 hoge from remote
備考
デプロイ時に出力されるJNDI名は別プロセスからのリモート呼び出しには使えない
EJBをデプロイするとWildFlyの出力に以下のようなJNDI名が出力されるが、別プロセスからのリモート呼び出しでは使えなかった。別プロセスからlookupする場合は「ejb:」で始まるJNDI名を使わないといけないっぽい。詳細は参考文献を参照。
java:global/remoteejbwar/HogeBeanImpl!foo.HogeBean java:app/remoteejbwar/HogeBeanImpl!foo.HogeBean java:module/HogeBeanImpl!foo.HogeBean java:jboss/exported/remoteejbwar/HogeBeanImpl!foo.HogeBean java:global/remoteejbwar/HogeBeanImpl java:app/remoteejbwar/HogeBeanImpl java:module/HogeBeanImpl
同じAPサーバにデプロイされたアプリケーション間なら「java:global」で始まるJNDI名でlookup可能。
ユーザの作成は不要
参考文献を見るとユーザの作成が必要な旨の記述が多く見つかるが、どうやらこの環境では特に必要ない模様
standalone.xmlでOK
参考文献を見ると設定ファイルstandalone-full.xmlで起動している例が見つかるが、この記事のような単純なものの場合は特に必要ない模様。JMSとか使う場合に必要になるのかも
その他
レガシーでもはやなるべく使うべきではないと言われることもあるリモートEJB呼び出しだが、局面によっては依然使えると思われる
参考文献
- Tanmoy's Blog: WildFly: EJB invocations from a remote client
- java - WildFly: EJB invocations from a remote client - Stack Overflow
- EJB invocations from a remote client using JNDI - WildFly 8 - Project Documentation Editor
- JNDI Lookup Failing | Community
- Remote EJB invocations via JNDI - EJB client API or remote-naming project - WildFly 8 - Project Documentation Editor
- Java Web: Accessing remote EJB on JBoss AS 7.1 from web application
- JBoss EAP 6.2のリモートEJB呼び出し - nekopの日記
Tags: wildfly
WildFly8でJDBCジョブレポジトリを使ってみる
TweetPosted on Saturday Feb 08, 2014 at 04:36PM in Technology
WildFly8.0.0.CR1ではジョブレポジトリがデフォルトではインメモリH2になっているが、スタンドアロンのDBにしてみる。現状H2しかサポートしてないようなのでサーバモードのH2を立ててやってみる
この例では普通のデータソースを使ってますが、バッチから他のデータベースも参照する場合は、XAデータソースにしておかないと問題が起こると思われるので注意
環境
- WildFly8.0.0.CR1
- H2 1.3.175 (2014-01-18)
- Oracle JDK7u51
- OS X 10.9.1
準備
プロジェクト
動かしてみるで作ったプロジェクトをそのまま使う。
H2を立ち上げる
- H2のzipファイルをとってきて展開する
- h2/binに存在するh2.shを実行する
- ブラウザにH2管理コンソールへのログイン画面が表示される。デフォルトでTCP接続のH2にログインする内容になっているのでそのままログインする
- 適当にSQLを実行してみて普通に動いている事を確認する
データソースを作る
jboss-cliで叩くコマンドはこんな感じ。この環境だとH2は最初から入っているようなので特にJDBCドライバのデプロイ等は不要。
data-source add \ --name=JBatchDS \ --driver-name=h2 \ --connection-url=jdbc:h2:tcp://localhost/~/test \ --jndi-name=java:jboss/jdbc/JBatchDS \ --user-name=sa \ --password=
一応接続チェック
[standalone@localhost:9990 /] /subsystem=datasources/data-source=JBatchDS:test-connection-in-pool { "outcome" => "success", "result" => [true] }
ジョブレポジトリを変更
JDBCレポジトリのデータソース名を設定
[standalone@localhost:9990 /] /subsystem=batch/job-repository=jdbc:write-attribute(name=jndi-name, value=java:jboss\/jdbc\/JBatchDS) { "outcome" => "success", "response-headers" => { "operation-requires-reload" => true, "process-state" => "reload-required" } }
job-repository-typeをjdbcに設定
[standalone@localhost:9990 /] /subsystem=batch:write-attribute(name=job-repository-type, value=jdbc) { "outcome" => "success", "response-headers" => { "operation-requires-restart" => true, "process-state" => "restart-required" } }
restart-requiredだそうなので再起動する。
実行してみる
TestServletにアクセス
普通に動いているようです
DBを見てみる
事前にDDLを流したりは一切していないが、H2のWebコンソールから見てみると自動的に必要なテーブル等が作られている
備考
GlassFishにあるlist-batch-jobs的なコマンドがあれば良いのだけどまだ見つけられていない。また調べる
参考文献
Tags: jbatch