打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
如何使用python将Spark数据写入ElasticSearch

这里以将Apache的日志写入到ElasticSearch为例,来演示一下如何使用Python将Spark数据导入到ES中。

实际工作中,由于数据与使用框架或技术的复杂性,数据的写入变得比较复杂,在这里我们简单演示一下。

如果使用Scala或Java的话,Spark提供自带了支持写入ES的支持库,但Python不支持。所以首先你需要去这里下载依赖的ES官方开发的依赖包包。

下载完成后,放在本地目录,以下面命令方式启动pyspark:

     pyspark --jars elasticsearch-hadoop-6.4.1.jar

如果你想pyspark使用Python3,请设置环境变量:

 export PYSPARK_PYTHON=/usr/bin/python3

理解如何写入ES的关键是要明白,ES是一个JSON格式的数据库,它有一个必须的要求。数据格式必须采用以下格式

 
{ "id: { the rest of your json}}

往下会展示如何转换成这种格式。

解析Apache日志文件

我们将Apache的日志文件读入,构建Spark RDD。然后我们写一个parse()函数用正则表达式处理每条日志,提取我们需要的字

rdd = sc.textFile("/home/ubuntu/walker/apache_logs")regex='^(\S ) (\S ) (\S ) \[([\w:/] \s[ \-]\d{4})\] "(\S )\s?(\S )?\s?(\S )?" (\d{3}|-) (\d |-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'
 
p=re.compile(regex)def parse(str):    s=p.match(str)    d = {}    d['ip']=s.group(1)    d['date']=s.group(4)    d['operation']=s.group(5)    d['uri']=s.group(6)    return d  

换句话说,我们刚开始从日志文件读入RDD的数据类似如下:

['83.149.9.216 - - [17/May/2015:10:05:03  0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"']

然后我们使用map函数转换每条记录:

rdd2 = rdd.map(parse)rdd2.take(1)[{'date': '17/May/2015:10:05:03  0000', 'ip': '83.149.9.216', 'operation': 'GET', 'uri': '/presentations/logstash-monitorama-2013/images/kibana-search.png'}]

现在看起来像JSON,但并不是JSON字符串,我们需要使用json.dumps将dict对象转换。

我们同时增加一个doc_id字段作为整个JSON的ID。在配置ES中我们增加如下配置“es.mapping.id”: “doc_id”告诉ES我们将这个字段作为ID。

这里我们使用SHA算法,将这个JSON字符串作为参数,得到一个唯一ID。
计算结果类似如下,可以看到ID是一个很长的SHA数值。

rdd3.take(1)[('a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c', '{"date": "17/May/2015:10:05:03  0000", "ip": "83.149.9.216", "operation": "GET", "doc_id": "a5b086b04e1cc45fb4a19e2a641bf99ea3a378599ef62ba12563b75c", "uri": "/presentations/logstash-monitorama-2013/images/kibana-search.png"}')]
 

现在我们需要制定ES配置,比较重要的两项是:

  • “es.resource” : ‘walker/apache’: "walker"是索引,apache是类型,两者一般合称索引

  • “es.mapping.id”: “doc_id”: 告诉ES那个字段作为整个文档的ID,也就是查询结果中的_id
    其他的配置自己去探索。

然后我们使用saveAsNewAPIHadoopFile()将RDD写入到ES。这部分代码对于所有的ES都是一样的,比较固定,不需要理解每一个细节

es_write_conf = {        "es.nodes" : "localhost",        "es.port" : "9200",        "es.resource" : 'walker/apache',        "es.input.json": "yes",        "es.mapping.id": "doc_id"    }       rdd3.saveAsNewAPIHadoopFile(        path='-',     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",        conf=es_write_conf)rdd3 = rdd2.map(addID)def addId(data):    j=json.dumps(data).encode('ascii', 'ignore')    data['doc_id'] = hashlib.sha224(j).hexdigest()    return (data['doc_id'], json.dumps(data))

最后我们可以使用curl进行查询

curl http://localhost:9200s/walker/apache/_search?pretty=true&?q=*{        "_index" : "walker",        "_type" : "apache",        "_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",        "_score" : 1.0,        "_source" : {          "date" : "17/May/2015:10:05:32  0000",          "ip" : "91.177.205.119",          "operation" : "GET",          "doc_id" : "227e977849bfd5f8d1fca69b04f7a766560745c6cb3712c106d590c2",          "uri" : "/favicon.ico"        }

如下是所有代码:

import jsonimport hashlibimport redef addId(data):    j=json.dumps(data).encode('ascii', 'ignore')    data['doc_id'] = hashlib.sha224(j).hexdigest()    return (data['doc_id'], json.dumps(data))def parse(str):    s=p.match(str)    d = {}    d['ip']=s.group(1)    d['date']=s.group(4)    d['operation']=s.group(5)    d['uri']=s.group(6)    return d    regex='^(\S ) (\S ) (\S ) \[([\w:/] \s[ \-]\d{4})\] "(\S )\s?(\S )?\s?(\S )?" (\d{3}|-) (\d |-)\s?"?([^"]*)"?\s?"?([^"]*)?"?$'p=re.compile(regex)rdd = sc.textFile("/home/ubuntu/walker/apache_logs")rdd2 = rdd.map(parse)rdd3 = rdd2.map(addID)es_write_conf = {        "es.nodes" : "localhost",        "es.port" : "9200",        "es.resource" : 'walker/apache',        "es.input.json": "yes",        "es.mapping.id": "doc_id"    }     rdd3.saveAsNewAPIHadoopFile(        path='-',     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",       keyClass="org.apache.hadoop.io.NullWritable",        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",        conf=es_write_conf)




链接:https://www.jianshu.com/p/c7365b9bda0a

来源:https://www.icode9.com/content-1-677701.html
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
ElasticSearch 亿级数据检索案例实战!
Elasticsearch对Hbase中的数据建索引实现海量数据快速查询
python中的Elasticsearch操作
用Elasticsearch构建电商搜索平台,一个极有代表性的基础技术架构和算法实践案例
使用Kafka和ksqlDB构建和部署实时流处理ETL引擎
深入浅出 spring
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服