Pekey‘s Blog

Spark Java API

2018/04/17 Share

参考文献:http://lxw1234.com

RDD如何创建

    
首先创建JavaSparkContext对象实例sc

1
JavaSparkContext  sc = new JavaSparkContext("local","SparkTest");

接受2个参数:
第一个参数表示运行方式(local、yarn-client、yarn-standalone等)
第二个参数表示应用名字

直接从集合转化 sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
从HDFS文件转化 sc.textFile("hdfs://")
从本地文件转化 sc.textFile("file:/")

下面例子中list2就是根据data2List生成的一个RDD

1
2
3
4
List<String> list1 = new ArrayList<String>();
list1.add("11,12,13,14,15");
list1.add("aa,bb,cc,dd,ee");
JavaRDD<String> list2 = sc.parallelize(list1);

常用算子


基本算子

filter

举例,在F:\sparktest\sample.txt 文件的内容如下

aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks
我要将包含zks的行的内容给找出来
scala版本

1
2
3
4
5
6
val lines = sc.textFile("F:\\sparktest\\sample.txt").filter(line=>line.contains("zks"))
//打印内容
lines.collect().foreach(println(_));
-------------输出------------------
ff aa bb zks
ee zz zks

java版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt");
JavaRDD<String> zksRDD = lines.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("zks");
}
});
//打印内容
List<String> zksCollect = zksRDD.collect();
for (String str:zksCollect) {
System.out.println(str);
}
----------------输出-------------------
ff aa bb zks
ee zz zks

map

map() 接收一个函数,把这个函数用于 RDD 中的每个元素,将函数的返回结果作为结果RDD编程 | 31
RDD 中对应元素的值 map是一对一的关系
举例,在F:\sparktest\sample.txt 文件的内容如下

aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks

把每一行变成一个数组
scala版本

1
2
3
4
5
6
7
8
9
10
11
//读取数据
scala> val lines = sc.textFile("F:\\sparktest\\sample.txt")
//用map,对于每一行数据,按照空格分割成一个一个数组,然后返回的是一对一的关系
scala> var mapRDD = lines.map(line => line.split("\\s+"))
---------------输出-----------
res0: Array[Array[String]] = Array(Array(aa, bb, cc, aa, aa, aa, dd, dd, ee, ee, ee, ee), Array(ff, aa, bb, zks), Array(ee, kks), Array(ee, zz, zks))

//读取第一个元素
scala> mapRDD.first
---输出----
res1: Array[String] = Array(aa, bb, cc, aa, aa, aa, dd, dd, ee, ee, ee, ee)

java版本

1
2
3
4
5
6
7
8
9
10
11
JavaRDD<Iterable<String>> mapRDD = lines.map(new Function<String, Iterable<String>>() {
@Override
public Iterable<String> call(String s) throws Exception {
String[] split = s.split("\\s+");
return Arrays.asList(split);
}
});
//读取第一个元素
System.out.println(mapRDD.first());
---------------输出-------------
[aa, bb, cc, aa, aa, aa, dd, dd, ee, ee, ee, ee]

flatMap

有时候,我们希望对某个元素生成多个元素,实现该功能的操作叫作 flatMap()
faltMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器(想要了解更多,请参考scala的flatMap和map用法)
例如我们将数据切分为单词
scala版本

1
2
3
4
5
scala>  val lines = sc.textFile("F:\\sparktest\\sample.txt")
scala> val flatMapRDD = lines.flatMap(line=>line.split("\\s"))
scala> flatMapRDD.first()
---输出----
res0: String = aa

java版本,spark2.0以下

1
2
3
4
5
6
7
8
9
10
11
12
JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt");
JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
String[] split = s.split("\\s+");
return Arrays.asList(split);
}
});
//输出第一个
System.out.println(flatMapRDD.first());
------------输出----------
aa

java版本,spark2.0以上
spark2.0以上,对flatMap的方法有所修改,就是flatMap中的Iterator和Iteratable的小区别

1
2
3
4
5
6
7
JavaRDD<String> flatMapRDD = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] split = s.split("\\s+");
return Arrays.asList(split).iterator();
}
});

distinct

distinct用于去重, 我们生成的RDD可能有重复的元素,使用distinct方法可以去掉重复的元素, 不过此方法涉及到混洗,操作开销
scala版本

