Workflows を使用したサービスの接続

このチュートリアルでは、Workflows を使用して一連のサービスをリンクする方法について説明します。2 つのパブリック HTTP サービス(Cloud Run functions を使用)、外部 REST API、プライベート Cloud Run サービスを接続すると、柔軟でサーバーレスなアプリケーションを作成できます。

最初の Cloud Run functions の関数をデプロイする

HTTP リクエストを受信すると、この HTTP 関数は 1~100 の間の乱数を生成し、その数値を JSON 形式で返します。

  1. randomgen という名前のディレクトリを作成し、そのディレクトリに移動します。

    mkdir ~/randomgen
    cd ~/randomgen
  2. 次の Python コードを含むテキスト ファイルを main.py というファイル名で作成します。

    import functions_framework
    import random
    from flask import jsonify
    
    
    @functions_framework.http
    def randomgen(request):
        randomNum = random.randint(1, 100)
        output = {"random": randomNum}
        return jsonify(output)
  3. HTTP 処理用の Flask への依存関係をサポートするには、pip パッケージ管理システムのテキスト ファイルを作成します。ファイル名を requirements.txt にして、以下を追加します。

    flask>=1.0.2
    functions-framework==3.0.0
  4. HTTP トリガーを使用して関数をデプロイし、未認証アクセスを許可します。

    gcloud functions deploy randomgen-function \
        --gen2 \
        --runtime python310 \
        --entry-point=randomgen \
        --trigger-http \
        --allow-unauthenticated

    関数のデプロイには数分かかることがあります。代替手段として、 Google Cloud コンソールで Cloud Run functions のインターフェースを使用して関数をデプロイすることもできます。

  5. randomgen 関数がデプロイされたら、httpsTrigger.url プロパティを確認できます。

    gcloud functions describe randomgen-function \
        --gen2 \
        --format="value(serviceConfig.uri)"
  6. URL を保存します。これは、後半の演習でワークフローのソースファイルに追加する必要があります。

  7. 次の curl コマンドを使用して、この関数を試すことができます。

    curl $(gcloud functions describe randomgen-function \
        --gen2 \
        --format="value(serviceConfig.uri)")

    数値がランダムに生成され、返されます。

2 つ目の Cloud Run functions の関数をデプロイする

HTTP リクエストを受信すると、この HTTP 関数は JSON 本文から input を抽出し、2 を乗算して、結果を JSON 形式で返します。

  1. ホーム ディレクトリに戻ります。

    cd ~
  2. multiply という名前のディレクトリを作成し、そのディレクトリに移動します。

    mkdir ~/multiply
    cd ~/multiply
  3. 次の Python コードを含むテキスト ファイルを main.py というファイル名で作成します。

    import functions_framework
    from flask import jsonify
    
    
    @functions_framework.http
    def multiply(request):
        request_json = request.get_json()
        output = {"multiplied": 2 * request_json['input']}
        return jsonify(output)
  4. HTTP 処理用の Flask への依存関係をサポートするには、pip パッケージ管理システムのテキスト ファイルを作成します。ファイル名を requirements.txt にして、以下を追加します。

    flask>=1.0.2
    functions-framework==3.0.0
  5. HTTP トリガーを使用して関数をデプロイし、未認証アクセスを許可します。

    gcloud functions deploy multiply-function \
        --gen2 \
        --runtime python310 \
        --entry-point=multiply \
        --trigger-http \
        --allow-unauthenticated

    関数のデプロイには数分かかることがあります。代替手段として、 Google Cloud コンソールで Cloud Run functions のインターフェースを使用して関数をデプロイすることもできます。

  6. multiply 関数がデプロイされたら、httpsTrigger.url プロパティを確認できます。

    gcloud functions describe multiply-function \
        --gen2\
        --format="value(serviceConfig.uri)"
  7. URL を保存します。これは、後半の演習でワークフローのソースファイルに追加する必要があります。

  8. 次の curl コマンドを使用して、この関数を試すことができます。

    curl -X POST MULTIPLY_FUNCTION_URL \
        -H "Authorization: Bearer $(gcloud auth print-identity-token)" \
        -H "Content-Type: application/json" \
        -d '{"input": 5}'

    数値 10 が返されます。

ワークフローで 2 つの Cloud Run functions の関数を接続する

