コラム

2022.03.23ワークフロー

代表的なワークフローエンジンをまとめて紹介!

「ワークフローエンジン」を活用することで、コンピューターの定期的な作業などをワークフローで自動化できるため、業務の効率化に役立ちます。本記事では、ワークフローエンジンの特徴や、代表的なワークフローエンジンについて解説します。

 

 

 

 

 

ワークフローエンジンとは

「ワークフローエンジン」とは、コンピューターの作業プロセスを自動で管理するソフトウェアです。これまで手動で行っていた業務プロセスの自動化・高速化を目的としており、ビジネスで活用される「ワークフローシステム」と同様に、申請・承認業務のプロセスを管理する働きをします。

 

ワークフローエンジンを使わずとも、タスクを決められた日時に行う場合にLinuxを使用する方法もありますが、これだとタスクが失敗したときにカバーできません。また、タスクの成否を確認するために、担当者がファイルをチェックしたり、失敗時の調査用ログを調べたりといった手間も発生します。

 

ワークフローエンジンを活用した場合、タスクが完了するごとに、その結果に応じて設定した次のタスクが実行できるよう、パッチの管理が可能です。

 

さらに、タスク実行後は通知を送信し、ステータスの履歴を保存したり、エラー発生時にはエラーの条件を自動で検出したりできるため、確認と再発防止に役立ちます。

 

ワークフローエンジンの導入によって業務を自動化することで、業務効率化と人的ミスの低減につながります。

 

 

 

代表的なワークフローエンジン

代表的なワークフローエンジンには「Airflow」「Digdag」「Argo」「Prefect」などがあります。以下、それぞれの特徴についてご紹介します。

 

 

Airflow

Airflowは、「DAG(有向非巡回グラフ)」と呼ばれるグラフ理論をもとに作られたワークフローエンジンです。タスクを実行する順番を定義するワークフローの作成や、実行のスケジューリング・監視を行います。

 

Airflowのアーキテクチャは大まかにいえば、実行の管理画面「Webserver」、ジョブ実行のスケジュール管理を行う「Scheduler」、ジョブ実行部の「Worker(Executor)」という3つのモジュールで構成されています。

 

それぞれ管理データベースを介して、ジョブの実行スケジュールや実行結果、依存関係などが共有されます。

 

Airflowでは、PythonプログラムやLinuxシェルで作られたファイルをタスクとして実行できます。また、Airflowの中にコーディングしてタスクを設定することも可能です。

 

これらのタスクをDAGで定義することで、依存関係のないタスクが実行されてから、次に依存関係が解消されたタスクが実行されるなど、順番にタスクが実行されていきます。

 

設定をPythonプログラムで行うと、自由度の高いスケジューリングが可能です。Airflowのチュートリアルで確認できるプログラムコードを利用したり、チュートリアルのコードをカスタマイズしたりすることで、さまざまなシステムのスケジューリングが実現します。

 

・Airflowの基本操作

Airflowの特徴がわかったところで、実際にどのように動かしていくのか、ざっくりとした基本操作を見ていきましょう。

 

Airflowをインストール

まずファーストステップとして、Airflowをインストールしていきます。

 

