読者です 読者をやめる 読者になる 読者になる

Galapagos Engineering Blog

株式会社ガラパゴス エンジニアチームによるブログです。

AWS Step FunctionsのWorkerをRubyで実装してみた

こんにちは。細羽(@hosopy)です。

先日開催されたAWS re:Inventですが、新しいサービスがドカドカと発表されてお腹いっぱいになり、良い年末を迎えられそうです。

そんな中、個人的に気になったサービスの一つに、AWS Step Functionsがあります。

今回は、AWS Step FunctionsのWorkerをRubyで実装してみたので、その内容を共有したいと思います。

なお、AWS Step Functionsについては、既に大変分かりやすい記事(ありがたい!)があるため、基本的な説明はここでは割愛させて頂きます。

qiita.com

TaskとActivity

多くのチュートリアルがそうなっていますが、Taskに割り当てられる代表的なリソースは、Lambda Functionです。

おそらく、最もAWSらしく、かつ、Serverlessのコンセプトに沿った使い方だと思います。

一方で、TaskにはLambda Function以外のリソースも割り当てることが出来ます。 EC2やECS、時にオンプレで動くバッチ処理等が、それに相当します。

でもどうやって?

AWS Step Functionsには、Lambda Function以外のリソースとTaskとを繋ぐために、Activityという概念が定義されています。

Lambda Functionが割り当てられたTaskの定義では、StateのResourceにはLambda FunctionのARNが指定されています。

{
  "Comment": "A Hello World example of the Amazon States Language using an AWS Lambda Function",
  "StartAt": "HelloWorld",
  "States": {
    "HelloWorld": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function: HelloWorldFunction",
      "End": true
    }
  }
}

一方で、Activityが割り当てられたTaskの定義では、StateのResourceにはActivityのARNが指定されます。

{
  "Comment": "A Hello World example of the Amazon States Language using an Activity",
  "StartAt": "HelloWorld",
  "States": {
    "HelloWorld": {
      "Type": "Task",
      "Resource": "arn:aws:states:REGION:ACCOUNT_ID:activity:HelloWorldActivity",
      "End": true
    }
  }
}

定義としてはシンプルですが、AWS Step Functions上では、Activityはただの識別子でしかありません。

実際のリソースを繋ぐためには、ActivityをキーにしてAWS Step Functionsと対話するWorkerを実装して動かす必要があります。

Workerを実装して動かすまでの流れ

JavaによるWorker実装のチュートリアルの内容を参考に、RubyによるWorker実装にトライしてみます。

1. Activityの定義

AWS Step Functionsのコンソールで、Tasksメニューを開き、Create new activityを選択します。

f:id:glpgsinc:20161209135654p:plain

Activity登録フォームが表示されるので、適当な名前(ここではHelloWorldActivity )を入力してActivityを作成します。

f:id:glpgsinc:20161209135941p:plain

作成後、 Activitiesの中に作成されたActivityのARNが表示されていると思います。 このARNは、State Machieの定義で使うのでメモしておきます。

arn:aws:states:ap-northeast-1:ACCOUNT_ID:activity:HelloWorldActivity

2. State Machineの定義

State Machineを定義します。 今回は、Hello Worldというblueprintのサンプルをベースにして定義してみます。

{
  "Comment": "A Hello World example of the Amazon States Language using an Activity",
  "StartAt": "HelloWorld",
  "States": {
    "HelloWorld": {
      "Type": "Task",
      "Resource": "arn:aws:states:ap-northeast-1:xxxxxxxxxxxx:activity:HelloWorldActivity",
      "End": true
    }
  }
}

f:id:glpgsinc:20161209142010p:plain

IAM Roleを選択後、State Machineの作成が完了します。

3. State Machineの実行1

まだWorkerを動かしていないので失敗するだけですが、試しにState Machineを実行してみます。

State Machine作成完了後の画面に表示されたNew executionを押してみます。

f:id:glpgsinc:20161209142137p:plain

すると、InputのJSONを編集する画面が表示されるので、適当に編集してStart Executionを押して処理を開始します。

f:id:glpgsinc:20161209151723p:plain

