美高梅官方网站3045-mgm6608美高梅app下载
spark实例-DataSet之统计部门员工平均薪资和平均年龄

spark实例-DataSet之统计部门员工平均薪资和平均年龄

作者:美高梅官方网站3045    来源:未知    发布时间:2019-11-25 10:36    浏览量:

dataframe是在spark1.3.0中推出的新的api,那让spark具备了管理大面积结构化数据的本领,在比原本的瑞鹰DD转变格局易用的前提下,听他们说总结性能更还快了两倍。spark在离线批管理或然实时总括中都能够将rdd转成dataframe进而通过轻易的sql命令对数码进行操作,对于熟识sql的人来讲在改造和过滤进程很方便,以至能够有越来越高档期的顺序的利用,举例在实时这一块,传入kafka的topic名称和sql语句,后台读取自身布置好的内容字段反射成叁个class并行使出入的sql对实时数据举行总结,这种气象下不会spark streaming的人也都能够方便的享受到实时总结带给的好处。

须求解析

计量部门的平均薪金和年龄

  • 只计算岁数在20岁以上的职工
  • 基于单位名称和工作者性别为粒度来张开总结
  • 总计出各样单位分性别的平均薪资和年龄

斯Parker学习笔记

上面包车型地铁示范为读取本羊眼半夏件成rdd并隐式转换来dataframe对数据开展询问,最终以扩张的款型写入mysql表的长河,scala代码示比如下

驷不及舌技艺点

  • 导入隐式转变import spark.implicits._
  • 导入spark.sql.fucntionsimport org.apache.spark.sql.functions._
  • 三个表的字段的总是条件,须求选择多个等号$"depId" === $"id"
  • groupBy聚适这时,钦点表及相应字段groupBy(department("name"), employee("gender"))
  • agg聚合函数agg(avg(employee("salary")), avg(employee("age")))
  • dataframe == dataset[Row],dataframe的品种是Row,所以是untyped类型,弱类型,dataset的花色常常是大家自定义的case class,所以是typed类型,强类型
  • dataset开拓,与rdd开荒有多数的协同点。dataset接纳encoder体系化

Data

