Kohei Nozaki's blog 

CLIからリモートEJB経由でジョブを操作してみる


Posted on Monday Feb 10, 2014 at 06:44PM in Technology


JAX-RSなどを使ってREST APIを作る方が良いのだろうと思われるが、大変なのでリモートEJB経由でやってみる。さらにコマンドラインから叩きたいので、シェルスクリプトから蹴れるように少し作り込んでみる

WildFly - リモートEJB呼び出しを使ってみる を併せて読むと多少参考になるかも

環境

  • WildFly8.0.0.CR1
  • Oracle JDK7u51
  • OS X 10.9.1

準備

プロジェクトを作る

下の3つを作る。ソースはGitHubに置いた

  1. JSR352APIのSerializableなJavaBeansを含むプロジェクト(jbatchif)
  2. リモートEJBとバッチの実装を含むプロジェクト(jbatch)
  3. クライアント側プロジェクト(jbatchcli)

JSR352APIのSerializableなJavaBeansを含むプロジェクト(jbatchif)

SimpleJobInstanceImpl.java, SimpleJobExecutionImpl.java, SimpleStepExecutionImpl.java, SimpleMetricImpl.java

JobInstance, JobExecution, StepExecution, MetricインタフェースのSerializableな実装。WildFlyのJSR352実装のこれらのクラスをそのまま使うとNotSerializableExceptionが出てしまうのでしかたなく作った(org.jberet.runtime.metric.StepMetricsが非Serializableらしい)。リモートEJB側とクライアント側の両方で使うので切り出しておく。フィールドとアクセサしかないただのJavaBeans。

リモートEJBとバッチの実装を含むプロジェクト(jbatch)

MyJobOperatorImpl.java

こいつをリモートEJB経由で呼び出す。throws宣言が若干気になるけど面倒なのでそのままにしておく

TestBatchlet.java, job001.xml

動かしてみる で使ったのと同じ

クライアント側プロジェクト(jbatchcli)

Main.java

メインクラス。引数でリモートEJBのJNDI名、ジョブ名、ジョブパラメータを受け取る。細かい処理はprocessorパッケージの下のクラスにやらせる。Commons CLIとか使って作り込んだ方がいいんだろうけど、現状ではかなり適当。

config.sh

シェルスクリプトの環境依存っぽいところを外だししてある

jbatch.sh

javaコマンドをたたくところ

jndi.properties

リモートEJBのlookupに必要な設定をする。ここを適当に書き換えればWildFly以外のAPサーバでもいけるはず

logging.properties

JBossのEJB呼び出し用ライブラリがデバッグ的な文言をINFOレベルで出力してくるのでログレベル上げて出ないようにしとく

デプロイする

CLIから叩く前にjbatchプロジェクトをデプロイする

CLIから叩いてみる

ジョブ一覧を表示してみる

kyle-no-MacBook:resources kyle$ ./jbatch.sh list-job-names
Picked up _JAVA_OPTIONS: -Dfile.encoding=UTF-8
jobNames: [job001]
kyle-no-MacBook:resources kyle$ 

jbatchランタイムに認識されたジョブXMLの一覧的なのが出る。job001しかないので1つだけだけど。

ジョブを起動してみる

kyle-no-MacBook:resources kyle$ ./jbatch.sh start job001
Picked up _JAVA_OPTIONS: -Dfile.encoding=UTF-8
executionId: 63
kyle-no-MacBook:resources kyle$ 

普通に動いている。WildFly側にもログが出てる。こんな感じ

まだジョブXML一覧と起動のところしか作ってない。停止とか再実行とかexecutionIdから情報見るとかはそのうち作り込む。

何故かなかなかjavaプロセスが終了しないことがあるのが気になる。1分位待ってると終わるんだけど。

参考文献

  1. David Blevins' Blog: @ApplicationException is evil… sort of


Chunk方式のStepを使ってみる


Posted on Monday Feb 10, 2014 at 06:42PM in Technology


