1.首先用sqoop将mysql数据定时导入到hdfs中,然后用spark streaming实时读取hdfs的数据,并把数据写入elasticsearch中。代码如下
------bigdata.project.spark----------package bigdata.project.sparkimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.elasticsearch.spark.sql._object sparkstreamingcopynew { def main(args: Array[String]): Unit = { val sparkconf = new SparkConf().setMaster("local[2]").setAppName("sparkstreamingcopynew") sparkconf.set("es.nodes", "localhost") sparkconf.set("es.port", "9200") sparkconf.set("es.index.auto.create", "true") sparkconf.set("spark.driver.allowMultipleContexts","true") sparkconf.set("empty", "true") val sc= new SparkContext(sparkconf) val ssc = new StreamingContext(sc,Seconds(10)) import org.apache.spark.streaming.Time val lines = ssc.textFileStream("hdfs://hadoop:9000/ershoufang") lines.foreachRDD((rdd: RDD[String],time: Time)=> { val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ val wordsDataFrame = rdd.map(x => (x.split(",")(0), x.split(",")(1), x.split(",")(2), x.split(",")(3), x.split(",")(4), x.split(",")(5), x.split(",")(6), x.split(",")(8), x.split(",")(9), x.split(",")(10), x.split(",")(11),x.split(",")(12))) .map(w => RecordEs(w._1.toInt, w._2, w._3,w._4,w._5,w._6,w._7,w._8,w._9,w._10,w._11,w._12)).toDF() val dataDS=wordsDataFrame.as[RecordEs] //val datardd= wordsDataFrame.rdd EsSparkSQL.saveToEs(dataDS,"zufang/docs") wordsDataFrame.registerTempTable("RecordEs") val wordCountsDataFrame = sqlContext.sql("select id,title,city,huose_type,area,location,direction,price, url,origin,publish_date,true_date from RecordEs") println(s"========= $time =========") wordCountsDataFrame.show() }) ssc.start() ssc.awaitTermination() }}
package bigdata.project.sparkimport org.apache.spark.SparkContextimport org.apache.spark.sql.SQLContextobject SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance }}
package bigdata.project.sparkpackage bigdata.project.sparkcase class RecordEs(id: Int,title: String,city: String,huose_type: String,area:String,location:String,direction:String ,price:String, url:String,origin:String,publish_date:String,true_date:String)
来源:http://www.icode9.com/content-4-241101.html
联系客服