ワークフローは、Workflows の構文で記述された一連のステップで構成され、YAML 形式または JSON 形式のいずれでも記述できます。これがワークフローの定義です。詳細な説明については、構文リファレンスのページをご覧ください。

  1. ホーム ディレクトリに戻ります。

    cd ~
  2. 次の内容を含むテキスト ファイルを workflow.yaml という名前で作成します。

    - randomgen_function:
        call: http.get
        args:
            url: RANDOMGEN_FUNCTION_URL
        result: randomgen_result
    - multiply_function:
        call: http.post
        args:
            url: MULTIPLY_FUNCTION_URL
            body:
                input: ${randomgen_result.body.random}
        result: multiply_result
    - return_result:
        return: ${multiply_result}
    
    • RANDOMGEN_FUNCTION_URL は、randomgen 関数の URL に置き換えます。
    • MULTIPLY_FUNCTION_URL は、multiply 関数の URL に置き換えます。

    このソースファイルにより、2 つの HTTP 関数がリンクされ、最終結果が返されます。

  3. ワークフローを作成したら、デプロイして実行の準備ができます。

    gcloud workflows deploy WORKFLOW_NAME \
        --source=workflow.yaml \
        --service-account=${SERVICE_ACCOUNT}@PROJECT_ID.iam.gserviceaccount.com

    WORKFLOW_NAME は、ワークフローの名前に置き換えます。

  4. ワークフローを実行します。

    gcloud workflows run WORKFLOW_NAME

    実行とは、ワークフローの定義に含まれるロジックを 1 回だけ実行することです。すべてのワークフローの実行は独立しており、Workflows の迅速なスケーリングにより、多数の同時実行が可能になります。

    ワークフローの実行後、出力は次のようになります。

    result: '{"body":{"multiplied":120},"code":200,"headers":{"Alt-Svc":"h3-29=\":443\";
    ...
    startTime: '2021-05-05T14:17:39.135251700Z'
    state: SUCCEEDED
    ...
    

ワークフローでパブリック REST サービスを接続する

既存のワークフローを更新し、数式を評価できるパブリック REST API(math.js)を接続します。例: curl https://api.mathjs.org/v4/?'expr=log(56)'

ワークフローはデプロイしてあるため、 Google Cloud コンソールの [ワークフロー] ページから編集することもできます。

  1. ワークフローのソースファイルを編集し、次の内容で置き換えます。

    - randomgen_function:
        call: http.get
        args:
            url: RANDOMGEN_FUNCTION_URL
        result: randomgen_result
    - multiply_function:
        call: http.post
        args:
            url: MULTIPLY_FUNCTION_URL
            body:
                input: ${randomgen_result.body.random}
        result: multiply_result
    - log_function:
        call: http.get
        args:
            url: https://api.mathjs.org/v4/
            query:
                expr: ${"log(" + string(multiply_result.body.multiplied) + ")"}
        result: log_result
    - return_result:
        return: ${log_result}
    
    • RANDOMGEN_FUNCTION_URL は、randomgen 関数の URL に置き換えます。
    • MULTIPLY_FUNCTION_URL は、multiply 関数の URL に置き換えます。

    これにより、外部 REST サービスが Cloud Run functions にリンクされ、最終結果が返されます。

  2. 変更されたワークフローをデプロイします。

    gcloud workflows deploy WORKFLOW_NAME \
        --source=workflow.yaml \
        --service-account=${SERVICE_ACCOUNT}@PROJECT_ID.iam.gserviceaccount.com

Cloud Run サービスをデプロイする