jbatchでバルク処理を行う場合、Chunk-orientedという方式に合わせたインタフェース群を実装することが推奨されている。若干複雑なので、簡単な実装を作ってみて、ログや仕様を見つつ処理の流れを確認してみる。

JSR 352 Implementation - Connection close probl… | Communityによると、ここに書いてあるような実装はよろしくないようです。SELECT文の発行などは全て同一トランザクション内で完結する必要があるらしい。open()で取得するのはPKだけにして、実際のデータはreadItem()で別途取得するようにするなどした方が良いかも

トランザクション境界を跨いでJDBCの資源(Connection, Statement, ResultSet等)を使う場合は,非JTAデータソースを使いましょう詳細はこちらを参照

環境

前提条件

Arquillianでジョブのテストをしてみるで作ったプロジェクトが普通に動いているものとする

Chunk-oriented Processingとは

以下の3つのインタフェースで1つのStepが構成される。開発者はこれらのインタフェースを実装したクラスを作って1つのStepを作る

  • ItemReader
  • ItemProcessor
  • ItemWriter

データの入力元と出力先が両方ともDBの場合、それぞれの役割と実装の仕方はざっくり以下のような感じになるかと思われる

ItemReader

SELECT文を実行して処理したい単位でデータを取り出しItemProcessorに渡す。

void open(Serializable)

SELECT文を発行するのが普通の使い方かと思われる。

バッチが初回実行の場合、引数はnullだが、途中でバッチが死んだ後の再実行の場合、checkpointInfo()で返したカウンタが引数で渡されるので、これをうまく使えば処理済みのデータをスキップして途中から処理を再開させられる。

JPAを使う場合

データ量が少ない場合はEntityManagerを使いQueryを作ってgetResultList()を呼ぶ。データ量が多い場合、[4][5][8]あたりを読むと良いかも

JDBCを使う場合

DataSourceを使いConnection, Statement, ResultSetを作る。データ量が多い場合は[6]あたりを読むとよいかも。

Object readItem()

データを1件取り出す処理。データがなくなったらnullを返すと全件処理終了の意味になる。

JPAを使う場合

open()で作ったListからget()でデータを取り出して返す。

JDBCを使う場合

ResultSet#next()を呼んでカーソルを制御しつつ、簡単なものならResultSetをそのまま返すか、ResultSet#get()でデータを取り出して、適当なDTO等につめて返す

void close()

open()で開いた資源を解放する。Connection, Statement, ResultSetを使った場合はここでcloseする。

Serializable checkpointInfo()

カウンタを返す。返したカウンタはRepositoryに保存され、どこまで処理が完了したかが記録される。

ItemProcessor

ItemReaderが取り込んだデータを適当に処理して処理結果を作る。省略可。

Object process(Object)

ItemReader#readItem()が返したオブジェクトがここでの引数になる。これを使って何らかの処理をする。ここで返した処理結果はItemWriter#writeItems()の引数になる

ItemWriter

ItemProcessorが作った処理結果をどこかのテーブルに書き込む。

void open(Serializable)

JDBCを使う場合、ここでConnectionとINSERT/UPDATE/DELETE文のPreparedStatementを開いておいたりすると良いのかも たぶん何もしない.

void writeItems(List)

ItemProcessorが作った処理結果が1トランザクションで書き込む単位でまとめてListに詰めて引数で渡されるので、ループ等を使って要素をどこかのテーブルに書き込む。

JPAを使う場合

EntityManager#persist()かmerge()を呼び出す。

JDBCを使う場合

open()で開いておいたConnection, PreparedStatementを作って,データを書き込む。

void close()

Connection, Statementを使った場合はここで閉じる たぶん何もしない.

Serializable checkpointInfo()

ItemReaderの同メソッドと同じ。

準備

単純なchunk方式のStepが1つだけのバッチをJPAを使って作ってみる。資源の詳細はこのあたり参照

バッチを構成する資源