import java.sql.Timestampimport org.apache.spark.sql.{SaveMode, SQLContext}import org.apache.spark.{SparkContext, SparkConf}object DataFrameSql { case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)extends Serializable{ override def toString: String="%dt%st%st%d".format(data_date,memberid,createtime,sp) } def main: Unit ={ val conf = new SparkConf() conf.setMaster// ---------------------- //参数 spark.sql.autoBroadcastJoinThreshold 设置某个表是否应该做broadcast,默认10M,设置为-1表示禁用 //spark.sql.codegen 是否预编译sql成java字节码,长时间或频繁的sql有优化效果 // spark.sql.inMemoryColumnarStorage.batchSize 一次处理的row数量,小心oom //spark.sql.inMemoryColumnarStorage.compressed 设置内存中的列存储是否需要压缩// ---------------------- conf.set("spark.sql.shuffle.partitions","20") //默认partition是200个 conf.setAppName val sc = new SparkContext val sqc = new SQLContext val ac = sc.accumulator val file = sc.textFile("src\main\resources\000000_0") val log = file.map(lines => lines.split.filter(line => if  { //做一个简单的过滤 ac.add .map(line => memberbase.toLong, line,Timestamp.valueOf.toInt)) // 方法一、利用隐式转换 import sqc.implicits._ val dftemp = log.toDF() // 转换 /* 方法二、利用createDataFrame方法,内部利用反射获取字段及其类型 val dftemp = sqc.createDataFrame */ val df = dftemp.registerTempTable /*val sqlcommand ="select date_formatas mm,count as nums " + "from memberbaseinfo group by date_format " + "order by nums desc,mm asc "*/ val sqlcommand="select * from memberbaseinfo" val sel = sqc.sql val prop = new java.util.Properties prop.setProperty prop.setProperty // 调用DataFrameWriter将数据写入mysql val dataResult = sqc.sql.write.mode.jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // 表可以不存在 println(ac.name.get+" "+ac.value) sc.stop() }}

代码示例

package com.spark.dataset

import org.apache.spark.sql.SparkSession

/**
 * 计算部门的平均薪资和年龄
 *
 * 需求:
 *      1、只统计年龄在20岁以上的员工
 *      2、根据部门名称和员工性别为粒度来进行统计
 *      3、统计出每个部门分性别的平均薪资和年龄
 *
 */
object DepartmentAvgSalaryAndAgeStat extends App{
  val spark=SparkSession
  .builder()
  .appName("DepartmentAvgSalaryAndAgeStat")
  .master("local")
  .config("spark.sql.warehouse.dir","E:\worksplace\spark\spark-warehouse")
  .getOrCreate()
  //导入隐式转换
  import spark.implicits._
  //spark sql functions
  import org.apache.spark.sql.functions._
  /**
+---+--------------------+
| id|                name|
+---+--------------------+
|  1|Technical Department|
|  2|Financial Department|
|  3|       HR Department|
+---+--------------------+
   */
  val department=spark.read.json("E:\worksplace\spark\src\main\resources\department.json")
/**
+---+-----+------+------+------+
|age|depId|gender|  name|salary|
+---+-----+------+------+------+
| 25|    1|  male|   Leo| 20000|
| 30|    2|female| Marry| 25000|
| 35|    1|  male|  Jack| 15000|
| 42|    3|  male|   Tom| 18000|
| 21|    3|female|Kattie| 21000|
| 30|    2|female|   Jen| 28000|
| 19|    2|female|   Jen|  8000|
+---+-----+------+------+------+  
 */
  val employee=spark.read.json("E:\worksplace\spark\src\main\resources\employee.json")
  //department.show()
  //employee.show()
  //1.过滤20岁以上的员工
  val filtedEmployee=employee.filter("age>20")
  //filtedEmployee.show()
/**
 +---+-----+------+------+------+---+--------------------+
|age|depId|gender|  name|salary| id|                name|
+---+-----+------+------+------+---+--------------------+
| 25|    1|  male|   Leo| 20000|  1|Technical Department|
| 30|    2|female| Marry| 25000|  2|Financial Department|
| 35|    1|  male|  Jack| 15000|  1|Technical Department|
| 42|    3|  male|   Tom| 18000|  3|       HR Department|
| 21|    3|female|Kattie| 21000|  3|       HR Department|
| 30|    2|female|   Jen| 28000|  2|Financial Department|
+---+-----+------+------+------+---+--------------------+
 */
  // 注意:untyped join,两个表的字段的连接条件,需要使用三个等号
  val joined=filtedEmployee.join(department, $"depId" === $"id")
  val result=employee
  // 先对employee进行过滤,只统计20岁以上的员工
  .filter("age>20")
    // 需要跟department数据进行join,然后才能根据部门名称和员工性别进行聚合
    // 注意:untyped join,两个表的字段的连接条件,需要使用三个等号
  .join(department, $"depId" === $"id")
   // 根据部门名称和员工性别进行分组
  .groupBy(department("name"), employee("gender"))
   // 最后执行聚合函数
  .agg(avg(employee("salary")), avg(employee("age")))
  // 执行action操作,将结果显示出来
/**
+--------------------+------+-----------+--------+
|                name|gender|avg(salary)|avg(age)|
+--------------------+------+-----------+--------+
|       HR Department|female|    21000.0|    21.0|
|Technical Department|  male|    17500.0|    30.0|
|Financial Department|female|    26500.0|    30.0|
|       HR Department|  male|    18000.0|    42.0|
+--------------------+------+-----------+--------+
 */
  result.show()
}

Source->Kafka->Spark Streaming->Parquet->Spark SQL(SparkSQL能够整合ML、GraphX等)->Parquet->其余种种Data Mining等

地点代码textFile中的示例数据如下,数据来源hive,字段音讯分级为 分区号、客商id、注册时间、第三方号

1.1 斯Parker集群的装置

20160309 45386477 2012-06-12 20:13:15 90143820160309 45390977 2012-06-12 22:38:06 90103620160309 45446677 2012-06-14 21:57:39 90143820160309 45464977 2012-06-15 13:42:55 90143820160309 45572377 2012-06-18 14:55:03 90260620160309 45620577 2012-06-20 00:21:09 90260620160309 45628377 2012-06-20 10:48:05 90118120160309 45628877 2012-06-20 11:10:15 90260620160309 45667777 2012-06-21 18:58:34 90252420160309 45680177 2012-06-22 01:49:55 20160309 45687077 2012-06-22 11:23:22 902607

斯Parker的周转是营造在hadoop集群之上(暗许hadoop集群已经安装好了卡塔尔,在spark集集结群上必必要设置相应版本的scala

那边注意字段类型映射,即case class类到dataframe映射,从官方网站的截图如下

1.1.1 scala安装

更多缜密能够查阅官方文档 Spark SQL and DataFrame Guide

Ø下载scala版本,解压scala

如上那篇spark rdd转dataframe 写入mysql的实例讲授便是作者分享给大家的全体内容了,希望能给我们叁个参照,也冀望大家多多照拂脚本之家。

Ø配置意况变量/etc/profile,加多SCALA_HOME、修改PATH,添加上scala的path路径

Ø进入$SCALA_HOME/bin目录,试行./scala验证scala是不是安装成功

Ø集群机器都急需设置scala

1.1.2 spark安装

在集群的有着机器上都不得不要安装spark,首先安装master的spark程序

Ø先解压spark程序

Ø改正际遇变量/etc/profile增多SPALANDK_HOME和修改spark PATH路径

美高梅官方网站3045,Ø配置spark,进入conf目录下

nmv spark-env.sh.template spark-env.sh

其中:spark_master_ip:用于钦定master

nvi slaves改良文件,把work节点都增多进去;

Ø至此,spark集群安装完结

1.1.3开发银行集群校验

Ø先运转hadoop集群,jps查看进程

Ø再开发银行spark集群,在sbin目录下施行./start-all.shjps查看进度

ØUi访谈,检查集群意况http://master:8080

Ø进入spark/bin目录下,启动spark-shell脚本

1.2 spark-shell的使用

在master机器上的$SPA牧马人K_HOME/bin目录下,运转./spark-shell程序运行shark-shell脚本;通过http://master:4040翻看spark-shell运维情况

1.2.1 spark-shell操作hdfs文件实战

Ø将spark目录下的README.md文件上盛传hdfs上的/test目录下,通过hdfs ui来进行查看slave:50070/explorer.html#/查看文件是或不是上传成功

Ø在spark-shell脚本程序下,实施sc(斯ParkerContext实例卡塔尔,运维spark-sehll时,系统自动生成

scala> sc

res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@65859b44

# 斯ParkerContext是把代码提交到集群可能地点的大路,编写斯Parker代码,无论是要运转本地照旧集群都应当要有斯ParkerContext实例

ØSpark-shell读取hdfs文件的README.md文件

val file = sc.textFile(“hdfs://mapeng:8020/test/README.md”)

#那边把读取到的文本内容赋值给了变量file,(正是三个Mapped昂CoraDD,在spark的代码中,一切都以基于哈弗DD进行操作的卡塔 尔(阿拉伯语:قطر‎

Ø读取文本中饱含有“spark”的行

scala> val sparks = file.filter(line

=> line.contains("spark"))

sparks: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at

filter at :26

#那时候变动了二个FilterCR-VDD

Ø总括spark生机勃勃共现身略微次

Sparks.count

1.2.2 spark-shell操作及详细表明

1.2.2.1并行化集结(parallelize卡塔尔国

Ø加载会集数据

val data = sc.parallelize(1 to 10)#加载会集数据

或者; val data = sc.parallelize(List(1,2,3,4…))

Ø对集合数据举行*2操作

val data1 = data.map(_*2)

Ø对数据开展过滤:过滤出是2的翻番的集中

val data2 = data.filter(_%2==0)

Ø内部存储器缓存多少

data.cache

Ø触发action,以数据的款型重返结果集

data.collect

Ø重临结果集的率先个成分

data.first

Ø再次来到结果集的前3个要素

data.take(3)

Ø总计成分的个数

data.count

Ø查看哈弗DD的转移进度

data.toDebugString

1.2.2.2 map数据集合

Ø加载List(Map)数据

val

data=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

Ø排序sortByKey()

scala> data.sortByKey().collect

res55: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5),

(C,3))

Ø分组groupByKey()

scala>

data.groupByKey().collect

res57: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2,  5)), (A,CompactBuffer(1, 4)), (C,CompactBuffer(3)))

