JJUG CCC 2019 Spring で登壇してきました

5/18 に行われた JJUG CCC 2019 Spring に「初めてのgRPC」 という内容で発表してきました。

www.java-users.jp

発表資料はこちらです。

speakerdeck.com

全体の資料もこちらでまとまっています。

github.com

今回は、登壇者兼ボランティアスタッフとして申し込みしました。 ボランティアスタッフは2回目だし慣れてるだろうから、部屋の進行と発表同時にやっちゃえということで、 スタッフからの注意事項の説明の後、そのまま発表するというちょっと面白い流れになりました。

会社の人からもいい感じのツッコミをいただきました。

技術イベントの登壇はいつかやってみたいことの一つでした、なので夢が一つ叶いました。

なかなか大変でしたけど、発表したテーマに興味を持って頂けた方が何名かいらっしゃったので、発表してみて良かったです。

発表テーマ gRPC について

gRPC については2,3 年ほど前から採用事例を聞くようになってきました。 また最近だと、国内・海外のマイクロサービスを主導している会社で gRPC + サービスメッシュの採用事例を聞くようになってきていたので gRPC について調べ出しました。

その過程で、4種類ある RPC のプログラムの書き方が結構違ったり、そもそも gRPC Java に関する情報があまりない( Go が多い) ということに気づきました。 ちょうどいいタイミングで CCC の CFP 応募が始まったので、良い機会だということで申し込んでみました。

幸いにも、同じタイミングで 社内でも gRPC を検討するタイミングがあったので、 チーム内勉強会を開くノリで発表資料をまとめられたのは良かったかなと思います。

スライドが70枚と多く、後半は飛ばし気味になりましたが時間内で言いたいことは言えたので良かったです。 わかりづらい点などありましたら、コメントや Twitter で質問いただけると幸いです。

gRPC は個人的に筋の良い技術だと思っていますが、ライブラリやミドルウェアを含めた運用の知見がまだ足りてないです。 さらに今までの Web アプリケーションフレームワークの手法が使えないです。 ( とは言え、Webアプリケーションフレームワークの仕事の大半は、不確実なHTTP ボディの解析とURLのルーティングで、gRPC はそのどちらも割と高いレベルで解消しているのだけど) なので、まだまだ発展の余地はあるし、色々貢献できそうだと思いました。 社内でも草の根的に gRPC やってるチームがあるということもあとで聞きましたので、やっておいて損はないと思います。

またパgRPC についてフォーマンスよりも気に入っているのは、IDL としての Protocol Bufferes の良さです。 DDD の文脈では公開されたインターフェースという、そのサービスが提供する API をサービス自らが提供するパターンがあります。 proto ファイルはまさに公開されたインターフェースのフォーマットとして最適だと思います。

今後チームで gRPC をやってみて、実際どうだったかをお知らせする機会があればやってみようと思います。

いただいた質問について

  • 分散トレースはできるの? Netty 以外も使えるの?

  • Protocol Bufferes の message を ビジネスロジックに持ち込まないなら、DTOを自分で作ってコピーしないといけないのでは?

  • Reactive-gRPC を使うと、RxJava や Spring WebFlux とかと接続できるの?

    • WebFlux との接続は確認しました。そのため gRPC-gateway を使わなくても、Spring WebFlux->Reactive gRPC で REST-gRPC変換はできなくもないです。
    • R2DBC が登場したら、 gRPC とDB をシームレスでリアクティブに扱えるので面白いなと思っています。
    • 色々な Reactive なもの(外部のWebApi とか、別の gRPC の呼び出しとか)を逐次・並列に接続したい場合に、Reactive-gRPC は良い選択肢だと思います。
  • 何か困ったことは?

    • 時刻型と十進数型は ProtocolBufferes には無いので、自作する必要があること。
    • 自作した場合、Javaの LocalDate や BigDecimal とのマッピングコードが必要なので、どうやって実現するかは検討の余地がある。
      • go だとこの場合 インターフェースで後からメソッドが追加できるから楽なんだろうなとか思った。
      • Java だとどうしようもないが、 Kotlinだと拡張メソッドで Protocol Buffersの型とJavaの型それぞれに相互変換のメソッドを追加できるので、Kotlin やるならありかなと思います。
  • Kotlin でできるの?

    • できます。gRPC-Kotlin プラグインもありますが、普通に生成した gRPC Service, Stub のJavaコードもKotlin から使えます。
    • gRPC-Kotlin のコードは コルーチンを使うためか、MDC,分散トーレシングのためにCoroutien Context の上書きが必須だったので、もう少し見守ろうと思いました。(サンプルコードを参照)

所属について

2019年のはじめに転職しました。 今回、試用期間が空けたこともあり、所属について隠すことはやめました。

懇親会 LT でもほぼ同時期に入試された方が、働いてみてどうだったかを語っていました。 私もチームは違いますが、大規模なサービスの開発に携われるというのはプレッシャーもありますが、とても面白いです。 エンジニアリングに集中できる環境があり、チームメンバーみんなが生き生きとしているので働いて楽しいなと感じています。

DevRel チームの方々はフットワークも軽く、知識もあり、今回の発表にあたり色々とアドバイスいただけました。 発表内容については、機密や誹謗中傷がないかをチェックされるだけで、 内容については発表者の意思を尊重いただけたので感謝しかありません。 内容についての拙い点はわかりづらい点は全て私のスキルによるものです。

JJUG CCC について

ボランティアスタッフとして

今回は人員が2倍くらいに増えました。 色々な人が参加していて楽しかったし、一人当たりの負荷も前回に比べて減りました。 また総会には参加できませんでしが、運営の方式も変わりますので CCC はより良いものになっていくと思います。 みんなもやりましょう。

参加者として

自分の発表以外は、割と自由にその辺をうろついていました。 その辺で久しぶりの方々と身の回りの話や、今後の技術動向などについて話をしていることが多かったかな。 セッション聞くのも面白いですけど、ここでしか会えない人たちと話すのも楽しいし、刺激になる。

特にアンカンファレンスは、今回は Java Champion や JDK ソムリエ、他に Java 界隈の強い人ばかりの豪華メンバーでした。

貴重な実体験や知識を聞くことができました。

まとめ

皆さまお疲れ様でした。

こんなに貴重な技術イベントは他にはないと思う。

次も頑張るぞ。

Microservices Patterns を読んで(2)

Chris Richardson 氏の Microservices Patterns を読んだ。前回からの続きです。

Microservices Patterns: With examples in Java

Microservices Patterns: With examples in Java

前回はこちら。

kencharos.hatenablog.com

全13章の後半、8章からです。

8章 外部API

マイクロサービス群を外部公開する場合に、API Gateway を適用することが書かれている。 API Gateway に求めらる機能は、ルーティングのほか、認証、プロトコル変換(REST->gRCPとか)、前述の API Composition など。

認証の例として、外部からの Basic 認証に対して、API Gateway が内部の OAtuh サーバーと接続して認証を肩代わりしたり、 JWT トークンを Cookie に付け替えたり、リフレッシュトークンのリフレッシュなどを行い、外部クライアントの負荷を減らす案などが出ていた。

API Gateway を実現する手段として、 AWSAPI Gateway, Spring Cloud Gateway, Apollo GraphQL (node.js), NetFlix Falcor などがある。 API Gateway のカスタマイズ性を重視するかどうかでどれを選ぶかが変わってくる。 また、アクセスが集中する部品なので性能面での注意も必要となる。例えば Spring Cloud Gateway は、Reactor と WebFlux を使って大量アクセスが可能になっている。

API Gateway に各サービスを連携する際の主管をどうするかが興味深かった。 API Gateway を管理するチームと、各サービスを担当するチームが異なっていると調整コストが発生する。 Netflix では、API Gateway を階層化し、API Gateway チームが基本的な Gateway 機能を提供し、そこに各チームが作成した API Gateway をぶら下げる。 各チームが外部公開に特化した API を作るこのパターンは、 BFF (Backend For Frontend) と呼ばれる。

9章 マイクロサービスのテスト 1

テストに関する章その1 。

筆者の経験から、マイクロサービスの開発を滞りなく行うには 自動化されたテストとビルドパイプラインが必須。 一方で、今まで見てきたプロジェクトでは自動化テストをちゃんとやっている現場が割となかったため、 まずはテストの基本について書くことにしたとのこと。

