ワークフローフレームワーク digdagの紹介と運用Tips

2017年8月24日

GHSではクライアントのKPIモニタリングの管理,更新(モニタリング内容については弊社が作成しているTHRUSTERをご参照ください。)や定期的なタスクの実行にトレジャーデータがOSSで公開しているdigdagを使用しています。
cronで実行してもいいのですが再実行性の問題があったり複雑なタスクを処理するには力足らずなので、digdagを導入しました。
digdagは公式のドキュメントがしっかりしており、実現したいことは大体ドキュメントを見ればどのように行えばいいのかがわかると思います。
タスク自体はShell/Python/Rubyを書いて実行するので大体のことはできます。

はじめかた

インストール方法

ページに載っている通りですが、下記コマンドでインストールできます。

$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc

プロジェクト作成

digdag initコマンドによりワークフローを作成できます。*.digファイルにワークフローの処理順が書いてあります。digファイルには変数や実行タイミングの記述もできます。

$ curl -o ~/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
$ chmod +x ~/bin/digdag
$ echo 'export PATH="$HOME/bin:$PATH"' >> ~/.bashrc
``````bash
$ digdag init first_project # プロジェクト作成
$ cd first_project
$ ls
first_project.dig
$ cat irst_project.dig
timezone: UTC

+setup:
  echo>: start ${session_time}

+disp_current_date:
  echo>: ${moment(session_time).utc().format('YYYY-MM-DD HH:mm:ss Z')}

+repeat:
  for_each>:
    order: [first, second, third]
    animal: [dog, cat]
  _do:
    echo>: ${order} ${animal}
  _parallel: true

+teardown:
  echo>: finish ${session_time}
$ digdag run first_project.dig  # 実行

GHSでの利用例

ざっくり下記のように運用しています。

  • digdagサーバ
  • H2DB使用
  • supervisorでdigdagサーバをデーモン化

環境等

digdagサーバ

digdagはサーバとクライアントの機能があり、GHSではサーバを1台立てて使っています。
ライトには次のコマンドで実行できます。

$ digdag server --memory

サーバを立てるとクライアトサイドのコマンドを利用してリモートでプロジェクトを追加, 実行できます。

H2DB使用

digdagサーバのデータ永続化にはオンメモリ、PostgreSQL、H2DBが使えます。
サーバを複数立て実行管理する必要があるならPostgreSQLが良いかと思いますが、今回はライトな利用でサーバも1台構成(しかも、あいのり)のためH2DBを使用しています。
サーバ起動時に指定するプロパティファイルは次のような感じです。

$ cat h2db.properties
database.type = h2
database.path = ./digdagh2db

Supervisorでdigdagサーバをデーモン化

digdagサーバはそのままではデーモン化されないので、Supervisorを使い、デーモン化しています。
digdag-server.sh

#!/bin/bash
/usr/local/bin/digdag server --config /path/to/h2db.properties

digdag.conf

[program:digdag]
command=sh /path/to/digdag-server.sh
autostart=true
startsecs=5
user=root
redirect_stderr=true
stdout_logfile=/var/log/supervisord/digdag-daemon.log

運用上Tips

ディレクトリ構成

digdagプロジェクトのディレクトリ構成は大体次のようにしています。

$ pwd
/path/to/first_project
$ tree -d 
.
├── config # 設定ファイル(*.dig)
├── sql    # 集計に使用するsqlファイル
└── tasks  # python

設定ファイル

digファイルの記述で!includeを使用すると他の*.digファイルを含めることができます。
前述のconfigディレクトリにDBやSlackの情報を記載した設定ファイルを作成し、読み込ませています。
config/mysql.dig

host: 'localhost'
password: 'password'
user: 'user'
port: 3306
database: 'db-name'

first_project.dig

timezone: UTC

# mysql設定ファイルを読み込み
_export:
    mysql:
        !include : 'config/mysql.dig'

+setup:
  echo>: start ${session_time}

+disp_current_date:
  echo>: ${moment(session_time).utc().format('YYYY-MM-DD HH:mm:ss Z')}

+repeat:
  for_each>:
    order: [first, second, third]
    animal: [dog, cat]
  _do:
    echo>: ${order} ${animal}
  _parallel: true

+teardown:
  echo>: finish ${session_time}

追加タスク並列実行

単純なタスクの並列実行はdigファイル内で_parallelを設定してあげれば実現できます。(参考)

+prepare:
  # +data1, +data2, and +data3 run in parallel.
  _parallel: true

  +data1:
    sh>: tasks/prepare_data1.sh

  +data2:
    sh>: tasks/prepare_data2.sh

  +data3:
    sh>: tasks/prepare_data3.sh

+analyze:
    sh>: tasks/analyze_prepared_data_sets.sh

digdagではAPIが用意され、タスク内でタスクを追加することもできます。Pythonで次のようにかけます。(参考)

class MyWorkflow(object):
    def __init__(self):
        pass

    def step1(self):
        for i in range(10):
            # taskを追加
            digdag.env.add_subtask(MyWorkflow.step2, arg1=i) 

    def step2(self, arg1):
        print("task2 - ", arg1)

この例だと0~9が連番で実行結果として出ます。
この処理を同時に走らせたいときは次のようにsubtask_configの設定を少し変更してあげれば解決できます。

class MyWorkflow(object):
    def __init__(self):
        pass

    def step1(self):
        for i in range(10):
            # taskを追加
            digdag.env.add_subtask(MyWorkflow.step2, arg1=i) 
        # パラレル実行
        digdag.env.subtask_config["_parallel"] = True

    def step2(self, arg1):
        print("task2 - ", arg1)

これで0-9が並列実行され連番表示ではなく、実行順になります。

まとめ

cronで消耗した人たちはdigdag使うと幸せになれるよ!

積極採用中!

GHSでは一緒に働いてくれる仲間を募集しています!