2016年2月24日 星期三

How to Deploy Spark Monitor Tool - varOne

You can reference the github url : https://github.com/SparkMonitor/varOne 
Here is  the more detail I think.

First you can download from github http://sparkmonitor.github.io/varOne/varOne-0.1.0.tgz

Or on your Spark Server with "wget" command

在實際開始安裝之前,請先修改$SPARK_HOME/conf metrics.propertis
如無此檔案,請先執行



加入
*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
*.sink.csv.period=1
*.sink.csv.unit=seconds
*.sink.csv.directory=/home/hadoop/CSV_SINK
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource


#注意 csv.directory路徑請自行更改至您對應的路徑

接著修改 $SPARK_HOME/conf下的 spark-defaults.conf
如無此檔案請先執行


加入
spark.eventLog.enabled true
spark.eventLog.dir  hdfs://hadoop-master:8020/Peter/eventLog





#注意  eventLog.dir 目前版本設定存放 hdfs,請修改及對應您的hdfs 目錄位置

解壓縮檔案(建立安裝在 namenode上,這樣遠端呼叫datanode上的 varOne不需要再輸入密碼,因為hadoop cluster環境上已設定過 ssh)




修改 $VARONE_HOME/conf3個檔案 varOne-env.sh,varOne-site.xml,varonedaemond
若無前兩個檔案,請自行複製一份並修改檔名
主要修改2個檔案

1.varOne-env.sh




2.varonedaemond








把你的Cluster環境中的所有hostname加入,注意一個hostname一行

上述全部修改完畢後,請將varOne目錄複製一份至每個datanode
我的習慣是先壓縮再傳送





開始啟動 varOned !
注意啟動前,事實上Spark也必須複製一份至各個node(請重複SCP動作)
因為varOne.sh 需要去讀取$SPARK_HOME/confmetrics.properties


















啟動 varOne.sh for WebUI




開啟 WebUI default會在 8080 port
hadoop-master:8080/varOne-web/index.html



2016年2月2日 星期二

Spark save data to HDFS(Yarn) and get data from HDFS to save to Phoenix+HBase- fully demo with Java

終於來到了打通任督二脈的時刻了!!!
今天要示範的是使用 Java 程式編寫一個 Person 物件,透過 Spark 將值轉入 HDFS,
再用 Spark 成功從 HDFS 取出值後,將其 Save to Phoenix (事先建立好的物件Table)
這樣的情境案例是一個 BigData 中可行的環境。
Spark 主要做為計算,YARN作為分配工作及資源
HDFS作為 Warehouse,HBase作為Storage,而 Phoenix做為快速查詢的橋樑
讓我們開始吧!
準備BigData環境:
HDFS + YARN + SPARK+ HBASE + PHOENIX
開發環境 Eclipse 加入所需Library(Spark-assembly...jar)

這些環境的建置基本上前面的文章都有提過了
今天直接把程式碼及執行的結果告訴大家!!!

1.建立程式碼如下
--我建立了2個Person物件轉成List將其填入hdfs指定的路徑<Spark會直接建目錄>

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class SparkSaveToHDFS {
public static class Person implements Serializable {
   private String name;
   private int age;

   public String getName() {
     return name;
   }

   public void setName(String name) {
     this.name = name;
   }

   public int getAge() {
     return age;
   }

   public void setAge(int age) {
     this.age = age;
   }
 }

public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setAppName("SparkSaveToHDFS");
   JavaSparkContext ctx = new JavaSparkContext(sparkConf);
  
   Person Peter = new Person();
   Peter.setName("Peter");
   Peter.setAge(30);
   Person Kevin = new Person();
   Kevin.setName("Kevin");
   Kevin.setAge(40);
   
   List<Person> personList = new ArrayList<Person>();
   personList.add(0, Peter);
   personList.add(1,Kevin);
   System.out.println("list contains Peter : " + personList.contains(Peter) + Peter.getAge());
   System.out.println("list contains Kevin : " + personList.contains(Kevin) + Kevin.getAge());
   
   JavaRDD<Person> personRdd = ctx.parallelize(personList);
   personRdd.saveAsObjectFile("hdfs://hadoop-master:8020/Peter/test");    
   
}
}

執行結果如下



#注意: 執行時 需要使用到 phoenix-hbase-client jar,在之前 Spark-shell 中時並不用加入是因為已經把這個 jar 檔加入環境變數 CLASSPATH
而在 Spark-submit 中 不使用 --driver-class-path 也是因為 CLASSPATH中已經有加入時,會造成 jar檔衝突,而改使用參數 --jars 是指 強制使用這個 jar檔

在沒有任何錯誤的情況下,可以檢查 hdfs 是否有長出了新目錄


2.我們可以使用下列程式將物件從HDFS取出並轉成 DataFrame 讀取
請修改上一支程式改為如下,記得 import SQLContext & DataFrame

public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setAppName("SparkSaveToHDFS");
   JavaSparkContext ctx = new JavaSparkContext(sparkConf);
   SQLContext sqlContext = new SQLContext(ctx);

   JavaRDD<Person> getPersonRdd = ctx.objectFile("hdfs://hadoop-master:8020/Peter/test");
   DataFrame schemaPeople = sqlContext.createDataFrame(getPersonRdd, Person.class);
   schemaPeople.registerTempTable("people");
   schemaPeople.printSchema();
   DataFrame people = sqlContext.sql("select * from people");
   people.show();
       
}

執行結果如下執行 command 跟之前一樣



主要是 printSchema & show 是否 work

3.上面程式我們已成功將物件轉為DataFrame,現在我們直接要 Save to Phoenix
在執行程式前,我們必須先至 Phoenix 建立 待會要 Insert 的 table

至 phoenix/bin下 執行 sqlline.py localhost(zookeeper location)

執行 DDL
create table people1(AGE INTEGER, NAME VARCHAR NOT NULL PRIMARY KEY)

Table 建立 Table後,開始修改程式,只要加入一行

程式如下

people.write().format("org.apache.phoenix.spark").mode("overwrite").option("table", "people1").option("zkUrl", "localhost").save();

一樣 Spark-submit 後,若無錯誤出現,people 該 DataFrame 已經成功 insert了

至 phoenix 查詢 結果如下



假設最後你希望透過程式把 Phoenix 的結果 select 出來,一樣可以透過 phoenix 取出,請參考前面的文章(不一定要用 JDBC若程式在 Spark 上執行的話)

恭喜成功!