以下に挙げるテストの4分類に関する記述が主な内容となる。

  • End-to-End テスト : システム全体の受入テスト。全ての要素を本番同様の構成にする。
  • Component テスト : サービス単体の受入テスト。テスト対象以外のサービスはスタブを用いる。
  • Integration テスト : API呼び出しやインフラに依存する部分のテスト。スタブ、CDC を用いる。
  • Unit テスト : クラス単体のテスト。依存性はモックを使う。

上記の4つはピラミッドの構成を取り、上に行くほどテストの構成が複雑で実行時間が長く、脆くなる。 各テストの性質に合わせて使用する技術やテストの量を調整して行く。

Unit テストでは、テストダブル (モックやフィクスチャなどで依存性を置き換えること)の手法が紹介されている。

Integration テストでは、コンシューマー駆動契約テスト, CDC (Consumer Driven Contract) が紹介されている。 CDCはサービスのAPIを呼ぶ側のコンシューマーと、APIを提供する側のプロデューサーとで、 両者のサービスを実際にデプロイしてテストを行う代わりに、 コンシューマーがAPIの入出力に対する期待値を定義し、その期待値を満たしているかをお互いにチェックする。

期待値を DSL として定義すると、コンシューマー側では DSL からモックサーバーを立てるのでモックサーバーに向けてテストを行う。 プロデューサー側では DSL からAPIにアクセスを行うテストを生成するので、プロデューサーが期待した通りの入出力を行うかテストする。 こうすると API 呼び出しのテストにおいて、他のサービスを実際にデプロイすることがないのでテストの効率化が期待できる。

また、CDC はビルドパイプラインに載せることが重要。他のサービスが CDC の DSL を更新したらそれを自動的にテストできるようにしておく。 そうするとことで、早期に API の整合性が合わなくなったことを検知できる。

CDC をサポートするライブラリは、Spring Cloud Contract や Pact がある。

10章 マイクロサービスのテスト 2

主に Integration テストの記述方法の紹介。

Spring Cloud Contract や、 Evantuate Tram のテストライブラリを使うと、 RESTやメッセージングなどの連携部分を CDC でテストできる。 DB の Integration テストについては、 JPA のプロパティの変更の検証で代替するか、 Docker を使うかといった感じ。

Component テストについては、サービスに依存する DB や kafka, 他のサービスについては、インメモリ型のライブラリや Docker, サービスに見立てた HTTPサーバを立てて、テストを行う。 Component テストの記述は受入テストであることを踏まえて、シナリオ形式で書くことが望ましく、 Cucumber や Gherkin といったライブラリが紹介されている。

End-to-End テストも同様に Cucumber を使う。

(ただし、個人的な意見でいえば、 BDD はどうなんだろうというスタンス)

11 章 本番稼働に向けた開発

マイクロサービスにおける非機能要件をまとめた章。

マイクロサービスを問題なく運用して行くためには、セキュリティ、設定、監視が重要なファクターと言える。

セキュリティについては、API Gateway で認証や認可を一元的を行う、監査ログの発行、TLSによる暗号化などが要素となる。

設定については設定ファイルなどをアプリケーションに埋め込むのではなく外部から設定可能なようにすることが重要となる。

環境変数などを使って外部から設定を上書きできるようにする push-based 型や、 コンフィグレーションサーバーを立てて各サービスの設定を一元管理し、各サービスが設定を起動時に取りに行く pull-base 型 などがある。 また、サービスを再起動せずとも設定のリロードが可能なような仕組みもあると望ましい。

監視については、ヘルスチェックAPIの提供、 kibanaなどを使ったログサーバーによるログの一元管理、分散トレーシング、 メトリクスの定期的な収集、監査ログなどがある。

メトリクスは CPU使用率などのマシン状態だけでなく、業務的なメトリクスの記録(一定以上の金額の取引があった回数とか)も行うと良い。 micromater はカスタムメトリクスを記録し、 Prometheus などのメトリクス収集ツールと連携する。

監査ログの収集は収集漏れを防ぐために、AOPドメインイベントと連携すると良い。

上記のように、マイクロサービスに求められる非機能要件は膨大であるため、こういった機能を基盤として提供できるようにしたライブラリを microservice chassis と呼ぶ。 Spring Boot, Spring Boot Actuator, Spring Cloud などが該当する。

ただし、 microservice chassis は言語やバージョンがある程度固定化されてしまう欠点がある。 そこで最近注目されているのが、マイクロサービスの実行基盤側で上記の非機能要件を提供する service mesh である。 service mesh は sidecar として自分たちの作ったサービスの横にプロキシとして存在し、サービスへの in/out 両方の通信を仲介することで、 上記の非機能要件を付与する。 また、 sidecar の一元管理を行うための control plane という機能も一緒に登場する。 ( sidecar は control plane の対比として、 data plane とも呼ばれる)

service mesh を提供するライブラリとして、 istio, envoy, linkerd, consul connect, NGINX Controller などがある。

12章 マイクロサービスのデプロイ

アプリケーションのデプロイに関する章。

アプリケーションのデプロイは、 TomcatWebLogic などアプリケーションサーバーに同居させる形式から、 クラウドの登場により マシンイメージの展開や SaaSなどが生まれ、 最近は Docker によるコンテナ仮想化、k8sによるコンテナクラスタAWS Lambda などのサーバーレスといった様々な変遷があった。

コンテナを使う方がより早く柔軟である。 Docker や k8s を使ったコンテナイメージの作成や、k8s での負荷分散、ローリングデプロイなどについて一通り触れられている。 ただし、これらの内容はそれだけで一冊の本になる内容なので、入門的な内容を軽く触れる感じとなる。

最後に少しだけ、サービスの一部を AWS Lambda に切り出してデプロイする例が載っている。

13章 マイクロサービスのリファクタリング

既存のモノリシックなシステムをどうやってマイクロサービスに変えて行くかという章。

Big Bang Rewrite (既存システムを無視して0から作り直すこと) を絶対に避ける。基本的にうまくいかないし、コストがかかりすぎる。 また、デプロイメントパイプラインも自動化テストも無いのに作り直すのは生存できないというような強い言葉も書かれている。

Strangler Application パターンという徐々にモノリスを分解してマイクロサービスに変えていき、最終的にモノリスを無くすような戦略の方がうまくいく。 初めは設定ファイルを外出しするような小さな修正だけでも十分。

クラウドベンダーや SaaS ベンダーがサービスをクラウドに載せ替えてコスト削減のようなことを言ってきても、検討しないで飛び乗るような真似をしてはいけない。徐々に移行しながら適切な移行先の基盤を検討する方が良い。

ではどうやってリファクタリングをしていくかだが、3つの方法がある。

  1. 新規機能をマイクロサービスにする
  2. フロントエンドとバックエンドの分離
  3. モノリスの一部をマイクロサービスとして切り出す

いずれの方法であってもまずは前段に API Gateway を導入し、モノリスと新規開発部分を振り分け可能にしておくことが必須となる。

場合によってはモノリスとマイクロサービス間でデータの取得や同期が必要となる。 その際にモノリス側の修正が最低限で済むような工夫が必要となる。

例えば、サービスを分割する際は分割後不要となる項目であっても読み取り専用で残し、 マイクロサービス側で反映を行なったりする。 その反映もイベント連携としたり、モノリスの修正が一切できないような状況なら、DBのトランザクションログからデータ反映するようなツールを作る。

認証についても、既存の認証機能を API Gateway で上手く吸収し、マイクロサービス側へ連携させるようにするなどと言った工夫がある。

また、マイクロサービス側がモノリス側のデータ構造に引っ張られすぎないように、 腐敗防止層(ACL: Anti corruption Layer) といった防御のためのサービスを作ることも検討する。

まとめ

9章のテストちゃんとやってる現場が少ないという発言は胸に来ますね。 13章にも名言がたくさんあり、著者の経験を物語っているなと感じました。

英語で400ページを超える分量の本ですが、 DDDやマイクロサービスについては以前から調べていたのでどうにか読むことができました。

マイクロサービスに関する知見を幅広く集めて整理した本書は、サーバーサイドに関わる人であれば読んでみて損はないかなと思います。 サンプルコードの多くは Java や Spring なので、その辺の知見がないと少々難しいかもしれないですが。

マイクロサービスという言葉が出てきたのは2014年頃だったと思います。 その頃はマイクロサービスは今後はやっていくのかは怪しいなという思いでした。