ChunkInputItem.java, ChunkOutputItem.java

超単純なエンティティ。

  • ItemReaderがChunkInputItemを取ってくる
  • ItemProcessorがChunkInputItemをChunkOutputItemに変換する
  • ItemWriterがChunkOutputItemをDBに書き込む

chunk.xml

  • ジョブXML。chunk方式のStep1個だけの単純なもの
  • 3件処理したらコミット
  • processorにジョブパラメータを渡してみる

ChunkItemReader.java

  • データ量が多い場合はよろしくないが、簡単のためgetResultList()でデータを取ってくる
  • 一応カウンタ付き

ChunkItemProcessor.java

  • @BatchPropertyで受け取ったプロパティを使って、@PostConstructで初期化的な処理をやってみる
  • ChunkInputItemからChunkOutputItemを作って返す

ChunkItemWriter.java

  • EntityManager#persist()を呼ぶ
  • 一応カウンタ付き

テスト用資源

ChunkJobTest.java

  • テストクラス
  • ジョブプロパティを渡してみる

ChunkInputItem.yml, ChunkOutputItem.yml

  • テストデータ
  • ChunkInputItemは10件
  • ChunkOutputItemは0件

expected.yml

  • ChunkOutputItemへの出力の期待結果

実行してみる

テスト結果は普通に緑色になる。ログはこんな感じ

22:00:29,478 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#postConstruct(): divide=2
22:00:29,478 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#open(): checkpoint=null, index starts at=0
22:00:29,481 DEBUG [org.hibernate.SQL] (batch-batch - 3) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id
22:00:29,491 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#open(): checkpoint=null, index starts at=0
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=0
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=0, input=0, processed=false]
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=0, input=0, processed=false], output=ChunkOutputItem [id=0, result=0]
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=1
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=1
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=1, input=10, processed=false]
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=1, input=10, processed=false], output=ChunkOutputItem [id=1, result=5]
22:00:29,492 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=2
22:00:29,493 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=2
22:00:29,493 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=2, input=20, processed=false]
22:00:29,493 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=2, input=20, processed=false], output=ChunkOutputItem [id=2, result=10]
22:00:29,493 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=3
22:00:29,493 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=0
22:00:29,493 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=0, result=0]
22:00:29,493 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=1, result=5]
22:00:29,494 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=2, result=10]
22:00:29,494 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=3
22:00:29,494 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=1
22:00:29,494 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=3
22:00:29,497 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,505 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,506 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,507 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=3
22:00:29,507 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=3, input=30, processed=false]
22:00:29,507 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=3, input=30, processed=false], output=ChunkOutputItem [id=3, result=15]
22:00:29,507 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=4
22:00:29,507 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=4
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=4, input=40, processed=false]
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=4, input=40, processed=false], output=ChunkOutputItem [id=4, result=20]
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=5
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=5
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=5, input=50, processed=false]
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=5, input=50, processed=false], output=ChunkOutputItem [id=5, result=25]
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=6
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=1
22:00:29,508 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=3, result=15]
22:00:29,509 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=4, result=20]
22:00:29,509 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=5, result=25]
22:00:29,509 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=6
22:00:29,509 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=2
22:00:29,509 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=6
22:00:29,510 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,512 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,513 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=6
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=6, input=60, processed=false]
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=6, input=60, processed=false], output=ChunkOutputItem [id=6, result=30]
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=7
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=7
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=7, input=70, processed=false]
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=7, input=70, processed=false], output=ChunkOutputItem [id=7, result=35]
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=8
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=8
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=8, input=80, processed=false]
22:00:29,514 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=8, input=80, processed=false], output=ChunkOutputItem [id=8, result=40]
22:00:29,515 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=9
22:00:29,515 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=2
22:00:29,515 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=6, result=30]
22:00:29,515 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=7, result=35]
22:00:29,515 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=8, result=40]
22:00:29,515 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=9
22:00:29,516 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=3
22:00:29,516 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=9
22:00:29,516 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,517 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,517 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,518 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=9
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=ChunkInputItem [id=9, input=90, processed=false]
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemProcessor] (batch-batch - 3) chunkItemProcessor#processItem(): input=ChunkInputItem [id=9, input=90, processed=false], output=ChunkOutputItem [id=9, result=45]
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=10
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): index=10
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#readItem(): returning=null
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): index=3
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#writeItems(): item=ChunkOutputItem [id=9, result=45]
22:00:29,519 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=10
22:00:29,520 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#checkpointInfo(): returns=4
22:00:29,520 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#checkpointInfo(): returns=10
22:00:29,520 DEBUG [org.hibernate.SQL] (batch-batch - 3) insert into ChunkOutputItem (result, id) values (?, ?)
22:00:29,522 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#close()
22:00:29,522 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#close()
22:00:29,522 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemReader] (batch-batch - 3) chunkItemReader#close()
22:00:29,522 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 3) chunkItemWriter#close()