1
2
3
4
5
6
7
8
9
scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))

scala> RDD1.collect
res3: Array[String] = Array(aa, aa, bb, cc, dd)

scala> var distinctRDD = RDD1.distinct

scala> distinctRDD.collect
res5: Array[String] = Array(aa, dd, bb, cc)

java版本

1
2
3
4
5
6
7
8
JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb", "cc", "dd"));
JavaRDD<String> distinctRDD = RDD1.distinct();
List<String> collect = distinctRDD.collect();
for (String str:collect) {
System.out.print(str+", ");
}
---------输出----------
aa, dd, bb, cc,

union

两个RDD进行合并
scala版本

1
2
3
4
5
6
7
8
9
10
11
scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))
scala> var RDD2 = sc.parallelize(List("aa","dd","ff"))

scala> RDD1.collect
res6: Array[String] = Array(aa, aa, bb, cc, dd)

scala> RDD2.collect
res7: Array[String] = Array(aa, dd, ff)

scala> RDD1.union(RDD2).collect
res8: Array[String] = Array(aa, aa, bb, cc, dd, aa, dd, ff)

java版本

1
2
3
4
5
6
7
8
9
JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb", "cc", "dd"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));
JavaRDD<String> unionRDD = RDD1.union(RDD2);
List<String> collect = unionRDD.collect();
for (String str:collect) {
System.out.print(str+", ");
}
-----------输出---------
aa, aa, bb, cc, dd, aa, dd, ff,


intersection

RDD1.intersection(RDD2) 返回两个RDD的交集,并且去重
intersection 需要混洗数据,比较浪费性能
scala版本

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> var RDD1 = sc.parallelize(List("aa","aa","bb","cc","dd"))
scala> var RDD2 = sc.parallelize(List("aa","dd","ff"))

scala> RDD1.collect
res6: Array[String] = Array(aa, aa, bb, cc, dd)

scala> RDD2.collect
res7: Array[String] = Array(aa, dd, ff)

scala> var insertsectionRDD = RDD1.intersection(RDD2)
scala> insertsectionRDD.collect

res9: Array[String] = Array(aa, dd)

java版本

1
2
3
4
5
6
7
8
9
JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb", "cc", "dd"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));
JavaRDD<String> intersectionRDD = RDD1.intersection(RDD2);
List<String> collect = intersectionRDD.collect();
for (String str:collect) {
System.out.print(str+" ");
}
-------------输出-----------
aa dd


subtract

RDD1.subtract(RDD2),返回在RDD1中出现,但是不在RDD2中出现的元素,不去重
scala版本

1
2
3
4
5
6
7
8
JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa","bb", "cc", "dd"));

JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));

scala> var substractRDD =RDD1.subtract(RDD2)

scala> substractRDD.collect
res10: Array[String] = Array(bb, cc)

java版本

1
2
3
4
5
6
7
8
9
JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("aa", "aa", "bb","cc", "dd"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("aa","dd","ff"));
JavaRDD<String> subtractRDD = RDD1.subtract(RDD2);
List<String> collect = subtractRDD.collect();
for (String str:collect) {
System.out.print(str+" ");
}
------------输出-----------------
bb cc


cartesian

RDD1.cartesian(RDD2) 返回RDD1和RDD2的笛卡儿积,这个开销非常大

scala版本

1
2
3
4
5
6
7
8
scala>  var RDD1 = sc.parallelize(List("1","2","3"))

scala> var RDD2 = sc.parallelize(List("a","b","c"))

scala> var cartesianRDD = RDD1.cartesian(RDD2)

scala> cartesianRDD.collect
res11: Array[(String, String)] = Array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c))

java版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
JavaRDD<String> RDD1 = sc.parallelize(Arrays.asList("1", "2", "3"));
JavaRDD<String> RDD2 = sc.parallelize(Arrays.asList("a","b","c"));
JavaPairRDD<String, String> cartesian = RDD1.cartesian(RDD2);

List<Tuple2<String, String>> collect1 = cartesian.collect();
for (Tuple2<String, String> tp:collect1) {
System.out.println("("+tp._1+" "+tp._2+")");
}
------------输出-----------------
(1 a)
(1 b)
(1 c)
(2 a)
(2 b)
(2 c)
(3 a)
(3 b)
(3 c)