とはいえ、増大して行くモバイルデバイスとインターネットに対し、Web を戦場とするサービス提供者にとっては、 いかに早くサービスを改善・拡大できるかが重要だったと思います。 そこで先人が色々な試行錯誤を重ねた結果として、サービスを分散させても上手く行くアイデアがようやくまとまってきた段階に来たんだなと感じます。 また、それを支えるのはクラウドサービスの普及と発展なのは間違いのないことです。

今なら、マイクロサービスに挑戦しても無謀ではないと本書を読んで思いました。

Microservices Patterns を読んで(1)

Chris Richardson 氏の Microservices Patterns を読んだ。

Microservices Patterns: With examples in Java

Microservices Patterns: With examples in Java

マイクロサービスという言葉が出て来て数年経ちます。

私もマイクロサービス的な複数のサービス間でデータのやり取りを頻繁にするようなシステムを構築したことがあります。 その際にデータの整合性は最重要ではなかったのでトランザクション的なものは使いませんでしが、 お金を扱うようなシステムをマイクロサービスにした場合ちゃんとしたトランザクションはどうするのかは気になっていました。

本書にはその疑問に対する現実的な回答が載っています。

著者は CloudFoundry の創設者で、また以下のマイクロサービスのパターンを集めたサイトの管理者でもあります。 microservices.io

また本書のサンプルコードに度々登場するマイクロサービスのための evantuate-tram フレームワークとその開発を行なっている evantuate の代表でもあります。

eventuate.io

というわけで著者の知見を盛り込んだ本書は、マイクロサービスを導入するために必要な、設計・実装・インフラといったあらゆる知識を包括的に学べる一冊だと思いました。

なお、本書で掲載されているソースコードは以下で公開されているようです。

Eventuate example applications

本書は全部で 13 章から成っています。それぞれの章の内容を簡単にまとめてきます。 3週間くらいかけて読んだので一部曖昧だったり、私見もあるかもしれないけどそこはご容赦を。

1 章 モノリシックヘルを脱出する

Uber Eats のような、注文・レストラン・配達 を提供するシステムを題材にしている。 複数の機能を単一の Spring Boot アプリケーションでデプロイしている。 Jenkins による CI もあるし、ヘキサゴナルアーキテクチャもあるしで、決してレガシーではないが モノリシックゆえに理解しづらく、機能追加が難しく、新しいライブラリは言語が投入できず、スケールもしないという欠点を抱えているため、 これをマイクロサービスに移行したいというところから始まる。

マイクロサービスは上記の欠点を克服し、開発スピードの向上が期待できるとしつつも、一方で分散構成ゆえの困難も抱えることになる。

残りの章はその困難に対する対策となる。

2章 マイクロサービスアーキテクチャの定義

マイクロサービスの設計に関する章。

マイクロサービスの一つ一つはヘキサゴナルアーキテクチャで作り、サービス間のデータは 定義した API 経由でしかやり取りしない。 また、サービスごとに DB を持つ。

マイクロサービス全体の設計は次の3つを行うことになる。

  1. サービスの分割
  2. 各サービスの機能の定義
  3. サービスのAPIの定義とサービス間の関連を定義

サービスの分割の手法として、 business capability と DDD の2つが挙げられている。

前者は業務の価値を生む出す機能を抽出し、それをグルーピングしていくような感じだろうか。 ユースケース分析とか要求分析に近い感じだろうか。

後者は DDD の流儀に従って粗めのドメインモデルを作り、境界づけられたコンテキストを作ることでサービスを分割する。 またこの章だけではなく、随所に DDD の手法を適用していくことになる。

どちらの手法でも大事なのはビジネスの観点に従って分割をすることで、技術的な理由による分割を行なってはいけないということ。 また疎結合な分割を行うこと。適切でないサービス分割は、マイクロサービスアーキテクチャではなく、ただの分割したモノリス (Distributed Monolith) になってしまう。

3章 プロセス間通信

マイクロサービス間の通信手段に関する章。

同期通信の手段として、 REST, gRPC の2つがある。 REST は汎用的に使える一方でマイクロサービスの文脈では通信効率の悪さや、API と HTTP へのマッピングの負荷が大きいという点が述べられている。 gRPC は通信効率の良さやスキーマがあること、多言語対応などがマイクロサービスに適しているが、HTTP/2 が必須となる。

またどちらであっても、同期通信は通信相手が必ずいないといけないので、可用性を下げる要因となる。 そのために、Circuit Braker や Service Discovery を導入するとよい。

非同期的手段として、kafka などの非同期メッセージングがある。 マイクロサービスの可用性を向上にするには、サービス間のデータの送受信を非同期メッセージにするのが望ましい。 (これは主に更新系操作の話で、同期通信ももちろん使っていいはず)

例えば、サービス間で非同期でデータを送りその結果を応答して欲しいなら、送信用のチャネルと返信用のチャネルを用意して 送信用のチャネルにデータを流す際に id を格納して、その id を付与したデータを返信用チャネルに流してもらう。

この際にデータの整合性を保つために Transactional Message というものが必要となる。 簡単に言うと DB の更新とそれに対応するメッセージの送信が必ずアトミックになると言うものだ。

evantuate tram はこういったメッセージ送受信を簡単に行うためのフレームワークで、 Transactional Message は 直接メッセージを送る代わりに、 DB の OUTBOX (送信箱)テーブルに送信メッセージ内容を書き込む。 そして、非同期で OUTBOX テーブルの内容をポーリングするか、DBのトランザクションログ を tailing して OUTBOX の内容をkafka に送る。

単純だけど、これはいいアイデアだ。 アプリケーションコード内にメッセージ送信を行うコードを書くと、DBトランザクション失敗時のメッセージ送信の扱いは面倒になりがちだ。

他にメッセージングの送信は最低一回保証なので同一メッセージが再度届くことも考慮した設計にする必要があることも述べられている。

なお、トランザクションログを監視して kafka に投げるライブラリは他に debezium も紹介されている。 ぜひ試してみたい。

4章 Saga によるトランザクション

本書の一番のポイントは多分ここ。実際マイクロサービスをやるにあたって最も気になるのはトランザクションだろう。 2フェーズコミットはスケーラビリティを大きく損なうため、Saga というパターンを使う。

Saga は非同期メッセージングをベースにしたサービス間の協調の仕組みだ。 トランザクションの ACID の4つの特性のうち、 ACD だけを保証する。 Isolation についてはいくつかのテクニックである程度は保てるが、基本的には結果整合性になる。

Saga は 以下のような手続きで実現する。

  • Transactional Message を使って、まずは自分のデータを処理中(pending) にして他のサービスのメッセージを投げる。
  • 他のサービスはメッセージを受け取って自身のデータを更新し、応答メッセージを投げる。
  • 応答メッセージを受け取ったらその内容に従って、ステータスを完了や失敗に更新する。

また失敗時に更新したデータを取り消す(または打ち消す)ために、補償トランザクションを用意しておく。

Saga は f 型と Orchestration 型の2つがある。

前者は送信したメッセージの扱いと返信を投げた先のサービスに任せる感じで単純な反面、トランザクションの内容が複雑だと全体の把握が難しくなる。
後者は主となるサービスが全てのメッセージの送受信の流れを決める感じで、複雑なトランザクションの制御に向く。 基本的に Orchestration で行うのがよく、evantuate tram は Orchestration を DSL として定義可能な API を持つ。

両者のソースコードは前述のサンプルコードにあるので両方を見て比較すると良いと思う。 Choreography 型は投げたメッセージについてどういった処理を行い、どういう返信メッセージをするのかを確認するためには、そのサービスのソースを見るしかないので、実際わかりづらい。

Isolation を保つにはトランザクション中のデータのステータス管理が肝になる。ステータス管理を適切に行うためにステートマシンを定義すると良い。

他に、Isolation を確保するためのテクニックとして https://dl.acm.org/citation.cfm?id=284472.284478 という 複数データベース環境におけるACIDに関する論文からいくつかが紹介されている。

5章 マイクロサービスのビジネスロジック

DDD の手法を適用してビジネスロジックを構成するクラスを作る。 主な登場人物は、 Service, Repository, Entity, ValueObject の他、マイクロサービス固有の SagaOrchestrator (SagaのDSL), MessageConsumer, MessageHandler などがある。

