打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
大数据IMF传奇行动绝密课程第59课:使用Java和Scala在IDE中实战RDD和DataFrame转换操作

1、RDD与DataFrame转换的重大意义
2、使用Java实战RDD与DataFrame转换
3、使用Scala实战RDD与DataFrame转换
RDD接上数据库、接上文件系统,无限想象空间~,极大加速和简化了大数据开发
通过反射来预测转换
case class/JavaBean适合于知道RDD的元数据,
不知道RDD的元数据动态获取元数据
JavaBean不支持嵌套,也不可以有复杂数据结构(List等)
Person.class传进去后,会以反射的方式创建DataFrame

遇到错误

java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public"    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)    at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)    at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)    at java.lang.reflect.Method.invoke(Method.java:490)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356)    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)    at scala.collection.Iterator$class.foreach(Iterator.scala:727)    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)    at scala.collection.AbstractIterator.to(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)	at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)16/09/07 12:42:35 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessException: Class org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1 can not access a member of class com.tom.spark.SparkApps.sql.Person with modifiers "public"    at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)    at java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)    at java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)    at java.lang.reflect.Method.invoke(Method.java:490)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1$$anonfun$apply$1.apply(SQLContext.scala:1358)    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1358)    at org.apache.spark.sql.SQLContext$$anonfun$org$apache$spark$sql$SQLContext$$beansToRows$1.apply(SQLContext.scala:1356)    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)	at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)    at scala.collection.Iterator$class.foreach(Iterator.scala:727)    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)    at scala.collection.AbstractIterator.to(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)	at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)

需要将Person类放到一个单独的文件并标记为public

又遇到一个错误

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String    at org.apache.spark.sql.Row$class.getString(Row.scala:250)    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:192)    at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:57)    at com.tom.spark.SparkApps.sql.RDD2DataFrameByReflection$2.call(RDD2DataFrameByReflection.java:1)    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)    at scala.collection.Iterator$class.foreach(Iterator.scala:727)    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)    at scala.collection.AbstractIterator.to(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)    at org.apache.spark.scheduler.Task.run(Task.scala:89)    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)    at java.lang.Thread.run(Thread.java:745)

原因是列的顺序与原来不一致,列被排序了

Java实现

    public static void ByReflection(){        SparkConf conf = new SparkConf().setAppName("RDD2DataFrameByReflection").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(conf);        SQLContext sqlContext = new SQLContext(sc);        JavaRDD<String> lines = sc.textFile("F:\\sparkData\\personBean.txt");        JavaRDD<Person> persons = lines.map(new Function<String, Person>() {            public Person call(String line) throws Exception {                // TODO Auto-generated method stub                String[] arr = line.split(",");                Person p = new Person();                p.setId(Integer.valueOf(arr[0].trim()));                p.setName(arr[1].trim());                p.setAge(Integer.valueOf(arr[2].trim()));                return p;            }                   });        //在底层,通过反射的方式获得Person的所有fields,结合RDD本身,就生成了DataFrame        DataFrame df = sqlContext.createDataFrame(persons, Person.class);        //注册成临时表,在临时表上就可以写SQL        df.registerTempTable("persons");        DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");        bigDatas.show();        JavaRDD<Row> row = bigDatas.javaRDD();        JavaRDD<Person> result = row.map(new Function<Row, Person>() {            public Person call(Row row) throws Exception {                // TODO Auto-generated method stub                Person p = new Person();                p.setId(row.getInt(1));                p.setName(row.getString(2));                p.setAge(row.getInt(0));                return p;            }        });        List<Person> personlist = result.collect();        for(Person p : personlist) {            System.out.println(p);        }    }

Scala实现

  def ByReflection(): Unit = {    val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection")    val sc = new SparkContext(conf)    val sqlContext = new SQLContext(sc)    val lines = sc.textFile("F:\\sparkData\\personBean.txt")    val persons = lines.map(line => {      val arr = line.split(",")      Person(arr(0).toInt, arr(1), arr(2).toInt)    })    val df = sqlContext.createDataFrame(persons)    df.registerTempTable("persons")    df.printSchema()    val bigDatas = sqlContext.sql("select * from persons where age >= 6")    bigDatas.show()    val result = bigDatas.rdd.map(row => {      Person(row.getInt(0), row.getString(1), row.getInt(2))    })    result.collect().foreach(println)  }
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
spark
SparkSQL结合SparkStreaming,使用SQL完成实时计算中的数据统计
SparkSession简单介绍
Spark入门:读写Parquet(DataFrame)
理解Spark SQL(二)—— SQLContext和HiveContext
sparksql 报错Container killed by YARN for exceeding memory limits. xGB of x GB physical memory used. C
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服