読者です 読者をやめる 読者になる 読者になる

jBatch RI を Java SE でまともに(JPA,CDI,Transactionalつきで)動かす #javaee

この記事は、Java EE Advent Calendar 4日目の記事です。

昨日は @lbtc_xxx さんの Java EE ブログエンジン Apache Roller のご紹介でした。

明日は、making@github さんです。

概要

背景を説明すると、既存の Java 製のオレオレバッチフレームワークがあるんだけど、 設計が古いので、改善したいと。

標準の jBatch(JSR-352)ってのがあるけど、これアプリケーションサーバーいるんでしょ? ちょっと面倒だなーとかと思ってたら、 Java SE 環境でも動くらしい。

yoshio3.com

が、色々と足りないというか、Java EE 向けに書いたソースが、以下のようにそのままでは使えないこともわかりました。

  • CDI が使えない
    • ジョブXML の ref 属性に、 @Named の名称で指定できない
  • Batchlet で @Transactional が使えない
    • JTA 入ってないしね
  • チャンク方式における分割コミットが動作しない
  • JPA も使いたいよね

というわけで、この辺を改善して、SE環境で使う際の制限を取っ払うことができないかというお話です。

内容的に、Java EE でいんだろうかと思わないわけでもないですが、タイミングがよかったのと、色々と知見を得たのでまとめてみたいと思いました。

最終的なソース一式は Github にあります。

前提として、CDI, JPA, jBatch を一通り触ったことがあるくらい人向けの内容です。 知らない人でもそれぞれの入門記事を見てもらえればわかるんじゃないかと思います。

環境準備

上で引用した記事ですと、jBatch の参照実装と derby なんかのjarファイルを取得してクラスパスに設定するようになっていましたが、 調べると Maven リポジトリがあったので、ちゃんと依存性を管理するようします。

やりことを全部含めた 依存性定義はこんな感じになりました。

      <!-- Java EE API もろもろ -->
      <dependency>
         <groupId>javax</groupId>
         <artifactId>javaee-api</artifactId>
         <version>7.0</version>
      </dependency>
      <!-- glassfish に載ってる jBatch の参照実装 -->
      <dependency>
         <groupId>com.ibm.jbatch</groupId>
         <artifactId>com.ibm.jbatch-runtime</artifactId>
         <version>1.0</version>
      </dependency>
      <!-- jBatch RI がジョブ実行状況の記録に使うDB -->
      <dependency>
         <groupId>org.apache.derby</groupId>
         <artifactId>derby</artifactId>
         <version>10.12.1.1</version>
      </dependency>
      <!-- CDI 実装 -->
      <dependency>
         <groupId>org.jboss.weld.se</groupId>
         <artifactId>weld-se</artifactId>
         <version>2.3.1.Final</version>
      </dependency>
      <!-- JPA 実装 -->
      <dependency>
         <groupId>org.eclipse.persistence</groupId>
         <artifactId>org.eclipse.persistence.jpa</artifactId>
         <version>2.5.2</version>
      </dependency>
      <!-- お好きなJDBCドライバ -->
      <dependency>
         <groupId>org.postgresql</groupId>
         <artifactId>postgresql</artifactId>
         <version>9.3-1102-jdbc41</version>
      </dependency>

jBatch RI の概要

  • ジョブのステータス管理として Derby を使っている
  • ジョブの実行はConcurrency Utility のスレッドプールで動いている
    • このスレッドプールは、Java SE 環境でも有効なため、ジョブの再投入や並列実行もOK
    • 既存バッチは、1ジョブごとにJVMを起動するので起動時間がネックとなっていたため、これは大きな利点
  • 設定やカスタマイズは、 META-INF/servicesの下に batch-configやbatch-services というプロパティファイルで行う。
    • 前者はジョブ管理用の DerbyのURLとかを設定
    • 後者は、カスタマイズ用の定義ファイルで、jBatch RI はサービスという形式で拡張ポイントを用意している
      • 今回は、CDIを使ってジョブインスタンスを取ってきたり、チャンクトランザクション制御のカスタマイズとかに使っている
      • プロパティのキーにサービス名、値に所定のクラスを継承して作ったクラスの FQDNを書いておく

Batchlet でJPAと @Transactional を使う

さて、では実際にどのような拡張を行ったかを紹介します。

まずは今回動かす Batchlet と job xml です。

Batchlet は JPA でテーブルのデータを消すものです。

package batchlet;

import javax.batch.api.AbstractBatchlet;
import javax.batch.api.BatchProperty;
import javax.batch.runtime.context.JobContext;\
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;

