参考文献:http://lxw1234.com
RDD如何创建
首先创建JavaSparkContext对象实例sc
1 | JavaSparkContext sc = new JavaSparkContext("local","SparkTest"); |
接受2个参数:
第一个参数表示运行方式(local、yarn-client、yarn-standalone等)
第二个参数表示应用名字
直接从集合转化
sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
从HDFS文件转化sc.textFile("hdfs://")
从本地文件转化sc.textFile("file:/")
下面例子中list2就是根据data2List生成的一个RDD
1 | List<String> list1 = new ArrayList<String>(); |
常用算子
基本算子
filter
举例,在F:\sparktest\sample.txt 文件的内容如下
aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks
我要将包含zks的行的内容给找出来
scala版本
1 | val lines = sc.textFile("F:\\sparktest\\sample.txt").filter(line=>line.contains("zks")) |
java版本
1 | JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt"); |
map
map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD编程 | 31
RDD 中对应元素的值 map是一对一的关系
举例,在F:\sparktest\sample.txt 文件的内容如下
aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks
把每一行变成一个数组
scala版本
1 | //读取数据 |
java版本
1 | JavaRDD<Iterable<String>> mapRDD = lines.map(new Function<String, Iterable<String>>() { |
flatMap
有时候,我们希望对某个元素生成多个元素,实现该功能的操作叫作 flatMap()
faltMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器(想要了解更多,请参考scala的flatMap和map用法)
例如我们将数据切分为单词
scala版本
1 | scala> val lines = sc.textFile("F:\\sparktest\\sample.txt") |
java版本,spark2.0以下
1 | JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt"); |
java版本,spark2.0以上
spark2.0以上,对flatMap的方法有所修改,就是flatMap中的Iterator和Iteratable的小区别
1 | JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() { |
distinct
distinct用于去重, 我们生成的RDD可能有重复的元素,使用distinct方法可以去掉重复的元素, 不过此方法涉及到混洗,操作开销
scala版本1
2
3
4
5
6
7
8
9scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))
scala> RDD1.collect
res3: Array[String] = Array(aa, aa, bb, cc, dd)
scala> var distinctRDD = RDD1.distinct
scala> distinctRDD.collect
res5: Array[String] = Array(aa, dd, bb, cc)
java版本
1 | JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb", "cc", "dd")); |
union
两个RDD进行合并
scala版本1
2
3
4
5
6
7
8
9
10
11scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))
scala> var RDD2 = sc.parallelize(List("aa","dd","ff"))
scala> RDD1.collect
res6: Array[String] = Array(aa, aa, bb, cc, dd)
scala> RDD2.collect
res7: Array[String] = Array(aa, dd, ff)
scala> RDD1.union(RDD2).collect
res8: Array[String] = Array(aa, aa, bb, cc, dd, aa, dd, ff)
java版本1
2
3
4
5
6
7
8
9JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb", "cc", "dd"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));
JavaRDD<String> unionRDD = RDD1.union(RDD2);
List<String> collect = unionRDD.collect();
for (String str:collect) {
System.out.print(str+", ");
}
-----------输出---------
aa, aa, bb, cc, dd, aa, dd, ff,
intersection
RDD1.intersection(RDD2) 返回两个RDD的交集,并且去重
intersection 需要混洗数据,比较浪费性能
scala版本1
2
3
4
5
6
7
8
9
10
11
12
13scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))
scala> var RDD2 = sc.parallelize(List("aa","dd","ff"))
scala> RDD1.collect
res6: Array[String] = Array(aa, aa, bb, cc, dd)
scala> RDD2.collect
res7: Array[String] = Array(aa, dd, ff)
scala> var insertsectionRDD = RDD1.intersection(RDD2)
scala> insertsectionRDD.collect
res9: Array[String] = Array(aa, dd)
java版本1
2
3
4
5
6
7
8
9JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb", "cc", "dd"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));
JavaRDD<String> intersectionRDD = RDD1.intersection(RDD2);
List<String> collect = intersectionRDD.collect();
for (String str:collect) {
System.out.print(str+" ");
}
-------------输出-----------
aa dd
subtract
RDD1.subtract(RDD2),返回在RDD1中出现,但是不在RDD2中出现的元素,不去重
scala版本1
2
3
4
5
6
7
8JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa","bb", "cc", "dd"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));
scala> var substractRDD =RDD1.subtract(RDD2)
scala> substractRDD.collect
res10: Array[String] = Array(bb, cc)
java版本1
2
3
4
5
6
7
8
9JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb","cc", "dd"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));
JavaRDD<String> subtractRDD = RDD1.subtract(RDD2);
List<String> collect = subtractRDD.collect();
for (String str:collect) {
System.out.print(str+" ");
}
------------输出-----------------
bb cc
cartesian
RDD1.cartesian(RDD2) 返回RDD1和RDD2的笛卡儿积,这个开销非常大
scala版本1
2
3
4
5
6
7
8scala> var RDD1 = sc.parallelize(List("1","2","3"))
scala> var RDD2 = sc.parallelize(List("a","b","c"))
scala> var cartesianRDD = RDD1.cartesian(RDD2)
scala> cartesianRDD.collect
res11: Array[(String, String)] = Array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c))
java版本1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("1", "2", "3"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("a","b","c"));
JavaPairRDD<String, String> cartesian = RDD1.cartesian(RDD2);
List<Tuple2<String, String>> collect1 = cartesian.collect();
for (Tuple2<String, String> tp:collect1) {
System.out.println("("+tp._1+" "+tp._2+")");
}
------------输出-----------------
(1 a)
(1 b)
(1 c)
(2 a)
(2 b)
(2 c)
(3 a)
(3 b)
(3 c)
xxxxx
mapValues
def mapValuesU => U): RDD[(K, U)]
同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。
1 | scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) |
flatMapValues
def flatMapValuesU => TraversableOnce[U]): RDD[(K, U)]
同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作。
1 | scala> rdd1.flatMapValues(x => x + "_").collect |
键值对操作
mapToPair
举例,在F:\sparktest\sample.txt 文件的内容如下
aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks
将每一行的第一个单词作为键,1 作为value创建pairRDD
scala版本
scala是没有mapToPair函数的,scala版本只需要map就可以了
1 | scala> val lines = sc.textFile("F:\\sparktest\\sample.txt") |
java版本
1 | JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt"); |
flatMapToPair
类似于xxx连接 mapToPair是一对一,一个元素返回一个元素,而flatMapToPair可以一个元素返回多个,相当于先flatMap,在mapToPair
例子: 将每一个单词都分成键为
scala版本1
2
3
4
5
6val lines = sc.textFile("F:\\sparktest\\sample.txt")
val flatRDD = lines.flatMap(x => (x.split("\\s+")))
val pairs = flatRDD.map(x=>(x,1))
scala> pairs.collect
res1: Array[(String, Int)] = Array((aa,1), (bb,1), (cc,1), (aa,1), (aa,1), (aa,1), (dd,1), (dd,1), (ee,1), (ee,1), (ee,1), (ee,1), (ff,1), (aa,1), (bb,1), (zks,1), (ee,1), (kks,1), (ee,1), (zz,1), (zks,1))
java版本 spark2.0以下
1 | JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { |
java版本 spark2.0以上
主要是iterator和iteratable的一些区别
1 | JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() { |
reduceByKey
1 | def reduceByKey(func: (V, V) => V): RDD[(K, V)] |
接收一个函数,按照相同的key进行reduce操作,类似于scala的reduce的操作
例如RDD {(1, 2), (3, 4), (3, 6)}进行reduce
scala版本1
2
3
4
5
6var mapRDD = sc.parallelize(List((1,2),(3,4),(3,6)))
var reduceRDD = mapRDD.reduceByKey((x,y)=>x+y)
reduceRDD.foreach(x=>println(x))
------输出---------
(1,2)
(3,10)
再举例 单词计数
F:\sparktest\sample.txt中的内容如下
aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks
scala版本
1 | val lines = sc.textFile("F:\\sparktest\\sample.txt") |
java版本
1 | JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt"); |
foldByKey
1 | def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] |
该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.
foldByKey可以参考我之前的scala的fold的介绍
与reduce不同的是 foldByKey开始折叠的第一个元素不是集合中的第一个元素,而是传入的一个元素
参考LXW的博客 scala的例子
直接看例子1
2
3
4
5
6scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.foldByKey(0)(_+_).collect
res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
//将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
//作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
//("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)
再看:1
2
3
4scala> rdd1.foldByKey(2)(_+_).collect
res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)
再看乘法操作:1
2
3
4
5
6
7
8
9scala> rdd1.foldByKey(0)(_*_).collect
res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
//先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),
//即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)
//其他K也一样,最终都得到了V=0
scala> rdd1.foldByKey(1)(_*_).collect
res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
//映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。
SortByKey
1 | def sortByKey(ascending : scala.Boolean = { /* compiled code */ }, numPartitions : scala.Int = { /* compiled code */ }) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ } |
SortByKey用于对pairRDD按照key进行排序,第一个参数可以设置true或者false,默认是true
scala例子
1 | scala> val rdd = sc.parallelize(Array((3, 4),(1, 2),(4,4),(2,5), (6,5), (5, 6))) |
存储相关算子
saveAsTextFile
1 | def saveAsTextFile(path: String): Unit |
saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
codec参数可以指定压缩的类名。1
2
3
4
5
6
7
8var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/test/") //保存到HDFS
hadoop fs -ls /tmp/test
Found 2 items
-rw-r--r-- 2 bob supergroup 0 2015-07-10 09:15 /tmp/test/_SUCCESS
-rw-r--r-- 2 bob supergroup 21 2015-07-10 09:15 /tmp/test/part-00000
hadoop fs -cat /tmp/test/part-00000
注意:如果使用rdd1.saveAsTextFile(“file:///tmp/test”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。
指定压缩格式保存1
2
3
4
5
6
7rdd1.saveAsTextFile("hdfs://cdh5/tmp/test/",classOf[com.hadoop.compression.lzo.LzopCodec])
hadoop fs -ls /tmp/test
-rw-r--r-- 2 bob supergroup 0 2015-07-10 09:20 /tmp/test/_SUCCESS
-rw-r--r-- 2 bob supergroup 71 2015-07-10 09:20 /tmp/test/part-00000.lzo
hadoop fs -text /tmp/test/part-00000.lzo
saveAsSequenceFile
saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。
用法同saveAsTextFile。
saveAsObjectFile
1 | def saveAsObjectFile(path: String): Unit |
saveAsHadoopFile
1 | def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], codec: Class[_ <: CompressionCodec]): Unit |
可以指定outputKeyClass、outputValueClass以及压缩格式。
每个分区输出一个文件。1
2
3
4
5
6
7
8
9
10var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
rdd1.saveAsHadoopFile("/tmp/test/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
rdd1.saveAsHadoopFile("/tmp/test/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
classOf[com.hadoop.compression.lzo.LzopCodec])
saveAsHadoopDataset
def saveAsHadoopDataset(conf: JobConf): Unit
saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。
在JobConf中,通常需要关注或者设置五个参数:
文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。
使用saveAsHadoopDataset将RDD保存到HDFS中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/bob/")
rdd1.saveAsHadoopDataset(jobConf)
结果:
hadoop fs -cat /tmp/bob/part-00000
A 2
A 1
hadoop fs -cat /tmp/bob/part-00001
B 6
B 3
B 7
保存数据到HBASE
HBase建表:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38create ‘bob′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
var conf = HBaseConfiguration.create()
var jobConf = new JobConf(conf)
jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
jobConf.set("zookeeper.znode.parent","/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"bob")
jobConf.setOutputFormat(classOf[TableOutputFormat])
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
rdd1.map(x =>
{
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsHadoopDataset(jobConf)
结果:
hbase(main):005:0> scan 'bob'
ROW COLUMN+CELL
A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02
B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06
C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07
3 row(s) in 0.0550 seconds
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
saveAsNewAPIHadoopFile
1 | def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit |
saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。
用法基本同saveAsHadoopFile。1
2
3
4
5
6
7
8
9import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/bob/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
saveAsNewAPIHadoopDataset
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。
以写入HBase为例:
HBase建表:
create ‘bob′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
完整的Spark应用程序:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38package com.bob.test
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
object Test {
def main(args : Array[String]) {
val sparkConf = new SparkConf().setMaster("spark://test:7077").setAppName("test")
val sc = new SparkContext(sparkConf);
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"bob")
var job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd1.map(
x => {
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}
}
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。