下面对相关常用算子进行演示。
将 RDD
中的数据 一对一 的转为另一种形式:
例如:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5))
println(
num.map(_+1).collect().toList
)
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
System.out.println(
num.map(i -> i + 1).collect()
);
num = sc.parallelize((1, 2, 3, 4, 5))
print(
num.map(lambda i:i+1).collect()
)
和 Map
算子类似,但是 FlatMap
是一对多,并都转化为一维数据:
例如:
val text = sc.parallelize(Seq('abc def', 'hello word', 'dfg,okh', 'he,word'))
println(
text.flatMap(_.split(' ')).flatMap(_.split(',')).collect().toList
)
JavaRDD<String> text = sc.parallelize(Arrays.asList('abc def', 'hello word', 'dfg,okh', 'he,word'));
System.out.println(
text.flatMap(s ->Arrays.asList(s.split(' ')).iterator())
.flatMap(s ->Arrays.asList(s.split(',')).iterator())
.collect()
);
text = sc.parallelize(('abc def', 'hello word', 'dfg,okh', 'he,word'))
print(
text.flatMap(lambda s: s.split(' ')).flatMap(lambda s: s.split(',')).collect()
)
过滤掉不需要的内容:
例如:
val text = sc.parallelize(Seq('hello', 'hello', 'word', 'word'))
println(
text.filter(_.equals('hello')).collect().toList
)
JavaRDD<String> text = sc.parallelize(Arrays.asList('hello', 'hello', 'word', 'word'));
System.out.println(
text.filter(s -> Objects.equals(s,'hello'))
.collect()
);
text = sc.parallelize(('hello', 'hello', 'word', 'word'))
print(
text.filter(lambda s: s == 'hello').collect()
)
和 map
类似,针对整个分区的数据转换,拿到的是每个分区的集合:
例如:
val text = sc.parallelize(Seq('hello', 'hello', 'word', 'word'), 2)
println(
text.mapPartitions(iter => {<!-- -->
iter.map(_ + '333')
}).collect().toList
)
JavaRDD<String> text = sc.parallelize(Arrays.asList('hello', 'hello', 'word', 'word'), 2);
System.out.println(
text.mapPartitions(iter -> {<!-- -->
List<String> list = new ArrayList<>();
iter.forEachRemaining(s -> list.add(s+'333'));
return list.iterator();
}).collect()
);
text = sc.parallelize(('hello', 'hello', 'word', 'word'), 2)
def partition(par):
tmpArr = []
for s in par:
tmpArr.append(s + '333')
return tmpArr
print(
text.mapPartitions(partition).collect()
)
和 mapPartitions
类似, 只是在函数中增加了分区的 Index
:
例如:
val text = sc.parallelize(Seq('hello', 'hello', 'word', 'word'), 2)
println(
text.mapPartitionsWithIndex((index, iter) => {<!-- -->
println('当前分区' + index)
iter.map(_ + '333')
}, true).collect().toList
)
JavaRDD<String> text = sc.parallelize(Arrays.asList('hello', 'hello', 'word', 'word'), 2);
System.out.println(
text.mapPartitionsWithIndex((index, iter) -> {<!-- -->
System.out.println('当前分区' + index);
List<String> list = new ArrayList<>();
iter.forEachRemaining(s -> list.add(s + '333'));
return list.iterator();
}, true).collect()
);
text = sc.parallelize(('hello', 'hello', 'word', 'word'), 2)
def partition(index, par):
print('当前分区' + str(index))
tmpArr = []
for s in par:
tmpArr.append(s + '333')
return tmpArr
print(
text.mapPartitionsWithIndex(partition).collect()
)
只能作用于 Key-Value
型数据, 和 Map
类似, 也是使用函数按照转换数据, 不同点是 MapValues
只转换 Key-Value
中的 Value
:
例如:
val text = sc.parallelize(Seq('abc', 'bbb', 'ccc', 'dd'))
println(
text.map((_, 'v' + _))
.mapValues(_ + '66')
.collect().toList
)
JavaRDD<String> text = sc.parallelize(Arrays.asList('abc', 'bbb', 'ccc', 'dd'));
System.out.println(
text.mapToPair(s -> new Tuple2<>(s, 'v' + s))
.mapValues(v -> v + '66').collect()
);
text = sc.parallelize(('abc', 'bbb', 'ccc', 'dd'))
print(
text.map(lambda s: (s, 'v' + s)).mapValues(lambda v: v + '66').collect()
)
可以从一个数据集中抽样出来一部分, 常用作于减小数据集以保证运行速度, 并且尽可能少规律的损失:
第一个参数为withReplacement
, 意为是否取样以后是否还放回原数据集供下次使用, 简单的说,如果这个参数的值为 true, 则抽样出来的数据集中可能会有重复。
第二个参数为fraction
, 意为抽样的比例。
第三个参数为seed
, 随机数种子, 用于 Sample
内部随机生成下标,一般不指定,使用默认值。
例如:
val num = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
println(
num.sample(true,0.6,2)
.collect().toList
)
JavaRDD<Integer> num = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
System.out.println(
num.sample(true, 0.6, 2).collect()
);
num = sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
print(
num.sample(True, 0.6, 2).collect()
)
两个数据并集,类似于数据库的 union
:
例如:
val text1 = sc.parallelize(Seq('aa', 'bb'))
val text2 = sc.parallelize(Seq('cc', 'dd'))
println(
text1.union(text2).collect().toList
)
JavaRDD<String> text1 = sc.parallelize(Arrays.asList('aa', 'bb'));
JavaRDD<String> text2 = sc.parallelize(Arrays.asList('cc', 'dd'));
System.out.println(
text1.union(text2).collect()
);
text1 = sc.parallelize(('aa', 'bb'))
text2 = sc.parallelize(('cc', 'dd'))
print(
text1.union(text2).collect()
)
两个(key,value)
数据集,根据 key
取连接、左连接、右连接,类似数据库中的连接:
例如:
val s1 = sc.parallelize(Seq('1,3', '2,6', '3,8', '4,2'))
val s2 = sc.parallelize(Seq('1,小明', '2,小张', '3,小李', '4,小红', '5,张三'))
val s3 = s1.map(s => (s.split(',')(0), s.split(',')(0)))
val s4 = s2.map(s => (s.split(',')(0), s.split(',')(1)))
println(s3.join(s4).collectAsMap)
println(s3.leftOuterJoin(s4).collectAsMap)
println(s3.rightOuterJoin(s4).collectAsMap)
JavaRDD<String> s1 = sc.parallelize(Arrays.asList('1,3', '2,6', '3,8', '4,2'));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList('1,小明', '2,小张', '3,小李', '4,小红', '5,张三'));
JavaPairRDD<String, String> s3 = s1.mapToPair(s -> new Tuple2<>(s.split(',')[0], s.split(',')[1]));
JavaPairRDD<String, String> s4 = s2.mapToPair(s -> new Tuple2<>(s.split(',')[0], s.split(',')[1]));
System.out.println(s3.join(s4).collectAsMap());
System.out.println(s3.leftOuterJoin(s4).collectAsMap());
System.out.println(s3.rightOuterJoin(s4).collectAsMap());
s1 = sc.parallelize(('1,3', '2,6', '3,8', '4,2'))
s2 = sc.parallelize(('1,小明', '2,小张', '3,小李', '4,小红', '5,张三'))
s3 = s1.map(lambda s:(s.split(',')[0], s.split(',')[0]))
s4 = s2.map(lambda s:(s.split(',')[0], s.split(',')[1]))
print(s3.join(s4).collectAsMap())
print(s3.leftOuterJoin(s4).collectAsMap())
print(s3.rightOuterJoin(s4).collectAsMap())
获取两个集合的交集 :
例如:
val s1 = sc.parallelize(Seq('abc', 'dfe', 'hello'))
val s2 = sc.parallelize(Seq('fgh', 'nbv', 'hello', 'word', 'jkl', 'abc'))
println(
s1.intersection(s2).collect().toList
)
JavaRDD<String> s1 = sc.parallelize(Arrays.asList('abc', 'dfe', 'hello'));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList('fgh', 'nbv', 'hello', 'word', 'jkl', 'abc'));
System.out.println(
s1.intersection(s2).collect()
);
s1 = sc.parallelize(('abc', 'dfe', 'hello'))
s2 = sc.parallelize(('fgh', 'nbv', 'hello', 'word', 'jkl', 'abc'))
print(
s1.intersection(s2).collect()
)
获取差集,a - b
,取 a
集合中 b
集合没有的元素:
例如:
val s1 = sc.parallelize(Seq('abc', 'dfe', 'hello'))
val s2 = sc.parallelize(Seq('fgh', 'nbv', 'hello', 'word', 'jkl', 'abc'))
println(
s1.subtract(s2).collect().toList
)
JavaRDD<String> s1 = sc.parallelize(Arrays.asList('abc', 'dfe', 'hello'));
JavaRDD<String> s2 = sc.parallelize(Arrays.asList('fgh', 'nbv', 'hello', 'word', 'jkl', 'abc'));
System.out.println(
s1.subtract(s2).collect()
);
s1 = sc.parallelize(('abc', 'dfe', 'hello'))
s2 = sc.parallelize(('fgh', 'nbv', 'hello', 'word', 'jkl', 'abc'))
print(
s1.subtract(s2).collect()
)
元素去重,是一个需要 Shuffled
的操作:
例如:
val s1 = sc.parallelize(Seq('abc', 'abc', 'hello', 'hello', 'word', 'word'))
println(
s1.distinct().collect().toList
)
JavaRDD<String> s1 = sc.parallelize(Arrays.asList('abc', 'abc', 'hello', 'hello', 'word', 'word'));
System.out.println(
s1.distinct().collect()
);
s1 = sc.parallelize(('abc', 'abc', 'hello', 'hello', 'word', 'word'))
print(
s1.distinct().collect()
)
只能作用于 Key-Value
型数据,根据 Key
分组生成一个 Tuple
,然后针对每个组执行 reduce
算子,传入两个参数,一个是当前值,一个是局部汇总,这个函数需要有一个输出, 输出就是这个 Key
的汇总结果,是一个需要 Shuffled
的操作:
例如:
val s1 = sc.parallelize(Seq('abc', 'abc', 'hello', 'hello', 'word', 'word'))
println(
s1.map((_, 1))
.reduceByKey(Integer.sum)
.collectAsMap
)
JavaRDD<String> s1 = sc.parallelize(Arrays.asList('abc', 'abc', 'hello', 'hello', 'word', 'word'));
System.out.println(
s1.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey(Integer::sum)
.collectAsMap()
);
s1 = sc.parallelize(('abc', 'abc', 'hello', 'hello', 'word', 'word'))
print(
s1.map(lambda s: (s, 1))
.reduceByKey(lambda v1, v2: v1 + v2)
.collectAsMap()
)
只能作用于 Key-Value
型数据,根据 Key
分组, 和 ReduceByKey
有点类似, 但是 GroupByKey
并不求聚合, 只是列举 Key
对应的所有 Value
,是一个需要 Shuffled
的操作。
GroupByKey
和 ReduceByKey
不同,因为需要列举 Key
对应的所有数据, 所以无法在 Map
端做 Combine
, 所以 GroupByKey
的性能并没有 ReduceByKey
好:
例如:
val s1 = sc.parallelize(Seq('abc', 'abc', 'hello', 'hello', 'word', 'word'))
println(
s1.map((_, 1))
.groupByKey()
.collectAsMap
)
JavaRDD<String> s1 = sc.parallelize(Arrays.asList('abc', 'abc', 'hello', 'hello', 'word', 'word'));
System.out.println(
s1.mapToPair(s -> new Tuple2<>(s, 1))
.groupByKey()
.collectAsMap()
);
s1 = sc.parallelize(('abc', 'abc', 'hello', 'hello', 'word', 'word'))
print(
s1.map(lambda s: (s, 1))
.reduceByKey()
.collectAsMap()
)
联系客服