创建RDD
1.
由外部存储系统的数据集创建
,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
val rdd1 = sc.textFile("hdfs://node01:8020/wordcount/input/words.txt")
2.通过已有的RDD经过算子转换生成新的RDD
val rdd2=rdd1.flatMap(_.split(" "))
3.由一个已经存在的Scala集合创建
val rdd3 = sc.
parallelize
(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.
makeRDD
(List(1,2,3,4,5,6,7,8))
makeRDD方法底层调用了parallelize方法
RDD的方法/算子分类
●分类
RDD的算子分为两类:
1.
Transformation转换操作:返回一个新的RDD
2.
Action动作操作:返回值不是RDD
(无返回值或返回其他的)
●注意:
RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系
(调用了什么方法,传入什么函数)
RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算
。只有当发生一个要求返回结果给Driver的
Action动作时,这些转换才会真正运行
。
之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行。
Transformation转换算子
转换
|
含义
|
map(func)
|
返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
|
filter(func)
|
返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
|
flatMap(func)
|
类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
|
mapPartitions(func)
|
类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
|
mapPartitionsWithIndex(func)
|
类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是
(Int, Interator[T]) => Iterator[U]
|
sample(withReplacement, fraction, seed)
|
根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
|
union(otherDataset)
|
对源RDD和参数RDD求并集后返回一个新的RDD
|
intersection(otherDataset)
|
对源RDD和参数RDD求交集后返回一个新的RDD
|
distinct([numTasks]))
|
对源RDD进行去重后返回一个新的RDD
|
groupByKey([numTasks])
|
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
|
reduceByKey(func, [numTasks])
|
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
|
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
|
|
sortByKey([ascending], [numTasks])
|
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
|
sortBy(func,[ascending], [numTasks])
|
与sortByKey类似,但是更灵活
|
join(otherDataset, [numTasks])
|
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
|
cogroup(otherDataset, [numTasks])
|
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
|
cartesian(otherDataset)
|
笛卡尔积
|
pipe(command, [envVars])
|
对rdd进行管道操作
|
coalesce(numPartitions)
|
减少 RDD 的分区数到指定值。在过滤大量数据之后,可以执行此操作
|
repartition(numPartitions)
|
重新给 RDD 分区
|
Action动作算子
动作
|
含义
|
reduce(func)
|
通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
|
collect()
|
在驱动程序中,以数组的形式返回数据集的所有元素
|
count()
|
返回RDD的元素个数
|
first()
|
返回RDD的第一个元素(类似于take(1))
|
take(n)
|
返回一个由数据集的前n个元素组成的数组
|
takeSample(withReplacement,num, [seed])
|
返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
|
takeOrdered(n, [ordering])
|
返回自然顺序或者自定义顺序的前 n 个元素
|
saveAsTextFile(path)
|
将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
|
saveAsSequenceFile(path)
|
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
|
saveAsObjectFile(path)
|
将数据集的元素,以 Java 序列化的方式保存到指定的目录下
|
countByKey()
|
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
|
foreach(func)
|
在数据集的每一个元素上,运行函数func进行更新。
|
foreachPartition(func)
|
在数据集的每一个分区上,运行函数func
|
统计操作
算子
|
含义
|
count
|
个数
|
mean
|
均值
|
sum
|
求和
|
max
|
最大值
|
min
|
最小值
|
variance
|
方差
|
sampleVariance
|
从采样中计算方差
|
stdev
|
标准差:衡量数据的离散程度
|
sampleStdev
|
采样的标准差
|
stats
|
查看统计结果
|