終於來到了打通任督二脈的時刻了!!!
今天要示範的是使用 Java 程式編寫一個 Person 物件,透過 Spark 將值轉入 HDFS,
再用 Spark 成功從 HDFS 取出值後,將其 Save to Phoenix (事先建立好的物件Table)
這樣的情境案例是一個 BigData 中可行的環境。
Spark 主要做為計算,YARN作為分配工作及資源
HDFS作為 Warehouse,HBase作為Storage,而 Phoenix做為快速查詢的橋樑
讓我們開始吧!
準備BigData環境:
HDFS + YARN + SPARK+ HBASE + PHOENIX
開發環境 Eclipse 加入所需Library(Spark-assembly...jar)
這些環境的建置基本上前面的文章都有提過了
今天直接把程式碼及執行的結果告訴大家!!!
1.建立程式碼如下
--我建立了2個Person物件轉成List將其填入hdfs指定的路徑<Spark會直接建目錄>
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class SparkSaveToHDFS {
public static class Person implements Serializable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("SparkSaveToHDFS");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Person Peter = new Person();
Peter.setName("Peter");
Peter.setAge(30);
Person Kevin = new Person();
Kevin.setName("Kevin");
Kevin.setAge(40);
List<Person> personList = new ArrayList<Person>();
personList.add(0, Peter);
personList.add(1,Kevin);
System.out.println("list contains Peter : " + personList.contains(Peter) + Peter.getAge());
System.out.println("list contains Kevin : " + personList.contains(Kevin) + Kevin.getAge());
JavaRDD<Person> personRdd = ctx.parallelize(personList);
personRdd.saveAsObjectFile("hdfs://hadoop-master:8020/Peter/test");
}
}
執行結果如下
#注意: 執行時 需要使用到 phoenix-hbase-client jar,在之前 Spark-shell 中時並不用加入是因為已經把這個 jar 檔加入環境變數 CLASSPATH
而在 Spark-submit 中 不使用 --driver-class-path 也是因為 CLASSPATH中已經有加入時,會造成 jar檔衝突,而改使用參數 --jars 是指 強制使用這個 jar檔
在沒有任何錯誤的情況下,可以檢查 hdfs 是否有長出了新目錄
2.我們可以使用下列程式將物件從HDFS取出並轉成 DataFrame 讀取
請修改上一支程式改為如下,記得 import SQLContext & DataFrame
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("SparkSaveToHDFS");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(ctx);
JavaRDD<Person> getPersonRdd = ctx.objectFile("hdfs://hadoop-master:8020/Peter/test");
DataFrame schemaPeople = sqlContext.createDataFrame(getPersonRdd, Person.class);
schemaPeople.registerTempTable("people");
schemaPeople.printSchema();
DataFrame people = sqlContext.sql("select * from people");
people.show();
}
執行結果如下執行 command 跟之前一樣
主要是 printSchema & show 是否 work
3.上面程式我們已成功將物件轉為DataFrame,現在我們直接要 Save to Phoenix
在執行程式前,我們必須先至 Phoenix 建立 待會要 Insert 的 table
至 phoenix/bin下 執行 sqlline.py localhost(zookeeper location)
執行 DDL
create table people1(AGE INTEGER, NAME VARCHAR NOT NULL PRIMARY KEY)
Table 建立 Table後,開始修改程式,只要加入一行
程式如下
people.write().format("org.apache.phoenix.spark").mode("overwrite").option("table", "people1").option("zkUrl", "localhost").save();
一樣 Spark-submit 後,若無錯誤出現,people 該 DataFrame 已經成功 insert了
至 phoenix 查詢 結果如下
假設最後你希望透過程式把 Phoenix 的結果 select 出來,一樣可以透過 phoenix 取出,請參考前面的文章(不一定要用 JDBC若程式在 Spark 上執行的話)
恭喜成功!
沒有留言:
張貼留言