2015年10月13日 星期二

使用 Sparck sqlContext 取得 DataFrame 後使用 DataFrame API 介紹

之前介紹 sqlContext 給予 連線位置、服務名稱等後,便可取回 DataFrame物件

以後介紹一些 我們可以透過 DataFrame 使用的API

我直接在 Spark-shell 環境作操作,基本上 Java環境操作 API的方式大同小異(前提是你得必須取得DataFrame物件)

1.取得 DataFrame on Spark-shell
請參考 Spark-shell with sqlContext to get database data
之前範例是取名叫 commits ,其實它就是 DataFrame的物件,現在改取名dataFrame比較好識別

2.dataFrame.count
計算你所連線 Table 的 row 筆數跟在DB執行 "Select count(*) from table" 一樣意思
從結果可以看出 該 Table 有 64筆資料

3. dataFrame.select("ColumnName")
and show(Integer)  (#顯示前幾筆資料)
這個範例是同時示範 select & show : 顯示 該 column name 的 前二筆資料
4.dataFrame.distinct & first
distinct : 顯示 不為  null 的資料
first: 顯示第一筆資料
合在一起: 顯示第一筆不為空的資料 
5.dataFrame.printSchema
列印出該 table 之 columnName
6.dataFrame.groupBy("columnName") & count & show
計算該欄位依值分成群組及計算筆數
可以看到我的欄位測試資料有一筆是 'H' ,60筆 'N',3筆 'Y'
所以該指令對於測試資料的內容可分出群組的欄位較有意義!

7.
重要!重要!重要!(很重要所以說三次)
使用 SQL Command 執行 Query
$ registerTempTable("定義TableName") & sqlContext.sql (Your SQL Command)
#這邊注意一下 registerTempTable 只是定義 tableName的名稱
實際上 我們在建立 dataFrame的時候已經指定了 Table,但是我們無法直接使用 SQL 語法
所以需要定義。
意即: 如果 dataFrame.registerTempTable("test_table")
然後執行 sqlContext.sql("select user_login_name from test_table where is_sys_admin='H'")
所得到的答案會是一樣的喔!^^"

今天就先簡單介紹到這邊嘍!







2015年10月12日 星期一

Spark-submit on Mesos Cluster

緊接著前一篇文章
基本上所謂的正式環境(Production mode)應該都會是Cluster的架構吧!
但是我目前沒有多台機器測試,所以我還是使用單機以不同的port來當作新的機器

首先請確認你有照著前一篇文章install mesos and run spark-shell on mesos
並在 spark-env.sh 有配置好 libmesos....so的位置 

1.啟動 mesos-master
  $ mesos-master.sh --ip=192.168.11.79 --work_dir=/var/lib/mesos

#觀察 http://192.168.11.79:5050

2.啟動 mesos-slave
 $mesos-slave.sh --master=192.168.11.79:5050 --port=5051 --work_dir=/tmp/mesos

3.啟動 start-mesos-dispatcher
 $./start-mesos-dispatcher.sh --master mesos:/192.168.11.79:5050

#註:start-mesos-dispatcher位置在 Spark安裝路徑的 /sbin下
 mesos-master 及 mesos-slave 安裝時已在自動在 PATH下了可直接下指令

4.至你要測試的jar檔目錄下執行 spark-submit
 $spark-submit --deploy-mode cluster --master mesos://192.168.11.79:7077 --class "SimpleApp" SimpleAppV2.jar

成功的結果如下圖可以使用瀏覽器觀察
submit成功的畫面(第三次submit的截圖,注意Driver ID)





恭喜又成功嘍!





2015年10月5日 星期一

離線安裝 Mesos for Spark running on Cent OS 6.3

前面我們提到 Spark 可以運行3種 cluster 模式,之前介紹過了 standalone,現在要介紹 Mesos
安裝 Mesos時,我發現到先前安裝的OS環境已不被支援,哭哭,於是重新安裝新的作業系統
由於我手邊沒有較新的版本,所以我從5.8換成6.3的版本而已!
如果您有較新的版本,建議使用6.6以上的版本。

許多網頁介紹了不少線上安裝的模式,但一往之前,今天也是離線安裝的版本。

我的順序是:
(1)安裝 jdk
(2)安裝 spark
(3)安裝 mesos

前兩個步驟請 follow 前面文章的介紹

我們直接開始步驟三
1. Download Mesos from 這裡
2. rpm -Uvh xxx
3.修改 mesos 啟動 master的設定檔

4.至你的Spark/conf 目錄下 將 spark-env.sh.template 複製一份改名為 spark-env.sh(前面Spark教學做過)
#cp spark-env.sh.template spark-env.sh
5.vi spark-env.sh 在下面加入
export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos-0.24.1.so
理論上 default 會安裝在 /usr/local/lib下(*請根據你的版本及路徑修改)

#初始安裝及設定至此已快速完成---------------------------------------------------------
開始執行程式嘍!
(1)啟動 mesos master
mesos-master --ip=192.168.11.79 --work_dir=/var/lib/mesos
(2)啟動 mesos slave
mesos-slave --master=192.168.11.79:5050
#用瀏覽器觀察 192.168.11.79:5050
(3)用spark-shell 執行程式於 mesos
spark-shell --master mesos://192.168.11.79:5050

這邊僅呈現 spark-shell …呈現的結果



在spark-shell 執行時 你會發現它會 submit 8個 tasks.
至於 範例程式於先前文章即可複製

恭喜 完成嘍!







2015年10月1日 星期四

在Eclipse上使用 Spark SQLContext by JDBC with Java

緊接著上一篇,畢竟開發程式不是在shell中執行(shell可用來測試)
而且小弟對於 Scala or Python不是特別熟悉(不然前面建個Eclipse + Java 做啥XD)

以下是我的範例程式碼 
內容及目的跟前一篇一樣,搜尋資料庫某個Table中的某個欄位的前8筆資料

程式說明如下:
首先建置了一個名為 SparkJDBCExample的class  (開發環境請參考使用Scale IDE with Eclipse 開發第一個Spark JAVA程式)
首先初始化一個 JavaSparkContext
再使用 sc 建立 sqlContext
DataFrame是 Spark中已經回傳的資料物件可以透過它來呼叫SparkAPI

完成程式碼:
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;


public class SparkJDBCExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
   JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:oracle:thin:user/password@localhost:1521:orcl");
options.put("dbtable", "ams_user");

DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load();
jdbcDF.select("USER_LOGIN_NAME").show(8);
}

}