@Dependent
@Named // これで、"dataDeleteProcess" だけで、JOB XML に記述できる。 
@Transactional
public class DataDeleteProcess extends AbstractBatchlet{

    @Inject
    private JobContext ctx;

    @Inject // プロデューサーメソッド経由で取得
    private EntityManager em;
    
    @Override
    public String process() throws Exception {
        
        int deleted = em.createQuery("delete from Employee e").executeUpdate();
        
        System.out.println(deleted + " rows deleted.");
       
        
        // 終了ステータス
        return "COMPLETE";
    }
}

次は job xml。 Batchlet に @Named アノテーションが付いているので、ref 属性は CDI の bean 名称で指定可能です。

<job id="cleanup-job"
     xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
     version="1.0">
      
  <step id="clean">
    <batchlet ref="dataDeleteProcess">
        
      <properties>
        <property name="text" value="TEXT"/>
      </properties>
    </batchlet>
  </step>
</job>

ついでに、 EntityManger を @Inject できるようにするプロデューサーフィールド

@Dependent
public class EMProducer {
    @PersistenceContext(unitName = "default")
    @Produces
    private EntityManager em;
    
}

これは、 Java EE 環境では普通に動きますが、SE環境では、以下の理由で動きません。

  • @PersitenceContext は Java EE環境のみ(だと思う) なので、対応できない
  • JTA が無いので、@Transactional が動かない
  • CDI も有効でないので、ref属性にはクラスの FQDN を書かなくてはいけない

というわけで1つ1つ解消していきます。

JPAJava SE環境で使う

JPAJava SE 環境で動かすには、JTA ではなくローカルトランザクションで動かします。 (JTA 詳しくないです。 SE で動かすことできるの?)

というわけで、 persistence.xml を書き直し、データソースではなく JDBC経由とする。

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
  <persistence-unit name="default" transaction-type="RESOURCE_LOCAL">
    <provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>
    <exclude-unlisted-classes>false</exclude-unlisted-classes>
    <properties>
        <property name="javax.persistence.jdbc.driver" value="org.postgresql.Driver"/>
        <property name="javax.persistence.jdbc.url"
             value="jdbc:postgresql://localhost:5432/postgres"/> <!--好きなDBを設定 -->
        <property name="javax.persistence.jdbc.user" value="postgres"/>
        <property name="javax.persistence.jdbc.password" value="postgres"/>
      <property name="eclipselink.logging.level" value="FINE"/>
      <property name="javax.persistence.schema-generation.database.action" value="drop-and-create"/>
    </properties>
  </persistence-unit>
</persistence>

で、 EntityManger をインジェクションするためのプロデューサーメソッドも書き直す。

package db;

import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

@Dependent
public class EMProducer {
    
    EntityManagerFactory emf = Persistence.createEntityManagerFactory("default");
    
    @Inject
    private EMHolder holder;
    
    @Produces
    public EntityManager getEm() {
        // ホルダーにEMが無い場合やあるけどトランザクションが終了している場合(※1)は新規取得
        // ※1 ジョブの複数起動でスレッドが再利用された場合
        if(holder.getEM()== null || !holder.getEM().isOpen()) {
            EntityManager em = emf.createEntityManager();
            holder.setEM(em);
        }
        
        EntityManager em = holder.getEM();
        
        return em;
    }
}

ここに出てくる EMHolder とはこんなの

/**
 * スレッドローカルでEntityMangerを保持するBean
 */
@ApplicationScoped
public class EMHolder {

    private ThreadLocal<EntityManager> holder = new ThreadLocal<>();
    
    public void setEM(EntityManager em) {
        holder.set(em);
    }
    
    public EntityManager getEM() {
        return holder.get();
    }
    
    public void removeEM() {
        holder.remove();
    }
}

これは何をやっているかというと、スレッド単位で EntityManger(=コネクション) を1つだけ生成したいために、 スレッドローカルをアプリケーションスコープで作っている。

jBatch で CDI を使うと、 Bean は全て @Dependent なのだが、Java SE環境では、@Dependent は都度インスタンスを生成してしまう。 こうすると、EntityManger のインジェクションごとに、EntityManger を生成していまうので、 複数 bean 間で EntityManger の共有ができないため、スレッドローカルを用いている。

@Transactional に反応するインターセプタを作る

JTA を使わないので、@Transactional によるトランザクション制御は自前のインターセプタを作って行う。

package se;

import db.EMHolder;
import javax.inject.Inject;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.transaction.Transactional;

@Transactional
@Interceptor
public class SeTransactionInterceptor {

    @Inject
    private EntityManager em;
    
    @Inject
    private EMHolder holder;