Ø求和reduceByKey(_+_)

scala>

data.reduceByKey(_+_).collect

res59: Array[(String, Int)] = Array((B,7), (A,5), (C,3))

Ø去重distinct

scala> data.distinct.collect

res60: Array[(String, Int)] = Array((A,1), (A,4), (B,5), (C,3),

(B,2))

Ø联合union

scala> val

data1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

data1:

org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at

parallelize at :24

scala> val data2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5)))

data2:

org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at

parallelize at :24

scala>

data1.union(data2).collect

res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4),

(B,5), (A,4), (A,4), (C,3), (A,4), (B,5))

Ø关联join

一定于笛Carl积

1.2.2.3保留转变结果saveAsTextFile

data.saveAsTextFile(“path”);//将改动结果存款和储蓄在hdfs钦点的路线

1.2.3 spark cache缓存

对此spark程序,第二次推行要比前边的进行的功能要高

1.3 HavalDD(弹性布满式数据集)

1.3.1 RDD介绍

ØRubiconDD是四个容错的、并行的数据结构,能够让顾客体现的将数据存款和储蓄在磁盘和内部存储器中,并能调控数据的分区。

Ø讴歌ZDXDD提供了风流倜傥套丰盛的函数来操作数据

Ø奥迪Q5DD作为数据机构,本质上是叁个只读的分区记录集结;多少个智跑DD能够包含多个分区,每一种分区正是二个dataSet片段;卡宴DD能够相互正视

