Kohei Nozaki's blog 

Chunk-oriented Processing with Hibernate's StatelessSession


Posted on Tuesday Mar 11, 2014 at 08:24AM in Technology


  • Bulk fetching is difficult with standard JPA API, but provider-specific functions make it easy.
  • This is a example of such provider-specific bulk fetching function of Hibernate with JSR352.

According to JSR 352 Implementation - Connection close probl… | Community, this example seems to be wrong. every cursors need to be opened and closed in a transaction. so another approach is required, e.g. fetch primary keys in open(), then fetch each rows in readItem().

Environment

  • WildFly8.0.0.Final
  • Hibernate 4.3.1.Final
  • Oracle JDK7u51
  • PostgreSQL 9.2.4

Example

ItemReader

package org.nailedtothex.jbatch.example.chunk;

@Named
public class ChunkItemReader extends AbstractItemReader {
    @PersistenceContext
    EntityManager em;
    ScrollableResults scroll;
    StatelessSession ss;
    Session session;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        session = em.unwrap(Session.class);
        ss = session.getSessionFactory().openStatelessSession();
        scroll = ss.createQuery("SELECT c FROM ChunkInputItem c ORDER BY c.id").scroll(ScrollMode.FORWARD_ONLY);
    }

    @Override
    public Object readItem() throws Exception {
        if (!scroll.next()) {
            return null;
        }

        return scroll.get(0);
    }

    @Override
    public void close() throws Exception {
        try {
            scroll.close();
        } catch (Exception e) {
        }
        try {
            ss.close();
        } catch (Exception e) {
        }
        try {
            session.close();
        } catch (Exception e) {
        }
    }
}
  • I'm not sure that close() procedures are mandatory or not.

ItemWriter

package org.nailedtothex.jbatch.example.chunk;

@Named
public class ChunkItemWriter extends AbstractItemWriter {
    private static final Logger log = Logger.getLogger(ChunkItemWriter.class.getName());

    @Override
    public void writeItems(List<Object> items) throws Exception {
        log.log(Level.FINE, "chunkItemWriter#writeItems(): {0}", new Object[] { items });
    }
}

persistence.xml

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1"
    xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
    <persistence-unit name="jbatchsls">
        <provider>org.hibernate.ejb.HibernatePersistence</provider>
        <jta-data-source>java:jboss/jdbc/JBatchTestDS</jta-data-source>
        <properties>
            <property name="javax.persistence.schema-generation.database.action" value="none" />
            <property name="hibernate.connection.release_mode" value="on_close"/>  
        </properties>
    </persistence-unit>
</persistence>
  • “hibernate.connection.release_mode” property is necessary.
    • NOTE: use of this declaration is highly discouraged. see the additional note in the bottom of this article which explains the another way to realize it.
  • If this is not set, then you will get:
15:24:59,047 INFO  [org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl] (batch-batch - 19) HHH000106: Forcing container resource cleanup on transaction completion
15:24:59,048 WARN  [org.hibernate.engine.jdbc.spi.SqlExceptionHelper] (batch-batch - 19) SQL Error: 0, SQLState: null
15:24:59,048 ERROR [org.hibernate.engine.jdbc.spi.SqlExceptionHelper] (batch-batch - 19) The result set is closed.
15:24:59,048 ERROR [org.jberet] (batch-batch - 19) JBERET000007: Failed to run job chunk, doChunk, org.jberet.job.model.Chunk@16d1364: org.hibernate.exception.GenericJDBCException: could not advance using next()
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:54) [hibernate-core-4.3.1.Final.jar:4.3.1.Final]
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:126) [hibernate-core-4.3.1.Final.jar:4.3.1.Final]
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:112) [hibernate-core-4.3.1.Final.jar:4.3.1.Final]
    at org.hibernate.internal.ScrollableResultsImpl.next(ScrollableResultsImpl.java:125) [hibernate-core-4.3.1.Final.jar:4.3.1.Final]
    at org.nailedtothex.jbatch.example.chunk.ChunkItemReader.readItem(ChunkItemReader.java:31) [classes:]
    at org.jberet.runtime.runner.ChunkRunner.readItem(ChunkRunner.java:343) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.jberet.runtime.runner.ChunkRunner.readProcessWriteItems(ChunkRunner.java:288) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.jberet.runtime.runner.ChunkRunner.run(ChunkRunner.java:190) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.jberet.runtime.runner.StepExecutionRunner.runBatchletOrChunk(StepExecutionRunner.java:204) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.jberet.runtime.runner.StepExecutionRunner.run(StepExecutionRunner.java:131) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.jberet.runtime.runner.CompositeExecutionRunner.runStep(CompositeExecutionRunner.java:162) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.jberet.runtime.runner.CompositeExecutionRunner.runFromHeadOrRestartPoint(CompositeExecutionRunner.java:88) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.jberet.runtime.runner.JobExecutionRunner.run(JobExecutionRunner.java:58) [jberet-core-1.0.1.Beta-SNAPSHOT.jar:1.0.1.Beta-SNAPSHOT]
    at org.wildfly.jberet.services.BatchEnvironmentService$WildFlyBatchEnvironment$1.run(BatchEnvironmentService.java:149) [wildfly-jberet-8.0.0.Final.jar:8.0.0.Final]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [rt.jar:1.7.0_51]
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [rt.jar:1.7.0_51]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_51]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_51]
    at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_51]
    at org.jboss.threads.JBossThread.run(JBossThread.java:122)