処理の流れを見てみる

[2]の「11.6 Regular Chunk Processing」に詳細が書いてあるが、以下のようなイメージの順序で処理が行われる

まずopenが呼ばれる(ログの2-4行目)

  1. トランザクション開始
  2. ItemReader#open()
  3. ItemWriter#open()
  4. コミット

データがなくなる(readItem()がnullを返す)まで以下ループ(ログの5-26行目)

  1. トランザクション開始
  2. ItemReader#readItem()
  3. ItemProcessor#processItem()
  4. ItemReader#readItem()
  5. ItemProcessor#processItem()
  6. ItemReader#readItem()
  7. ItemProcessor#processItem()
  8. itemWriter#writeItems() ※ItemProcessor#processItem()の返り値3件分がListに詰められたものが引数
  9. ItemReader#checkpointInfo()をRepositoryに記録 
  10. ItemWriter#checkpointInfo()をRepositoryに記録
  11. コミット

ItemReader#readItem()で件数カウンタをインクリメントしつつデータを1件返す。ItemProcessor#processItem()にデータが渡ってきたら、処理を行い、結果をreturnで返す。これがitem-countに指定した数繰り返され、処理結果がListに詰められてItemWriter#writeItems()に渡される。ここでも件数カウンタをインクリメントしつつ書き込む。コミット前にItemReaderとItemWriterのcheckpointInfo()が呼ばれ、件数カウンタがRepositoryに書き込まれる。

件数カウンタを実装しておくと、処理中に問題が起きた後の再実行時に処理済みのデータをスキップさせることが可能になる。再実行時のスキップが必要ない場合は、AbstractItemReaderを継承して、Iteratorを使うと楽に実装できる。

なお、アプリで使っているものとRepositoryのものとは通常別のデータソースになるので、基本的にXAトランザクションが必要になるようだ.または,XAトランザクションを使いたくない場合は,Repository用のデータソースを非JTAデータソースにするといいかもしれない.

最後にcloseが呼ばれる(ログの83-86行目)

  1. トランザクション開始
  2. ItemReader#close()
  3. ItemWriter#close()
  4. コミット

備考

  • ItemReader#close()とItemWriter#close()の呼ばれる順序が仕様[2]と逆な気がする。仕様だとItemWriter#close()→ItemReader#close()の順だが、この実装だとItemReader#close()→ItemWriter#close()の順に呼ばれている
  • ItemReaderとItemWriterのclose()が2回呼ばれるのが気になる。[3]を見るとsafeClose()ってとこで例外を握りつぶしつつ、しつこくclose()するようになってる。まあこれはこれでいいのかも
  • checkpointInfo()も無駄に呼ばれている気がする。けど別に害は無いだろうから気にしない
  • JDBCを使う場合はDbUtils.closeQuietly()があると便利かも。ResultSetHandlerは使い辛そう

続き

Chunk方式のStepでJDBCを使ってみる

