打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
070 DStream中的transform和foreachRDD函数

070 DStream中的transform和foreachRDD函数

1.说明
  DStream的API不够满足使用的时候,可以使用这两个函数,将dstream转换为rdd,然后进行操作

 

2.transform

  transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可

 

3.程序

 1 package com.window.it 2 import org.apache.spark.{SparkConf, SparkContext} 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} 5 import org.apache.spark.streaming.dstream.DStream 6 import org.apache.spark.streaming.kafka.KafkaUtils 7 object TransformDemo { 8   def main(args: Array[String]): Unit = { 9     val conf = new SparkConf()10       .setAppName("StreamingWindowOfKafka")11       .setMaster("local[*]")12     val sc = SparkContext.getOrCreate(conf)13     val ssc = new StreamingContext(sc, Seconds(5))14     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir15     // 路径对应的文件夹不能存在16     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712")17 18     val kafkaParams = Map(19       "group.id" -> "streaming-kafka-78912151",20       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",21       "auto.offset.reset" -> "smallest"22     )23     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于124     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](25       ssc, // 给定SparkStreaming上下文26       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接27       topics, // 给定读取对应topic的名称以及读取数据的线程数量28       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别29     ).map(_._2)30 31     val resultWordCount = dstream32       .filter(line => line.nonEmpty)33       .flatMap(line => line.split(" ").map((_, 1)))34       .reduceByKeyAndWindow(35         (a: Int, b: Int) => a + b,36         Seconds(15), // 窗口大小37         Seconds(10) // 滑动大小38       )39     resultWordCount.print() // 这个也是打印数据40 41     /**42       * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可43       */44     dstream.transform(rdd => {45       // 对rdd进行预处理46       val processedRDD = rdd47         .filter(line => line.nonEmpty)48         .flatMap(line => line.split(" ").map((_, 1)))49         .reduceByKey(_ + _)50       // 数据抽样,获取两个节点51       val seeder = processedRDD.takeSample(true, 2)52       // 对rdd进行处理操作, 将抽样数据和rdd中的数据进行比较,如果rdd中的word的出现次数大于等于抽样数据中的任何一个word的次数,次数*3;否则次数*253       val brocast = rdd.sparkContext.broadcast(seeder)54       val resultRDD = processedRDD.mapPartitions(iter => {55         val seederValue = brocast.value56         iter.map {57           case (word, count) => {58             val vc = seederValue59               .filter(tuple => {60                 count >= tuple._261               }).size62             if (vc == 0) {63               (word, 2, count * 2)64             } else {65               (word, 3, count * 3)66             }67           }68         }69       })70       resultRDD71     }).print()72     73     // 启动开始处理74     ssc.start()75     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作76   }77 }

 

4.foreachRDD

  作用和transform类型,将DStream的操作转换为RDD进行操作,区别:该api没有返回值

 

5.程序

 1 package com.window.it 2  3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.kafka.KafkaUtils 5 import org.apache.spark.streaming.{Seconds, StreamingContext} 6 import org.apache.spark.{SparkConf, SparkContext} 7  8 object TransformDemo { 9   def main(args: Array[String]): Unit = {10     val conf = new SparkConf()11       .setAppName("StreamingWindowOfKafka")12       .setMaster("local[*]")13     val sc = SparkContext.getOrCreate(conf)14     val ssc = new StreamingContext(sc, Seconds(5))15     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir16     // 路径对应的文件夹不能存在17     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712")18 19     val kafkaParams = Map(20       "group.id" -> "streaming-kafka-78912151",21       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",22       "auto.offset.reset" -> "smallest"23     )24     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于125     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](26       ssc, // 给定SparkStreaming上下文27       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接28       topics, // 给定读取对应topic的名称以及读取数据的线程数量29       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别30     ).map(_._2)31 32     val resultWordCount = dstream33       .filter(line => line.nonEmpty)34       .flatMap(line => line.split(" ").map((_, 1)))35       .reduceByKeyAndWindow(36         (a: Int, b: Int) => a + b,37         Seconds(15), // 窗口大小38         Seconds(10) // 滑动大小39       )40     resultWordCount.print() // 这个也是打印数据41 42     dstream.foreachRDD(rdd => {43       // TODO: 这里就可以做数据输出的代码编写44       // TODO: 这里不要为空45       rdd.foreachPartition(iter => {46         // TODO: 这里在实际环境中不要为空,为空可能会出现一些问题:内存泄露的问题47         println(iter.take(1))48       })49     })50     51     // 启动开始处理52     ssc.start()53     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作54   }55 }

 

6.注意点

  一个批次,DStream内部就只对应一个RDD,transform和foreachRDD API使用的过程中,不要考虑多个RDD的问题

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Spark Streaming——Spark第一代实时计算引擎
Spark6_Spark Stream、Spark集成kafka、Spark图计算
Spark定制班第1课:通过案例对Spark Streaming透彻理解三板斧之一:解密Spark Streaming另类实验及Spark Streaming本质解析
简单之美 | Kafka+Spark Streaming+Redis实时计算整合实践
spark流数据处理:Spark Streaming的使用
Spark版本定制班第1课-Frank
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服