適切なサービス分割を行うために、 Aggregation(集約)を定義するのが重要。 他の集約への参照は、オブジェクトではなく ID を保持することが望ましい。

こうすることで、集約一つが適切なトランザクションの単位となる。 また、NoSQL を使う場合でも集約を決めることは重要となる。 それは、集約一つを NoSQL のドキュメントとすることで、トランザクションが提供されない NoSQL でもデータの整合性が保てるためだ。

また集約の処理結果を、DomainEvent というサービスで起きた変化を伝えるデータにして、さらにその DomainEvent をメッセージ送信することで、 他のサービスへの通知を行うことができる。

DomainEvent は様々な用途で使用できる。 Choreography 型 Saga の起因となるのも DomainEvent となるし、他に後述の EventSourcing や CQRS, AuditLog などでも使う。

また適切な DomainEvent の定義の仕方として、 Event storming について少しだけ触れらている。

6章 Event Sourcing

集約の更新と同時にイベントを投げるというのは、2つのデータを管理することになるし、もしイベントを投げることを失念してしまったら 見つかりにくいバグの原因になりかねない。 それならいっそのこと、イベントの蓄積だけを行おうというのが、 Event Sourcing の基本的な考え方。

銀行の入出金の例で言えば、従来の方法は入出金があるたびに現在の残高を更新していくが、 Event Sourcing では入出金を一件ずつ記録しておき 残高の算出が必要になった時点で全ての入出金のイベントを取得して残高の算出を行う。

Event Sourcing を行えば管理対象は イベントデータだけだから前述の懸念点は無くなる。 一方で従来のやり方とは大きく異なるやり方なので、学習コストがかかることに注意が必要。

他に、サマリ処理を軽減するスナップショットや重複メッセージの扱い、仕様変更後のマイグレーションといった実装上の懸念点について述べられている。

7章 クエリ

サービスを跨ったデータの取得方法に関する章。

一つ目は API Composition で、一つのサービスが他のサービスの API をコールしてデータをまとめるというもの。 単純だし、最も一般的に使われる。 一方で複雑な場合だと、 インメモリ join や多数の通信によって計算量や時間が増える。 後者は並列化などによって緩和可能だが、前者の解決には CQRS パターンによるデータ複製及びクエリに適したストレージを使うことが挙げられる。

CQRS (Command Query Responsibility Segregation) は クエリ専用のデータモデルやデータストレージ (ElasticSearch など) を用いる方法。 また Command 側はこれまでに出てきた DDD の手法で手厚く作り、 ドメインイベント経由で Query 側にデータを提供する。 Query 側はイベント経由でクエリ用のデータを作り公開する。

DB に限らず適した製品を選べたり、複数サービスに跨るデータを集約できることがポイントになる。 DB以外の製品として出てきたのは、地理情報の検索が可能な Elastic Search など。

また、 Event Sourcing をCommand 側で使った場合はイベントを集約した内容を Query として生成することで、 集約にかかる負荷を抑えたり、Queryの要求に応じて柔軟にQueryモデルの再生成が可能といったメリットがある。

CQRS を適用する上での欠点は、複雑性が増すこと。

一旦まとめ

長くなったので主に設計・実装に関わる前半が終わったこの辺で一回区切ります。

個人的に気になっていた、 Saga について実行可能なサンプルコードもある状態で解説されているのがとても良かったです。 OUTBOX を使用したトランザクションメッセージは個人的には目から鱗のアイデアだと思いつつも、 Saga パターンと補償トランザクションは真面目に書くと結構辛いなというのが正直な感想ではあります。

とはいえ、これより優れたアイデアというものも中々出てこないですし、 Saga を簡単に書き下す言語やライブラリの登場が今後あるかもしれないです。 個人的には、Saga を書くのに Scala + Akka は結構良いのではと思っていたりします。

あとはドメインイベントは必須だとして、 Event Sourcing をどうするか。 Event Sourcing を使うと CQRS も多分必須になるし。 なんとなく、 Event Sourcing がハマりそうな分野(会計とか)はあるものの、これを適用するのは勇気がいるなと思います。

とはいえ本書は全てのパターンを使うことを求めてはいないし、各パターンの利点・欠点もちゃんと述べている。 そのため、自分たちが作るものに合わせて最適なパターンを決めていけばいいし、 特定のサービスだけ Event Sourcing + CQRS にするというのももちろんありでしょう。

続きを書きました。 kencharos.hatenablog.com

2度目の転職をします

ブログの方は久しぶりです。 Qiita にはそこそこ書いてましたが、QIita に書いてもあまり個人に評判とかが反映されないなという思いが強まったので、 今後はブログに色々書いていこうかなと思います。 更新も一年ぶり、、頻度あげます、、はい、、

ちょっと前の記事をさかのぼれば 5年前の転職の記事が出てきてしまうのであれですが、 2度目の転職をすることになりました。

どこに行くのはここでは書かないですが、しばらくしたら Twitter で何かしら所属については発言する気がします。 ここでは、転職活動の振りかえりを書きます。

何をしていたか

ITコンサル的な会社にいました。 技術提案、PMO、一次受け開発などをするところで、主に開発やってました。 ここ一年は クラウドネイティブなシステム作るみたいなお仕事をしていて、 IaaS をいかに使わずに 開発と運用を回すかみたいなことに取り組んでました。 ほぼすべての技術構成を自分で決めて検証して、チームメンバで一緒に作る毎日は中々に楽しかったです。

他には社内で勉強会やら研修資料やらを作ったり。 Git や Java8 関数型プログラミング の研修資料を作り、それを新人研修のカリキュラムに組み込んだのは、 開発環境をモダン化していく上でよい仕事をしたなと自分でも思います。

どうして転職しようと思ったか

色々と考えましたが、まとめると転職で望むことや条件は次の3つになりました。

  1. サービス開発をやりたい
  2. 収入を増やす
  3. グローバルな環境(英語)

サービス開発をやりたい

今の職でも前職の SIer でも開発はやりますが、自社の事業としてのサービスを作っているわけではなく、基本的には受託です。 そうするとサービス開発の一部にしか携われないなと。 幸いにもアーキテクチャを作るような重要部分を任せていただくことは多かったです。 一方で企画段階だったり、あるいはサービスロンチ後にログや利用動向などをみてサービスの改善を図っていったりだとか、 改善をしていくことを前提とした設計だとかに携わることはあまりありませんでした。

サービス開発は作って終わりではなく、作った後も継続して改善が必要だと思っています。 どちらもやってこそ見えてくる境地があるだろうし、単純に自社サービスをやって得られるデータを扱う事には純粋な興味がある。 あとは大規模サービスはどうやって大量のトラフィックやログをさばいているの?とかも気になるしやってみたい。 事例公開はあるにはあるけど、本当のところは外からじゃわからない。

そういった思いがあり、事業会社でサービス開発に携わりたいと思うようになりました。

収入を増やす

幸いにも業界の平均よりも多めの給料はもらっていたと思います。 実際、転職活動中、今より収入を増やしてくれる会社はほとんどありませんでした。

とはいえ、収入が十分かどうかは家族構成や環境なんかで大きく変わりますし、 何かあった時の備えや貯蓄・投資もしたいし、ちょっと贅沢もしたいものです。 定年まで右肩上がりで給料が上がって、年金もばっちりもらえるなんてのは幻想だと思っているし、 今のうちに稼げるだけ稼いでおきたい。

そんなわけでもう少し収入を増やせないか、あるいは副業などの自由な働き方ができる会社がないかを考えるようになりました。

グローバルな環境(英語)

開発者としての地位と幅を上げていくためには、積極的に海外の情報を仕入れたり海外の人との交流が必要だなとひしひしと感じるようになりました。 仕事でも仕事外でも英語ができないと単純に辛かったり、歯がゆい思いをすることが増えました。 例えば、目の前に Java チャンピオンがいるし聞きたいことがあるのに、上手く話せなかったみたいなことがありました。

今の30,40代は、ギリギリで日本語だけで仕事ができる世代だと思っていて、 今後は英語ができないといい仕事にありつけないんじゃないかと思っています。

そんなわけで、英語を学べる・使える環境に身を置きたいなと考えるようになりました。 自分で英会話やればいいじゃんというのもその通りなんですけど、仕事で使わざるを得ないならやらざるを得ないだろうと。

上の3つの条件のうち最低でも2つを満たせるところに行こうと考えました。

転職活動でしたこと

大体一年くらい活動をしていました。

