Skip to content

Latest commit

 

History

History
101 lines (83 loc) · 3.69 KB

File metadata and controls

101 lines (83 loc) · 3.69 KB

第69课:Spark SQL通过Hive数据源实战

标签: sparkIMF


##代码实战

SparkSQL2Hive.scala

package com.dt.spark.sparkapps.sql

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}


/**
 * 第69课:Spark SQL通过Hive数据源实战
 * 本DemoWin系统下运行脚本:
 *  spark-submit --class com.dt.spark.sparkapps.sql.SparkSQL2Hive --master local z:/out/sparkApp.jar
 * Created by Limaoran on 2016/7/5.
 */
object SparkSQL2Hive {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkSQL2Hive")
    val sc = new SparkContext(conf)
    /**
     * 第一:在目前企业级大数据Spark开发的时候绝大多数情况下是采用Hive作为数据仓库的。
     * Spark提供了Hive的支持功能,通过HiveContext可以直接操作Hive中的数据。
     * 基于HiveContext我们可以使用sql/hql两种方式来编写SQL语句对Hive进行操作,
     * 包括创建表、删除表、往表里导入数据以及用SQL语法构造各种SQL语句对表中的数据进行CRUD操作
     *
     * 第二:我们也可以直接通过saveAsTable的方式把DataFrame中的数据保存到Hive数据仓库中
     * 第三:可以直接通过HiveContext.table方法来直接加载Hive中的表而生成DataFrame
     */
    val hiveContext = new HiveContext(sc)
    hiveContext.sql("use hive")   //使用Hive数据仓库中的hive数据库
    hiveContext.sql("DROP TABLE IF EXISTS people") //删除同名的Table
    hiveContext.sql("CREATE TABLE IF NOT EXISTS  people(name String,age Int) "+
      "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'")  //创建自定义的表
      /**
       * 把本地数据加载到Hive数据仓库中(背后实际上发生了数据的拷贝)
       * 当然也可以通过LOAD DATA INPATH去获得HDFS等上面的数据到Hive中(此时发生了数据的移动)
       */
    hiveContext.sql("LOAD DATA LOCAL INPATH 'G:/runtime/spark-1.6.0/examples/src/main/resources/people.txt' INTO TABLE people " )

    hiveContext.sql("DROP TABLE IF EXISTS peoplescores")
    hiveContext.sql("CREATE TABLE IF NOT EXISTS  peoplescores(name String,score Int)"+
      "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'")
    hiveContext.sql("LOAD DATA LOCAL INPATH 'G:/runtime/spark-1.6.0/examples/src/main/resources/peoplescores.txt' INTO TABLE peoplescores ")

    /**
     * 通过HiveContext使用join直接基于Hive中的两张表进行操作,获取大于90分的人的name、age、score
     */
    val resultDF = hiveContext.sql("SELECT p.name,p.age,ps.score FROM people as p JOIN peoplescores as ps " +
      "ON p.name=ps.name where ps.score>90")
    resultDF.show()

    /**
     * 通过saveAsTable创建一张Hive Managed Table,数据的元数据和数据即将放的具体的位置都是由Hive数据仓库进行管理的,
     * 当删除该表的时候,数据也会一起被删除(磁盘上的数据不再存在)。
     */
    hiveContext.sql("DROP TABLE IF EXISTS peopleInfomationResult")
    resultDF.write.saveAsTable("peopleInfomationResult")

    /**
     * 使用HiveContext的table方法可以直接读取Hive数据仓库中的Table并生成DataFrame,
     * 接下来就可以进行机器学习、图计算、各种复杂ETL等操作。
     */
    val dataFromHive = hiveContext.table("peopleInfomationResult")
    dataFromHive.show()

    sc.stop()
  }
}

Windows下部署此程序脚本

spark-submit --class com.dt.spark.sparkapps.sql.SparkSQL2Hive --master local /out/sparkApp.jar

##测试源数据

###people.txt

Michael,29
Andy,30
Justin,19
LiMing,18

###peoplescores.txt

Michael	99
Andy	97
Justin	68
LiMing	77