xxxxx

mapValues

def mapValuesU => U): RDD[(K, U)]

同基本转换操作中的map,只不过mapValues是针对[K,V]中的V值进行map操作。

1
2
3
4
5
scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[27] at makeRDD at :21

scala> rdd1.mapValues(x => x + "_").collect
res26: Array[(Int, String)] = Array((1,A_), (2,B_), (3,C_), (4,D_))

flatMapValues

def flatMapValuesU => TraversableOnce[U]): RDD[(K, U)]

同基本转换操作中的flatMap,只不过flatMapValues是针对[K,V]中的V值进行flatMap操作。

1
2
scala> rdd1.flatMapValues(x => x + "_").collect
res36: Array[(Int, Char)] = Array((1,A), (1,_), (2,B), (2,_), (3,C), (3,_), (4,D), (4,_))

键值对操作

mapToPair

举例,在F:\sparktest\sample.txt 文件的内容如下

aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks

将每一行的第一个单词作为键,1 作为value创建pairRDD
scala版本
scala是没有mapToPair函数的,scala版本只需要map就可以了

1
2
3
4
5
6
scala> val lines = sc.textFile("F:\\sparktest\\sample.txt")

scala> val pairs = lines.map(x => (x.split("\\s+")(0), 1))

scala> pairs.collect
res0: Array[(String, Int)] = Array((aa,1), (ff,1), (ee,1), (ee,1))

java版本

1
2
3
4
5
6
7
8
JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt");
//输入的是一个string的字符串,输出的是一个(String, Integer) 的map
JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s.split("\\s+")[0], 1);
}
});

flatMapToPair

类似于xxx连接 mapToPair是一对一,一个元素返回一个元素,而flatMapToPair可以一个元素返回多个,相当于先flatMap,在mapToPair
例子: 将每一个单词都分成键为
scala版本

1
2
3
4
5
6
val lines = sc.textFile("F:\\sparktest\\sample.txt")
val flatRDD = lines.flatMap(x => (x.split("\\s+")))
val pairs = flatRDD.map(x=>(x,1))

scala> pairs.collect
res1: Array[(String, Int)] = Array((aa,1), (bb,1), (cc,1), (aa,1), (aa,1), (aa,1), (dd,1), (dd,1), (ee,1), (ee,1), (ee,1), (ee,1), (ff,1), (aa,1), (bb,1), (zks,1), (ee,1), (kks,1), (ee,1), (zz,1), (zks,1))

java版本 spark2.0以下

1
2
3
4
5
6
7
8
9
10
11
12
JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterable<Tuple2<String, Integer>> call(String s) throws Exception {
ArrayList<Tuple2<String, Integer>> tpLists = new ArrayList<Tuple2<String, Integer>>();
String[] split = s.split("\\s+");
for (int i = 0; i <split.length ; i++) {
Tuple2 tp = new Tuple2<String,Integer>(split[i], 1);
tpLists.add(tp);
}
return tpLists;
}
});

java版本 spark2.0以上
主要是iterator和iteratable的一些区别

1
2
3
4
5
6
7
8
9
10
11
12
JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
ArrayList<Tuple2<String, Integer>> tpLists = new ArrayList<Tuple2<String, Integer>>();
String[] split = s.split("\\s+");
for (int i = 0; i <split.length ; i++) {
Tuple2 tp = new Tuple2<String,Integer>(split[i], 1);
tpLists.add(tp);
}
return tpLists.iterator();
}
});

reduceByKey

1
2
3
4
5
def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

接收一个函数,按照相同的key进行reduce操作,类似于scala的reduce的操作
例如RDD {(1, 2), (3, 4), (3, 6)}进行reduce

scala版本

1
2
3
4
5
6
var mapRDD = sc.parallelize(List((1,2),(3,4),(3,6)))
var reduceRDD = mapRDD.reduceByKey((x,y)=>x+y)
reduceRDD.foreach(x=>println(x))
------输出---------
(1,2)
(3,10)

再举例 单词计数

F:\sparktest\sample.txt中的内容如下
aa bb cc aa aa aa dd dd ee ee ee ee
ff aa bb zks
ee kks
ee zz zks