HTTP リクエストを受信後に JSON 本文から input を抽出してその math.floor を計算し、結果を返す Cloud Run サービスをデプロイします。

  1. floor という名前のディレクトリを作成し、そのディレクトリに移動します。

    mkdir ~/floor
    cd ~/floor
  2. 次の Python コードを含むテキスト ファイルを app.py というファイル名で作成します。

    import json
    import logging
    import os
    import math
    
    from flask import Flask, request
    
    app = Flask(__name__)
    
    
    @app.route('/', methods=['POST'])
    def handle_post():
        content = json.loads(request.data)
        input = float(content['input'])
        return f"{math.floor(input)}", 200
    
    
    if __name__ != '__main__':
        # Redirect Flask logs to Gunicorn logs
        gunicorn_logger = logging.getLogger('gunicorn.error')
        app.logger.handlers = gunicorn_logger.handlers
        app.logger.setLevel(gunicorn_logger.level)
        app.logger.info('Service started...')
    else:
        app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

  3. 同じディレクトリに、次の内容の Dockerfile を作成します。

    # Use an official lightweight Python image.
    # https://hub.docker.com/_/python
    FROM python:3.7-slim
    
    # Install production dependencies.
    RUN pip install Flask gunicorn
    
    # Copy local code to the container image.
    WORKDIR /app
    COPY . .
    
    # Run the web service on container startup. Here we use the gunicorn
    # webserver, with one worker process and 8 threads.
    # For environments with multiple CPU cores, increase the number of workers
    # to be equal to the cores available.
    CMD exec gunicorn --bind 0.0.0.0:8080 --workers 1 --threads 8 app:app

  4. Docker コンテナ イメージを保存できる Artifact Registry 標準リポジトリを作成します。

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=${REGION}

    REPOSITORY は、リポジトリの一意の名前に置き換えます。

  5. コンテナ イメージをビルドします。

    export SERVICE_NAME=floor
    gcloud builds submit --tag ${REGION}-docker.pkg.dev/PROJECT_ID/REPOSITORY/${SERVICE_NAME}
  6. コンテナ イメージを Cloud Run にデプロイし、認証された呼び出しのみが受け入れられるようにします。

    gcloud run deploy ${SERVICE_NAME} \
        --image ${REGION}-docker.pkg.dev/PROJECT_ID/REPOSITORY/${SERVICE_NAME}:latest \
        --no-allow-unauthenticated

サービス URL が表示されたら、デプロイは完了しています。ワークフロー定義を更新するときに、この URL を指定する必要があります。

ワークフローで Cloud Run サービスを接続する

既存のワークフローを更新して、Cloud Run サービスの URL を指定します。

  1. ホーム ディレクトリに戻ります。

    cd ~
  2. ワークフローのソースファイルを編集し、次の内容で置き換えます。

    - randomgen_function:
        call: http.get
        args:
            url: RANDOMGEN_FUNCTION_URL
        result: randomgen_result
    - multiply_function:
        call: http.post
        args:
            url: MULTIPLY_FUNCTION_URL
            body:
                input: ${randomgen_result.body.random}
        result: multiply_result
    - log_function:
        call: http.get
        args:
            url: https://api.mathjs.org/v4/
            query:
                expr: ${"log(" + string(multiply_result.body.multiplied) + ")"}
        result: log_result
    - floor_function:
        call: http.post
        args:
            url: CLOUD_RUN_SERVICE_URL
            auth:
                type: OIDC
            body:
                input: ${log_result.body}
        result: floor_result
    - create_output_map:
        assign:
          - outputMap:
              randomResult: ${randomgen_result}
              multiplyResult: ${multiply_result}
              logResult: ${log_result}
              floorResult: ${floor_result}
    - return_output:
        return: ${outputMap}
    
    • RANDOMGEN_FUNCTION_URL は、randomgen 関数の URL に置き換えます。
    • MULTIPLY_FUNCTION_URL は、multiply 関数の URL に置き換えます。
    • CLOUD_RUN_SERVICE_URL は、Cloud Run サービス URL に置き換えます。

    これにより、ワークフローで Cloud Run サービスが接続されます。auth キーを使用すると、Cloud Run サービスの呼び出しで認証トークンが渡されます。詳細については、ワークフローから認証済みリクエストを行うをご覧ください。

  3. 変更されたワークフローをデプロイします。

    gcloud workflows deploy WORKFLOW_NAME \
        --source=workflow.yaml \
        --service-account=${SERVICE_ACCOUNT}@PROJECT_ID.iam.gserviceaccount.com
  4. 最終ワークフローを実行します。

    gcloud workflows run WORKFLOW_NAME

    出力は次のようになります。

    result: '{"floorResult":{"body":"4","code":200
      ...
      "logResult":{"body":"4.02535169073515","code":200
      ...
      "multiplyResult":{"body":{"multiplied":56},"code":200
      ...
      "randomResult":{"body":{"random":28},"code":200
      ...
    startTime: '2023-11-13T21:22:56.782669001Z'
    state: SUCCEEDED
    

これで完了です。一連のサービスを接続するワークフローのデプロイと実行が完了しました。

式、条件付きジャンプ、Base64 エンコードまたはデコード、サブワークフローなどを使用して複雑なワークフローを作成するには、Workflows 構文リファレンス標準ライブラリの概要をご覧ください。