使用 Cloud Composer 的工作流程

在本文件中,您會使用 Google Cloud的下列計費元件:

  • Dataproc
  • Compute Engine
  • Cloud Composer

您可以使用 Pricing Calculator 根據預測用量產生預估費用。新 Google Cloud 使用者可能符合申請免費試用的資格。

事前準備

設定專案

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  5. Install the Google Cloud CLI.

  6. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Composer APIs.

    Enable the APIs

  11. Install the Google Cloud CLI.

  12. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  13. To initialize the gcloud CLI, run the following command:

    gcloud init
  14. 建立 Dataproc 工作流程範本

    在本機終端機視窗或 Cloud Shell 中複製並執行下列指令,即可建立並定義工作流程範本

    1. 建立 sparkpi 工作流程範本。
      gcloud dataproc workflow-templates create sparkpi \
          --region=us-central1
            
    2. 將 Spark 工作新增至 sparkpi 工作流程範本。「compute」step-id 標記可識別 SparkPi 工作。
      gcloud dataproc workflow-templates add-job spark \
          --workflow-template=sparkpi \
          --step-id=compute \
          --class=org.apache.spark.examples.SparkPi \
          --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
          --region=us-central1 \
          -- 1000
            
    3. 使用受管理單節點叢集執行工作流程。Dataproc 會建立叢集,並在其中執行工作流程,然後在工作流程完成時刪除叢集。
      gcloud dataproc workflow-templates set-managed-cluster sparkpi \
          --cluster-name=sparkpi \
          --single-node \
          --region=us-central1
            
    4. 確認要建立工作流程範本。

      主控台

      在 Google Cloud 控制台的 Dataproc 工作流程頁面中,按一下 sparkpi 名稱,即可開啟「工作流程範本詳細資料」頁面。按一下工作流程範本名稱,確認 sparkpi 範本屬性。

      gcloud 指令

      執行下列指令:

      gcloud dataproc workflow-templates describe sparkpi --region=us-central1
          

    建立 DAG 並上傳至 Cloud Storage

    1. 建立或使用現有的 Cloud Composer 環境
    2. 設定環境變數。

      Airflow UI

      1. 在工具列中,依序按一下「管理」>「變數」
      2. 按一下「建立」。
      3. 輸入下列資訊:
        • 鍵:project_id
        • Val:PROJECT_ID - 您的 Google Cloud 專案 ID
      4. 按一下 [儲存]

      gcloud 指令

      輸入下列指令:

      • ENVIRONMENT 是 Cloud Composer 環境的名稱
      • LOCATION 是 Cloud Composer 環境所在的地區
      • PROJECT_ID 是包含 Cloud Composer 環境的專案 ID
          gcloud composer environments run ENVIRONMENT --location LOCATION variables set -- project_id PROJECT_ID
          
    3. 將下列 DAG 程式碼複製到本機的「composer-dataproc-dag.py」檔案中,該檔案會使用 DataprocInstantiateWorkflowTemplateOperator

      Airflow 2

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.providers.google.cloud.operators.dataproc import (
          DataprocInstantiateWorkflowTemplateOperator,
      )
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = DataprocInstantiateWorkflowTemplateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              region="us-central1",
          )
      

      Airflow 1

      
      """Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
      Spark Pi Job.
      
      This DAG relies on an Airflow variable
      https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
      * project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
      """
      
      import datetime
      
      from airflow import models
      from airflow.contrib.operators import dataproc_operator
      from airflow.utils.dates import days_ago
      
      project_id = "{{var.value.project_id}}"
      
      
      default_args = {
          # Tell airflow to start one day ago, so that it runs as soon as you upload it
          "start_date": days_ago(1),
          "project_id": project_id,
      }
      
      # Define a DAG (directed acyclic graph) of tasks.
      # Any task you create within the context manager is automatically added to the
      # DAG object.
      with models.DAG(
          # The id you will see in the DAG airflow page
          "dataproc_workflow_dag",
          default_args=default_args,
          # The interval with which to schedule the DAG
          schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
      ) as dag:
          start_template_job = dataproc_operator.DataprocWorkflowTemplateInstantiateOperator(
              # The task id of your job
              task_id="dataproc_workflow_dag",
              # The template id of your workflow
              template_id="sparkpi",
              project_id=project_id,
              # The region for the template
              # For more info on regions where Dataflow is available see:
              # https://cloud.google.com/dataflow/docs/resources/locations
              region="us-central1",
          )
      
    4. DAG上傳至 Cloud Storage 中的環境資料夾。上傳完成後,請按一下 Cloud Composer 環境頁面中的「DAG 資料夾」連結。

    查看工作狀態

    Airflow UI

    1. 開啟 Airflow 網頁介面
    2. 在 DAG 頁面中,按一下 DAG 名稱 (例如 dataproc_workflow_dag)。
    3. 在 DAG 詳細資料頁面中,按一下「圖表檢視」
    4. 檢查狀態:
      • 失敗:工作會以紅色方框標示。您也可以將游標懸停在工作上,然後查看「狀態:失敗」工作周圍有紅色方塊,表示工作失敗
      • 成功:工作周圍有綠色方塊。您也可以將滑鼠游標懸停在工作上,確認工具提示是否顯示「State: Success」工作周圍有綠色方塊,表示工作已成功

    控制台

    按一下「工作流程」分頁標籤,查看工作流程狀態。

    gcloud 指令

    gcloud dataproc operations list \
        --region=us-central1 \
        --filter="labels.goog-dataproc-workflow-template-id=sparkpi"
        

    正在清除所用資源

    如要避免系統向您的 Google Cloud 帳戶收取費用,您可以刪除本教學課程中使用的資源:

    1. 刪除 Cloud Composer 環境。

    2. 刪除工作流程範本

    後續步驟