    @AroundInvoke
    public Object doTransaction(InvocationContext ic) throws Exception {
        EntityTransaction tx = em.getTransaction();
        Object result = null;
        try {
            if (!tx.isActive()) {
                tx.begin();
            }
            
            System.out.println("tx interceptor start");
            result = ic.proceed();
            if (tx.isActive()) {
                tx.commit();
            } 
        } catch (Exception e) {
            try {
                if (tx.isActive()) {
                    tx.rollback();
                }
            } catch (Exception e2) {
            }
            throw e;
        } finally{
            holder.removeEM();
            if(em.isOpen()) {
                em.close();
            }
        }
        return result;
    }
}

やっていることは、メソッド実行前にトランザクションを開始して、成功すればコミット、例外が起きればロールバックというもの。 本当だと、例外の種類によってはコミットしたり、トランザクション属性なんかもあるけど、とりあえず割愛。 ここで、インターセプタとBatchlet で EntityManger を共有するためにも、前述の EMHolder が生きてくる。

DB周りはこれでOKなので、次は CDI

CDI を有効にする

上で述べたとおり、サービスによってjBatch RI は拡張可能になっている。 JOB XML から、 ジョブインスタンスを取得する箇所は、 batch-services.properties に、 CONTAINER_ARTIFACT_FACTORY_SERVICE を設定すると拡張できる。

また、jBatch RI は、 Weld SE(Weld は glassFishでも使っている CDI 実装。SEはそのJava SE版) を使った 上記のサービス実装が存在しているため、

CONTAINER_ARTIFACT_FACTORY_SERVICE=com.ibm.jbatch.container.services.impl.WeldSEBatchArtifactFactoryImpl

のように書けば、CDI が使用できるし、ref属性に Bean の名前が書けるようになる。

よかったこれで解決ですね。と思ったのだけど、ジョブを2回投入すると2回目のジョブが起動できない。。。

WeldSEBatchArtifactFactoryImpl をみていると、ジョブインスタンスを取得するたびに、 Weldコンテナ(DIコンテナ)を生成しているため、 この辺に原因が有りそうだと踏んだ。

実際、このソースを参考に、Weldコンテナを1回だけ生成するようなサービスを作ったら、この問題は解消した。

package se;

import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Named;

import org.jboss.weld.environment.se.WeldContainer;

import com.ibm.jbatch.container.exception.BatchContainerServiceException;
import com.ibm.jbatch.spi.services.IBatchArtifactFactory;
import com.ibm.jbatch.spi.services.IBatchConfig;
import org.jboss.weld.environment.se.Weld;

@Named("MyWeldBean")
public class WeldContainerFactory implements IBatchArtifactFactory {

    // 1回だけ生成
    WeldContainer weld = new Weld().initialize();
    @Override
    public Object load(String batchId) {
        Object loadedArtifact = getArtifactById(batchId);

        if (loadedArtifact == null) {
            return loadedArtifact;
        }

        return loadedArtifact;
    }

    private Object getArtifactById(String id) {
        Object artifactInstance = null;

        try {
            // 元の実装はここで毎回 Weld().initialize();
            BeanManager bm = weld.getBeanManager();

            Bean bean = bm.getBeans(id).iterator().next();

            Class clazz = bean.getBeanClass();

            artifactInstance = bm.getReference(bean, clazz, bm.createCreationalContext(bean));
        } catch (Exception e) {
        }
        return artifactInstance;
        
    }
//(割愛)
}

このサービスを有効にするには、batch-sevices.properties にこう書く。

CONTAINER_ARTIFACT_FACTORY_SERVICE=se.WeldContainerFactory

これで、Java SE 環境でも Java EE の要領で記述した batchlet が使えるようになった。

チャンク形式のトランザクション制御を実現する

jBatch のチャンク形式は、大量データの処理に向いていて、job xml のパラメータ設定だけで、 データのコミットを行う間隔を変更できたりします。

例えば、以下は読込んだデータについて2件ごとにコミットする設定になります。

<?xml version="1.0" encoding="UTF-8"?>
<job id="chunk-sample" xmlns="http://xmlns.jcp.org/xml/ns/javaee"  version="1.0">
    <step id="insert">
        <chunk item-count="2"><!-- item-count がコミットする件数 -->
            <reader    ref="employeeFileReader">
                <properties>
                  <property name="input" value="c:/temp/input.txt"/>
                </properties>
            </reader>
            <processor ref="employeeProcessor"/>
            <writer    ref="employeeWriter"/>
        </chunk>
    </step>
</job>