やったことは次の通りです。

職務経歴書(レジュメ)を書く

2度目の転職なので職務経歴書は元々ありましたが、 今までの経歴を振り返って最新化しました。 ただただやったことの羅列にならないように、 自分が主体的に行ったことのアピールを盛り込んだり、 重要でないことは削ったりとメリハリをつけるようにしました。

どうやって書いていいかわからないというのであれば、 転職ドラフトとか Moffers とかに申し込んでレジュメを書いてみたらいいと思います。 第三者にレビューしてもらえるので結構有用でした。

あと、前述した3つの条件にかすらない企業に当たってもしょうがないので、 自分の転職する条件は最初に書きました。 はっきり書いてある分、企業側の人も重視して見てくれたと思います。

企業との接点を増やす

転職系のサービス(転職ドラフト、moffers、Paiza、findy, folkwell、Bizleach、Linkedin)などはあらかた登録しました。

特に理由はないのですが、エージェントは登録しませんでした。 エージェントとのやり取りが面倒だから、レジュメを登録して企業側からのアクションを待ちたいみたいな心理が働いたのかもしれないです。 (事実結構忙しかった)

色々なサービスに登録したせいか、スタートアップ、ベンチャー、大企業、外資など色々なところから声をかけていただけたなと思います。 何社かの人にも実際に会いに行きました。

転職目的ではなく自身の習慣としてですが、Twitter ではいろんなエンジニアの方をフォローしているので、自然と採用に関する情報も入ってきました。 採用説明会を開くなどの情報が入ってきて気になる会社があれば、申し込んだりもしていました。

他には勉強会の参加などもしていました。 勉強会の参加履歴から企業側が興味を持ち、声をかけてくれたこともありました。 ただし、採用目的で勉強会に参加してもつらいだけなので、そこはあわよくば程度に考えましょう。

Qiita を書いたりもしていたので、企業側からは技術習得に熱心な人みたいな印象を持たれることが多かったように思います。 仕事での仕事外でもアウトプットをしていることは採用の評価に大きく影響すると感じます。

レーニン

外資系やベンチャーではコーディングテストが出ることが多いです。 CS 知識を問うものも多く、最低限以下の内容は押さえておかないとスタートラインにも立てないです。

  • データ構造(連結リストとかスタックとか木とか)
  • 探索、ソートのアルゴリズムと計算量
  • 幅優先検索とか深さ優先検索とかDPとか
  • 再帰

他にも Linux やネットワーク、HTTP の基礎的な知識を問われることもあるし、非常に高度なアルゴリズムを適用しないと解けない問題などもあります。 私自身は活動を始める前から Paiza をやったり、アルゴリズムの本を読んだりしていました。

活動を終えてからしりましたが、外資向けのコーディング試験対策としては、

leetcode.com

などが有名なようでした。 いずれにせよ、普段書いているプログラムとはちょっと趣向が違う課題が多いので、 対策はしておいた方がよいと思います。 どこまで対策するかは、その会社がどこまでの技術レベルを求めるかによるので一概には言えないですが、 凄腕の技術者が多い会社であればそれだけ採用試験は難しいとみていいと思います。

面談・面接

大体の場合、仕事で何をしてきたかが問われますので、ちゃんと自分がやってきたことをアピールできるように、 仕事で自分が主体的に取り組み工夫してきたことを自分の言葉で話せるようにしておきます。 10年以上この業界で働いていますので、自分のスキルをちゃんと自分で語れないようでは、採用したいとは思ってはくれないでしょう。

転職をしたらどうなりたいか・何をしたいかも大体聞かれます。 1年後、5年後のなりたい自分とキャリアプランを考えておくといいと思います。

また、前述の3つの条件については早めに話すようにしました。 なので給与レンジなどはカジュアル面談の時点で聞くようにしていました。 オファーの段階になって希望年収を下回るというのも時間の無駄です。

それに、企業側の人もお金の話をしても嫌な顔はしませんでした。 むしろ外資系の場合は先に、幾らぐらい欲しいの? と聞かれました。 特にエージェントを介さない場合、年収交渉は自分でやるしかないので、 お金の話はきちんとしたほうがいいです。

結果

何社かの人はカジュアル面談に伺い、お話を聞かせていただきました。 3つの条件以外にスキル的なアンマッチもあったりして、採用プロセスに進もうと思ったところは正直そんなになかったです。 これは正直なところ、忙しかったから受ける会社は厳選したいという思いもありました。

数カ月にわたって7,8度の面接を行って落ちたところもあり、その時は正直だいぶへこみました。

その後、たまたま Twitter を見ていたら某企業が採用説明やる情報を見つけて、 採用説明会に行き、興味があったので、申込、コーディングテスト、面接2回で内定と半月くらいであっさり決まりました。

面接の帰りに近くの店で餃子食べてたら、電話かかってきて内定の連絡だった時は正直笑いました。 自分の上げた3つの条件をすべて満たせるところだったので結果的には良かったと思います。

今後について

すごい人ばかりいるところなので、「俺より強い奴に会いに行く」という気持ちです。 入社は年初となりますので実際に入ってみてどうだったかは、試用期間が過ぎる頃にまたレポートしようと思っています。

転職はゴールではなくスタートなのでこれからも頑張っていかないといけないですが、今後も設計とプログラミングの間で生きていく所存です。 今後ともよろしくお願いします。

「AkkaにPull Requestをあげようハッカソン」に参加してきたよ

先日、「AkkaにPull Requestをあげようハッカソン」 に参加してきました。

jsa.connpass.com