Caused by: java.sql.SQLException: The result set is closed.
    at org.jboss.jca.adapters.jdbc.WrappedResultSet.checkState(WrappedResultSet.java:4081)
    at org.jboss.jca.adapters.jdbc.WrappedResultSet.next(WrappedResultSet.java:1855)
    at org.hibernate.internal.ScrollableResultsImpl.next(ScrollableResultsImpl.java:120) [hibernate-core-4.3.1.Final.jar:4.3.1.Final]
    ... 16 more

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>jbatchsls</groupId>
    <artifactId>jbatchsls</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>
    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <failOnMissingWebXml>false</failOnMissingWebXml>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.nailedtothex</groupId>
            <artifactId>jbatchif</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>7.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-core</artifactId>
            <version>4.3.1.Final</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-core</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss</groupId>
            <artifactId>jboss-remote-naming</artifactId>
            <version>2.0.0.Final</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.jboss.xnio</groupId>
            <artifactId>xnio-nio</artifactId>
            <version>3.2.0.Final</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

ChunkInputItem

package org.nailedtothex.jbatch.example.chunk;

import java.io.Serializable;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class ChunkInputItem implements Serializable {
    private static final long serialVersionUID = 1L;

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    @Column(nullable = false)
    private Long id;

    @Column
    private Integer input;

    @Column
    private Boolean processed;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Integer getInput() {
        return input;
    }

    public void setInput(Integer input) {
        this.input = input;
    }

    public Boolean getProcessed() {
        return processed;
    }

    public void setProcessed(Boolean processed) {
        this.processed = processed;
    }

    @Override
    public String toString() {
        return "ChunkInputItem [id=" + id + ", input=" + input + ", processed=" + processed + "]";
    }
}

Test data

jbatcharts=# select * from chunkinputitem ;
 id | input | processed 
----+-------+-----------
  0 |     0 | f
  1 |    10 | f
  2 |    20 | f
  3 |    30 | f
  4 |    40 | f
  5 |    50 | f
  6 |    60 | f
  7 |    70 | f
  8 |    80 | f
  9 |    90 | f
(10 rows)

jbatcharts=# 

Log

15:29:03,916 DEBUG [org.hibernate.SQL] (batch-batch - 20) select chunkinput0_.id as id1_0_, chunkinput0_.input as input2_0_, chunkinput0_.processed as processe3_0_ from ChunkInputItem chunkinput0_ order by chunkinput0_.id
15:29:03,918 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 20) chunkItemWriter#writeItems(): [ChunkInputItem [id=0, input=0, processed=false], ChunkInputItem [id=1, input=10, processed=false], ChunkInputItem [id=2, input=20, processed=false]]
15:29:03,920 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 20) chunkItemWriter#writeItems(): [ChunkInputItem [id=3, input=30, processed=false], ChunkInputItem [id=4, input=40, processed=false], ChunkInputItem [id=5, input=50, processed=false]]
15:29:03,922 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 20) chunkItemWriter#writeItems(): [ChunkInputItem [id=6, input=60, processed=false], ChunkInputItem [id=7, input=70, processed=false], ChunkInputItem [id=8, input=80, processed=false]]
15:29:03,925 FINE  [org.nailedtothex.jbatch.example.chunk.ChunkItemWriter] (batch-batch - 20) chunkItemWriter#writeItems(): [ChunkInputItem [id=9, input=90, processed=false]]

A way to avoid “hibernate.connection.release_mode=on_close”

  • ItemReader like this works with no declaration of “hibernate.connection.release_mode=on_close” which highly discouraged[2].
public abstract class AbstractHibernateItemReader extends AbstractItemReader {
    @PersistenceContext
    EntityManager em;
    @Resource
    DataSource ds;
    Connection cn;
    ScrollableResults scroll;
    StatelessSession ss;
    Session session;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        cn = ds.getConnection();
        cn.setHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT);
        session = em.unwrap(Session.class);
        ss = session.getSessionFactory().openStatelessSession(cn);
        scroll = ss.createQuery("SELECT c FROM ChunkInputItem c ORDER BY c.id").scroll(ScrollMode.FORWARD_ONLY);
    }
...
    @Override
    public void close() throws Exception {
        try {
            scroll.close();
        } catch (Exception e) {
        }
        try {
            ss.close();
        } catch (Exception e) {
        }
        try {
            session.close();
        } catch (Exception e) {
        }
        try {
            cn.close();
        } catch (Exception e) {
        }
    }
...
  • Ugly but I guess that it would be better for some occasions.
  • There's no proper way to set holdability of resultset with StatelessSession[3].
  • Configure through Session#doWork() and SessionImpl#connection() didn't worked as expectedly.
    • Both of them brings “The result set is closed”.

References

  1. Bulk fetching with Hibernate | Java Code Geeks
  2. 11.5. Connection Release Modes
  3. [HHH-2820] Can not set HOLD_CURSORS_OVER_COMMIT when creating ScrollableResults - Hibernate JIRA



No one has commented yet.

Leave a Comment

HTML Syntax: NOT allowed