scala版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    val lines = sc.textFile("F:\\sparktest\\sample.txt")
val wordsRDD = lines.flatMap(x=>x.split("\\s+")).map(x=>(x,1))
val wordCountRDD = wordsRDD.reduceByKey((x,y)=>x+y)
wordCountRDD.foreach(x=>println(x))
---------输出-----------
(ee,6)
(aa,5)
(dd,2)
(zz,1)
(zks,2)
(kks,1)
(ff,1)
(bb,2)
(cc,1)

java版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
JavaRDD<String> lines = sc.textFile("F:\\sparktest\\sample.txt");

JavaPairRDD<String, Integer> wordPairRDD = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterable<Tuple2<String, Integer>> call(String s) throws Exception {
ArrayList<Tuple2<String, Integer>> tpLists = new ArrayList<Tuple2<String, Integer>>();
String[] split = s.split("\\s+");
for (int i = 0; i <split.length ; i++) {
Tuple2 tp = new Tuple2<String,Integer>(split[i], 1);
tpLists.add(tp);
}
return tpLists;
}
});

JavaPairRDD<String, Integer> wordCountRDD = wordPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
Map<String, Integer> collectAsMap = wordCountRDD.collectAsMap();
for (String key:collectAsMap.keySet()) {
System.out.println("("+key+","+collectAsMap.get(key)+")");
}
----------输出-------------------------------
(kks,1)
(ee,6)
(bb,2)
(zz,1)
(ff,1)
(cc,1)
(zks,2)
(dd,2)
(aa,5)

foldByKey

1
2
3
4
5
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.
foldByKey可以参考我之前的scala的fold的介绍
与reduce不同的是 foldByKey开始折叠的第一个元素不是集合中的第一个元素,而是传入的一个元素
参考LXW的博客 scala的例子
直接看例子

1
2
3
4
5
6
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.foldByKey(0)(_+_).collect
res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
//将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
//作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
//("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)

再看:

1
2
3
4
scala> rdd1.foldByKey(2)(_+_).collect
res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)

再看乘法操作:

1
2
3
4
5
6
7
8
9
scala> rdd1.foldByKey(0)(_*_).collect
res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
//先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),
//即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)
//其他K也一样,最终都得到了V=0

scala> rdd1.foldByKey(1)(_*_).collect
res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
//映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。


SortByKey

1
def sortByKey(ascending : scala.Boolean = { /* compiled code */ }, numPartitions : scala.Int = { /* compiled code */ }) : org.apache.spark.rdd.RDD[scala.Tuple2[K, V]] = { /* compiled code */ }

SortByKey用于对pairRDD按照key进行排序,第一个参数可以设置true或者false,默认是true
scala例子

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val rdd = sc.parallelize(Array((3, 4),(1, 2),(4,4),(2,5), (6,5), (5, 6)))  

// sortByKey不是Action操作,只能算是转换操作
scala> rdd.sortByKey()
res9: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[28] at sortByKey at <console>:24

//看看sortByKey后是什么类型
scala> rdd.sortByKey().collect()
res10: Array[(Int, Int)] = Array((1,2), (2,5), (3,4), (4,4), (5,6), (6,5))

//降序排序
scala> rdd.sortByKey(false).collect()
res12: Array[(Int, Int)] = Array((6,5), (5,6), (4,4), (3,4), (2,5), (1,2))

存储相关算子

saveAsTextFile

1
2
3
def saveAsTextFile(path: String): Unit

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

codec参数可以指定压缩的类名。

1
2
3
4
5
6
7
8
var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/test/") //保存到HDFS
hadoop fs -ls /tmp/test
Found 2 items
-rw-r--r-- 2 bob supergroup 0 2015-07-10 09:15 /tmp/test/_SUCCESS
-rw-r--r-- 2 bob supergroup 21 2015-07-10 09:15 /tmp/test/part-00000

hadoop fs -cat /tmp/test/part-00000

