标签: sparkIMF
##代码实战
SparkSQL2Hive.scala
package com.dt.spark.sparkapps.sql
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* 第69课:Spark SQL通过Hive数据源实战
* 本DemoWin系统下运行脚本:
* spark-submit --class com.dt.spark.sparkapps.sql.SparkSQL2Hive --master local z:/out/sparkApp.jar
* Created by Limaoran on 2016/7/5.
*/
object SparkSQL2Hive {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkSQL2Hive")
val sc = new SparkContext(conf)
/**
* 第一:在目前企业级大数据Spark开发的时候绝大多数情况下是采用Hive作为数据仓库的。
* Spark提供了Hive的支持功能,通过HiveContext可以直接操作Hive中的数据。
* 基于HiveContext我们可以使用sql/hql两种方式来编写SQL语句对Hive进行操作,
* 包括创建表、删除表、往表里导入数据以及用SQL语法构造各种SQL语句对表中的数据进行CRUD操作
*
* 第二:我们也可以直接通过saveAsTable的方式把DataFrame中的数据保存到Hive数据仓库中
* 第三:可以直接通过HiveContext.table方法来直接加载Hive中的表而生成DataFrame
*/
val hiveContext = new HiveContext(sc)
hiveContext.sql("use hive") //使用Hive数据仓库中的hive数据库
hiveContext.sql("DROP TABLE IF EXISTS people") //删除同名的Table
hiveContext.sql("CREATE TABLE IF NOT EXISTS people(name String,age Int) "+
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'") //创建自定义的表
/**
* 把本地数据加载到Hive数据仓库中(背后实际上发生了数据的拷贝)
* 当然也可以通过LOAD DATA INPATH去获得HDFS等上面的数据到Hive中(此时发生了数据的移动)
*/
hiveContext.sql("LOAD DATA LOCAL INPATH 'G:/runtime/spark-1.6.0/examples/src/main/resources/people.txt' INTO TABLE people " )
hiveContext.sql("DROP TABLE IF EXISTS peoplescores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS peoplescores(name String,score Int)"+
"ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'")
hiveContext.sql("LOAD DATA LOCAL INPATH 'G:/runtime/spark-1.6.0/examples/src/main/resources/peoplescores.txt' INTO TABLE peoplescores ")
/**
* 通过HiveContext使用join直接基于Hive中的两张表进行操作,获取大于90分的人的name、age、score
*/
val resultDF = hiveContext.sql("SELECT p.name,p.age,ps.score FROM people as p JOIN peoplescores as ps " +
"ON p.name=ps.name where ps.score>90")
resultDF.show()
/**
* 通过saveAsTable创建一张Hive Managed Table,数据的元数据和数据即将放的具体的位置都是由Hive数据仓库进行管理的,
* 当删除该表的时候,数据也会一起被删除(磁盘上的数据不再存在)。
*/
hiveContext.sql("DROP TABLE IF EXISTS peopleInfomationResult")
resultDF.write.saveAsTable("peopleInfomationResult")
/**
* 使用HiveContext的table方法可以直接读取Hive数据仓库中的Table并生成DataFrame,
* 接下来就可以进行机器学习、图计算、各种复杂ETL等操作。
*/
val dataFromHive = hiveContext.table("peopleInfomationResult")
dataFromHive.show()
sc.stop()
}
}
Windows下部署此程序脚本
spark-submit --class com.dt.spark.sparkapps.sql.SparkSQL2Hive --master local /out/sparkApp.jar
##测试源数据
###people.txt
Michael,29
Andy,30
Justin,19
LiMing,18
###peoplescores.txt
Michael 99
Andy 97
Justin 68
LiMing 77