参考文献

  1. Batch Application for the Java Platform – JSR352
  2. JSR-000352 Batch Applications for the Java Platform - Final Release
  3. jsr352/jberet-core/src/main/java/org/jberet/runtime/runner/ChunkRunner.java at master · jberet/jsr352
  4. java - How to handle large dataset with JPA (or at least with Hibernate)? - Stack Overflow
  5. java - JPA: what is the proper pattern for iterating over large result sets? - Stack Overflow
  6. java - JDBC: How to read all rows from huge table? - Stack Overflow
  7. 調査メモ: DbUtilsのサンプル
  8. JPAからフェッチサイズを変更したかった - kagamihogeの日記


別プロセスからリモートEJBを呼び出してみる


Posted on Sunday Feb 09, 2014 at 03:18PM in Technology


WildFly8にデプロイしてあるEJB(WARファイル内に存在)を、別のJavaプロセスからリモート呼び出ししてみる

環境

  • Eclipse Kepler SR1
  • WildFly8.0.0.CR1
  • Oracle JDK7u51

前提条件

WildFly8.0.0CR1でサーブレットを動かしてみる あたりでEclipseに加えた設定が終わっているものとする

準備

プロジェクトを作る

以下の3つを作ります

  1. リモートEJBインタフェースを含むプロジェクト(remoteejbif)
  2. リモートEJBの実装を含むプロジェクト(remoteejbwar)
  3. クライアント側プロジェクト(remoteejbclient)

リモートEJBインタフェースを含むプロジェクト(remoteejbif)

  1. 「remoteejbif」という名称のJavaプロジェクトを作る
  2. ソースを追加
package foo;


public interface HogeBean {

    String hoge();
}

リモートEJBの実装を含むプロジェクト(remoteejbwar)

  1. 「remoteejbwar」という名称のMavenプロジェクトをarchetype「javaee7-essentials」から作る
  2. プロジェクトの設定を開く
  3. Project Referencesでremoteejbifにチェックを入れる
  4. Java Build PathのProjectsタブでAddを押してremoteejbifを追加する
  5. Deployment Assembly→Add→Projectでremoteejbifを追加する。追加した後、再度設定内容を確認すると、追加されていなかったりするので注意(もう一度追加するとOKっぽい。なんだこれ)
  6. ソースを追加
  7. ソースを追加したらJava EE Perspective→Servers→WildFly→右クリック→Add and Removeからデプロイ
package foo;

import javax.ejb.Remote;
import javax.ejb.Stateless;

@Stateless
@Remote(HogeBean.class)
public class HogeBeanImpl implements HogeBean{

    @Override
    public String hoge() {
        return "hoge from remote";
    }
}

クライアント側プロジェクト(remoteejbclient)

  1. 「remoteejbclient」という名称のJavaプロジェクトを作る
  2. プロジェクトの設定を開く
  3. Project Referencesでremoteejbifにチェックを入れる
  4. Java Build PathのProjectsタブでAddを押してremoteejbifを追加する
  5. Java Build PathのLibrariesタブでAdd External Jarsを押して$WILDFLY_HOME/bin/client/jboss-client.jarを追加する
  6. ソースを追加
package foo;

import java.util.Properties;

import javax.naming.Context;
import javax.naming.InitialContext;

public class HogeBeanInvoker2 {

    public static void main(String[] args) throws Exception {
        Properties p = new Properties();
        p.put("remote.connections", "default");
        p.put("remote.connection.default.port", "8080");
        p.put("remote.connection.default.host", "localhost");
        p.put("remote.connectionprovider.create.options.org.xnio.Options.SSL_ENABLED", false);
        p.put(Context.URL_PKG_PREFIXES, "org.jboss.ejb.client.naming");
        p.put("org.jboss.ejb.client.scoped.context", true);

        Context c = null;
        try {
            c = new InitialContext(p);
            HogeBean hogeBean = (HogeBean) c.lookup("ejb:/remoteejbwar//HogeBeanImpl!foo.HogeBean");
            System.out.println(hogeBean.hoge());
        } finally {
            c.close();
        }
    }
}

