打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Spark构建推荐引擎之二:基于Spark Streaming 实时推荐计算

1.1 数据输入模型

1)用户数据输入数据格式:

用户ID,物品ID,点击次数。

2)相似矩阵输入数据格式:

物品ID,物品ID,相似度

1.2 物品相似矩阵

 采用SparkContext读取物品的相似矩阵:

    //2 sc 读取相似矩阵

    valsimi_path1 ="hdfs://192.168.180.100:9000/data/simi/simi.txt"

    valsimi_rdd1 =sc.textFile(simi_path1,10)

    valsimi_rdd2 =simi_rdd1.map(line => {

      valfileds = line.split(",")

      (fileds(0),fileds(1),fileds(2).toDouble)

    })

    simi_rdd2.cache()

1.3 用户实时评分计算

 采用Spark Streaming实时计算用户的评分数据:

//3 构建Streaming对象

    valssc =new StreamingContext(sc,batchDuration)

    ssc.checkpoint("hdfs://192.168.180.100:9000/data/check")

 

    //4 用户实时数据收集

    //4.1 设置监听目录

    valSpath1 =directory

    valStream_User_Act1 =ssc.fileStream[LongWritable, Text, TextInputFormat](Spath1).map(_._2.toString)

    Stream_User_Act1.checkpoint(slideDuration)

    //4.2 用户操作数据——流式窗口处理

    //用户ID,物品ID,点击次数

    valStream_User_Act2 =Stream_User_Act1.map(line => {

      valfileds = line.split(",")

      ((fileds(0),fileds(1)),fileds(2).toInt)

    })

    valStream_User_Act3 =Stream_User_Act2.reduceByKeyAndWindow(_ + _, _ - _,windowDuration,slideDuration)

    valStream_User_Act4 =Stream_User_Act3.reduceByKey(_ + _).map(f => (f._1._1, f._1._2, f._2))

1.4 实时推荐计算

//5 实时推荐计算

    valRecommend_app1 =Stream_User_Act4.transform(rdd => Recommend1(simi_rdd2, rdd,r_number))

def Recommend1(

    items_similar: RDD[(String, String, Double)],

    user_prefer: RDD[(String, String, Int)],

    r_number: Int): (RDD[(String, String, Double)]) = {

    //   1 矩阵计算——i行与jjoin

    valrdd_app1_R2 =items_similar.map(f => (f._1, (f._2, f._3))).

      join(user_prefer.map(f => (f._2, (f._1, f._3))))

    //   2 矩阵计算——i行与j列元素相乘

    valrdd_app1_R3 =rdd_app1_R2.map(f => ((f._2._2._1, f._2._1._1), f._2._2._2 * f._2._1._2))

    //   3 矩阵计算——用户:元素累加求和

    valrdd_app1_R4 =rdd_app1_R3.reduceByKey((x, y) => x + y).map(f => (f._1._1, (f._1._2, f._2)))

    //   4 矩阵计算——用户:用户对结果排序,过滤

    valrdd_app1_R5 =rdd_app1_R4.groupByKey()

    valrdd_app1_R6 =rdd_app1_R5.map(f => {

      vali2 = f._2.toBuffer

      vali2_2 =i2.sortBy(_._2)

      if (i2_2.length > r_number)i2_2.remove(0, (i2_2.length - r_number))

      (f._1, i2_2.toIterable)

    })

    valrdd_app1_R7 =rdd_app1_R6.flatMap(f => {

      valid2 = f._2

      for (w <-id2)yield (f._1,w._1,w._2)

    })

    rdd_app1_R7.map(f => f._1).distinct.count

    rdd_app1_R7

  }

1.5 结果输出

    //6 结果输出   

    valoutpath ="hdfs://192.168.180.79:9000/output/recommend"

    Recommend_app1.saveAsTextFiles(outpath,"rdd")

    //7 监听启动

    ssc.start()

    ssc.awaitTermination()

 

转载请注明出处:

http://blog.csdn.net/sunbow0/article/details/43456805

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
spark流数据处理:Spark Streaming的使用
Spark定制班第1课:通过案例对Spark Streaming透彻理解三板斧之一:解密Spark Streaming另类实验及Spark Streaming本质解析
070 DStream中的transform和foreachRDD函数
Spark Streaming——Spark第一代实时计算引擎
Apache Spark源码走读之5
Spark Streaming为啥要设置两条线程?
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服