手動で起動する場合は、公式サイトのQuick Start(URL:https://airflow.apache.org/docs/apache-airflow/stable/start/index.html)に記載されているコマンドで実行すると楽に行えます。

 

DAGを追加

Airflowでは、DAG(Directed Acyclic Graph:有向非巡回グラフ)中に複数のタスクを定義し、一つのワークフローが作成されます。少し待ち時間は必要ですが、下記画像のコマンドを/dags以下に入力するだけで、簡単にDAGを作成することができます。

管理画面に戻って、”Tree View”をクリックし作成したDAGが正常に登録されているか確認しましょう。

 

DAGの実行

DAGを実行する方法は、Actionsの▶︎をクリック、そのまま実行する場合は「Trigger DAG」、もしくは「Add DAG Run」(Browse → DAG Runs → +アイコンクリック)からです。

 

パラメータを指定する場合は、「Trigger DAG w/config」から行なってください。

 

 

 

Digdag

Digdagは、アメリカのTreasure Data社が開発したOSSワークフローエンジンで、同社が提供するプラットフォーム「Treasure Data」と連携させて使うことが可能です。

 

Digdagでは、Airflowのように複雑なオペレートはできず、主に定期的に動かすデータパイプラインの記述で使われます。データパイプラインではタスクが直列か並列に実行されるよう、タスク間の依存関係をDigdagが管理します。

 

手動で行っているさまざまなタスクを自動化できるため、すべてのタスクをオペレータプラグインによりワークフローに定義してから、Digdagを使ってタスクを実行することが可能です。Digdagはプラグイン実行フレームワークとしてワークロード自動化の周辺を管理するため、タスクが失敗した場合などには、Digdagによって通知が送られてきます。

 

複雑なワークフローを自動化する際は、その定義も複雑になりやすいですが、Digdagを使えばタスクを小さなグループに分けてまとめ、グループごとに管理できます。

 

実行の際は、依存する相手がないタスクから実行され、次に依存するタスクをすべて完了したタスクが続々と実行され始めます。タスクグループは、親タスクから子タスクまですべてが成功した場合には「成功」、どこかで失敗した場合は「失敗」で完了します。

 

なお、Digdagをセットアップする際は、最初に「jenv」「Supervisor」「Postgres >=9.5」のソフトウェアのインストールが推奨されます。実行に必要なJavaを入れてから、始めるのがおすすめです。

 

・Digdagの基本操作

 

Digdagの特徴がわかったところで、実際にどのように動かしていくのか、ざっくりした基本操作を見ていきましょう。

 

Digdagのセットアップ

Digdagのセットアップですが、まずはJava 8(8u72) 以上をインストールします。

 

次に公式サイト(URL:http://docs.digdag.io/getting_started.html#downloading-the-latest-version)に記載されている下記のソースを取得して、ローカルで実行できるようにしましょう。

 

バージョンが表示されれば、完了になります。

 

$ curl -o ~/bin/digdag –create-dirs -L “https://dl.digdag.io/digdag-latest”

$ chmod +x ~/bin/digdag

$ echo ‘export PATH=”$HOME/bin:$PATH”‘ >> ~/.bashrc

 

ワークフローの作成

Digdagのワークフローは、「*.dig」拡張子が付いたファイルとして定義されます。ここでは、「hello_worldワークフロー」の作成例を記載していますので、下記のファイル内容をご確認ください。

 

 

画像のタイムゾーンは「Asia/Tokyo」となっていますが、デフォルトでは「UTC」、他には「America/Los_Angeles」などがあります。「+」記号から始まっているものがタスクで、上から順番に実行されます。

 

また、「sh>;」「py>;」などで表現されているものは、オペレーターと呼ばれています。

 

・sh>;:シェルの実行

・py>;:pythonスクリプトの実行

・echo>;:メッセージの表示

 

 

 

Argo

Argoは、2017年にApplatix社で開発された、Kubernetes上で動作するワークフローエンジンです。Kubernetes上にある並列タスクを管理でき、機械学習などに適用される複雑なバッチ処理の管理に使用されています。

 

Argoワークフローは、複数タスクの重なりや有向非巡回グラフ(DAG)によって、タスクの依存関係を洗い出してから作成します。コンテナを1つのパッケージとして取り扱うこのコンテナネイティブなワークフローエンジンでは、環境依存性があるものをコンテナとして隔離できるメリットがあります。

 

各タスクがPodによって実行されるので、Argoワークフローでは断片的な時間で機械学習やデータ処理の計算を実行することも可能です。コンテナのおかげでソフトウェアの構成が複雑になることなく、Kubernetes上にCICDパイプラインの作成も行えます。

 

Argoワークフローでは、ステップやコンテナ対応の実行パラメータなどによってワークフロー定義されたYAMLファイルにより、UI画面が指示され、Argoコマンドが実行されてワークフローが開始します。

 

また、Kubernetesのイベント駆動型ワークフロー自動化フレームワーク上で、さまざまなソースから発生するArgoイベントによってタスクが開始した場合も、自動化されたワークフローの流れが始まり、問題が生じなければ完了まで進むことが可能です。

 

ArgoイベントはWebhookやスケジュール、メッセージ、SNSなど、さまざまなものがきっかけで発生します。

 

Webhookなどのイベント元から通知が届いたときに、Argoイベントの監視が通知に気づいてイベントが実行され、ワークフローが開始する仕組みです。

 

・Argoの基本操作

Argoの概要や特徴がわかったところで、次は簡単な基本操作を見ていきましょう。

 

Argo WorkflowはQuick Startも存在しますが、日本語の記事は多くなく、Kubernetesの環境を構築してArgo Workflowを動かすのがほとんどです。

 

ここでは、minikubeを使ってkubenetesクラスタを構築し、Argo Workflowをインストールする方法をご紹介します。

 

minikubeの導入

はじめにHomebrewでminikubeをインストールして、「minikube start」を入力してminikubeを起動しましょう。

 

正常に起動すると、画像のような画面が表示されます。

(URL:https://cdn-ak.f.st-hatena.com/images/fotolife/l/lapis_zero09/20190228/20190228163904.png

 

kubectlでコンテキストを確認して、minikubeに向いていることを確認できたら完了になります。

 

Argo Workflowのインストール

kubenetesを構築できたら、次にArgo CLIをインストールします。

 

Namespaceの作成→defaultサービスアカウントに権限を付与して、導入は完了です。上手くいっているかどうか、Argo UIにアクセスしてみましょう。

 

Workflowの実行

Workflowを実行するためには、dockerのwhalesayイメージを指定→cowsayコマンドを実行でOKです。

 

Workflowが実行されているかは、Argo list、もしくはArgo UIのWorkflowをクリックでも詳細を確認することができます。

 

必要なコマンドは今回参考にさせていただいた下記のサイトに記載されていますので、ぜひ参考にしてみてください。

 

 

 

Prefect

Prefectは、Prefectチームから提供されるPython製のOSSワークフローエンジンです。最新のインフラストラクチャ用に設計されていて、スケジュール実行よりかは、手動で実行する単発のワークフローのほうが比較的実装しやすい特徴があります。

 

Prefectは問題発生のリスクを減らし、さらに問題発生時もサポートができることを目標として作られています。Prefectでは、Pythonの関数を定義していく方法でワークフローの構築が可能です。

 

Prefectには、ほかにも「Prefect Cloud」「Prefect Orion」という2つの製品があり、Prefect Cloudは本番運用に適した有償のクラウドサービス、Prefect OrionはPrefectの第2世代オーケストレーションエンジンです。それぞれ特徴が異なるので、目的に応じて使用するワークフローエンジンを選ぶ必要があります。

 

Prefectを活用した場合、タスク間のデータのやり取りや、ローカルでのデバックがしやすくなるなどのメリットもあります。また、実行時に対象を固定しない動的なワークフローの構成も可能です。そのほか、タスクを定期実行のスケジュールに入れるだけでなく、不定期に実行するスケジュールでも活用できます。

 

・Prefectの基本操作

 

Prefectの概要や特徴がわかったところで、他のワークフローシステムと同じように、ざっくりした基本操作を見ていきましょう。

 

 

Prefectをインストール

$ pip install prefectを入力して、prefectをインストール。正しくできているとコマンドが使えるようになります。

 

Hello worldを実行

Prefectでは、「@taskデコレータ」をPythonの関数に指定するだけで、簡単にワークフローのタスクを作ることができます。

 

次にFlowの部分で必要な関数(Task)を指定してワークフローを構築→最後にFlowのrun()メソッドを呼ぶことで、ワークフローが開始されます。

 

パラメータの実行

Flowを実行する際のパラメータとして、タスクで使用する値を指定することができます。

 

Parameterクラスの(”○○”)内に”param”や”name”などと指定して実行しましょう。

 

動的なFlowを構成

動的なFlowを構成する場合は、タスクの「mapメソッド」を使用します。

 

mapメソッドには、上流タスクの結果を引数として指定することで、各要素のタスクを実行できるようになります。

 

きちんと機能しているか確認したい場合は、参考サイト(URL:https://qiita.com/koji_mats/items/0533fbdeb9012a7e1494)のdynamic_flow.pyを入力して、実際の動きをチェックしてみるとよいでしょう。

 

 

 

まとめ

ワークフローエンジンとは、コンピューター上の一定の作業を自動化するシステムをいいます。ワークフローエンジンを活用することで、ビジネス上の業務を自動化して効率化を図るワークフローシステムのように、業務効率化や作業ミスの防止を実現できます。

 

代表的なワークフローエンジンには「Airflow」「Digdag」「Argo」「Prefect」などがあり、スケジューリングに適したものや定期的なタスク実行に適したものなど、それぞれ特徴が異なります。

 

コンピューター作業はもちろん、ビジネス上の幅広い業務効率化を目指すのであれば、株式会社無限が提供する「二次元ワークフロー・ソリューション」の導入がおすすめです。設定されたタスクの業務の流れから、申請・承認業務の流れまでフローを一本化し、楽に管理が行えます。

 

 


 

 

株式会社無限やソリューションなどへの資料請求・お問い合わせは、お気軽にご連絡ください。

PAGETOP