実行

「remoteejbclient」プロジェクトに作ったHogeBeanInvoker2を普通に実行する(右クリック→Run As→Java Application)。こんな感じの出力が出る。わりと速い

2 10, 2014 5:24:54 午後 org.xnio.Xnio <clinit>
INFO: XNIO version 3.2.0.Beta4
2 10, 2014 5:24:54 午後 org.xnio.nio.NioXnio <clinit>
INFO: XNIO NIO Implementation Version 3.2.0.Beta4
2 10, 2014 5:24:54 午後 org.jboss.remoting3.EndpointImpl <clinit>
INFO: JBoss Remoting version (unknown)
2 10, 2014 5:24:54 午後 org.jboss.ejb.client.remoting.VersionReceiver handleMessage
INFO: EJBCLIENT000017: Received server version 2 and marshalling strategies [river]
2 10, 2014 5:24:54 午後 org.jboss.ejb.client.remoting.RemotingConnectionEJBReceiver associate
INFO: EJBCLIENT000013: Successful version handshake completed for receiver context EJBReceiverContext{clientContext=org.jboss.ejb.client.EJBClientContext@1f1ee981, receiver=Remoting connection EJB receiver [connection=org.jboss.ejb.client.remoting.ConnectionPool$PooledConnection@2674241d,channel=jboss.ejb,nodename=kyle-no-macbook]} on channel Channel ID bb6edca7 (outbound) of Remoting connection 3aa7fab1 to localhost/127.0.0.1:8080
2 10, 2014 5:24:54 午後 org.jboss.ejb.client.EJBClient <clinit>
INFO: JBoss EJB Client version 2.0.0.Beta5
hoge from remote

備考

デプロイ時に出力されるJNDI名は別プロセスからのリモート呼び出しには使えない

EJBをデプロイするとWildFlyの出力に以下のようなJNDI名が出力されるが、別プロセスからのリモート呼び出しでは使えなかった。別プロセスからlookupする場合は「ejb:」で始まるJNDI名を使わないといけないっぽい。詳細は参考文献を参照。

java:global/remoteejbwar/HogeBeanImpl!foo.HogeBean
java:app/remoteejbwar/HogeBeanImpl!foo.HogeBean
java:module/HogeBeanImpl!foo.HogeBean
java:jboss/exported/remoteejbwar/HogeBeanImpl!foo.HogeBean
java:global/remoteejbwar/HogeBeanImpl
java:app/remoteejbwar/HogeBeanImpl
java:module/HogeBeanImpl

同じAPサーバにデプロイされたアプリケーション間なら「java:global」で始まるJNDI名でlookup可能。

ユーザの作成は不要

参考文献を見るとユーザの作成が必要な旨の記述が多く見つかるが、どうやらこの環境では特に必要ない模様

standalone.xmlでOK

参考文献を見ると設定ファイルstandalone-full.xmlで起動している例が見つかるが、この記事のような単純なものの場合は特に必要ない模様。JMSとか使う場合に必要になるのかも

その他

レガシーでもはやなるべく使うべきではないと言われることもあるリモートEJB呼び出しだが、局面によっては依然使えると思われる

参考文献

  1. Tanmoy's Blog: WildFly: EJB invocations from a remote client
  2. java - WildFly: EJB invocations from a remote client - Stack Overflow
  3. EJB invocations from a remote client using JNDI - WildFly 8 - Project Documentation Editor
  4. JNDI Lookup Failing | Community
  5. Remote EJB invocations via JNDI - EJB client API or remote-naming project - WildFly 8 - Project Documentation Editor
  6. Java Web: Accessing remote EJB on JBoss AS 7.1 from web application
  7. JBoss EAP 6.2のリモートEJB呼び出し - nekopの日記


WildFly8でJDBCジョブレポジトリを使ってみる


