打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
第102讲: 动手实战Spark Streaming自定义Receiver并进行调试和测试


1:SparkStreaming虽然说已经支持了很多不同类型的数据来源。但是有时候可能我们的一些数据来源非常特殊 ,不是它天然默认支持的,这时候就要自定义Receiver。而自定义Receiver,一般都是基于网络的方式。因为你传数据的话,一般是从另外一个网络端口传过来,至于传的协议是另外一码事。

2:从本质上来说,SparkStreaming中的所有Receiver,都是自定义的Receiver。所以你要想自定义一个Receiver,最最简单的方式,你就是看下已有的Receiver怎么去实现。

具体步骤:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

class CustomReceiver(host: String, port: Int)  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {  def onStart() {// Start the thread that receives data over a connectionnew Thread("Socket Receiver") {  override def run() { receive() }//开启线程调receiver()方法}.start()  }  def onStop() {   // There is nothing much to do as the thread calling receive()   // is designed to stop by itself if isStopped() returns false  }  /** Create a socket connection and receive data until receiver is stopped */  private def receive() {var socket: Socket = nullvar userInput: String = nulltry { // Connect to host:port,Receiver的时候就连上Socket socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) userInput = reader.readLine() while(!isStopped && userInput != null) {   store(userInput)   userInput = reader.readLine()//每读一行,存一次,一直循环 } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again")//先stop,然后再start()} catch { case e: java.net.ConnectException =>   // restart if could not connect to server   restart("Error connecting to " + host + ":" + port, e) case t: Throwable =>   // restart if there is any other error   restart("Error receiving data", t)}  }}
上面就已经自定义完了一个Receiver,下面就new出它的对象,传进去。因为返回的是Dstream,以前对Dstream怎么操作,继续怎么操作,这里先从flatMap开始。
// Assuming ssc is the StreamingContextval customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))val words = lines.flatMap(_.split(" "))...
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Spark Streaming任务延迟监控及告警
Apache Spark源码走读之4
Spark Streaming数据接收和并行度的关系
Spark
关键七步,用Apache Spark构建实时分析Dashboard
大数据开发-Spark-Streaming处理数据到mysql
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服