Skip to content

Latest commit

 

History

History
98 lines (78 loc) · 2.95 KB

File metadata and controls

98 lines (78 loc) · 2.95 KB

第57课:Spark SQL on Hive配置及实战

标签: sparkIMF


##Spark SQL on Hive

注:Spark SQL on Hive 不需要启动Yarn资源管理器!

  1. 在Spark/conf目录下新建hive-site.xml文件

    <property>
    	<name>hive.metastore.uris</name>
    	<value>thrift://Master:9083</value>
    	<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
    </property>
  2. 上传mysql驱动到spark/lib目录下

  3. 此时启动spark-shell会报错

    Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@4ed90b04, see the next exception for details.
    

    要启动数据仓库服务 hive --service metastore >metastore.log 2>& 1&

  4. 如果还是启动错误:类似不能创建 metastore_db/log

    1. 进入Spark/metastore_db目录,创建log文件夹
    2. 修改权限:chmod 666 log
  5. 重新启动spark-shell bin/spark-shell --master spark://master:7077

  6. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

  7. hiveContext.sql("use hive")

  8. hiveContext.sql("show tables").collect.foreach(println);

  9. hiveContext.sql("select count(*) from SogouQ1").collect.foreach(println)

  10. 出现错误:WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 说明集群没有可用的Core了,一般在虚拟机中经常遇到! 解决方案:spark-shell用local模式即可! spark-shell --master local

  11. hiveContext.sql("select count(*) from sogouq2 where WEBSITE like '%baidu%' ").collect.foreach(println)

  12. hiveContext.sql("select count(*) from sogouq2 where s_seq<11 and c_seq<11 and website like '%baidu'").collect.foreach(println)

##DataFrames 案例

案例来自Spark官方文档

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1

##注意:集群模式下,不需要每台机器都配置,只需要配置Master即可。

##理解才能应对变化