注意:如果使用rdd1.saveAsTextFile(“file:///tmp/test”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。
指定压缩格式保存

1
2
3
4
5
6
7
rdd1.saveAsTextFile("hdfs://cdh5/tmp/test/",classOf[com.hadoop.compression.lzo.LzopCodec])

hadoop fs -ls /tmp/test
-rw-r--r-- 2 bob supergroup 0 2015-07-10 09:20 /tmp/test/_SUCCESS
-rw-r--r-- 2 bob supergroup 71 2015-07-10 09:20 /tmp/test/part-00000.lzo

hadoop fs -text /tmp/test/part-00000.lzo


saveAsSequenceFile

saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。

用法同saveAsTextFile。


saveAsObjectFile

1
2
3
4
5
6
7
8
9
10
11
def saveAsObjectFile(path: String): Unit

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。

对于HDFS,默认采用SequenceFile保存。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/test/")

hadoop fs -cat /tmp/test/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT

saveAsHadoopFile

1
2
3
4
5
def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], codec: Class[_ <: CompressionCodec]): Unit

def saveAsHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。

可以指定outputKeyClass、outputValueClass以及压缩格式。

每个分区输出一个文件。

1
2
3
4
5
6
7
8
9
10
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))

import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

rdd1.saveAsHadoopFile("/tmp/test/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

rdd1.saveAsHadoopFile("/tmp/test/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
classOf[com.hadoop.compression.lzo.LzopCodec])


saveAsHadoopDataset

def saveAsHadoopDataset(conf: JobConf): Unit

saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。

在JobConf中,通常需要关注或者设置五个参数:

文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。
使用saveAsHadoopDataset将RDD保存到HDFS中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf



var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/bob/")
rdd1.saveAsHadoopDataset(jobConf)

结果:
hadoop fs -cat /tmp/bob/part-00000
A 2
A 1
hadoop fs -cat /tmp/bob/part-00001
B 6
B 3
B 7

保存数据到HBASE
HBase建表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
create ‘bob′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

var conf = HBaseConfiguration.create()
var jobConf = new JobConf(conf)
jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
jobConf.set("zookeeper.znode.parent","/hbase")
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"bob")
jobConf.setOutputFormat(classOf[TableOutputFormat])

var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
rdd1.map(x =>
{
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsHadoopDataset(jobConf)

结果:
hbase(main):005:0> scan 'bob'
ROW COLUMN+CELL
A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02
B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06
C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07
3 row(s) in 0.0550 seconds

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。


saveAsNewAPIHadoopFile

1
2
3
def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit

def saveAsNewAPIHadoopFile(path: String, keyClass: Class[], valueClass: Class[], outputFormatClass: Class[_ <: OutputFormat[, ]], conf: Configuration = self.context.hadoopConfiguration): Unit

saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。

用法基本同saveAsHadoopFile。

1
2
3
4
5
6
7
8
9
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/bob/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])


saveAsNewAPIHadoopDataset

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。

以写入HBase为例:

HBase建表:

create ‘bob′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

完整的Spark应用程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.bob.test

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put

object Test {
def main(args : Array[String]) {
val sparkConf = new SparkConf().setMaster("spark://test:7077").setAppName("test")
val sc = new SparkContext(sparkConf);
var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))

sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"bob")
var job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

rdd1.map(
x => {
var put = new Put(Bytes.toBytes(x._1))
put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
(new ImmutableBytesWritable,put)
}
).saveAsNewAPIHadoopDataset(job.getConfiguration)

sc.stop()
}
}

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

CATALOG
  1. 1. 参考文献:http://lxw1234.com
  2. 2. RDD如何创建
  3. 3. 常用算子
    1. 3.1. 基本算子
      1. 3.1.1. filter
      2. 3.1.2. map
      3. 3.1.3. flatMap
      4. 3.1.4. distinct
      5. 3.1.5. union
      6. 3.1.6. intersection
      7. 3.1.7. subtract
      8. 3.1.8. cartesian
    2. 3.2. xxxxx
      1. 3.2.1. mapValues
      2. 3.2.2. flatMapValues
    3. 3.3. 键值对操作
      1. 3.3.1. mapToPair
      2. 3.3.2. flatMapToPair
      3. 3.3.3. reduceByKey
      4. 3.3.4. foldByKey
      5. 3.3.5. SortByKey
    4. 3.4. 存储相关算子
      1. 3.4.1. saveAsTextFile
      2. 3.4.2. saveAsSequenceFile
      3. 3.4.3. saveAsObjectFile
      4. 3.4.4. saveAsHadoopFile
      5. 3.4.5. saveAsHadoopDataset
      6. 3.4.6. saveAsNewAPIHadoopFile
      7. 3.4.7. saveAsNewAPIHadoopDataset