打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Spark学习实例(Python):加载数据源Load Data Source

我们在使用Spark的时候主要是用来快速处理大批量的数据,那么实际开发和生产中会有哪些数据来源呢,我归类总结有:

  • text
  • csv
  • json
  • parquet
  • jdbc
  • hive
  • kafka
  • elasticsearch

接下来所有的测试是基于spark local模式,因为local模式便于测试不依赖spark集群环境。有一点要注意将代码运行在spark集群上时要将.master("local[*]")这行去掉,同时需要修改相应的路径名才能访问本地机器文件,以/tmp/people.txt文件为例:

local模式:/tmp/people.txt

集群模式:file:///tmp/people.txt 相当于local模式/tmp/people.txt

                    hdfs://master:8020/tmp/people.txt 分布式系统文件

在学习各种数据来源前先了解一种最基本的数据源,那就是数据集,也就是我们根据自身开发需求制造出来的数据,常常用在开发和测试一些简单功能上面。

开始编写代码制造数据集并形成dataframe显示出来

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadDatas")        .master("local[*]")        .enableHiveSupport()        .getOrCreate()    datas = [('Jack', 27), ('Rose', 24), ('Andy', 32)]    df = spark.createDataFrame(datas, ['name', 'age'])    df.show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

text

数据源people.txt内容是

Jack 27
Rose 24
Andy 32

编写代码加载people.txt并通过sql显示出来

from pyspark.sql import SparkSessionfrom pyspark.sql import Rowif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadTextData")        .master("local[*]")        .getOrCreate()    lines = spark.sparkContext.textFile("/home/llh/data/people.txt")    parts = lines.map(lambda line: line.split(" "))    people = parts.map(lambda p: Row(name=p[0], age=p[1]))    peopledf = spark.createDataFrame(people)    peopledf.show()    #  --- ----     # |age|name|    #  --- ----     # | 27|Jack|    # | 24|Rose|    # | 32|Andy|    #  --- ----     peopledf.createOrReplaceTempView("people")    namedf = spark.sql("select name from people where age < 30")    namedf.show()    #  ----     # |name|    #  ----     # |Jack|    # |Rose|    #  ----     spark.stop()

csv

数据源people.csv内容是

Jack,27
Rose,24
Andy,32

编写代码加载csv数据并显示出来

from pyspark.sql import SparkSessionfrom pyspark.sql.types import *import pandas as pdif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadCsvData")        .master("local[*]")        .getOrCreate()    # 方式一: 与Text生成的表头的另外一种形式    schema = StructType([        StructField("name", StringType(), True),        StructField("age", IntegerType(), True)    ])    peopledf = spark.read.csv("/home/llh/data/people.csv", schema=schema)    peopledf.show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     # 方式二: 该方式并未使用Spark    data = pd.read_csv("/home/llh/data/people.csv", names=['name','age'])    print(data.head())    #    name  age    # 0  Jack   27    # 1  Rose   24    # 2  Andy   32    spark.stop()

json

数据源people.json内容是:

{"name":"Jack", "age":27}
{"name":"Rose", "age":24}
{"name":"Andy"}

编写代码加载json数据并通过接口显示

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadJsonData")        .master("local[*]")        .getOrCreate()    peopledf = spark.read.json("/home/llh/data/people.json")    peopledf.show()    #  ---- ----     # | age|name|    #  ---- ----     # | 27 |Jack|    # | 24 |Rose|    # |null|Andy|    #  ---- ----     peopledf.printSchema()    # root    # | -- age: long(nullable=true)    # | -- name: string(nullable=true)    peopledf.select('name').show()    #  ----     # |name|    #  ----     # |Jack|    # |Rose|    # |Andy|    #  ----     peopledf.select(peopledf['name'],peopledf['age'] 1).show()    #  ---- ---------     # |name|(age   1)|    #  ---- ---------     # |Jack|       28|    # |Rose|       25|    # |Andy|     null|    #  ---- ---------     peopledf.filter(peopledf['age'] > 25).show()    #  --- ----     # |age|name|    #  --- ----     # | 27|Jack|    #  --- ----     peopledf.groupBy("age").count().show()    #  ---- -----     # | age|count|    #  ---- -----     # |null|    1|    # |  27|    1|    # |  24|    1|    #  ---- -----     spark.stop()

parquet

这种格式数据一般存放在hdfs上,用一般编辑器打开会显示一堆乱码

编写代码加载parquet数据并显示出来

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadParquetData")        .master("local[*]")        .getOrCreate()    peopledf = spark.read.parquet("/home/llh/data/people.parquet")    peopledf.createOrReplaceTempView("people")    namedf = spark.sql("select name from people where age < 30")    namedf.show()    #  ----     # |name|    #  ----     # |Jack|    # |Rose|    #  ----     spark.stop()    

jdbc

jdbc可以包含mysql、oracle、tidb等,我们这里以mysql为例,数据库是test,表为people

编写代码加载mysql数据库并显示

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadJdbcData")        .master("local[*]")        .getOrCreate()    peopledf = spark.read        .format("jdbc")        .option("url", "jdbc:mysql://localhost:3306/test")        .option("driver", "com.mysql.jdbc.Driver")        .option("dbtable", "(select * from people) tmp")        .option("user", "root")        .option("password", "1")        .load()    peopledf.show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

运行时可以会报找不到mysql驱动:java.lang.ClassNotFoundException: com.mysql.jdbc.Driver,解决办法是mysql驱动下载一个驱动放到pyspark安装目录jars下,默认在/usr/local/lib/python3.7/site_package/pyspark/jars/

hive

hive数据存放文件分隔符是一种特殊符号"^A",而且一般的spark配置了hive数据库信息,所以可以直接读取hive数据库

编写代码加载people.hive到people表中并显示出来

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadHiveData")        .master("local[*]")        .enableHiveSupport()        .getOrCreate()    spark.sql("create table if not exists people (name string, age int) using hive")    spark.sql("load data local inpath '/home/llh/data/people.hive' into table people")    spark.sql("select * from people").show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

kafka

kafka与spark结合常用于实时项目,也就是spark streaming后续会单独写

elasticsearch

es与mysql等数据库类似

编写代码加载并显示出来

from pyspark.sql import SparkSessionif __name__ == '__main__':    spark = SparkSession        .builder        .appName("loadEsData")        .master("local[*]")        .enableHiveSupport()        .getOrCreate()    peopledf = spark.read        .format("org.elasticsearch.spark.sql")        .option("es.nodes", "localhost")        .option("es.port", 9200)        .option("es.resource", "people/data")        .load()    peopledf.registerTempTable("people")    spark.sql("select * from people").show()    #  ---- ---     # |name|age|    #  ---- ---     # |Jack| 27|    # |Rose| 24|    # |Andy| 32|    #  ---- ---     spark.stop()

以上是比较常用的数据来源,当然还有一些比如hbase、phoenix等等...掌握上面的几种再举一反三问题不大。

 

 

 

 

 

来源:https://www.icode9.com/content-1-384101.html
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
hive加载json数据和解析json
spark sql根本使用方法介绍
大数据IMF传奇行动绝密课程第58课:使用Java和Scala在IDE中开发DataFrame实战
“模板”学习笔记(3)
【Hive】各种join连接用法
Python之pyspark:pyspark的简介、安装、使用方法之详细攻略
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服