完成後請照著先前文章的步驟
(1)Export JAR
(2)Copy to your spark environment
(3)Download ojdbc6.jar
(4)執行 Command(至所有jar檔的目錄下)
#spark-submit --driver-class-path ojdbc6 --class "SparkJDBCExample" SparkJDBCExample.jar
我的結果如下



這樣的結果是否跟之前在 spark-shell中執行的scala程式一樣呢!


在Spark-shell 上使用 sqlContext 取回資料庫資料

今天要練習的項目是使用 sqlContext 取回資料庫的資料
今天的範例程式是使用 spark-shell cmd,想當然爾是使用 Scala 語法

步驟(一)
由於我連線的資料庫是使用 Oracle,所以請先至官網 download ojdbc6.jar
如果您使用的是Oracle 12c,那麼請使用 ojdbc7.jar
如果資料庫是其它(mysql,mssql...)等請使用合適的Driver,程式碼上需做修改
我將 ojdbc6.jar放在我即將執行 spark-shell的目錄下
執行命令
#spark-shell -–driver-class-path ojdbc6.jar
#注意 --driver-class-path 很重要!指定Driver所在位置

執行以下範例程式

請依照您的環境做必要的修改(帳密以及連線位置,您的TableName,及搜尋欄位)
    import sqlContext.implicits._
    val commits = sqlContext.load("jdbc", Map(
      "url" -> "jdbc:oracle:thin:user/password@localhost:1521:orcl",
      "dbtable" -> "ams_user",
      "driver" -> "oracle.jdbc.driver.OracleDriver"))
    commits.select("USER_LOGIN_NAME").show(8)

我的執行結果如下
這個範例程式僅顯示前8筆資料