Spark多路径输入输出

大漠孤烟直,长河落日圆。

唐·王维·使至塞上 

spark的多目录输入与输出

完整代码

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.{SparkConf, SparkContext}

object MultipleInAndOut {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MultipleInAndOut").setMaster("local")
val sc = new SparkContext(conf)
val filesRDD = sc.textFile("/input/*/part-*")
.map(x=>(x.split(" ")(0),x.split(" ")(1)))
.saveAsHadoopFile("/output",classOf[String],classOf[String],classOf[RDDMultipleFormat])
}
}
class RDDMultipleFormat extends MultipleTextOutputFormat[Any,Any]{
override def generateActualKey(key: Any, value: Any): AnyRef = NullWritable.get()
//override def generateActualValue(key: Any, value: Any): AnyRef = NullWritable.get()
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]+"/"+name
}

多目录输入

// read all files(files in different directorys)

sc.textFile("/input/*/part-*")

目录文件情况:

input
|---big
| |---part-00002
| |20180912 gggg
| |20180912 hhhh
|---middle
| |---part-00001
| |20180911 dddd
| |20180911 eeee
| |20180911 ffff
|---small
|---part-00000
|20180910 aaaa
|20180910 bbbb
|20180910 cccc

多目录输出

输出结果

output
|
|---20180910
| |---part-00002
| |aaaa
| |bbbb
| |cccc
|---20180911
| |---part-00001
| |dddd
| |eeee
| |ffff
|---20180912
| |---part-00000
| |gggg
| |hhhh

解析

在MultipleTextOutputFormat类中generateActualKey函数和generateActualValue函数控制程序是否在文件输出key和value,想要排除谁重写哪个函数即可

generateFileNameForKeyValue函数有三个参数key(即字面的意思,数据的key)、value(即字面意思,数据的value)、name(每个reduce的编号,即正常输出时的文件名part-000),返回值就是输出文件名

在你所提供的目录下,通过<key+”/“+name>(name可加可不加)的形式实现多级目录的输出,可以添加更多层级的目录<xx+”/“+xxx+”/“+key+”/“+name>(name可加可不加),如果没有使用”/“就会在提供的目录下按照文件名生成文件,建议追加name否则容易造成多个reduce写在一个文件并且文件内容只会有一个reduce输出内容

generateActualKey函数中的 NullWritable.get(),NullWritable是Writable的一个特殊类,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符

0%