打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Spark计算结果继续追加在HDFS目录下,不会覆盖之前的文件

https://blog.csdn.net/ZMC921/article/details/74948786


由于工作需要,我用scala实现在已将有的目录下面继续写入文件。需要重写MultipleTextOutputFormat这个类,具体的请看下面代码,需要交流可以联系我

import java.text.SimpleDateFormat

import java.util.Date

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

import org.apache.hadoop.mapred.{InvalidJobConfException, JobConf}

import org.apache.hadoop.mapreduce.security.TokenCache

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

import org.apache.spark.rdd.RDD;

/**

  * 在HDFS目录下继续追加文件,不会覆盖以前的文件

  * Created by csw on 2017/6/23.

  */

object MultipleTextOutput {

  def main(args: Array[String]) {

    val filePath = "hdfs://master:9000/csw/tmp/data";

    val savePath = "hdfs://master:9000/hxzj/mydata/tatol";

    val conf = new SparkConf().setAppName("Spark shell")

    val sc = new SparkContext(conf)

    //读取文件后,不进行split操作,直接将整行内容看作key,

    val rdd: RDD[(String, String)] = sc.textFile(filePath).map(x => (x, ""))

    //rdd必须是(key,value)形式的

    RDD.rddToPairRDDFunctions(rdd).partitionBy(new HashPartitioner(1)).saveAsHadoopFile(savePath, classOf[String], classOf[String], classOf[RDDMultipleTextOutputFormat])

    sc.stop()

  }

  /**

    * 自定义一个输出文件类

    */

  case class RDDMultipleTextOutputFormat() extends MultipleTextOutputFormat[Any, Any] {

    val currentTime: Date = new Date()

    val formatter = new SimpleDateFormat("yyyy-MM-dd-HHmmss");

    val dateString = formatter.format(currentTime);

    //自定义保存文件名

    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {

      //key 和 value就是rdd中的(key,value),name是part-00000默认的文件名

      //保存的文件名称,这里用字符串拼接系统生成的时间戳来区分文件名,可以自己定义

      "HTLXYFY" + dateString

    }

    override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = {

      val name: String = job.get(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR)

      var outDir: Path = if (name == null) null else new Path(name)

      //当输出任务不等于0 且输出的路径为空,则抛出异常

      if (outDir == null && job.getNumReduceTasks != 0) {

        throw new InvalidJobConfException("Output directory not set in JobConf.")

      }

      //当有输出任务和输出路径不为null时

      if (outDir != null) {

        val fs: FileSystem = outDir.getFileSystem(job)

        outDir = fs.makeQualified(outDir)

        outDir = new Path(job.getWorkingDirectory, outDir)

        job.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outDir.toString)

        TokenCache.obtainTokensForNamenodes(job.getCredentials, Array[Path](outDir), job)

        //下面的注释掉,就不会出现这个目录已经存在的提示了

        /* if (fs.exists(outDir)) {

             throw new FileAlreadyExistsException("Outputdirectory"

                     + outDir + "alreadyexists");

         }

      }*/

      }

    }

  }

}

--------------------- 

原文链接:https://blog.csdn.net/ZMC921/article/details/74948786

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
提交任务到spark(以wordcount为例)
大数据时代到底Hadoop和Spark谁是王者!
大数据学习路线分享Master的jps
读写parquet格式文件的几种方式
Spark:一个高效的分布式计算系统 | UC技术博客
【小白视角】大数据基础实践(七) Spark的基本操作
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服