n窄信任:RAV4DD的每一种分区最八只好被叁个child

奥迪Q5DD的分区使用(举例:map操作卡塔尔

n宽信任:LANDDD的分区能够被四个child

LANDDD的分区使用(比方:join操作卡塔 尔(阿拉伯语:قطر‎

区别:

(1卡塔尔国窄信赖能够在集群中的二个节点上如流水般的实践,相反,宽注重必要持有的父分区的数量都可用

(2卡塔 尔(阿拉伯语:قطر‎从现身倒闭复苏的角度来杜撰:窄重视只须要重新计算退步的父瑞鹰DD的分区,而宽依赖战败会导致其父福睿斯DD的几个分区重新总结

1.3.2 RDD分区

1.3.3成立操作

1.3.3.1汇合创设操作

Spark提供了两类函数完结从集结生成普拉多DD;

Øparallelize

val rdd = sc.parallelize(1 to 100)

Ømake昂科拉DD:还提供了点名分区参数

val rdd = sc.makeRDD(1 to 100,3)#点名了分区数为3

1.3.3.2积存成立操作

操作hdfs

val rdd = sc.textFile(“hdfs://master:9000/test/xxx.txt”)

1.3.4 CR-VDD的中坚转移操作

1.3.4.1 TiguanDD的再度分区

repartition和coalesce是对CR-VDD的分区实行双重划分

Ørepartition(numPartitions:Int):RDD[T]

Øcoalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

repartition只是coalesce接口中shuffle为true的简短达成。

重新划分分区重要有二种意况:(原RDD有N个分区,须求重新划分为M个分区卡塔尔

ØN

ØN>M(相差一点都不大):面前碰到着要把原分区实行合併的操作,最终合成M个分区,那时候将shuffle设置为false

注:在shuffle为false时,设置M>N,coalesce是不起作用的

ØN>>M(差别悬殊卡塔尔:假使将shuffle设置为false,由于老爹和儿子瑞鹰DD是窄信任,会使得它们同处于二个stage中,恐怕会形成spark程序运转的并行度非常不够,进而影响效能。

因此,最棒设置为true,使得coalesce以前的操作有越来越好的并行度

1.3.4.2 LX570DD转换为数组(randomSplit、glom卡塔尔

ØrandomSplit(weight:Array[Double],seed:Long=System.nanoTime):Array[RDD[T]]

randomSplit函数是将三个EscortDD切分为多少个传祺DD,重临结果是二个CRUISERDD数组;函数的率先个法子传入的参数权重是一个Double类型的数组;权根本的,分到的数额的票房价值大

val rdd = sc.makeRDD(1 to 10)

val splitRDD = rdd.randomSplit(Array(1.0,3.0,6.0))

#再次回到的三个奇骏DD数组,查看数组成分

splitRDD(0).collect

splitRDD(1).collect

splitRDD(2).collect

Øglom():RDD[Array[T]]

glom函数是将MuranoDD中每三个分区中类型为T的因素调换为Array[T]

val rdd = sc.makeRDD(1 to 10,3)

val glomRDD= rdd.glom

#回来的结果是三个数组,

glomRDD.collect

scala> glomRDD.collect

res44: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7,

8, 9, 10))

1.3.4.3 LacrosseDD的聚合操作

Øunion(other:RDD[T]) :RDD[T]

将三个PAJERODD的多少进行联合,再次回到八个瑞虎DD的并集,不去重

Øintersection(other:RDD[T]) :RDD[T]

再次来到四个本田UR-VDD的混合(会去重卡塔尔国

Øsubtract(other:RDD[T]) :RDD[T]

取差集

Øzip(other:RDD[T]):RDD[T]

zip函数用于将五个HighlanderDD组合成key/value情势的君越DD,三个奥迪Q5DD的partition个数以至成分的多寡都必定要黄金年代致,不然会抛出非凡

val rdd1 = sc.makeRDD(List(1,2,3,3))

val rdd2 = sc.makeRDD(List(2,3,4))

union操作

rdd1.union(rdd2).collect

#结果

Array[Int] = Array(1, 2, 3, 3, 2, 3, 4)

intersection操作

rdd1.intersection(rdd2).collect

#结果(去重)

Array[Int] = Array(3, 2)

subtract操作

rdd1.subtract(rdd2).collect

#结果(不去重)

Array[Int] = Array(1, 1)

zip操作

scala> val rdd1 = sc.makeRDD(1

to 3)

scala> val rdd2 =

sc.makeRDD(List(1.0,2.0,3.0))

scala> rdd1.zip(rdd2).collect

res8: Array[(Int, Double)] = Array((1,1.0), (2,2.0), (3,3.0))

1.3.4.4键值KugaDD转换操作

map和flatmap的区别:

(1卡塔尔国map是对各样成分都开展点名的操作,再次回到各种成分管理后的对象

(2卡塔 尔(英语:State of Qatar)flatmap对具备的要素都做内定的操作,将具有的目的归并为二个目的回来

val rdd = sc.makeRDD(1 to 3)

rdd.map(x=>Seq(x,x)).collect

#结果

Array[Seq[Int]] = Array(List(1, 1),

List(2, 2), List(3, 3), List(4, 4))

rdd.flatMap(x=>Seq(x,x)).collect

#结果,合併为三个对象回来

下一篇:没有了
友情链接: 网站地图
Copyright © 2015-2019 http://www.zen-40.com. mgm美高梅有限公司 版权所有