akkaコミッターの Konrad さん (https://github.com/ktoso) に加え、 Scala Matsuri 運営の @ さんと @ さん、 さらに会場提供していただいたセプテーニオリジナルの @ さん と @ さん といった Scala 界の豪華メンバーに囲まれて、 Akka の PR を出すために直接相談しならが作業を進めるという、何とも素敵な体験をしましたのでその報告をします。

ちなみに私は時間内に PR を出すことができませんでしが、宿題という形でちゃんと出しました。 敷居は高くないんだよということをお知らせしたいと思います。

私について

  • アーキテクト的な立ち位置で、普段は Javaを書いていてここ最近は C#, Azure
  • Scala はほぼ趣味で Play 書いたり、コップ本、 FP in Scala, Akka in Action 読んだりしました
  • 4月に社内のプロトタイプで好きにやっていいと言われたので、 akka-http の webSocket でチャットボット作ったりしました
  • 英語力はほぼなしで、読みはどうにかなるくらい。
  • OSS の貢献はしたいと思っていたが、結局やってなかった

参加したきっかけ

このイベントの趣旨を見たときは、正直畏れ多いと思いましてためらいましたが、 http://blog.scalamatsuri.org/entry/2017/11/15/133342 などをみて、気軽に行ってみようと思いました。 OSS やりたいけど英語や腕に不安のある私のような人は多いはずですし、良い試金石になるかもという思いもありました。

なお、Scala 大好きな後輩に参加を薦めておいて、私だけ当選しました。ごめんね。

当日について

まず、どの Issue を対応するかについての説明がありました。 https://github.com/akka/akka-http/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22 のように、 "help wanted" がついた issue は有志のパッチを受け付けてくれるそうで(知らなかった)、そこから興味があるものを選択して作業し、わからないことは konrad さんや運営スタッフと相談という感じでした。

実装方法の相談、テストの仕方、 PR の相談などを何でも聞いてくれて、 英語のコミュニケーションは @ さんと @ がサポートしてくれて本当に万全の体制でした。ありがとうございます。

他の参加者の @ さん, @ さんがそれぞれ、 alpakka, akka の issue に着手したので、私は経験もあるし、 akka-http の issue をやることにしました。

簡単そうだなーと思ってやってみると、意外に難しいとか、実はもう対応されてたとか色々 issue をあさり、 URLエンコーディングの解析不具合によるエラーがでるという issue を対応することにしました。

github.com

多分簡単だろうと思っていたら、呼び出し場所によって挙動が変わったり、URL解析のための割と複雑なコードがあったりと原因を見つけ、対策を考えるのに割と時間を食い、あえなく時間切れとなりました。 後日絶対 PR 出してやるぞと思いました。

なお、URL解析は結構面倒だし色々な場所で呼ばれるので、ちゃんとパフォーマンスを考えないといけない。 if文一つの追加でもパフォーマンスには影響がでるかもしれないから、 ベンチマークをちゃんとやってから、PRマージするか決めるねという、ありがたいアドバイスをいただきました。

そういう意図なのかわかりませんが、私が直そうとしていたコードは、普通に var や while がありました。 なので、FP 詳しくなくても大丈夫ですよ。

あと、私は Windows マシンで参戦しましたが、開発上は特に問題ありませんでした。 ただし、 PR 出す際に全てのテストを流してわかりましたが、 Windows ではいくつかのテストが失敗またはスキップされます。 なので万全を期すなら Bash on Windows や Docker などでビルドできる環境も用意したほうがいいです。

あと、パエリアおいしかったです。ごちそうさまでした。

PR 出したよ

Adding percent encoding length check for Path.Uri #1553 by kencharos · Pull Request #1590 · akka/akka-http · GitHub

PR 出した後、速攻で Konrad から応答いただけてうれしかったです。

ただ、verup の関係か、修正箇所とは違う場所のCIテストがこけているので、もう少し調べてマージされるまで付き合います。 コア機能に手を入れると色々な箇所に影響を与えそうですね。

そのあと、半日程度で検証・マージされました。 多分ふつうのルートだとこんなに速攻でマージされることはないはずなので、本当にありがたいことです。

思ったこと・感想

前述のブログ記事にもある通り、海外では今回のようなみんなで集まって OSS のパッチを出すイベントが結構あるらしく、それを日本でもという趣旨が素晴らしいと思いました。

OSS への貢献は、特に日本では言語の壁もあるし、コミュニティに参加する人も少ないこともあって敷居が高いと感じていました。初めの一歩が難しいというやつですね。 その初めの一歩を手厚くサポートしてくれるというのは、とてもありがたいことでした。 今日だけでなく、今後もこの活動を続けていきたいと思いました。 何らかの形で、この活動に関わっていければと思います。

あとは英語について。
個人的な話ですが、年、別件で海外のすごいエンジニアの方と会食をするという機会をいただいたことがあったのですが、自分は全く何もできなくて勿体無い思いをしました。
今回も Konrad さんがいらっしゃっていて、Akka だけでなく Scala の動向だとかコミュニティに関する話など色々と直接聞ける機会だったのに、自分は何をやっていたんだろうと。 この業界にいるなら、英語はできないよりできたほうが圧倒的に楽しいし仕事の幅も増えるはずなので、もっと勉強しなければと思いました。努力が必要される場所に身を置きたい。

また、自身の反省として、コントリビューションガイドはちゃんと読んでおくべきだったことが挙げられます。 例えば、"help wanted" という誰でもパッチを上げていいという issue の存在自体をまず知りませんでしたし、 よく読めばどうやって作業したらいいかということが書いてありました。

反省ついでに、akka-http の コントリビューションガイドの抜粋を書いておきます。 PR作るにあたって重要そうな箇所だけを抜き出したので、より詳しく知りたい方は原書を参照してください。

akka-http/CONTRIBUTING.md at master · akka/akka-http · GitHub

Contribution Guideの 抜粋

タグについて

t: で始まるタグはトピックを表し、issue がどの要素に対するものなのかを示します。 t:http:core などです。

すべての issue が開かれていますが、初めての場合は次の2つのタグが付いているものから着手するといいでしょう。

  • help wanted タグはコアメンバーが作業する時間がないか入門にちょうどいいissue です。不明点があれば気軽に訪ねてほしい。

  • nice-to-have (low-priority) は重要度が低いものです。興味があるならやってみてください。コントリビューションを歓迎します。

数字のついたタグは、開発状況を示しています。

  • 0 new は要件が不明瞭なものです。議論をするために、discuss も一緒についていることがあります。内容が変わったりクローズするかもしれません。
  • 1 triage は、有意義なチケットです。パッチを送るなら、このタグがついているものがいいでしょう。
  • 2 pick next は作業対象にしたいチケットで、次回のリリースに含めたいPRに主にコアメンバーが付与します。
  • 3 in progress は誰かが作業中のものです。

他に次のタグが使用されます

bug - バグを示し、feature よりも優先して対応することを示します。 failed - Jeknins の夜間ビルドが失敗したものを示します。

また、 PR の進捗状況のタグは次のように遷移します。

validating => [tested | needs-attention]

PRのワークフロー

| ※ PR を出す前に、6の CLA の登録はしておいたほうがいいです。

  1. 対応するissue を探し、決定する
  2. リポジトリを自分のgithubアカウントにフォークする
  3. git checkout -b wip-custom-headers-akka-http など追加したい featureのブランチを作成して作業する
    • 変更内容の検証をするためには、 sbt の validatePullRequest タスクが役に立ちます。このタスクはmaster ブランチからの変更箇所から影響のあるプロジェクトを特定し、影響のあるプロジェクトすべてのテストを実行します。
      • 例えば akka-http-core のソースを修正すると、akka-http-coreに依存するすべてのプロジェクトでテストが行います。
  4. コミットを以下のルールで作ります
    • Adding compression support for Manifests #22222 のように作業内容の概要とチケット番号を必ず先頭に含めます
      • 簡単なPRならば概要は minor fix などでもOKです
    • 詳細が必要なら1行開けて、詳細な内容を追加します
    • 複数のコミットを行った場合は squashでコミットを1つにまとめてください。
  5. akkaリポジトリにたいして、PR を発行してください。
  6. ここ で CLA(Contributor License Agreement) に登録してください。
    • githubアカウントが、CLA に登録されていない場合、 PR の検証が OK になりませんので、事前に登録しておいた方がよいです。
  7. 初めてのPRの場合など、アカウントがホワイトリストに登録されていない場合、 BotCan one of the repo owners verify this patch? とたずねてきます。コアメンバーが確認をして、OKを回答します。これは悪意のあるPRによって、ビルドが壊れることを避けるためです。
  8. アメンバーや興味のある人によってレビューや場合に応じて質問や修正依頼が来ます。質問などは気軽にしてください。通常 2つの LGTM が得られたらマージする対象として認められたことになります。
  9. 次回のリリースで、PRが本体にマージされます!

最後に

とても貴重な体験をしました。ありがとうございました!

jBatch RI を Java SE でまともに(JPA,CDI,Transactionalつきで)動かす #javaee

この記事は、Java EE Advent Calendar 4日目の記事です。

昨日は @lbtc_xxx さんの Java EE ブログエンジン Apache Roller のご紹介でした。

明日は、making@github さんです。

概要

背景を説明すると、既存の Java 製のオレオレバッチフレームワークがあるんだけど、 設計が古いので、改善したいと。

標準の jBatch(JSR-352)ってのがあるけど、これアプリケーションサーバーいるんでしょ? ちょっと面倒だなーとかと思ってたら、 Java SE 環境でも動くらしい。

yoshio3.com

が、色々と足りないというか、Java EE 向けに書いたソースが、以下のようにそのままでは使えないこともわかりました。

  • CDI が使えない
    • ジョブXML の ref 属性に、 @Named の名称で指定できない
  • Batchlet で @Transactional が使えない
    • JTA 入ってないしね
  • チャンク方式における分割コミットが動作しない
  • JPA も使いたいよね

というわけで、この辺を改善して、SE環境で使う際の制限を取っ払うことができないかというお話です。

内容的に、Java EE でいんだろうかと思わないわけでもないですが、タイミングがよかったのと、色々と知見を得たのでまとめてみたいと思いました。

最終的なソース一式は Github にあります。

前提として、CDI, JPA, jBatch を一通り触ったことがあるくらい人向けの内容です。 知らない人でもそれぞれの入門記事を見てもらえればわかるんじゃないかと思います。

環境準備

上で引用した記事ですと、jBatch の参照実装と derby なんかのjarファイルを取得してクラスパスに設定するようになっていましたが、 調べると Maven リポジトリがあったので、ちゃんと依存性を管理するようします。

やりことを全部含めた 依存性定義はこんな感じになりました。

      <!-- Java EE API もろもろ -->
      <dependency>
         <groupId>javax</groupId>
         <artifactId>javaee-api</artifactId>
         <version>7.0</version>
      </dependency>
      <!-- glassfish に載ってる jBatch の参照実装 -->
      <dependency>
         <groupId>com.ibm.jbatch</groupId>
         <artifactId>com.ibm.jbatch-runtime</artifactId>
         <version>1.0</version>
      </dependency>
      <!-- jBatch RI がジョブ実行状況の記録に使うDB -->
      <dependency>
         <groupId>org.apache.derby</groupId>
         <artifactId>derby</artifactId>
         <version>10.12.1.1</version>
      </dependency>
      <!-- CDI 実装 -->
      <dependency>
         <groupId>org.jboss.weld.se</groupId>
         <artifactId>weld-se</artifactId>
         <version>2.3.1.Final</version>
      </dependency>
      <!-- JPA 実装 -->
      <dependency>
         <groupId>org.eclipse.persistence</groupId>
         <artifactId>org.eclipse.persistence.jpa</artifactId>
         <version>2.5.2</version>
      </dependency>
      <!-- お好きなJDBCドライバ -->
      <dependency>
         <groupId>org.postgresql</groupId>
         <artifactId>postgresql</artifactId>
         <version>9.3-1102-jdbc41</version>
      </dependency>

jBatch RI の概要

  • ジョブのステータス管理として Derby を使っている
  • ジョブの実行はConcurrency Utility のスレッドプールで動いている
    • このスレッドプールは、Java SE 環境でも有効なため、ジョブの再投入や並列実行もOK
    • 既存バッチは、1ジョブごとにJVMを起動するので起動時間がネックとなっていたため、これは大きな利点
  • 設定やカスタマイズは、 META-INF/servicesの下に batch-configやbatch-services というプロパティファイルで行う。
    • 前者はジョブ管理用の DerbyのURLとかを設定
    • 後者は、カスタマイズ用の定義ファイルで、jBatch RI はサービスという形式で拡張ポイントを用意している
      • 今回は、CDIを使ってジョブインスタンスを取ってきたり、チャンクトランザクション制御のカスタマイズとかに使っている
      • プロパティのキーにサービス名、値に所定のクラスを継承して作ったクラスの FQDNを書いておく

Batchlet でJPAと @Transactional を使う

さて、では実際にどのような拡張を行ったかを紹介します。

まずは今回動かす Batchlet と job xml です。

Batchlet は JPA でテーブルのデータを消すものです。

package batchlet;

import javax.batch.api.AbstractBatchlet;
import javax.batch.api.BatchProperty;
import javax.batch.runtime.context.JobContext;\
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.inject.Named;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;

@Dependent
@Named // これで、"dataDeleteProcess" だけで、JOB XML に記述できる。 
@Transactional
public class DataDeleteProcess extends AbstractBatchlet{

    @Inject
    private JobContext ctx;

    @Inject // プロデューサーメソッド経由で取得
    private EntityManager em;
    
    @Override
    public String process() throws Exception {
        
        int deleted = em.createQuery("delete from Employee e").executeUpdate();
        
        System.out.println(deleted + " rows deleted.");
       
        
        // 終了ステータス
        return "COMPLETE";
    }
}

次は job xml。 Batchlet に @Named アノテーションが付いているので、ref 属性は CDI の bean 名称で指定可能です。

<job id="cleanup-job"
     xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
     version="1.0">
      
  <step id="clean">
    <batchlet ref="dataDeleteProcess">
        
      <properties>
        <property name="text" value="TEXT"/>
      </properties>
    </batchlet>
  </step>
</job>

ついでに、 EntityManger を @Inject できるようにするプロデューサーフィールド

@Dependent
public class EMProducer {
    @PersistenceContext(unitName = "default")
    @Produces
    private EntityManager em;
    
}

これは、 Java EE 環境では普通に動きますが、SE環境では、以下の理由で動きません。

  • @PersitenceContext は Java EE環境のみ(だと思う) なので、対応できない
  • JTA が無いので、@Transactional が動かない
  • CDI も有効でないので、ref属性にはクラスの FQDN を書かなくてはいけない

というわけで1つ1つ解消していきます。

JPAJava SE環境で使う

JPAJava SE 環境で動かすには、JTA ではなくローカルトランザクションで動かします。 (JTA 詳しくないです。 SE で動かすことできるの?)

というわけで、 persistence.xml を書き直し、データソースではなく JDBC経由とする。

<?xml version="1.0" encoding="UTF-8"?>
<persistence version="2.1" xmlns="http://xmlns.jcp.org/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
  <persistence-unit name="default" transaction-type="RESOURCE_LOCAL">
    <provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>
    <exclude-unlisted-classes>false</exclude-unlisted-classes>
    <properties>
        <property name="javax.persistence.jdbc.driver" value="org.postgresql.Driver"/>
        <property name="javax.persistence.jdbc.url"
             value="jdbc:postgresql://localhost:5432/postgres"/> <!--好きなDBを設定 -->
        <property name="javax.persistence.jdbc.user" value="postgres"/>
        <property name="javax.persistence.jdbc.password" value="postgres"/>
      <property name="eclipselink.logging.level" value="FINE"/>
      <property name="javax.persistence.schema-generation.database.action" value="drop-and-create"/>
    </properties>
  </persistence-unit>
</persistence>

で、 EntityManger をインジェクションするためのプロデューサーメソッドも書き直す。

package db;

import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

@Dependent
public class EMProducer {
    
    EntityManagerFactory emf = Persistence.createEntityManagerFactory("default");
    
    @Inject
    private EMHolder holder;
    
    @Produces
    public EntityManager getEm() {
        // ホルダーにEMが無い場合やあるけどトランザクションが終了している場合(※1)は新規取得
        // ※1 ジョブの複数起動でスレッドが再利用された場合
        if(holder.getEM()== null || !holder.getEM().isOpen()) {
            EntityManager em = emf.createEntityManager();
            holder.setEM(em);
        }
        
        EntityManager em = holder.getEM();
        
        return em;
    }
}

ここに出てくる EMHolder とはこんなの

/**
 * スレッドローカルでEntityMangerを保持するBean
 */
@ApplicationScoped
public class EMHolder {

    private ThreadLocal<EntityManager> holder = new ThreadLocal<>();
    
    public void setEM(EntityManager em) {
        holder.set(em);
    }
    
    public EntityManager getEM() {
        return holder.get();
    }
    
    public void removeEM() {
        holder.remove();
    }
}

これは何をやっているかというと、スレッド単位で EntityManger(=コネクション) を1つだけ生成したいために、 スレッドローカルをアプリケーションスコープで作っている。

jBatch で CDI を使うと、 Bean は全て @Dependent なのだが、Java SE環境では、@Dependent は都度インスタンスを生成してしまう。 こうすると、EntityManger のインジェクションごとに、EntityManger を生成していまうので、 複数 bean 間で EntityManger の共有ができないため、スレッドローカルを用いている。

@Transactional に反応するインターセプタを作る

JTA を使わないので、@Transactional によるトランザクション制御は自前のインターセプタを作って行う。

package se;

import db.EMHolder;
import javax.inject.Inject;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.transaction.Transactional;

@Transactional
@Interceptor
public class SeTransactionInterceptor {

    @Inject
    private EntityManager em;
    
    @Inject
    private EMHolder holder;

    @AroundInvoke
    public Object doTransaction(InvocationContext ic) throws Exception {
        EntityTransaction tx = em.getTransaction();
        Object result = null;
        try {
            if (!tx.isActive()) {
                tx.begin();
            }
            
            System.out.println("tx interceptor start");
            result = ic.proceed();
            if (tx.isActive()) {
                tx.commit();
            } 
        } catch (Exception e) {
            try {
                if (tx.isActive()) {
                    tx.rollback();
                }
            } catch (Exception e2) {
            }
            throw e;
        } finally{
            holder.removeEM();
            if(em.isOpen()) {
                em.close();
            }
        }
        return result;
    }
}

やっていることは、メソッド実行前にトランザクションを開始して、成功すればコミット、例外が起きればロールバックというもの。 本当だと、例外の種類によってはコミットしたり、トランザクション属性なんかもあるけど、とりあえず割愛。 ここで、インターセプタとBatchlet で EntityManger を共有するためにも、前述の EMHolder が生きてくる。

DB周りはこれでOKなので、次は CDI

CDI を有効にする

上で述べたとおり、サービスによってjBatch RI は拡張可能になっている。 JOB XML から、 ジョブインスタンスを取得する箇所は、 batch-services.properties に、 CONTAINER_ARTIFACT_FACTORY_SERVICE を設定すると拡張できる。

また、jBatch RI は、 Weld SE(Weld は glassFishでも使っている CDI 実装。SEはそのJava SE版) を使った 上記のサービス実装が存在しているため、

CONTAINER_ARTIFACT_FACTORY_SERVICE=com.ibm.jbatch.container.services.impl.WeldSEBatchArtifactFactoryImpl

のように書けば、CDI が使用できるし、ref属性に Bean の名前が書けるようになる。

よかったこれで解決ですね。と思ったのだけど、ジョブを2回投入すると2回目のジョブが起動できない。。。

WeldSEBatchArtifactFactoryImpl をみていると、ジョブインスタンスを取得するたびに、 Weldコンテナ(DIコンテナ)を生成しているため、 この辺に原因が有りそうだと踏んだ。

実際、このソースを参考に、Weldコンテナを1回だけ生成するようなサービスを作ったら、この問題は解消した。

package se;

import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Named;

import org.jboss.weld.environment.se.WeldContainer;

import com.ibm.jbatch.container.exception.BatchContainerServiceException;
import com.ibm.jbatch.spi.services.IBatchArtifactFactory;
import com.ibm.jbatch.spi.services.IBatchConfig;
import org.jboss.weld.environment.se.Weld;

@Named("MyWeldBean")
public class WeldContainerFactory implements IBatchArtifactFactory {

    // 1回だけ生成
    WeldContainer weld = new Weld().initialize();
    @Override
    public Object load(String batchId) {
        Object loadedArtifact = getArtifactById(batchId);

        if (loadedArtifact == null) {
            return loadedArtifact;
        }

        return loadedArtifact;
    }

    private Object getArtifactById(String id) {
        Object artifactInstance = null;

        try {
            // 元の実装はここで毎回 Weld().initialize();
            BeanManager bm = weld.getBeanManager();

            Bean bean = bm.getBeans(id).iterator().next();

            Class clazz = bean.getBeanClass();

            artifactInstance = bm.getReference(bean, clazz, bm.createCreationalContext(bean));
        } catch (Exception e) {
        }
        return artifactInstance;
        
    }
//(割愛)
}

このサービスを有効にするには、batch-sevices.properties にこう書く。

CONTAINER_ARTIFACT_FACTORY_SERVICE=se.WeldContainerFactory

これで、Java SE 環境でも Java EE の要領で記述した batchlet が使えるようになった。

チャンク形式のトランザクション制御を実現する

jBatch のチャンク形式は、大量データの処理に向いていて、job xml のパラメータ設定だけで、 データのコミットを行う間隔を変更できたりします。

例えば、以下は読込んだデータについて2件ごとにコミットする設定になります。

<?xml version="1.0" encoding="UTF-8"?>
<job id="chunk-sample" xmlns="http://xmlns.jcp.org/xml/ns/javaee"  version="1.0">
    <step id="insert">
        <chunk item-count="2"><!-- item-count がコミットする件数 -->
            <reader    ref="employeeFileReader">
                <properties>
                  <property name="input" value="c:/temp/input.txt"/>
                </properties>
            </reader>
            <processor ref="employeeProcessor"/>
            <writer    ref="employeeWriter"/>
        </chunk>
    </step>
</job>

ですが、これも Java EE の場合のみです。 jBatch RI は Java SE環境だと、トランザクション制御を行ってくれません。 といわけでこれも対応させてみます。

トランザクションサービスの作成

トランザクション制御に関する処理も、TRANSACTION_SERVICE というサービスで拡張可能になっています。

ここでは、既存のトランザクション制御用のサービスを上書きし、EntityManger のトランザクションを利用してトランザクション制御を行うように自作サービスを作成します。

package se;

import db.EMProducer;
import com.ibm.jbatch.container.exception.TransactionManagementException;
import com.ibm.jbatch.container.services.impl.BatchTransactionServiceImpl;
import com.ibm.jbatch.spi.services.TransactionManagerAdapter;
import javax.batch.runtime.context.StepContext;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.CDI;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;

public class SeTransactionManagerImpl extends BatchTransactionServiceImpl {

    @Override
    public TransactionManagerAdapter getTransactionManager(StepContext stepContext) throws TransactionManagementException {

        return new TransactionManagerAdapter() {
            EntityTransaction tx;

            @Override
            public void begin() {

                BeanManager beanManager = CDI.current().getBeanManager();
                Bean<?> empBean = beanManager.getBeans(EMProducer.class).iterator().next();

                EMProducer emp = (EMProducer) beanManager.getReference(empBean, 
                        EMProducer.class, beanManager.createCreationalContext(empBean));
                EntityManager em = emp.getEm();
                tx = em.getTransaction();
                tx.begin();
            }
            @Override
            public void rollback() {
                tx.rollback();
            }
            @Override
            public void setRollbackOnly() {
                tx.setRollbackOnly();
            }
            @Override
            public void commit() {
                System.out.println("tx commit");
                tx.commit();
            }
            @Override
            public int getStatus() {
                return 0;
            }
            @Override
            public void setTransactionTimeout(int arg0) {
            }
        };
    }
}

TransactionManagerAdapter というのが、トランザクション制御用のインターフェースです。 ここではかなり無理やりですが、 CDI の BeanManger から Batchlet の項で作成した EMProducer を取得して、EntityManger を取得し、 トランザクションを取得しています。

今のところ、JPA に限定した実装ですが、JDBCコネクションにすることもできるでしょう。 また、begin,commit,rollback以外の実装は適当です。

これで、チャンク形式の機能も Java EE 同様に近づいたと思います。

並列処理などの検証はしていないですけど、多分動きそう。

(おまけ)ジョブ投入方法

mainメソッドなどから、ジョブを投入するには、以下のようなコードを書けばOKです。

    private static void executeJob(String jobId) {
        long execID=0;
        // ジョブの開始時はXML読み込みなど同時実行されるとおかしくなる処理があるので、ジョブID単位で同期化しておく。
        JobOperator jobOperator = BatchRuntime.getJobOperator();
        synchronized(jobId){
            Properties props = new Properties();
            execID = jobOperator.start(jobId, props);
            System.out.println("job stated id:" + execID);
        }
        JobExecution jobExec = null;
        // ジョブが終了するまでポーリング
        while (true) {
            jobExec = jobOperator.getJobExecution(execID);
           
            if (jobExec.getEndTime() != null) {
                break;
            }

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException ex) {
            }
        }
        System.out.println("JOB END:Status is " + jobExec.getExitStatus() );
        
    }