Posted on Saturday Feb 08, 2014 at 04:36PM in Technology


WildFly8.0.0.CR1ではジョブレポジトリがデフォルトではインメモリH2になっているが、スタンドアロンのDBにしてみる。現状H2しかサポートしてないようなのでサーバモードのH2を立ててやってみる

この例では普通のデータソースを使ってますが、バッチから他のデータベースも参照する場合は、XAデータソースにしておかないと問題が起こると思われるので注意

環境

  • WildFly8.0.0.CR1
  • H2 1.3.175 (2014-01-18)
  • Oracle JDK7u51
  • OS X 10.9.1

準備

プロジェクト

動かしてみるで作ったプロジェクトをそのまま使う。

H2を立ち上げる

  1. H2のzipファイルをとってきて展開する
  2. h2/binに存在するh2.shを実行する
  3. ブラウザにH2管理コンソールへのログイン画面が表示される。デフォルトでTCP接続のH2にログインする内容になっているのでそのままログインする
  4. 適当にSQLを実行してみて普通に動いている事を確認する

データソースを作る

jboss-cliで叩くコマンドはこんな感じ。この環境だとH2は最初から入っているようなので特にJDBCドライバのデプロイ等は不要。

data-source add \
      --name=JBatchDS \
      --driver-name=h2 \
      --connection-url=jdbc:h2:tcp://localhost/~/test \
      --jndi-name=java:jboss/jdbc/JBatchDS \
      --user-name=sa \
      --password=

一応接続チェック

[standalone@localhost:9990 /] /subsystem=datasources/data-source=JBatchDS:test-connection-in-pool
{
    "outcome" => "success",
    "result" => [true]
}

ジョブレポジトリを変更

JDBCレポジトリのデータソース名を設定

[standalone@localhost:9990 /] /subsystem=batch/job-repository=jdbc:write-attribute(name=jndi-name, value=java:jboss\/jdbc\/JBatchDS)
{
    "outcome" => "success",
    "response-headers" => {
        "operation-requires-reload" => true,
        "process-state" => "reload-required"
    }
}

job-repository-typeをjdbcに設定

[standalone@localhost:9990 /] /subsystem=batch:write-attribute(name=job-repository-type, value=jdbc)
{
    "outcome" => "success",
    "response-headers" => {
        "operation-requires-restart" => true,
        "process-state" => "restart-required"
    }
}

restart-requiredだそうなので再起動する。

実行してみる

TestServletにアクセス

普通に動いているようです

DBを見てみる

事前にDDLを流したりは一切していないが、H2のWebコンソールから見てみると自動的に必要なテーブル等が作られている

備考

GlassFishにあるlist-batch-jobs的なコマンドがあれば良いのだけどまだ見つけられていない。また調べる

参考文献

  1. H2 Database Engine
  2. Wildfly Model Reference


@ResourceアノテーションでDataSourceを注入してみる


Posted on Sunday Feb 02, 2014 at 05:23PM in Technology


直接JDBCを使う必要がでてきた時用。

環境

  • WildFly8.0.0.CR1
  • Oracle JDK7u51
  • PostgreSQL 9.2.4

準備

データソースを確認

こんなデータソースを使ってみることにする

[standalone@localhost:9990 /] /subsystem=datasources/data-source=TestDS:read-attribute(name=jndi-name)
{
    "outcome" => "success",
    "result" => "java:jboss/jdbc/TestDS"
}
[standalone@localhost:9990 /] 

HogeServlet.java

このサーブレットにDataSourceを注入します。try-with-resourcesが便利ですね。@Resourceアノテーションのlookup属性には、“java:comp/env/“の後にXMLファイルで定義した論理名を書く

package com.example;

import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import javax.annotation.Resource;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.sql.DataSource;

@WebServlet("/HogeServlet")
public class HogeServlet extends HttpServlet {