HelloWorldがIn progressに、Execution StatusがRunningな状態になり、処理が開始して実行中になっているのが分かります。

f:id:glpgsinc:20161209170306p:plain

ですが、、、Activityを実装しておらず、また、Workerも動かしていない状態であるため、何分待っても処理が終わらないと思います。

それでは、いったんStop Executionで処理を止めて終了し、Activityの実装に移ります。

4. Activityを実装

GitHubにソースを置きました。

github.com

main.rbが中心ですが、要素だけ抜粋します。

client = Aws::States::Client.new

loop do
  # Taskを受信するまでブロックされるが、
  # 内部でAWS Step FunctionsにPollingしているものと思われる
  resp = client.get_activity_task({ activity_arn: activity_arn })

  # Task識別子とInputを取得
  task_token = resp.task_token
  input      = resp.input

  begin
    input = JSON.parse(resp.input)

    # Inputが不正であれば、AWS Step FunctionsにFailureを通知する
    unless input.has_key?('who')
      client.send_task_failure({ task_token: task_token, cause: 'Input error' })
    end

    # AWS Step FunctionsにSuccessを通知する
    client.send_task_success({
      task_token: resp.task_token,
      output:     JSON.generate({ message: "Hello #{input['who'].to_s}!" })
    })
  rescue => err
    # AWS Step FunctionsにFailureを通知する
    client.send_task_failure({ task_token: task_token, cause: 'Unexpected error' })
  end
end

5. ActivityのWorkerを起動

実装したActivityをWorkerとして起動してみます。${ACTIVITY_ARN}は、自身で定義したActivityのARNを指定します。

$ git clone https://github.com/hosopy/aws-step-functions-hello-world
$ cd aws-step-functions-hello-world
$ bundle install
$ bundle exec ruby main.rb ${ACTIVITY_ARN}

WorkerはTask受信待ちになります。

I, [2016-12-09T16:02:05.691154 #51083]  INFO -- : Waiting for a task to start. Press Ctrl-C to interrupt.

6. State Machineの実行2

ActivityのWorkerが起動したところで、State Machineを実行してみます。

f:id:glpgsinc:20161209164842p:plain

今回実装したActivityは、Input JSONwhoというフィールドがあることを前提とするので、Input JSONを変更して実行してみます。

f:id:glpgsinc:20161209165110p:plain

すると、Worker側で受信ログが確認出来ます。

I, [2016-12-09T16:02:53.924983 #51083]  INFO -- : Get activity task : #<struct Aws::States::Types::GetActivityTaskOutput task_token="xxxxxx", input="{\n    \"who\": \"hoge\"\n}">
I, [2016-12-09T16:02:54.196109 #51083]  INFO -- : Success
I, [2016-12-09T16:02:54.196184 #51083]  INFO -- : Waiting for a task to start. Press Ctrl-C to interrupt.

そして、AWS Step Functions側でもHelloWorldが Successに、Execution Statusが Succeededな状態になり、見事にState Machineが正常終了しました!

f:id:glpgsinc:20161209165809p:plain

Outputも意図したJSONになっているようです。

f:id:glpgsinc:20161209165623p:plain

考察

Lambda vs Activity

Activityを定義してWorkerを実装することで、Lambda Function以外のリソースを、AWS Step Functionsに統合できることが分かりました。

たとえば、以下のようなLambdaに載せられない処理を組込みたい場合は、Activityとして定義すると良さ気です。

  • バッチ処理
  • GPUを必要とする処理(深層学習など)
  • どうしてもオンプレ環境でないと実行できない処理

一方で、

  • Workerの実装自体が面倒
  • Workerの可用性やスケーラビリティを考える必要がある

という点を考えると、全てのTaskがAWS Managedなコンポーネントで構成されている方が、 Serverlessとしての究極的な理想形なのかなと思います。

そのあたりは、先日のre:Inventで同時に発表された、AWS Batchあたりに期待したいところです。

最後に

株式会社ガラパゴスでは、AWSを使ったより良いアーキテクチャを追求したいエンジニアを絶賛大募集中です!

RECRUIT | 株式会社ガラパゴス iPhone/iPad/Androidのスマートフォンアプリ開発