Chunk方式のStepで例外発生時にリトライさせてからスキップさせてみる
TweetPosted on Monday Feb 10, 2014 at 06:57PM in Technology
Chunk方式のStepで例外発生時にスキップさせてみる でリトライとスキップをそれぞれ別々に試してみたが、ここではリトライを試み、それでも成功しない場合は、そのデータをスキップして処理を継続させる、というのをやってみる
環境・前提条件
- Chunk方式のStepで例外発生時にスキップさせてみると同じ。この記事で作ったプロジェクトや資源がすでに存在するものとする
JSR352仕様を確認する
[1]に以下のようにある
8.2.1.4.3 Retry and Skip the Same Exception
When the same exception is specified as both retryable and skippable, retryable takes precedence over skippable during regular processing of the chunk. While the chunk is retrying, skippable takes precedence over retryable since the exception is already being retried.
This allows an exception to initially be retried for the entire chunk and then skipped if it recurs. When retrying with default retry behavior (see section 8.2.1.4.4) the skips can occur for individual items, since the retry is done with an item-count of 1.
リトライの回数は1回で固定になるようだ。失敗後は1回はリトライするが、それでも駄目ならスキップとなる。retry-limitを指定してみたが、ここでリトライ回数を指定するために使われる値ではないようだ。
バッチを作る
仕様
- 入力、出力、処理内容は基本的にChunk方式のStepで例外発生時にスキップさせてみるでやったのと同じ
- 6件目のデータ処理時にItemReaderでRuntimeExceptionが起こる。リトライが試みられたあとさらに例外が起こり、スキップされる
- RuntimeExceptionは以下の両方に設定してある
- skippable-exception-classes
- retryable-exception-classes
- RuntimeExceptionは以下の両方に設定してある
資源
資源はこのへんにまとめて全部ある
- バッチ本体
- テスト
動かしてみる
ログ
16:36:59,072 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) open(): checkpoint=null, index starts at=0 16:36:59,076 DEBUG [org.hibernate.SQL] (batch-batch - 5) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 16:36:59,078 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) open(): checkpoint=null, index starts at=0 16:36:59,079 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=0, failAt=5 16:36:59,079 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=0, input=0, processed=false] 16:36:59,079 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=0, input=0, processed=false], output=ChunkOutputItem [id=0, result=0] 16:36:59,079 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=1 16:36:59,079 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=1, failAt=5 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=1, input=10, processed=false] 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=1, input=10, processed=false], output=ChunkOutputItem [id=1, result=5] 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=2 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=2, failAt=5 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=2, input=20, processed=false] 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=2, input=20, processed=false], output=ChunkOutputItem [id=2, result=10] 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=3 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): index=0 16:36:59,080 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=0, result=0] 16:36:59,081 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=1, result=5] 16:36:59,081 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=2, result=10] 16:36:59,081 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=3 16:36:59,081 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) checkpointInfo(): returns=1 16:36:59,083 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=3 16:36:59,086 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,087 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,088 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,116 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=3, failAt=5 16:36:59,117 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=3, input=30, processed=false] 16:36:59,117 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 16:36:59,117 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=4 16:36:59,117 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=4, failAt=5 16:36:59,117 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=4, input=40, processed=false] 16:36:59,117 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 16:36:59,117 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=5 16:36:59,118 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=5, failAt=5 16:36:59,118 WARNING [org.nailedtothex.jbatch.example.chunkretry.MyRetryReadListener] (batch-batch - 5) onRetryReadException(): java.lang.RuntimeException: 5 at org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader.readItem(ChunkRetrySkipItemReader.java:43) [classes:] at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:343) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:288) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:190) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:204) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51] at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51] at org.jboss.threads.JBossThread.run(JBossThread.java:122) 16:36:59,119 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=6 16:36:59,119 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) close() 16:36:59,119 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) close() 16:36:59,119 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) open(): checkpoint=3, index starts at=3 16:36:59,120 DEBUG [org.hibernate.SQL] (batch-batch - 5) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 16:36:59,122 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) open(): checkpoint=1, index starts at=1 16:36:59,122 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=3 16:36:59,122 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=3, failAt=5 16:36:59,123 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=3, input=30, processed=false] 16:36:59,123 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 16:36:59,123 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=4 16:36:59,123 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): index=1 16:36:59,123 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=3, result=15] 16:36:59,123 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=4 16:36:59,123 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) checkpointInfo(): returns=2 16:36:59,125 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=4 16:36:59,125 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,152 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=4, failAt=5 16:36:59,152 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=4, input=40, processed=false] 16:36:59,152 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 16:36:59,153 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=5 16:36:59,153 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): index=2 16:36:59,153 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=4, result=20] 16:36:59,153 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=5 16:36:59,153 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) checkpointInfo(): returns=3 16:36:59,155 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=5 16:36:59,155 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,183 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=5, failAt=5 16:36:59,183 WARNING [org.nailedtothex.jbatch.example.chunkskip.MySkipReadListener] (batch-batch - 5) onSkipReadItem(): java.lang.RuntimeException: 5 at org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader.readItem(ChunkRetrySkipItemReader.java:43) [classes:] at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:343) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:288) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:190) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:204) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51] at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51] at org.jboss.threads.JBossThread.run(JBossThread.java:122) 16:36:59,185 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=6 16:36:59,185 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=6, failAt=5 16:36:59,186 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=6, input=60, processed=false] 16:36:59,186 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=6, input=60, processed=false], output=ChunkOutputItem [id=6, result=30] 16:36:59,186 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=7 16:36:59,186 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=7, failAt=5 16:36:59,187 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=7, input=70, processed=false] 16:36:59,187 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=7, input=70, processed=false], output=ChunkOutputItem [id=7, result=35] 16:36:59,187 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=8 16:36:59,187 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=8, failAt=5 16:36:59,187 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=8, input=80, processed=false] 16:36:59,187 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=8, input=80, processed=false], output=ChunkOutputItem [id=8, result=40] 16:36:59,187 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=9 16:36:59,188 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): index=3 16:36:59,188 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=6, result=30] 16:36:59,188 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=7, result=35] 16:36:59,189 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=8, result=40] 16:36:59,189 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=9 16:36:59,189 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) checkpointInfo(): returns=4 16:36:59,191 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=9 16:36:59,191 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,192 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,193 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,221 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=9, failAt=5 16:36:59,221 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=ChunkInputItem [id=9, input=90, processed=false] 16:36:59,221 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemProcessor] (batch-batch - 5) processItem(): input=ChunkInputItem [id=9, input=90, processed=false], output=ChunkOutputItem [id=9, result=45] 16:36:59,221 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=10 16:36:59,221 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): index=10, failAt=5 16:36:59,221 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) readItem(): returning=null 16:36:59,221 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): index=4 16:36:59,222 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) writeItems(): item=ChunkOutputItem [id=9, result=45] 16:36:59,222 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=10 16:36:59,222 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) checkpointInfo(): returns=5 16:36:59,223 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) checkpointInfo(): returns=10 16:36:59,224 DEBUG [org.hibernate.SQL] (batch-batch - 5) insert into ChunkOutputItem (result, id) values (?, ?) 16:36:59,252 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) close() 16:36:59,269 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) close() 16:36:59,270 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemReader] (batch-batch - 5) close() 16:36:59,270 FINE [org.nailedtothex.jbatch.example.chunkretryskip.ChunkRetrySkipItemWriter] (batch-batch - 5) close()
Repository
job_execution
jbatch=# select * from job_execution order by jobexecutionid desc limit 1; jobexecutionid | jobinstanceid | version | createtime | starttime | endtime | lastupdatedtime | batchstatus | exitstatus | jobparameters | restartposition ----------------+---------------+---------+-------------------------+-------------------------+-------------------------+-------------------------+-------------+------------+----------------------+----------------- 127 | 122 | | 2014-02-15 16:36:59.068 | 2014-02-15 16:36:59.068 | 2014-02-15 16:36:59.272 | 2014-02-15 16:36:59.272 | COMPLETED | COMPLETED | itemReaderFailAt = 5+| | | | | | | | | | divide = 2 +| | | | | | | | | | | (1 row)
step_execution
jbatch=# select * from step_execution where jobexecutionid =127; stepexecutionid | jobexecutionid | version | stepname | starttime | endtime | batchstatus | exitstatus | executionexception | persistentuserdata | readcount | writecount | commitcount | rollbackcount | readskipcount | processskipcount | filtercount | writeskipcount | readercheckpointinfo | writercheckpointinfo| 127 | | doChunk | 2014-02-15 16:36:59.07 | 2014-02-15 16:36:59.27 | COMPLETED | COMPLETED | | | 11 | 9 | 5 | 1 | 1 | 0 | 0 | 0 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700000000a | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000005 (1 row)
動作の流れを見てみる
ざっくり
- 3件目までは普通に完走
- 6件目で例外が起きて、バッファリングされていた4, 5件目の処理結果は破棄
- item-count=1に設定されリトライが行われる
- 4, 5件目が1件ずつ処理されて結果がDBに書き込まれる
- 6件目で2回目の例外が起きる。6件目はあきらめてスキップされる
- 7件目以降が1件ずつ処理されて完走
備考
- リトライの回数を指定できると嬉しいような気はするがまあこれはこれでいいのかも
- アプリ側でリトライの回数を数えて、あきらめる時はスキップ用例外を投げる、みたいな感じで制御してもいいかも
参考文献
Tags: jbatch
Chunk方式のStepで例外発生時にスキップさせてみる
TweetPosted on Monday Feb 10, 2014 at 06:56PM in Technology
例外発生時のスキップをさせてみてどんな挙動か確認してみる
環境・前提条件
- jBeret 1.0.1.Beta-SNAPSHOT (1.0.0.FinalはSkipに不具合があるため)
- 他はChunk方式のStepを使ってみると同じ。この記事で作ったプロジェクトや資源がすでに存在するものとする
バッチを作る
仕様
- 入力、出力、処理内容は基本的にChunk方式のStepを使ってみるでやったのと同じ
- 6件目のデータ処理時にItemReaderでRuntimeExceptionが起こる
- RuntimeExceptionはskippable-exception-classesに指定されているので、ここだけスキップされる
資源
資源はこのへんにまとめて全部ある
- バッチ本体
- テスト
動かしてみる
ログ
11:18:00,247 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) open(): checkpoint=null, index starts at=0 11:18:00,250 DEBUG [org.hibernate.SQL] (batch-batch - 2) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 11:18:00,253 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) open(): checkpoint=null, index starts at=0 11:18:00,254 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=0, failAt=5 11:18:00,254 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=0, input=0, processed=false] 11:18:00,254 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=0, input=0, processed=false], output=ChunkOutputItem [id=0, result=0] 11:18:00,254 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=1 11:18:00,254 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=1, failAt=5 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=1, input=10, processed=false] 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=1, input=10, processed=false], output=ChunkOutputItem [id=1, result=5] 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=2 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=2, failAt=5 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=2, input=20, processed=false] 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=2, input=20, processed=false], output=ChunkOutputItem [id=2, result=10] 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=3 11:18:00,255 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): index=0 11:18:00,256 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=0, result=0] 11:18:00,256 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=1, result=5] 11:18:00,256 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=2, result=10] 11:18:00,256 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=3 11:18:00,256 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) checkpointInfo(): returns=1 11:18:00,258 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=3 11:18:00,261 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,262 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,263 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,292 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=3, failAt=5 11:18:00,292 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=3, input=30, processed=false] 11:18:00,292 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 11:18:00,292 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=4 11:18:00,292 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=4, failAt=5 11:18:00,292 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=4, input=40, processed=false] 11:18:00,292 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 11:18:00,293 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=5 11:18:00,293 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=5, failAt=5 11:18:00,293 WARNING [org.nailedtothex.jbatch.example.chunkskip.MySkipReadListener] (batch-batch - 2) onSkipReadItem(): java.lang.RuntimeException: 5 at org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader.readItem(ChunkSkipItemReader.java:43) [classes:] at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:343) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:288) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:190) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:204) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT] at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51] at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51] at org.jboss.threads.JBossThread.run(JBossThread.java:122) 11:18:00,294 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=6 11:18:00,294 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=6, failAt=5 11:18:00,294 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=6, input=60, processed=false] 11:18:00,294 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=6, input=60, processed=false], output=ChunkOutputItem [id=6, result=30] 11:18:00,294 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=7 11:18:00,294 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): index=1 11:18:00,294 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=3, result=15] 11:18:00,295 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=4, result=20] 11:18:00,295 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=6, result=30] 11:18:00,295 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=7 11:18:00,295 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) checkpointInfo(): returns=2 11:18:00,297 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=7 11:18:00,297 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,298 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,299 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,328 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=7, failAt=5 11:18:00,328 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=7, input=70, processed=false] 11:18:00,328 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=7, input=70, processed=false], output=ChunkOutputItem [id=7, result=35] 11:18:00,328 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=8 11:18:00,328 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=8, failAt=5 11:18:00,328 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=8, input=80, processed=false] 11:18:00,329 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=8, input=80, processed=false], output=ChunkOutputItem [id=8, result=40] 11:18:00,329 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=9 11:18:00,329 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=9, failAt=5 11:18:00,329 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=ChunkInputItem [id=9, input=90, processed=false] 11:18:00,329 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=9, input=90, processed=false], output=ChunkOutputItem [id=9, result=45] 11:18:00,329 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=10 11:18:00,330 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): index=2 11:18:00,330 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=7, result=35] 11:18:00,330 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=8, result=40] 11:18:00,331 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) writeItems(): item=ChunkOutputItem [id=9, result=45] 11:18:00,332 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=10 11:18:00,332 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) checkpointInfo(): returns=3 11:18:00,334 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=10 11:18:00,334 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,335 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,336 DEBUG [org.hibernate.SQL] (batch-batch - 2) insert into ChunkOutputItem (result, id) values (?, ?) 11:18:00,364 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): index=10, failAt=5 11:18:00,365 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) readItem(): returning=null 11:18:00,365 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) checkpointInfo(): returns=10 11:18:00,365 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) close() 11:18:00,365 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) close() 11:18:00,365 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemReader] (batch-batch - 2) close() 11:18:00,365 FINE [org.nailedtothex.jbatch.example.chunkskip.ChunkSkipItemWriter] (batch-batch - 2) close()
Repository
job_execution
jbatch=# select * from job_execution order by jobexecutionid desc limit 1; jobexecutionid | jobinstanceid | version | createtime | starttime | endtime | lastupdatedtime | batchstatus | exitstatus | jobparameters | restartposition ----------------+---------------+---------+-------------------------+-------------------------+-------------------------+-------------------------+-------------+------------+----------------------+----------------- 124 | 119 | | 2014-02-15 11:18:00.243 | 2014-02-15 11:18:00.243 | 2014-02-15 11:18:00.367 | 2014-02-15 11:18:00.367 | COMPLETED | COMPLETED | itemReaderFailAt = 5+| | | | | | | | | | divide = 2 +| | | | | | | | | | | (1 row) jbatch=#
step_execution
jbatch=# select * from step_execution where jobexecutionid =124; stepexecutionid | jobexecutionid | version | stepname | starttime | endtime | batchstatus | exitstatus | executionexception | persistentuserdata | readcount | writecount | commitcount | rollbackcount | readskipcount | processskipcount | filtercount | writeskipcount | readercheckpointinfo | writercheckpointinfo| 124 | | doChunk | 2014-02-15 11:18:00.244 | 2014-02-15 11:18:00.365 | COMPLETED | COMPLETED | | | 9 | 9 | 4 | 0 | 1 | 0 | 0 | 0 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700000000a | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000003 (1 row) jbatch=#
テーブル出力
スキップ対象の1件以外は正常に出ている。うむ
jbatcharts=# select * from chunkoutputitem order by id; id | result ----+-------- 0 | 0 1 | 5 2 | 10 3 | 15 4 | 20 6 | 30 7 | 35 8 | 40 9 | 45 (9 rows) jbatcharts=#
参考文献
Tags: jbatch
Chunk方式のStepで例外発生時にリトライさせてみる
TweetPosted on Monday Feb 10, 2014 at 06:56PM in Technology
例外発生時のリトライをさせてみてどんな挙動か確認してみる
環境・前提条件
- Chunk方式のStepを使ってみると同じ。この記事で作ったプロジェクトや資源がすでに存在するものとする
バッチを作る
仕様
- 入力、出力、処理内容は基本的にChunk方式のStepを使ってみるでやったのと同じ
- 6件目のデータ処理時にItemReaderでRuntimeExceptionが起こる
- RuntimeExceptionはretryable-exception-classesに指定されているので、リトライが走る
- リトライしても同じところで例外が合計3回起こるようになっている
- 4回目のリトライでは例外が起こらない。以降は最後まで処理が完走する
資源
資源はこのへんにまとめて全部ある
- バッチ本体
- テスト
動かしてみる
ログ
17:43:59,756 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) postConstruct(): divide=2 17:43:59,757 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) open(): checkpoint=null, index starts at=0 17:43:59,759 DEBUG [org.hibernate.SQL] (batch-batch - 8) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 17:43:59,762 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) open(): checkpoint=null, index starts at=0 17:43:59,763 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=0, failAt=5, fails=3, retryCount=0 17:43:59,763 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=0, input=0, processed=false] 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=0, input=0, processed=false], output=ChunkOutputItem [id=0, result=0] 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=1 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=1, failAt=5, fails=3, retryCount=0 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=1, input=10, processed=false] 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=1, input=10, processed=false], output=ChunkOutputItem [id=1, result=5] 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=2 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=2, failAt=5, fails=3, retryCount=0 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=2, input=20, processed=false] 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=2, input=20, processed=false], output=ChunkOutputItem [id=2, result=10] 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=3 17:43:59,764 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=0 17:43:59,765 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=0, result=0] 17:43:59,765 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=1, result=5] 17:43:59,765 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=2, result=10] 17:43:59,765 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=3 17:43:59,765 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=1 17:43:59,767 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=3 17:43:59,771 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,772 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,773 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,801 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=3, failAt=5, fails=3, retryCount=0 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=3, input=30, processed=false] 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=4 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=4, failAt=5, fails=3, retryCount=0 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=4, input=40, processed=false] 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,802 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=5, failAt=5, fails=3, retryCount=0 17:43:59,802 WARNING [org.nailedtothex.jbatch.example.chunkretry.MyRetryReadListener] (batch-batch - 8) onRetryReadException(): java.lang.RuntimeException: 0 at org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader.readItem(ChunkRetryItemReader.java:48) [classes:] at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:346) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:291) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:193) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:204) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51] at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51] at org.jboss.threads.JBossThread.run(JBossThread.java:122) 17:43:59,804 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,804 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) close() 17:43:59,804 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) close() 17:43:59,804 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) open(): checkpoint=3, index starts at=3 17:43:59,804 DEBUG [org.hibernate.SQL] (batch-batch - 8) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) open(): checkpoint=1, index starts at=1 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=3 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=3, failAt=5, fails=3, retryCount=1 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=3, input=30, processed=false] 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=4 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=1 17:43:59,807 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=3, result=15] 17:43:59,808 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=4 17:43:59,808 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=2 17:43:59,809 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=4 17:43:59,810 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,839 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=4, failAt=5, fails=3, retryCount=1 17:43:59,839 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=4, input=40, processed=false] 17:43:59,839 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 17:43:59,839 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,839 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=2 17:43:59,839 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=4, result=20] 17:43:59,840 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,840 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=3 17:43:59,842 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,843 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,872 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=5, failAt=5, fails=3, retryCount=1 17:43:59,872 WARNING [org.nailedtothex.jbatch.example.chunkretry.MyRetryReadListener] (batch-batch - 8) onRetryReadException(): java.lang.RuntimeException: 1 at org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader.readItem(ChunkRetryItemReader.java:48) [classes:] at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:346) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:291) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:193) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:204) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51] at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51] at org.jboss.threads.JBossThread.run(JBossThread.java:122) 17:43:59,873 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,873 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) close() 17:43:59,873 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) close() 17:43:59,873 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) open(): checkpoint=5, index starts at=5 17:43:59,874 DEBUG [org.hibernate.SQL] (batch-batch - 8) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 17:43:59,876 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) open(): checkpoint=3, index starts at=3 17:43:59,876 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,876 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=5, failAt=5, fails=3, retryCount=2 17:43:59,876 WARNING [org.nailedtothex.jbatch.example.chunkretry.MyRetryReadListener] (batch-batch - 8) onRetryReadException(): java.lang.RuntimeException: 2 at org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader.readItem(ChunkRetryItemReader.java:48) [classes:] at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:346) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:291) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:193) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:204) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.0.Final.jar:1.0.0.Final] at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51] at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51] at org.jboss.threads.JBossThread.run(JBossThread.java:122) 17:43:59,877 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,877 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) close() 17:43:59,877 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) close() 17:43:59,877 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) open(): checkpoint=5, index starts at=5 17:43:59,878 DEBUG [org.hibernate.SQL] (batch-batch - 8) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id 17:43:59,879 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) open(): checkpoint=3, index starts at=3 17:43:59,879 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,879 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=5, failAt=5, fails=3, retryCount=3 17:43:59,880 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=5, input=50, processed=false] 17:43:59,880 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=5, input=50, processed=false], output=ChunkOutputItem [id=5, result=25] 17:43:59,880 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=6 17:43:59,880 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=3 17:43:59,880 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=5, result=25] 17:43:59,880 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=6 17:43:59,880 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=4 17:43:59,881 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=6 17:43:59,882 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,910 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=6, failAt=5, fails=3, retryCount=3 17:43:59,911 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=6, input=60, processed=false] 17:43:59,911 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=6, input=60, processed=false], output=ChunkOutputItem [id=6, result=30] 17:43:59,911 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=7 17:43:59,911 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=4 17:43:59,911 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=6, result=30] 17:43:59,912 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=7 17:43:59,912 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=5 17:43:59,913 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=7 17:43:59,914 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,943 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=7, failAt=5, fails=3, retryCount=3 17:43:59,944 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=7, input=70, processed=false] 17:43:59,944 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=7, input=70, processed=false], output=ChunkOutputItem [id=7, result=35] 17:43:59,944 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=8 17:43:59,944 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=5 17:43:59,944 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=7, result=35] 17:43:59,945 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=8 17:43:59,945 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=6 17:43:59,947 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=8 17:43:59,947 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:43:59,977 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=8, failAt=5, fails=3, retryCount=3 17:43:59,977 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=8, input=80, processed=false] 17:43:59,977 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=8, input=80, processed=false], output=ChunkOutputItem [id=8, result=40] 17:43:59,977 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=9 17:43:59,977 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=6 17:43:59,977 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=8, result=40] 17:43:59,978 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=9 17:43:59,978 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=7 17:43:59,980 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=9 17:43:59,980 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:44:00,009 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=9, failAt=5, fails=3, retryCount=3 17:44:00,009 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=ChunkInputItem [id=9, input=90, processed=false] 17:44:00,010 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=9, input=90, processed=false], output=ChunkOutputItem [id=9, result=45] 17:44:00,010 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=10 17:44:00,010 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): index=7 17:44:00,010 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) writeItems(): item=ChunkOutputItem [id=9, result=45] 17:44:00,010 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=10 17:44:00,011 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) checkpointInfo(): returns=8 17:44:00,012 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=10 17:44:00,013 DEBUG [org.hibernate.SQL] (batch-batch - 8) insert into ChunkOutputItem (result, id) values (?, ?) 17:44:00,041 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): index=10, failAt=5, fails=3, retryCount=3 17:44:00,042 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) readItem(): returning=null 17:44:00,042 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) checkpointInfo(): returns=10 17:44:00,042 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) close() 17:44:00,042 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) close() 17:44:00,042 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemReader] (batch-batch - 8) close() 17:44:00,042 FINE [org.nailedtothex.jbatch.example.chunkretry.ChunkRetryItemWriter] (batch-batch - 8) close()
Repository
job_execution
jbatch=# select * from job_execution where jobexecutionid =80; jobexecutionid | jobinstanceid | version | createtime | starttime | endtime | lastupdatedtime | batchstatus | exitstatus | jobparameters | restartposition ----------------+---------------+---------+-------------------------+-------------------------+-------------------------+-------------------------+-------------+------------+----------------------+----------------- 80 | 75 | | 2014-02-14 17:43:59.753 | 2014-02-14 17:43:59.753 | 2014-02-14 17:44:00.045 | 2014-02-14 17:44:00.045 | COMPLETED | COMPLETED | itemReaderFailAt = 5+| | | | | | | | | | itemReaderFails = 3 +| | | | | | | | | | divide = 2 +| | | | | | | | | | | (1 row) jbatch=#
step_execution
jbatch=# select * from step_execution where jobexecutionid =80; stepexecutionid | jobexecutionid | version | stepname | starttime | endtime | batchstatus | exitstatus | executionexception | persistentuserdata | readcount | writecount | commitcount | rollbackcount | readskipcount | processskipcount | filtercount | writeskipcount | readercheckpointinfo | writercheckpointinfo| 80 | | doChunk | 2014-02-14 17:43:59.754 | 2014-02-14 17:44:00.042 | COMPLETED | COMPLETED | | | 12 | 10 | 9 | 3 | 0 | 0 | 0 | 0 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700000000a | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000008 (1 row) jbatch=#
- rollbackcount = 3
- commitcount = 9
- 何故9なのか。謎
- readcount = 12
- 無駄なreadは確かに2回あったけど。
処理の流れを見てみる
最初の例外発生まで (1-34行目)
- ジョブXMLにはitem-count=3に設定してある
- 3件目までは処理が正常に終わり、コミットまで行われ出力がDBに書き込まれる
- 4, 5件目はItemProcessorの処理まで正常終了しているが、結果はバッファリングされており、まだDBに書き込まれていない
1回目の例外発生 (35-52行目)
- 6件目のreadItem()で例外が起きる
- バッファリングされていた4, 5件目の処理結果はDBに書き込まれないまま破棄される
1回目のリトライ (54-98行目)
- ItemReader, ItemWriter共に一旦close()で閉じられ、open()から再開となる
- リトライ時にはitem-count=1に設定される[1]
- item-count=1のため、結果のバッファリングは無効化され、4件目、5件目が1件ずつ処理されて完了
- 6件目で再度例外発生
2, 3回目のリトライ (100-124行目)
- 5件目まではすでに処理が完了しているため、2回目のリトライは6件目からの再開となる
- 2, 3回目のリトライはreadItem()で即死
4回目のリトライ (126行目以降)
- 4回目のリトライでは例外は起こらない
- 後続の処理は全てitem-count=1で処理されていき完走
備考
- Webサイトのスクレイピングする時とかに使える気がする
- リトライ回数の上限はジョブXMLで定義出来るが、デフォルトが無制限なのに注意
- リトライ後はitem-count=1になるのは良い面もあるが、checkpointに達したときの処理は結構重いので、データ量が多い場合のパフォーマンス劣化が若干気になる
参考文献
Tags: jbatch
Arquillianでジョブのテストをしてみる
TweetPosted on Monday Feb 10, 2014 at 06:54PM in Technology
CLIからリモートEJB経由でジョブを操作してみる のところで使ってみたリモートEJB呼び出しとArquillianを使って、ジョブのJUnitテストを書いてみる。ついでにテーブルのデータ検証とかもしてみる
バッチ開発時の雛型プロジェクト的に使えるように若干作り込んでみる
環境
- WildFly8.0.0.CR1
- wildfly-arquillian-container-remote 8.0.0.CR1
- Arquillian Persistence Extension 1.0.0.Alpha6
- Arquillian 1.1.2Final
- Eclipse Kepler SR1
- Oracle JDK7u51
- postgresql-9.3-1100.jdbc41.jar
- PostgreSQL 9.2.6
前提条件
- CLIからリモートEJB経由でジョブを操作してみる で作った資源とかが普通に動いているものとする
- データソースとかJDBCドライバとかも適当にAPサーバ側に定義されて使えるようになっているものとする。ここではArquillian側で定義したデータソースのデプロイとかはやらない。WildFlyでのデータソース定義はこのへんが参考になるかも
準備
資源の配置図
プロジェクト内の資源はこんな感じ。前に作ったjbatchifプロジェクトへの依存が有る。資源の詳細はGitHubのここのjbatchtestプロジェクト参照
MyJobOperatorImpl.java
CLIからリモートEJB経由でジョブを操作してみる で作ったのと同じもの。テストクラスからこのクラスのメソッドがリモートEJBで呼び出され、ジョブの起動やBatchStatusの取得が行われる
FugeEntity.java
IDとfugeっていうフィールドだけの簡単なエンティティ
test.xml
ジョブXML。Stepは1つだけ。fugeUpdateBatchletが走るだけ
FugeUpdateBatchlet.java
FugeEntityをEntityManager#find()で1つ取ってきてfugeフィールドを書き換える超単純なBatchlet
AbstractJobTest.java
テストクラスの親クラス。だいたい以下のような内容が書いてある
- @Deploymentアノテーション付きメソッド。たぶん他のジョブ用のテスト書くときも共通なのでここに置く
- @ResourceアノテーションでリモートEJB経由のJobOperatorの参照を注入させる。JNDI名もここに書かれている
- ジョブの実行が終わるまで1秒間隔でJobExecutionをポーリングするstartJob()メソッドを持つ。テストクラスはこのメソッドを使ってジョブを起動する
TestJobTest.java
テストクラス。Arquillian Persistence Extensionの機能を使う。以下をやる
- テスト実行前のDBへのデータ投入。ファイル名もここに書いてある
- テスト実行後のDB上のデータ検証。ファイル名もここに書いてある
- BatchStatusの検証
arquillian.xml
設定内容は以下
- Arquillian Persistence Extension用のデータソースの指定
- DBのクリーンアップはテスト実行前にやらせる
fugeentity.yml
テスト実行前に投入するデータ
expected.yml
テスト実行後の検証用データ
実行してみる
手順
- APサーバを起動する
- jbatchtestプロジェクトをデプロイ
- TestJobTestを普通にJUnitテストとして実行
テスト結果は普通に緑色になる
APサーバのログ
まあ普通。デプロイする物が少ないせいか割と早い。
16:49:07,091 INFO [org.jboss.as.repository] (management-handler-thread - 223) JBAS014900: Content added at location /Users/kyle/apps/wildfly-8.0.0.CR1/standalone/data/content/1d/b651139b3cf98767bf2bdb8e1cedbfaff7497c/content 16:49:07,092 INFO [org.jboss.as.server.deployment] (MSC service thread 1-4) JBAS015876: Starting deployment of "test.war" (runtime-name: "test.war") 16:49:07,120 WARN [org.jboss.as.dependency.private] (MSC service thread 1-1) JBAS018567: Deployment "deployment.test.war" is using a private module ("org.wildfly.security.manager:main") which may be changed or removed in future versions without notice. 16:49:07,120 INFO [org.jboss.weld.deployer] (MSC service thread 1-5) JBAS016002: Processing weld deployment test.war 16:49:07,126 INFO [org.jboss.weld.deployer] (MSC service thread 1-7) JBAS016005: Starting Services for CDI deployment: test.war 16:49:07,127 INFO [org.jboss.as.arquillian] (MSC service thread 1-7) Arquillian deployment detected: ArquillianConfig[service=jboss.arquillian.config."test.war",unit=test.war,tests=[job.test.TestJobTest]] 16:49:07,127 INFO [org.jboss.weld.deployer] (MSC service thread 1-1) JBAS016008: Starting weld service for deployment test.war 16:49:07,184 INFO [org.wildfly.extension.undertow] (MSC service thread 1-16) JBAS017534: Register web context: /test 16:49:07,191 INFO [org.jboss.as.server] (management-handler-thread - 223) JBAS018559: Deployed "test.war" (runtime-name : "test.war") 16:49:08,638 WARN [org.dbunit.dataset.AbstractTableMetaData] (pool-2-thread-28) Potential problem found: The configured data type factory 'class org.dbunit.dataset.datatype.DefaultDataTypeFactory' might cause problems with the current database 'PostgreSQL' (e.g. some datatypes may not be supported properly). In rare cases you might see this message because the list of supported database products is incomplete (list=[derby]). If so please request a java-class update via the forums.If you are using your own IDataTypeFactory extending DefaultDataTypeFactory, ensure that you override getValidDbProducts() to specify the supported database products. 16:49:08,651 DEBUG [org.hibernate.SQL] (batch-batch - 5) select fugeentity0_.id as id1_0_0_, fugeentity0_.fuge as fuge2_0_0_ from FugeEntity fugeentity0_ where fugeentity0_.id=? 16:49:08,653 DEBUG [org.hibernate.SQL] (batch-batch - 5) update FugeEntity set fuge=? where id=? 16:49:09,660 WARN [org.dbunit.dataset.AbstractTableMetaData] (pool-2-thread-28) Potential problem found: The configured data type factory 'class org.dbunit.dataset.datatype.DefaultDataTypeFactory' might cause problems with the current database 'PostgreSQL' (e.g. some datatypes may not be supported properly). In rare cases you might see this message because the list of supported database products is incomplete (list=[derby]). If so please request a java-class update via the forums.If you are using your own IDataTypeFactory extending DefaultDataTypeFactory, ensure that you override getValidDbProducts() to specify the supported database products. 16:49:09,689 INFO [org.wildfly.extension.undertow] (MSC service thread 1-16) JBAS017535: Unregister web context: /test 16:49:09,690 INFO [org.jboss.weld.deployer] (MSC service thread 1-7) JBAS016009: Stopping weld service for deployment test.war 16:49:09,696 INFO [org.jboss.as.server.deployment] (MSC service thread 1-9) JBAS015877: Stopped deployment test.war (runtime-name: test.war) in 7ms 16:49:09,703 INFO [org.jboss.as.repository] (management-handler-thread - 223) JBAS014901: Content removed from location /Users/kyle/apps/wildfly-8.0.0.CR1/standalone/data/content/1d/b651139b3cf98767bf2bdb8e1cedbfaff7497c/content 16:49:09,703 INFO [org.jboss.as.server] (management-handler-thread - 223) JBAS018558: Undeployed "test.war" (runtime-name: "test.war")
テーブル
まあ普通。
jbatcharts=# select * from fugeentity order by id; id | fuge ----+-------------- 1 | FugeModified 2 | FugeFuge (2 rows)
備考
何か無駄に複雑にしているような気がするが何か他に良いテストの方法ないだろうか。
Tags: jbatch
Partitioned Chunk Processingで並列処理してみる
TweetPosted on Monday Feb 10, 2014 at 06:50PM in Technology
簡単な並列処理をさせて遊ぶ
Partitioned Chunk Processingとは
バルク処理で使うChunk-oriented Processingで並列処理をやるための仕組み。詳細は[1]の11.7 Partitioned Chunk Processingとかを読むと良い(各処理がどのような順序で、どのスレッドで、どのトランザクションで実行されるかが書いてある)。Chunk-oriented ProcessingについてはChunk方式のStepを使ってみるを参照
ここではChunk-oriented Processingで出てきた3つのインタフェースに加えて、以下の4つのインタフェースを使う。
- PartitionMapper
- PartitionReducer
- PartitionCollector
- PartitionAnalyzer
全てが必須ではないようだが、機能の確認のためにここでは4インタフェースとも使ってみる。それぞれの役割と実装の仕方はざっくり以下のような感じになるかと思われる
PartitionMapper
PartitionPlan mapPartitions()
- どのような条件で並列実行させるかの情報をPartitionPlanインタフェースを実装するオブジェクトに詰めて返す
- int partitions: 処理をいくつに分けて実行するか
- int threads: 何パラで実行するか。デフォルトは0で、この場合暗黙的にpartitionsと同じ数とみなされる
- Properties[] partitionProperties: 各並列処理単位(以下ワーカーと呼ぶ)に与えるProperties
- 各ワーカーに番号を割り当てたりとか
- boolean partitionsOverride: 再実行時に前回実行時の結果を捨てるか否か?試してないので不明。デフォルトはfalse
- PartitionPlanImplというクラスがAPIで用意されているのでこいつを使うと楽
- メインスレッドで実行される
- トランザクション外で実行される
PartitionReducer
- 処理開始終了前後の処理とかを書く
- 各メソッドはメインスレッドで実行される
void beginPartitionedStep()
- Partition*系のインタフェース中で一番最初に実行されるメソッド
- 何か並列処理に関連する前処理をしたいときはここに書いたらいいのかも。具体的に何を書いたらいいかはよくわからん
- トランザクション外で実行される
void beforePartitionedStepCompletion()
- 全ワーカーが処理を全件終了した後に実行される
- 全ての処理結果が揃ってはじめて出来る処理とかがあれば、ここに書けば良さげ
- トランザクション終了前(ワーカーのトランザクションとは別)に実行される
void rollbackPartitionedStep()
- 何か問題が起きてロールバックする時、その直前に呼ばれる?
- エラー発生時、並列処理に関連して必要な後処理的なものがあればここに書けばよいのかも
- これが呼ばれるときはbeforePartitionedStepCompletion()は呼ばれないらしい
void afterPartitionedStepCompletion(PartitionStatus)
- Partition*系のインタフェース中で一番最後に実行されるメソッド
- 何か並列処理に関連する後処理をしたいときはここに書いたらいいのかも。具体的に何を書いたらいいかはよくわからん
- トランザクション外で実行される
PartitionCollector
Serializable collectPartitionData()
- 各ワーカーが作る処理結果を返す処理を書く
- ここで返した値はメインスレッドのPartitionAnalyzer#analyzeCollectorData()で受け取れる
- ワーカースレッド(各ワーカーが走るスレッドで、メインスレッドとは別)で実行される
- ワーカーの他のクラス(ItemWriterとか)と情報をやり取りするにはStepContext#getTransientUserData() / setTransientUserData()とかが使える
- チェックポイント到達後のコミットが走った後に実行される。なのでコミット単位ごとに少しずつ処理結果を送れる
- トランザクション外で実行される
- 想像だが、ここで返したSerializableはRepositoryに記録されて再実行時とかに良きに計らってくれたりするのかも。また再実行時の挙動は調べる
PartitionAnalyzer
void analyzeCollectorData(Serializable)
- PartitionAnalyzer#collectPartitionData()で返したオブジェクトが引数で渡ってくる
- メインスレッドで実行される
- トランザクション内(ワーカーのトランザクションとは別)で実行される
- チェックポイントごとに呼ばれるので、受け取ったデータの件数を数えれば進捗状況の出力とか出来そう
各クラス・スレッド間のデータの受け渡しについて
- ワーカースレッドからメインスレッドへデータを受け渡すのはPartitionCollectorとPartitionAnalyzerの役割
- 同一スレッド内でのデータの受け渡しはStepContext#getTransientUserData() / setTransientUserData()を使う
- StepContext, JobContextはそれぞれのスレッドで別のインスタンスになる
- CDIをうまく使えばもう少し楽に書けそうだが、また調べてみる
- バルク処理の性質上、基本的には無理にメモリ上でデータをやり取りしないでテーブルやファイルを介した方が良さげではある
バッチを作ってみる
- Chunk方式のStepを使ってみるで作ったバッチを、並列処理対応させて簡単なバッチを作ってみる。新規か変更を加えた資源について述べる
- 資源の詳細はこのあたり
- エンティティ、テストデータは流用する
環境・前提条件
- Chunk方式のStepを使ってみると同じ。この記事で作ったプロジェクトや資源がすでに存在するものとする
バッチを構成する資源
partition.xml (ジョブXML)
- 入力と出力は前回と全く同じ。Chunk方式のStep1個だけ
- 並列実行用の記述を加えている
- PartitionMapperとItemReaderにパラメータでパラ実行の総数を与える
- ItemReaderにパラメータでワーカーの番号を与える
- ItemProcessorにパラメータでスリープするミリ秒数を与える
PartitionItemReader.java
- SELECT文でパラメータで受け取ったワーカーの番号とパラ実行の総数とChunkInputItemのidを使って、取ってくるレコードを絞っている
- WHERE MOD(c.id, :partitions) = :partitionNumberとかそんな感じ
PartitionItemProcessor.java
- 並列処理っぽさを出すためにパラメータで受け取ったミリ秒数だけThread.sleep()する
PartitionItemWriter.java
- ChunkOutputItemをEntityManager#persist()に渡す替わりに、StepContext#setPersistentUserData()に突っ込む
- 渡した値はMyPartitionCollectorが取り出す
MyPartitionMapper.java
- パラメータで受け取ったパラ実行数と、ワーカーの番号をPropertiesに詰めて、さらにPartitionPlanImplにまとめて入れて返す
MyPartitionReducer.java
- PartitionItemWriter→MyPartitionCollector→MyPartitionAnalyzerの順で渡ってきたChunkOutputItemを全部まとめてここでDBに書き込む
MyPartitionCollector.java
- PartitionItemWriterでStepContextに突っ込んだChunkOutputItemを取り出して返す
- ここで返した値はMyPartitionAnalyzerに渡る
MyPartitionAnalyzer.java
- MyPartitionCollectorから渡ってきたChunkOutputItemをStepContext#transientUserDataに貯めていく
- データ量が多くなるとメモリがいっぱいになるのであくまでサンプル
テスト用資源
- テストデータ、検証用データは前回と同じ
PartitionJobTest.java
- テストメソッドは1パラ実行と2パラ実行の2つ
動かしてみる
1パラ実行時のログ
12:25:45,520 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beginPartitionedStep() 12:25:45,521 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionMapper] (batch-batch - 9) mapPartitions(): partitions=1 12:25:45,521 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionMapper] (batch-batch - 9) mapPartitions(): partitionProperty={partitionNumber=0} 12:25:45,522 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 9) postConstruct(): divide=2, sleep=1000 12:25:45,523 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) open(): checkpoint=null, index starts at=0, partitions=1, partitionNumber=0 12:25:45,524 DEBUG [org.hibernate.SQL] (batch-batch - 1) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ where mod(chunkinput0_.id, ?)=? order by chunkinput0_.id 12:25:45,526 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) open(): checkpoint=null, index starts at=0 12:25:45,527 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=0, returning=ChunkInputItem [id=0, input=0, processed=false] 12:25:45,527 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=0, input=0, processed=false], output=ChunkOutputItem [id=0, result=0] 12:25:46,528 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=1 12:25:46,528 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=1, returning=ChunkInputItem [id=1, input=10, processed=false] 12:25:46,528 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=1, input=10, processed=false], output=ChunkOutputItem [id=1, result=5] 12:25:47,529 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=2 12:25:47,530 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=2, returning=ChunkInputItem [id=2, input=20, processed=false] 12:25:47,530 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=2, input=20, processed=false], output=ChunkOutputItem [id=2, result=10] 12:25:48,531 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=3 12:25:48,531 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=0, item=ChunkOutputItem [id=0, result=0] 12:25:48,531 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=0, item=ChunkOutputItem [id=1, result=5] 12:25:48,531 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=0, item=ChunkOutputItem [id=2, result=10] 12:25:48,531 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=3 12:25:48,531 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) checkpointInfo(): returns=1 12:25:48,533 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 1) collectPartitionData(): data=[ChunkOutputItem [id=0, result=0], ChunkOutputItem [id=1, result=5], ChunkOutputItem [id=2, result=10]] 12:25:48,533 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=3 12:25:48,533 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 9) analyzeCollectorData(): data=[ChunkOutputItem [id=0, result=0], ChunkOutputItem [id=1, result=5], ChunkOutputItem [id=2, result=10]], receivedDataCount=3 12:25:48,535 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=3, returning=ChunkInputItem [id=3, input=30, processed=false] 12:25:48,536 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 12:25:49,537 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=4 12:25:49,537 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=4, returning=ChunkInputItem [id=4, input=40, processed=false] 12:25:49,537 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 12:25:50,538 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=5 12:25:50,538 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=5, returning=ChunkInputItem [id=5, input=50, processed=false] 12:25:50,539 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=5, input=50, processed=false], output=ChunkOutputItem [id=5, result=25] 12:25:51,540 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=6 12:25:51,540 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=1, item=ChunkOutputItem [id=3, result=15] 12:25:51,540 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=1, item=ChunkOutputItem [id=4, result=20] 12:25:51,540 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=1, item=ChunkOutputItem [id=5, result=25] 12:25:51,540 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=6 12:25:51,540 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) checkpointInfo(): returns=2 12:25:51,542 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 1) collectPartitionData(): data=[ChunkOutputItem [id=3, result=15], ChunkOutputItem [id=4, result=20], ChunkOutputItem [id=5, result=25]] 12:25:51,542 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=6 12:25:51,542 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 9) analyzeCollectorData(): data=[ChunkOutputItem [id=3, result=15], ChunkOutputItem [id=4, result=20], ChunkOutputItem [id=5, result=25]], receivedDataCount=6 12:25:51,543 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=6, returning=ChunkInputItem [id=6, input=60, processed=false] 12:25:51,543 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=6, input=60, processed=false], output=ChunkOutputItem [id=6, result=30] 12:25:52,544 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=7 12:25:52,544 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=7, returning=ChunkInputItem [id=7, input=70, processed=false] 12:25:52,545 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=7, input=70, processed=false], output=ChunkOutputItem [id=7, result=35] 12:25:53,546 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=8 12:25:53,546 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=8, returning=ChunkInputItem [id=8, input=80, processed=false] 12:25:53,546 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=8, input=80, processed=false], output=ChunkOutputItem [id=8, result=40] 12:25:54,547 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=9 12:25:54,547 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=2, item=ChunkOutputItem [id=6, result=30] 12:25:54,548 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=2, item=ChunkOutputItem [id=7, result=35] 12:25:54,548 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=2, item=ChunkOutputItem [id=8, result=40] 12:25:54,548 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=9 12:25:54,548 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) checkpointInfo(): returns=3 12:25:54,550 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 1) collectPartitionData(): data=[ChunkOutputItem [id=6, result=30], ChunkOutputItem [id=7, result=35], ChunkOutputItem [id=8, result=40]] 12:25:54,550 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=9 12:25:54,550 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 9) analyzeCollectorData(): data=[ChunkOutputItem [id=6, result=30], ChunkOutputItem [id=7, result=35], ChunkOutputItem [id=8, result=40]], receivedDataCount=9 12:25:54,551 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=9, returning=ChunkInputItem [id=9, input=90, processed=false] 12:25:54,552 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 1) processItem(): input=ChunkInputItem [id=9, input=90, processed=false], output=ChunkOutputItem [id=9, result=45] 12:25:55,553 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=10 12:25:55,553 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) readItem(): index=10, returning=null 12:25:55,553 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) writeItems(): index=3, item=ChunkOutputItem [id=9, result=45] 12:25:55,553 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=11 12:25:55,553 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) checkpointInfo(): returns=4 12:25:55,555 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 1) collectPartitionData(): data=[ChunkOutputItem [id=9, result=45]] 12:25:55,555 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) checkpointInfo(): returns=11 12:25:55,555 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 9) analyzeCollectorData(): data=[ChunkOutputItem [id=9, result=45]], receivedDataCount=10 12:25:55,556 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) close() 12:25:55,556 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) close() 12:25:55,556 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 1) collectPartitionData(): data=[] 12:25:55,556 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 9) analyzeCollectorData(): data=[], receivedDataCount=10 12:25:55,556 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 1) close() 12:25:55,556 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 1) close() 12:25:55,557 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 9) analyzeStatus(): batchStatus=COMPLETED, exitStatus=COMPLETED 12:25:55,557 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion() 12:25:55,557 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=0, result=0] 12:25:55,558 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=1, result=5] 12:25:55,558 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=2, result=10] 12:25:55,558 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=3, result=15] 12:25:55,558 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=4, result=20] 12:25:55,559 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=5, result=25] 12:25:55,559 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=6, result=30] 12:25:55,559 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=7, result=35] 12:25:55,559 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=8, result=40] 12:25:55,560 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=9, result=45] 12:25:55,561 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,562 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,562 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,563 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,564 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,564 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,565 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,565 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,566 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,566 DEBUG [org.hibernate.SQL] (batch-batch - 9) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:55,594 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 9) afterPartitionedStepCompletion(): partitionStatus=COMMIT
2パラ実行時のログ
12:25:38,265 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beginPartitionedStep() 12:25:38,265 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionMapper] (batch-batch - 4) mapPartitions(): partitions=2 12:25:38,266 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionMapper] (batch-batch - 4) mapPartitions(): partitionProperty={partitionNumber=0} 12:25:38,266 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionMapper] (batch-batch - 4) mapPartitions(): partitionProperty={partitionNumber=1} 12:25:38,267 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 4) postConstruct(): divide=2, sleep=1000 12:25:38,269 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) open(): checkpoint=null, index starts at=0, partitions=2, partitionNumber=0 12:25:38,271 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 4) postConstruct(): divide=2, sleep=1000 12:25:38,273 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) open(): checkpoint=null, index starts at=0, partitions=2, partitionNumber=1 12:25:38,273 DEBUG [org.hibernate.SQL] (batch-batch - 2) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ where mod(chunkinput0_.id, ?)=? order by chunkinput0_.id 12:25:38,274 DEBUG [org.hibernate.SQL] (batch-batch - 8) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ where mod(chunkinput0_.id, ?)=? order by chunkinput0_.id 12:25:38,276 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) open(): checkpoint=null, index starts at=0 12:25:38,278 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) readItem(): index=0, returning=ChunkInputItem [id=1, input=10, processed=false] 12:25:38,279 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=1, input=10, processed=false], output=ChunkOutputItem [id=1, result=5] 12:25:38,279 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) open(): checkpoint=null, index starts at=0 12:25:38,280 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) readItem(): index=0, returning=ChunkInputItem [id=0, input=0, processed=false] 12:25:38,280 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=0, input=0, processed=false], output=ChunkOutputItem [id=0, result=0] 12:25:39,280 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=1 12:25:39,280 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) readItem(): index=1, returning=ChunkInputItem [id=3, input=30, processed=false] 12:25:39,281 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=1 12:25:39,281 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15] 12:25:39,281 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) readItem(): index=1, returning=ChunkInputItem [id=2, input=20, processed=false] 12:25:39,281 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=2, input=20, processed=false], output=ChunkOutputItem [id=2, result=10] 12:25:40,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=2 12:25:40,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) readItem(): index=2, returning=ChunkInputItem [id=5, input=50, processed=false] 12:25:40,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=5, input=50, processed=false], output=ChunkOutputItem [id=5, result=25] 12:25:40,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=2 12:25:40,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) readItem(): index=2, returning=ChunkInputItem [id=4, input=40, processed=false] 12:25:40,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20] 12:25:41,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=3 12:25:41,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) writeItems(): index=0, item=ChunkOutputItem [id=1, result=5] 12:25:41,282 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) writeItems(): index=0, item=ChunkOutputItem [id=3, result=15] 12:25:41,283 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) writeItems(): index=0, item=ChunkOutputItem [id=5, result=25] 12:25:41,283 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=3 12:25:41,283 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) checkpointInfo(): returns=1 12:25:41,283 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=3 12:25:41,283 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) writeItems(): index=0, item=ChunkOutputItem [id=0, result=0] 12:25:41,283 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) writeItems(): index=0, item=ChunkOutputItem [id=2, result=10] 12:25:41,284 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) writeItems(): index=0, item=ChunkOutputItem [id=4, result=20] 12:25:41,284 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=3 12:25:41,284 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) checkpointInfo(): returns=1 12:25:41,285 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 8) collectPartitionData(): data=[ChunkOutputItem [id=1, result=5], ChunkOutputItem [id=3, result=15], ChunkOutputItem [id=5, result=25]] 12:25:41,285 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 2) collectPartitionData(): data=[ChunkOutputItem [id=0, result=0], ChunkOutputItem [id=2, result=10], ChunkOutputItem [id=4, result=20]] 12:25:41,286 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=3 12:25:41,286 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=3 12:25:41,286 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeCollectorData(): data=[ChunkOutputItem [id=1, result=5], ChunkOutputItem [id=3, result=15], ChunkOutputItem [id=5, result=25]], receivedDataCount=3 12:25:41,286 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeCollectorData(): data=[ChunkOutputItem [id=0, result=0], ChunkOutputItem [id=2, result=10], ChunkOutputItem [id=4, result=20]], receivedDataCount=6 12:25:41,287 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) readItem(): index=3, returning=ChunkInputItem [id=7, input=70, processed=false] 12:25:41,287 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) readItem(): index=3, returning=ChunkInputItem [id=6, input=60, processed=false] 12:25:41,287 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=7, input=70, processed=false], output=ChunkOutputItem [id=7, result=35] 12:25:41,287 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=6, input=60, processed=false], output=ChunkOutputItem [id=6, result=30] 12:25:42,288 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=4 12:25:42,288 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=4 12:25:42,288 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) readItem(): index=4, returning=ChunkInputItem [id=9, input=90, processed=false] 12:25:42,288 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) readItem(): index=4, returning=ChunkInputItem [id=8, input=80, processed=false] 12:25:42,288 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 8) processItem(): input=ChunkInputItem [id=9, input=90, processed=false], output=ChunkOutputItem [id=9, result=45] 12:25:42,288 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemProcessor] (batch-batch - 2) processItem(): input=ChunkInputItem [id=8, input=80, processed=false], output=ChunkOutputItem [id=8, result=40] 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=5 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=5 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) readItem(): index=5, returning=null 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) readItem(): index=5, returning=null 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) writeItems(): index=1, item=ChunkOutputItem [id=7, result=35] 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) writeItems(): index=1, item=ChunkOutputItem [id=6, result=30] 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) writeItems(): index=1, item=ChunkOutputItem [id=9, result=45] 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) writeItems(): index=1, item=ChunkOutputItem [id=8, result=40] 12:25:43,290 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=6 12:25:43,291 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=6 12:25:43,291 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) checkpointInfo(): returns=2 12:25:43,291 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) checkpointInfo(): returns=2 12:25:43,293 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 2) collectPartitionData(): data=[ChunkOutputItem [id=6, result=30], ChunkOutputItem [id=8, result=40]] 12:25:43,293 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 8) collectPartitionData(): data=[ChunkOutputItem [id=7, result=35], ChunkOutputItem [id=9, result=45]] 12:25:43,293 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) checkpointInfo(): returns=6 12:25:43,293 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) checkpointInfo(): returns=6 12:25:43,293 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeCollectorData(): data=[ChunkOutputItem [id=6, result=30], ChunkOutputItem [id=8, result=40]], receivedDataCount=8 12:25:43,293 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeCollectorData(): data=[ChunkOutputItem [id=7, result=35], ChunkOutputItem [id=9, result=45]], receivedDataCount=10 12:25:43,294 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) close() 12:25:43,294 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) close() 12:25:43,294 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) close() 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) close() 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 8) collectPartitionData(): data=[] 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionCollector] (batch-batch - 2) collectPartitionData(): data=[] 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeCollectorData(): data=[], receivedDataCount=10 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 2) close() 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemReader] (batch-batch - 8) close() 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 2) close() 12:25:43,295 FINE [org.nailedtothex.jbatch.example.partition.PartitionItemWriter] (batch-batch - 8) close() 12:25:43,296 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeStatus(): batchStatus=COMPLETED, exitStatus=COMPLETED 12:25:43,296 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeCollectorData(): data=[], receivedDataCount=10 12:25:43,297 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionAnalyzer] (batch-batch - 4) analyzeStatus(): batchStatus=COMPLETED, exitStatus=COMPLETED 12:25:43,297 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion() 12:25:43,298 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=1, result=5] 12:25:43,298 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=3, result=15] 12:25:43,298 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=5, result=25] 12:25:43,298 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=0, result=0] 12:25:43,299 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=2, result=10] 12:25:43,299 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=4, result=20] 12:25:43,299 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=6, result=30] 12:25:43,299 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=8, result=40] 12:25:43,299 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=7, result=35] 12:25:43,300 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) beforePartitionedStepCompletion(): outputItem=ChunkOutputItem [id=9, result=45] 12:25:43,304 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,305 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,306 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,307 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,307 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,308 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,308 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,309 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,309 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,310 DEBUG [org.hibernate.SQL] (batch-batch - 4) insert into ChunkOutputItem (result, id) values (?, ?) 12:25:43,337 FINE [org.nailedtothex.jbatch.example.partition.MyPartitionReducer] (batch-batch - 4) afterPartitionedStepCompletion(): partitionStatus=COMMIT
job_execution
1パラ実行時
jbatch=# select * from job_execution where jobexecutionid=73; jobexecutionid | jobinstanceid | version | createtime | starttime | endtime | lastupdatedtime | batchstatus | exitstatus | jobparameters | restartposition ----------------+---------------+---------+-------------------------+-------------------------+-------------------------+-------------------------+-------------+------------+----------------+----------------- 73 | 68 | | 2014-02-14 12:25:45.517 | 2014-02-14 12:25:45.517 | 2014-02-14 12:25:55.596 | 2014-02-14 12:25:55.596 | COMPLETED | COMPLETED | sleep = 1000 +| | | | | | | | | | partitions = 1+| | | | | | | | | | divide = 2 +| | | | | | | | | | | (1 row)
2パラ実行時
jbatch=# select * from job_execution where jobexecutionid=72; jobexecutionid | jobinstanceid | version | createtime | starttime | endtime | lastupdatedtime | batchstatus | exitstatus | jobparameters | restartposition ----------------+---------------+---------+-------------------------+-------------------------+------------------------+------------------------+-------------+------------+----------------+----------------- 72 | 67 | | 2014-02-14 12:25:38.262 | 2014-02-14 12:25:38.262 | 2014-02-14 12:25:43.34 | 2014-02-14 12:25:43.34 | COMPLETED | COMPLETED | sleep = 1000 +| | | | | | | | | | partitions = 2+| | | | | | | | | | divide = 2 +| | | | | | | | | | | (1 row)
実行時間が半分になっている
step_execution
1パラ実行時
jbatch=# select * from step_execution where jobexecutionid=73; stepexecutionid | jobexecutionid | version | stepname | starttime | endtime | batchstatus | exitstatus | executionexception | persistentuserdata | readcount | writecount | commitcount | rollbackcount | readskipcount | processskipcount | filtercount | writeskipcount | readercheckpointinfo | writercheckpointinfo -----------------+----------------+---------+----------+-------------------------+-------------------------+-------------+------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+----------------------+---------------------- 88 | 73 | | doChunk | 2014-02-14 12:25:45.519 | 2014-02-14 12:25:55.595 | COMPLETED | COMPLETED | | | 10 | 10 | 4 | 0 | 0 | 0 | 0 | 0 | | (1 row)
2パラ実行時
jbatch=# select * from step_execution where jobexecutionid=72; stepexecutionid | jobexecutionid | version | stepname | starttime | endtime | batchstatus | exitstatus | executionexception | persistentuserdata | readcount | writecount | commitcount | rollbackcount | readskipcount | processskipcount | filtercount | writeskipcount | readercheckpointinfo | writercheckpointinfo -----------------+----------------+---------+----------+-------------------------+-------------------------+-------------+------------+--------------------+--------------------+-----------+------------+-------------+---------------+---------------+------------------+-------------+----------------+----------------------+---------------------- 87 | 72 | | doChunk | 2014-02-14 12:25:38.263 | 2014-02-14 12:25:43.339 | COMPLETED | COMPLETED | | | 10 | 10 | 4 | 0 | 0 | 0 | 0 | 0 | | (1 row)
partition_execution
1パラ実行時
jbatch=# select * from partition_execution where stepexecutionid = 88; partitionexecutionid | stepexecutionid | version | batchstatus | exitstatus | executionexception | persistentuserdata | readercheckpointinfo | writercheckpointinfo| 88 | | COMPLETED | COMPLETED | | \xaced00057372001f6a6176612e7574696c2e436f6c6c656374696f6e7324456d7074794c6973747ab817b43ca79ede0200007870 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b02000078700000000b | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000004 (1 row) jbatch=#
2パラ実行時
jbatch=# select * from partition_execution where stepexecutionid = 87 order by partitionexecutionid; partitionexecutionid | stepexecutionid | version | batchstatus | exitstatus | executionexception | persistentuserdata | readercheckpointinfo | writercheckpointinfo| 87 | | COMPLETED | COMPLETED | | \xaced00057372001f6a6176612e7574696c2e436f6c6c656374696f6e7324456d7074794c6973747ab817b43ca79ede0200007870 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000006 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000002 1 | 87 | | COMPLETED | COMPLETED | | \xaced00057372001f6a6176612e7574696c2e436f6c6c656374696f6e7324456d7074794c6973747ab817b43ca79ede0200007870 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000006 | \xaced0005737200116a6176612e6c616e672e496e746567657212e2a0a4f781873802000149000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b020000787000000002 (2 rows) jbatch=#
checkpointInfoをパラ実行単位で持っているようなので、再実行時は途中からの処理再開が可能なのだろうか。また試してみる
備考
- もう少し実践的な使い方がわかるようなサンプルが有れば良いのだが見つからない。もしSpring Batchに似たような機能があれば、そのサンプルを参考にできるかも
- そのうち異常終了時の再実行を試してみる
参考文献
Tags: jbatch