Skip to main content

バージョン 4 のアーキテクチャ

このページでは現在開発中の OpenRefine 4 のアーキテクチャを説明します。従来バージョンとは重要な点で異なり、プロジェクトや履歴の表現方法が刷新されています。これにより RAM に収まりきらないテーブルの処理、時間のかかる操作に対するユーザー体験の改善、そしてワークフロー再現性の向上が可能になります。

プロジェクト履歴の構造

プロジェクト履歴は、プロジェクトが辿る複数の状態の連なりです。各状態は Grid として表現され、以下のデータを含みます。

  • 列名などのメタデータを持つ列ヘッダー
  • プロジェクト本体のテーブル
  • テーブルと一緒に保存されるオーバーレイモデル(Wikibase スキーマなど)

最初のグリッドはプロジェクト作成時にインポーターが生成します。以降の各グリッドは直前のグリッドに Change を適用して得ます。Change は前のグリッドを受け取り、新しいグリッドを生成する関数です。

グリッドはイミュータブル(不変)なオブジェクトです。つまり Change はグリッドをその場で書き換えず、変更を反映した新しいコピーを生成します。ランナー の節で見るように、これは効率的に行え、多量のデータコピーを伴わない設計になっています。イミュータブルなグリッドはスレッドセーフティの保証にも役立ち、たとえばファセットを評価した履歴上の時点が明確になるため、複数状態が混ざったビューを返しません。

以前のアーキテクチャでは Change がグリッドへの適用だけでなく、逆操作(元に戻す)も実装する必要がありました。前方向のみで済むようにしたことには、以下の重要な効果があります。

  • 片方向だけ実装すればよいため Change の実装が容易になります。双方向を実装して互いに逆関数であることを保証するのは難しく、#2 のようなバグの原因にもなっていました。拡張に独自の Change を定義させる設計では、誤った実装により Undo 後のプロジェクト状態が壊れるリスクが特に高くなります。
  • OpenRefine の多くの操作は破壊的(元データを削除・更新)であり、巻き戻すには Change オブジェクト内に削除/変更されたデータを保持する必要がありました。逆操作を求めないことで Change を格段に軽くできます。
  • Undo には過去のグリッドが参照可能である必要があります。ランナー の節で見るとおり、すべてのグリッドをメモリに置かなくても効率よく実現できます。

Runners

グリッドとその変換の実装は差し替え可能です。つまりグリッドをメモリ上でどう表現するかを、利用可能なリソースや実行環境に応じて選べます。これを支えるのが Runner という概念で、Grid インターフェースの具体実装を生成時に選択するファクトリクラスと考えられます。

  • ローカルランナー はデフォルト実装です。OpenRefine と同じマシンに全データがある前提で設計され、表示・集計・エクスポートなど必要なタイミングでのみディスクから読み込む遅延評価を行います。これにより巨大なデータセットでも膨大な RAM を要求せずに処理できます。遅延評価と即時評価 の違いは後述します。
  • Spark ランナーは分散データセット向けです。データを複数マシンに分割して保持し、Spark クラスター全体でワークフローを実行します。ローカルランナーと同様に Change は遅延評価されます。
  • テストランナーはすべてのデータをメモリにロードする単純な実装です。性能最適化はされておらず、Runner インターフェースの仕様を最小限満たす形になっています。操作・インポーター・ワークフロー基礎部品のテストで利用され、少量データであればシンプルさゆえに十分高速です。実行中に追加チェックも行うため、誤動作の検出が容易になります。ローカル/Spark ランナーと異なり即時評価を採用します。

Runner はコマンドラインの -r オプション(Windows では /r)でデータモデルランナーのクラス名を指定して切り替えられます。refine.ini 設定ファイルにはコマンドライン指定が無い場合に使うクラスを登録できます。新しい Runner を拡張として追加すれば、さまざまな環境や用途をサポートできます。

遅延評価と即時評価

ここではローカルランナーを例に遅延評価の仕組みを説明します。プロジェクトは通常履歴を通じてアクセスされます。プロジェクトを開くと history.json を読み込んで操作履歴を把握し、初期グリッドや履歴ごとの ChangeData をロードします。

履歴にある Change をすべて実行し、結果テーブルをメモリに保持することも理論上は可能です。しかしプロジェクトが巨大になると全行がメモリに収まらず現実的ではありません。たとえ収まったとしても、必要になった時点でデータを読み込む方が望ましい場面が多いでしょう。

これを実現するため、グリッドは問い合わせ時に内容を算出するロジックを保持するプロキシとして表現されます。

mapRows などで Change を適用すると、新しいグリッドは元のグリッドに変換処理を付与したプロキシのように振る舞います。変更後セルの内容を即座に計算するのではなく、新グリッドが参照された際に実行する「レシピ」を保持するだけです。これは、新しいグリッドの行をすぐに計算してそのデータをすべて抱える即時評価とは対照的です。即時評価のグリッドは新しいテーブルの全行・全列を保持するためメモリを多く消費しますが、アクセス時に再計算する必要がなく、任意の行や列を即座に返せます。

このようにグリッド上の操作が 遅延評価 されるため、これらの Runner を「遅延ランナー」と呼びます。実際の計算はグリッド生成時ではなく、集計でファセット統計を求める時や行範囲を取得する時など、データが利用される瞬間まで遅らされます。