jobOperator.start はジョブを開始すると、ジョブの実行終了を待たずにジョブIDを返して終了します。 そのためジョブの終了を待つためには、ポーリングするなどの対処が必要になります。 ここでは、ジョブ投入ごとにポーリングしていますが、ジョブ終了を検知するだけのタスクを別に作るという方向性でもいいと思います。 正直この辺は面倒ですね。標準でジョブ終了まで待つような仕組みが欲しいです。

まとめ

というわけで、 jBatch を拡張して Java SE でもそれなりに使うという話でした。

Java EE は色々と最初から用意されているので、楽だとあらためて思いました。

また、Java SE で使わなくても、 ジョブ単位でユニットテストをしたいという場合には、使える内容なんじゃないかと思います。 あと、CDI の裏側の仕組みも色々知れたので個人的には楽しかったです。

今回色々やってみて、スレッドプールでジョブが実行されるのがわかったのは、JVMの起動回数を抑えることができそうなので、結構有用でした。 とはいっても、 このプログラムを常駐化させてジョブ投入のリクエストを受け付けるようにするのは、 ソケット使ったりする必要があるのでわりと面倒ですけどね。

本格的にやるなら、 paraya micro で埋め込みサーバ使ったり、 あるいは Spring boot + Spring batch(+ JSR-352 Support) で組んでしまった方が楽かもしれません。

Javaで静的リソースを返すだけのHTTPサーバーをやっつけで

信頼性は無くてもよいので、複数のディレクトリにあるファイルをhttpで提供する必要が出てきたのでやっつけで作った。

javaによる複数の特定ディレクトリ以下の静的リソースを返すHttpサーバー(やっつけ実装)

色々と制約があり、jettyのような外部ライブラリを使わないでおこうと思ったので、 com.sun.net.httpserver.HttpServer というJDKに付属している非公開のAPIを使用している。 com.sun.net.httpserver.HttpServer はシンプルで良いのだが、特定URL配下に特定のディレクトリを割り当てるようなAPIが無かったので、 NIO2を使用してディレクトリ配下を走査しつつURLを割り当てている。