使用 Python 適用的 Cloud 用戶端程式庫

本教學課程包含Cloud Shell 逐步操作說明,使用 Python 適用的 Google Cloud 用戶端程式庫,透過程式呼叫 Dataproc gRPC API 來建立叢集,並將工作提交至叢集。

以下各節說明 GoogleCloudPlatform/python-dataproc 存放區中包含的操作說明程式碼。

執行 Cloud Shell 逐步操作說明

按一下「Open in Cloud Shell」,即可執行操作說明。

在 Cloud Shell 中開啟

瞭解程式碼

應用程式預設憑證

本教學課程的 Cloud Shell 操作說明會使用您的 Google Cloud 專案憑證進行驗證。在本機執行程式碼時,建議您使用服務帳戶憑證驗證程式碼。

建立 Dataproc 叢集

系統會設定下列值來建立叢集:

  • 即將建立叢集的所在專案
  • 要建立叢集的區域
  • 叢集名稱
  • 叢集設定,指定一個主機和兩個主要工作站

系統會使用預設設定來設定其餘叢集。您可以覆寫預設叢集設定。舉例來說,您可以新增次要 VM (預設值 = 0),或為叢集指定非預設的 VPC 網路。詳情請參閱 CreateCluster

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

提交工作

提交工作時會設定下列值:

  • 即將建立叢集的所在專案
  • 要建立叢集的區域
  • 工作設定,可用於指定叢集名稱和 PySpark 工作 Cloud Storage 檔案路徑 (URI)

詳情請參閱「SubmitJob」一文。

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

刪除叢集

下列值會設為刪除叢集:

  • 即將建立叢集的所在專案
  • 要建立叢集的區域
  • 叢集名稱

詳情請參閱 DeleteCluster

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")