遅延評価には次のような重要な影響があります。

  • 変換後グリッドを表すプロキシオブジェクトは非常に軽量で、データそのものではなくレシピだけを持ちます。そのため多くのプロキシをメモリ上に保持しておけます。実際 OpenRefine はプロジェクト履歴の各ステップに対応する Grid オブジェクトを保持することが多く、Change に逆操作がなくても過去の Grid に戻るだけで変換を元に戻せます。
  • 履歴が長くなると、プロジェクトデータはプロキシの長いチェーンを経由してアクセスされ、ワークフロー全体が何度も再実行されることになります。負荷が大きくなってきたら中間グリッドのどこかでキャッシュを有効にし、そのグリッドの内容を一度計算してメモリやディスクに保存できます。この戦略は メモリ管理 で詳しく説明します。
  • 遅延評価では、Change に必要な計算がたとえばファセット更新のたびに何度も走る可能性があります。URL からのデータ取得のように高コスト、もしくは副作用のある処理には望ましくありません。これを避けるため、Change は高コスト計算の結果を ChangeData オブジェクト に保存できます。ChangeData は一度だけ評価されディスクに永続化され、通常は join によって旧グリッドと組み合わせることで新しいグリッドを導出します。

ChangeData オブジェクト

ChangeData は、再計算コストや副作用の大きいデータを 1 度だけ計算して保持するためのデータ構造です。たとえばリコンサイル結果、URL からの取得データ、純粋関数 とは限らない式の評価結果などを保存する用途で利用します。

ChangeData はグリッド内の行またはレコードに紐づく任意のデータ項目をインデックスできます。各行/レコード ID につき最大 1 つの項目が対応します。典型的なライフサイクルは以下の通りです。

  • Grid::mapRowsGrid::mapRecords でグリッドに行/レコード単位の関数を適用して ChangeData を作成します。通常これは遅延操作であり、重い計算はまだ実行されていません。
  • 直後に、プロジェクトのシリアライズ で述べるプロジェクトディレクトリ内のファイルに保存します。ここで初めて高コストの計算が実行されるため、保存に時間がかかる場合があります。
  • 保存が始まったら(処理中でも)Runner.loadChangeData を使ってシリアライズ済みデータから読み戻せます。その時点で計算・書き込み済みの項目のみが読み込まれます。
  • その ChangeData を元のグリッドと join してセルへ書き戻します(取得した URL を格納する列を新設するなど)。Grid::join などのメソッドを用います。

プロジェクトのシリアライズ

1 つのグリッドは専用ディレクトリにシリアライズされ、次の構成を持ちます。

  • metadata.json: グリッドに紐づく列モデルとオーバーレイモデルを格納
  • grid/part-*.gz: プロジェクトの各行を 1 行ずつ JSON で表した Gzip 圧縮テキスト

複数ファイルに分割して保存しているのは並列処理のためで、パーティションごとに 1 ワーカーで処理できます。行はソートされ、各パーティションは連続する一塊の行を含むため、特定の行番号にアクセスする際は該当パーティションだけを読めば十分です。パーティション数はプロジェクトサイズと Runner の設定に依存します。

プロジェクト全体も専用ディレクトリにシリアライズされ、次の要素で構成されます。

  • history.json: 操作と紐づく Change を含む履歴エントリのリスト
  • initial/: インポーターが生成した初期グリッドを格納
  • changes/: すべての ChangeData を履歴エントリ ID ごとのサブディレクトリに格納
  • cache/: 初期グリッド以外を保存できるキャッシュ用ディレクトリ。履歴が長くなった際に毎回初期状態から計算し直さずに済みます。

メモリ管理

(再設計中のキャッシュ戦略について追って記載予定)

ローカルランナー

ローカルランナーは OpenRefine が想定する「データ整備を行うマシン上でそのまま実行する」状況に最適化されたデフォルト実装です。Spark に着想を得ていますが、Spark 本体は分散計算や冗長化のオーバーヘッドが大きく、ローカル実行では応答性が落ちるため置き換えには適しません。また OpenRefine は多くの場面で行順序に依存しますが、Spark の多くのプリミティブは順序を保持しないため、標準実装としては扱いづらいという事情もあります。

オプション

この Runner では以下の設定キーが利用できます。

Configuration keyDefault valueDescription
refine.runner.defaultParallelism4データセットを何分割するかの基本値(極端に小さい/大きい場合は除く)
refine.runner.minSplitSize4096パーティションの最小サイズ(バイト)。これより小さいデータセットは分割されません。
refine.runner.maxSplitSize16777216パーティションの最大サイズ(バイト)。defaultParallelism * maxSplitSize を超えるデータセットは基本並列度より多く分割されます。

Partitioned Lazy List

ローカルランナーでグリッドや ChangeData の遅延表現を支える中心データ構造が Partitioned Lazy List(PLL)です。Spark の RDD を軽量化したようなもので、以下の性質を持ちます。

  • 連続した要素の順序付きコレクションとしてのリスト
  • デフォルトでは内容をメモリ上の実体として保持せず、アクセス時にオンデマンドで計算する遅延構造
  • 要素をパーティションと呼ばれる連続領域に分割するため、各パーティションを独立に列挙でき、リストの異なる部分を並列処理できる

RDD と異なり PLL は以下の特徴があります。

  • 分散していない: すべてのデータにローカルからアクセスでき、計算は同じ JVM 内で完結する
  • 冗長化されていない: 冗長性をサポートしない

PLL の並列処理は Java スレッドで実装されており、ローカルランナーはインスタンス化時にスレッドプールを起動して必要に応じて計算に利用します。