    @Resource(lookup = "java:comp/env/jdbc/hoge")
    DataSource dataSource;

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        try(PrintWriter pw = response.getWriter();
            Connection cn = dataSource.getConnection();
            Statement st = cn.createStatement();
            ResultSet rs = st.executeQuery("select now()")){
            while(rs.next()){
                pw.write(String.valueOf(rs.getTimestamp(1)));
            }
        }catch(SQLException e){
            throw new ServletException(e);
        }
    }
}

web.xml

この環境の場合はなくても動くようだが一応。

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <resource-ref>
        <res-ref-name>jdbc/hoge</res-ref-name>
        <res-type>javax.sql.DataSource</res-type>
    </resource-ref>
</web-app>

jboss-web.xml

WEB-INFの下に置く。ここでデータソースのJNDI名とResourceアノテーションのlookup属性で指定した名前のひも付けを行う。

<?xml version="1.0" encoding="UTF-8"?>
<jboss-web>
    <resource-ref>
        <res-ref-name>jdbc/hoge</res-ref-name>
        <res-type>javax.sql.DataSource</res-type>
        <jndi-name>java:jboss/jdbc/TestDS</jndi-name>
    </resource-ref>
</jboss-web>

動かしてみる

テストの際はどうするか

jboss-web.xmlを別のものに差し替えればOK。HogeServletには手を入れずに参照するDBを切り替えることができる。

備考

JPAのpersistence.xmlからはweb.xmlとjboss-web.xmlで定義した論理名を参照できない

persistence.xmlからもweb.xmlとjboss-web.xmlで定義した論理名 “jdbc/hoge” を使う事が出来れば、jboss-web.xmlを差し替えるだけでJPAから参照するデータソースも切り替えられるので楽だなと思ったのだが、どうやらそれは出来ないらしい[2]。“java:comp/env/jdbc/hoge” とかも試してみたが駄目だった。persistence.xmlにはAPサーバに定義してあるJNDI名を直接指定してやる必要があるみたい。残念。WebSphereだとできたりするみたいだけど一般的には駄目っぽい[5]。

今回の例だとこう書いてやる必要がある。

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
    <persistence-unit name="datasource">
        <provider>org.hibernate.ejb.HibernatePersistence</provider>
        <jta-data-source>java:jboss/jdbc/TestDS</jta-data-source>
    </persistence-unit>
</persistence>

ただアプリケーションローカルでweb.xmlかアノテーションで定義したデータソースを使う場合は共用が可能らしい[3]。アプリの資源に接続情報べた書きになるので避けたい気もするが用途によっては良いのかもしれない。

@Resourceで直接JNDI名を参照する

特に論理名を通じて利用する必要がないなら、HogeServlet.javaで以下のようにすれば直接JNDI名からデータソースを拾って来れる。jboss-web.xmlもweb.xmlも不要になる。

ただし複数箇所でJNDI名の直書きが発生するのはあまりよろしくないので、CDIのProducerを使うか、前述の手段で論理名を通して参照した方が良いような気はする。

package com.example;

import java.io.IOException;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import javax.annotation.Resource;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.sql.DataSource;

@WebServlet("/HogeServlet")
public class HogeServlet extends HttpServlet {

    @Resource(lookup="java:jboss/jdbc/TestDS")
    DataSource dataSource;

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        try(PrintWriter pw = response.getWriter();
            Connection cn = dataSource.getConnection();
            Statement st = cn.createStatement();
            ResultSet rs = st.executeQuery("select now()")){
            while(rs.next()){
                pw.write(String.valueOf(rs.getTimestamp(1)));
            }
        }catch(SQLException e){
            throw new ServletException(e);
        }
    }
}

参考文献

  1. JBoss でのデータソースの定義のしかた (DB2/400)
  2. Beware of soapy frogs: Java EE 6 - Traps, pitfalls and warts list
  3. DataSource Resource Definition in Java EE 6 (Enterprise Tech Tips)
  4. Java EE 7 Deployment Descriptors | Antonio's Blog
  5. WebSphere Application Server Version 8 Information Center