在 Dataproc 上使用 Apache Spark 搭配 HBase

目標

本教學課程將示範如何:

  1. 建立 Dataproc 叢集,並在叢集中安裝 Apache HBase 和 Apache ZooKeeper
  2. 使用在 Dataproc 叢集主節點上執行的 HBase 殼層建立 HBase 資料表
  3. 使用 Cloud Shell 將 Java 或 PySpark Spark 工作提交至 Dataproc 服務,該服務會寫入資料,然後讀取 HBase 資料表中的資料

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

您可以使用 Pricing Calculator 根據預測用量產生預估費用。新 Google Cloud 使用者可能符合申請免費試用的資格。

事前準備

如果您尚未建立 Google Cloud Platform 專案,請先建立專案。

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  8. 建立 Dataproc 叢集

    1. Cloud Shell 工作階段終端機中執行下列指令,以便:

      • 安裝 HBaseZooKeeper 元件
      • 佈建三個 worker 節點 (建議在本教學課程中執行程式碼時,使用三到五個 worker)
      • 啟用元件閘道
      • 使用圖片 2.0 版
      • 使用 --properties 旗標將 HBase 設定和 HBase 程式庫新增至 Spark 驅動程式和執行緒的類別路徑。
    gcloud dataproc clusters create cluster-name \
        --region=region \
        --optional-components=HBASE,ZOOKEEPER \
        --num-workers=3 \
        --enable-component-gateway \
        --image-version=2.0 \
        --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
    

    驗證連接器安裝

    1. 透過 Google Cloud 主控台或 Cloud Shell 工作階段終端機,使用 SSH 連線至 Dataproc 叢集的主要節點

    2. 確認主節點上是否已安裝 Apache HBase Spark 連接器

      ls -l /usr/lib/spark/jars | grep hbase-spark
      
      輸出內容範例:
      -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
      

    3. 請保持 SSH 工作階段終端機為開啟狀態,以便執行下列操作:

      1. 建立 HBase 資料表
      2. (Java 使用者):在叢集的主節點上執行指令,以判斷叢集中安裝的元件版本
      3. 執行程式碼後,掃描 Hbase 資料表

    建立 HBase 資料表

    在上一個步驟中開啟的主節點 SSH 工作階段終端機中,執行本節所列的指令。

    1. 開啟 HBase shell:

      hbase shell
      

    2. 建立具有「cf」資料欄系列的 HBase「my-table」:

      create 'my_table','cf'
      

      1. 如要確認表格建立作業,請在 Google Cloud 控制台中,按一下 Google Cloud 控制台「元件閘道」連結中的「HBase」HBase,開啟 Apache HBase UI。my-table 會列在「首頁」頁面的「資料表」專區。

    查看 Spark 程式碼

    Java

    package hbase;
    
    import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SparkHBaseMain {
        public static class SampleData implements Serializable {
            private String key;
            private String name;
    
    
            public SampleData(String key, String name) {
                this.key = key;
                this.name = name;
            }
    
            public SampleData() {
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
    
            public String getKey() {
                return key;
            }
    
            public void setKey(String key) {
                this.key = key;
            }
        }
        public static void main(String[] args) {
            // Init SparkSession
            SparkSession spark = SparkSession
                    .builder()
                    .master("yarn")
                    .appName("spark-hbase-tutorial")
                    .getOrCreate();
    
            // Data Schema
            String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                    "\"rowkey\":\"key\"," +
                    "\"columns\":{" +
                    "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                    "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                    "}" +
                    "}";
    
            Map<String, String> optionsMap = new HashMap<String, String>();
            optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);
    
            Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                    new SampleData("key1", "foo"),
                    new SampleData("key2", "bar")), SampleData.class);
    
            // Write to HBase
            ds.write()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .mode("overwrite")
                    .save();
    
            // Read from HBase
            Dataset dataset = spark.read()
                    .format("org.apache.hadoop.hbase.spark")
                    .options(optionsMap)
                    .option("hbase.spark.use.hbasecontext", "false")
                    .load();
            dataset.show();
        }
    }
    

    Python

    from pyspark.sql import SparkSession
    
    # Initialize Spark Session
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-hbase-tutorial') \
      .getOrCreate()
    
    data_source_format = ''
    
    # Create some test data
    df = spark.createDataFrame(
        [
            ("key1", "foo"),
            ("key2", "bar"),
        ],
        ["key", "name"]
    )
    
    # Define the schema for catalog
    catalog = ''.join("""{
        "table":{"namespace":"default", "name":"my_table"},
        "rowkey":"key",
        "columns":{
            "key":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"cf", "col":"name", "type":"string"}
        }
    }""".split())
    
    # Write to HBase
    df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()
    
    # Read from HBase
    result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
    result.show()

    執行程式碼

    1. 開啟 Cloud Shell 工作階段終端機。

    2. GoogleCloudDataproc/cloud-dataproc 存放區複製到 Cloud Shell 工作階段終端機:

      git clone https://.com/GoogleCloudDataproc/cloud-dataproc.git
      

    3. 切換至 cloud-dataproc/spark-hbase 目錄:

      cd cloud-dataproc/spark-hbase
      
      輸出內容範例:
      user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
      

    4. 提交 Dataproc 工作。

    Java

    1. pom.xml 檔案中設定元件版本。
      1. 「Dataproc 2.0.x 版本」頁面會列出 Scala、Spark 和 HBase 元件版本,以及最新和過去四個 2.0 子次要版本。
        1. 如要查看 2.0 映像檔版本叢集的次次要版本,請在Google Cloud 主控台的「Clusters」頁面中按一下叢集名稱,開啟「Cluster details」頁面,其中列出叢集的「Image version」
      2. 或者,您也可以從叢集的主要節點,在 SSH 工作階段終端機中執行下列指令,以判斷元件版本:
        1. 檢查 Scala 版本:
          scala -version
          
        2. 檢查 Spark 版本 (按下 Control-D 即可退出):
          spark-shell
          
        3. 查看 HBase 版本:
          hbase version
          
        4. 在 Maven pom.xml 中找出 Spark、Scala 和 HBase 版本依附元件:
          <properties>
            <scala.version>scala full version (for example, 2.12.14)</scala.version>
            <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
            <spark.version>spark version (for example, 3.1.2)</spark.version>
            <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
            <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
          </properties>
          
          注意:hbase-spark.version 是目前的 Spark HBase 連接器版本,請勿變更這個版本號碼。
      3. 在 Cloud Shell 編輯器中編輯 pom.xml 檔案,插入正確的 Scala、Spark 和 HBase 版本號碼。完成編輯後,按一下「Open Terminal」即可返回 Cloud Shell 終端機指令列。
        cloudshell edit .
        
      4. 在 Cloud Shell 中切換至 Java 8。您需要這個 JDK 版本才能建構程式碼 (您可以忽略任何外掛程式警告訊息):
        sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
        
      5. 驗證 Java 8 安裝作業:
        java -version
        
        輸出內容範例:
        openjdk version "1.8..."
         
    2. 建構 jar 檔案:
      mvn clean package
      
      .jar 檔案會放在 /target 子目錄中 (例如 target/spark-hbase-1.0-SNAPSHOT.jar)。
    3. 提交工作。

      gcloud dataproc jobs submit spark \
          --class=hbase.SparkHBaseMain  \
          --jars=target/filename.jar \
          --region=cluster-region \
          --cluster=cluster-name
      
      • --jars:請在「target/」後方和「.jar」前方插入 .jar 檔案的名稱。
      • 如果您在建立叢集時未設定 Spark 驅動程式和執行緒 HBase 類別路徑,則必須在每個工作提交作業中設定這些類別路徑,方法是在工作提交指令中加入下列 ‑‑properties 標記:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    4. 在 Cloud Shell 工作階段終端機輸出內容中查看 HBase 資料表輸出內容:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    Python

    1. 提交工作。

      gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
          --region=cluster-region \
          --cluster=cluster-name
      
      • 如果您在建立叢集時未設定 Spark 驅動程式和執行緒 HBase 類別路徑,則必須在每個工作提交作業中設定這些類別路徑,方法是在工作提交指令中加入下列 ‑‑properties 標記:
        --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
               

    2. 在 Cloud Shell 工作階段終端機輸出內容中查看 HBase 資料表輸出內容:

      Waiting for job output...
      ...
      +----+----+
      | key|name|
      +----+----+
      |key1| foo|
      |key2| bar|
      +----+----+
      

    掃描 HBase 資料表

    您可以在驗證連接器安裝程序中開啟的主節點 SSH 工作階段終端機中,執行下列指令,掃描 HBase 資料表的內容:

    1. 開啟 HBase shell:
      hbase shell
      
    2. 掃描「my-table」:
      scan 'my_table'
      
      輸出內容範例:
      ROW               COLUMN+CELL
       key1             column=cf:name, timestamp=1647364013561, value=foo
       key2             column=cf:name, timestamp=1647364012817, value=bar
      2 row(s)
      Took 0.5009 seconds
      

清除所用資源

完成教學課程後,您可以清除所建立的資源,這樣資源就不會占用配額並產生費用。下列各節將說明如何刪除或關閉這些資源。

刪除專案

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。

如要刪除專案:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

刪除叢集

  • 如要刪除叢集:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}