ですが、これも Java EE の場合のみです。 jBatch RI は Java SE環境だと、トランザクション制御を行ってくれません。 といわけでこれも対応させてみます。

トランザクションサービスの作成

トランザクション制御に関する処理も、TRANSACTION_SERVICE というサービスで拡張可能になっています。

ここでは、既存のトランザクション制御用のサービスを上書きし、EntityManger のトランザクションを利用してトランザクション制御を行うように自作サービスを作成します。

package se;

import db.EMProducer;
import com.ibm.jbatch.container.exception.TransactionManagementException;
import com.ibm.jbatch.container.services.impl.BatchTransactionServiceImpl;
import com.ibm.jbatch.spi.services.TransactionManagerAdapter;
import javax.batch.runtime.context.StepContext;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.CDI;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;

public class SeTransactionManagerImpl extends BatchTransactionServiceImpl {

    @Override
    public TransactionManagerAdapter getTransactionManager(StepContext stepContext) throws TransactionManagementException {

        return new TransactionManagerAdapter() {
            EntityTransaction tx;

            @Override
            public void begin() {

                BeanManager beanManager = CDI.current().getBeanManager();
                Bean<?> empBean = beanManager.getBeans(EMProducer.class).iterator().next();

                EMProducer emp = (EMProducer) beanManager.getReference(empBean, 
                        EMProducer.class, beanManager.createCreationalContext(empBean));
                EntityManager em = emp.getEm();
                tx = em.getTransaction();
                tx.begin();
            }
            @Override
            public void rollback() {
                tx.rollback();
            }
            @Override
            public void setRollbackOnly() {
                tx.setRollbackOnly();
            }
            @Override
            public void commit() {
                System.out.println("tx commit");
                tx.commit();
            }
            @Override
            public int getStatus() {
                return 0;
            }
            @Override
            public void setTransactionTimeout(int arg0) {
            }
        };
    }
}

TransactionManagerAdapter というのが、トランザクション制御用のインターフェースです。 ここではかなり無理やりですが、 CDI の BeanManger から Batchlet の項で作成した EMProducer を取得して、EntityManger を取得し、 トランザクションを取得しています。

今のところ、JPA に限定した実装ですが、JDBCコネクションにすることもできるでしょう。 また、begin,commit,rollback以外の実装は適当です。

これで、チャンク形式の機能も Java EE 同様に近づいたと思います。

並列処理などの検証はしていないですけど、多分動きそう。

(おまけ)ジョブ投入方法

mainメソッドなどから、ジョブを投入するには、以下のようなコードを書けばOKです。

    private static void executeJob(String jobId) {
        long execID=0;
        // ジョブの開始時はXML読み込みなど同時実行されるとおかしくなる処理があるので、ジョブID単位で同期化しておく。
        JobOperator jobOperator = BatchRuntime.getJobOperator();
        synchronized(jobId){
            Properties props = new Properties();
            execID = jobOperator.start(jobId, props);
            System.out.println("job stated id:" + execID);
        }
        JobExecution jobExec = null;
        // ジョブが終了するまでポーリング
        while (true) {
            jobExec = jobOperator.getJobExecution(execID);
           
            if (jobExec.getEndTime() != null) {
                break;
            }

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException ex) {
            }
        }
        System.out.println("JOB END:Status is " + jobExec.getExitStatus() );
        
    }

jobOperator.start はジョブを開始すると、ジョブの実行終了を待たずにジョブIDを返して終了します。 そのためジョブの終了を待つためには、ポーリングするなどの対処が必要になります。 ここでは、ジョブ投入ごとにポーリングしていますが、ジョブ終了を検知するだけのタスクを別に作るという方向性でもいいと思います。 正直この辺は面倒ですね。標準でジョブ終了まで待つような仕組みが欲しいです。

まとめ

というわけで、 jBatch を拡張して Java SE でもそれなりに使うという話でした。

Java EE は色々と最初から用意されているので、楽だとあらためて思いました。

また、Java SE で使わなくても、 ジョブ単位でユニットテストをしたいという場合には、使える内容なんじゃないかと思います。 あと、CDI の裏側の仕組みも色々知れたので個人的には楽しかったです。

今回色々やってみて、スレッドプールでジョブが実行されるのがわかったのは、JVMの起動回数を抑えることができそうなので、結構有用でした。 とはいっても、 このプログラムを常駐化させてジョブ投入のリクエストを受け付けるようにするのは、 ソケット使ったりする必要があるのでわりと面倒ですけどね。

本格的にやるなら、 paraya micro で埋め込みサーバ使ったり、 あるいは Spring boot + Spring batch(+ JSR-352 Support) で組んでしまった方が楽かもしれません。