【实战篇】解析 Python之父写的网络爬虫 异步爬虫
下面的程序码程序是来自于Python 之父 Guido van Rossum和A. Jesse Jiryu Davis所写的虫子一起写的网页爬虫主要是展示如何使用使用 aiohttp 来网页写异步爬取。
作者:A. Jesse Jiryu Davis 和 Guido van Rossum
项目:网络爬虫钍is 是一个网络爬虫。您给它一个 URL,它会通过 HTML 页面中的 href 链接来抓取该网站。
该示例的重点是展示如何使用 asyncio 模块编写相当复杂的 HTTP 客户端应用程序。这个模块最初被称为 Tulip,是 Python 3.4 标准库中的新模块,基于 PEP 3156。该示例使用了一个名为“aiohttp”的异步 HTTP 客户端实现,由 Andrew Svetlov、Nikolay Kim 和其他人编写。
https://github.com/aosabook/500lines/tree/master/crawler
文章目录
ㄧ. 将部分程序更新
原始版本 GitHub ·刚刚发布的位置:500lines/crawling.py at master · aosabook/500lines GitHub
因为 GitHub 更新时间是四年前,下载下来,使用一些方法更新:
▍Python 3.8+ 移除 urllib.parse.splitport()
urllib.parse.splitport() 将在 Python 3.8 被移除,所以改使用 urlparse 和 hostname 的方法取得 hostname
parts = urllib.parse.urlparse(root)
host = parts.hostname
参考文件:问题 27485: urllib.splitport — 是不是官方的?- Python追踪器
▍Python3.10+ 即将移除 @asyncio.coroutine
Generator-based coroutine 的方式将在 Python 3.10 中被移除,所以这样的语法将改用 Native coroutine 的方式,使用 Python 3.5+ library 中的 async / await 来选择@asyncio.coroutine
参考文件:Coroutines and Tasks — Python 3.8.2 文档
▍asyncio.get_event_loop
Python 3.7 推出了更多的方法,将使用 event_loop 封装,asyncio.run() 一行程序就结束,不用在创建 event_loop 结束时也不需要 loop.close。
参考文件:cpython/runners.py at 3.8 · python/cpython · GitHub
二. 解析开始
▍Python环境配置:
- Python 3.7+
▍pip install 安装套件:
pip install aiohttp
▍开始解析
可以看到 Crawler 里面,我用注解来拆成三个部分
#解析爬取到的url是否符合需求规范
#爬取到的url将列队
#主要运行的联络函式
▍完整程序如下:
我将程序阅读步骤注解写在旁边,建议由下往上开始阅读
import asyncio
import aiohttp # Install with "pip install aiohttp".
from asyncio import Queue
import cgi
from collections import namedtuple
import logging
import re
import time
import urllib.parse
from urllib.parse import urljoin
FetchStatistic = namedtuple('FetchStatistic', [
'url', 'next_url', 'status', 'exception', 'size', 'content_type',
'encoding', 'num_urls', 'num_new_urls'
])
def lenient_host(host):
parts = host.split('.')[-2:]
return ''.join(parts)
def is_redirect(response):
return response.status in (300, 301, 302, 303, 307)
class Crawler:
def __init__(
self,
roots,
exclude=None,
strict=True, # What to crawl.
max_redirect=10,
max_tries=4, # Per-url limits.
max_tasks=10):
self.roots = roots # 使用者指定抓取的網站地址,是一個 list
self.exclude = exclude
self.strict = strict
self.max_redirect = max_redirect
self.max_tries = max_tries
self.max_tasks = max_tasks
self.seen_urls = set() # 會保證不重複 url 與和已經抓取過的 url
self.done = []
self.root_domains = set()
# 解析爬取到的 url 是否符合需求規範
def host_okay(self, host):
"""Check if a host should be crawled.
A literal match (after lowercasing) is always good. For hosts
that don't look like IP addresses, some approximate matches
are okay depending on the strict flag.
"""
host = host.lower()
if host in self.root_domains:
return True
if re.match(r'\A[\d\.]*\Z', host):
return False
if self.strict:
return self._host_okay_strictish(host)
else:
return self._host_okay_lenient(host)
def _host_okay_strictish(self, host):
"""Check if a host should be crawled, strict-ish version.
This checks for equality modulo an initial 'www.' component.
"""
host = host[4:] if host.startswith('www.') else 'www.' + host
return host in self.root_domains
def _host_okay_lenient(self, host):
"""Check if a host should be crawled, lenient version.
This compares the last two components of the host.
"""
return lenient_host(host) in self.root_domains
def url_allowed(self, url):
if self.exclude and re.search(self.exclude, url):
print('--------------------exclude', url)
return False
parts = urllib.parse.urlparse(url)
if parts.scheme not in ('http', 'https'):
return False
host = parts.hostname
if not self.host_okay(host):
return False
return True
# 將爬取到的 url 放入列隊
def add_url(self, url, max_redirect=None):
# print(url)
if max_redirect is None:
max_redirect = self.max_redirect
self.seen_urls.add(url)
self.q.put_nowait((url, max_redirect))
def record_statistic(self, fetch_statistic):
"""Record the FetchStatistic for completed / failed URL."""
self.done.append(fetch_statistic)
# 以下為主要運行的異步函式
# Step 5
async def parse_links(self, response):
links = set()
content_type = None
encoding = None
body = await response.read()
if response.status == 200:
content_type = response.headers.get('content-type')
pdict = {}
if content_type:
content_type, pdict = cgi.parse_header(content_type)
encoding = pdict.get('charset', 'utf-8')
if content_type in ('text/html', 'application/xml'):
text = await response.text()
urls = set(re.findall(r'''(?i)href=["']([^\s"'<>]+)''', text))
for url in urls:
url_join = urllib.parse.urljoin(str(response.url), url)
defragmented, frag = urllib.parse.urldefrag(url_join)
if self.url_allowed(defragmented):
links.add(defragmented)
print(defragmented)
stat = FetchStatistic(url=response.url,
next_url=None,
status=response.status,
exception=None,
size=len(body),
content_type=content_type,
encoding=encoding,
num_urls=len(links),
num_new_urls=len(links - self.seen_urls))
return stat, links
# Step 4
async def fetch(self, url, max_redirect):
tries = 0
exception = None
while tries < self.max_tries:
# 取得 url 的 response,失敗則在 max_tries 內持續嘗試
try:
response = await self.session.get(url, allow_redirects=False)
break
except Exception as e:
exception = e
tries += 1
else:
self.record_statistic(
FetchStatistic(url=url,
next_url=None,
status=None,
exception=exception,
size=0,
content_type=None,
encoding=None,
num_urls=0,
num_new_urls=0))
return
try:
# 判斷是否跳轉頁面
if is_redirect(response):
location = response.headers['location']
next_url = urllib.parse.urljoin(url, location)
self.record_statistic(
FetchStatistic(url=url,
next_url=next_url,
status=response.status,
exception=None,
size=0,
content_type=None,
encoding=None,
num_urls=0,
num_new_urls=0))
if next_url in self.seen_urls:
return
if max_redirect > 0:
self.add_url(next_url, max_redirect - 1)
else:
print('redirect limit reached for %r from %r', next_url,
url)
else:
stat, links = await self.parse_links(response)
self.record_statistic(stat)
for link in links.difference(self.seen_urls):
self.q.put_nowait((link, self.max_redirect))
self.seen_urls.update(links)
finally:
await response.release()
# Step 3
async def work(self):
try:
while True:
url, max_redirect = await self.q.get()
await self.fetch(url, max_redirect)
self.q.task_done()
except asyncio.CancelledError:
pass
# Step 2
async def crawl(self):
self.q = asyncio.Queue() # 存放所有等待抓取的 url
self.t0 = time.time()
self.session = aiohttp.ClientSession()
for root in self.roots:
parts = urllib.parse.urlparse(root)
host = parts.hostname
# 判斷解析 url 後有無 host
if not host:
continue
# 判斷 host 是否為數字
if re.match(r'\A[\d\.]*\Z', host):
self.root_domains.add(host)
else:
host = host.lower()
if self.strict:
self.root_domains.add(host)
else:
self.root_domains.add(lenient_host(host))
for root in self.roots:
self.add_url(root)
workers = [
asyncio.create_task(self.work()) for _ in range(self.max_tasks)
]
await self.q.join() # 等待列隊 url 清空,將結束任務
for w in workers:
w.cancel()
await self.session.close()
self.t1 = time.time()
# Step 1
time_start = time.time()
crawler = Crawler(['https://xkcd.com'], max_tasks=30, exclude='.css')
asyncio.run(crawler.crawl())
print(len(crawler.done))
print(time.time() - time_start)
三. 讨论命名元组
过去写爬虫用字典dict来处理爬下来的数据,都可以看到Python之父是使用namedtuple,所以我们来看看tuple vs namedtuple vs dict之间有什么用途?
1. tuple、namedtuple 与 dict 之间的区别?
▍命名元组与元组
使用 tuple 的时候,在使用 tuple 的其中一个值的时候,索引的索引,而索引通常可以访问的值具有可维护性和维护性
的特性。其中还解决了索引的问题,可以使用名字来访问的值,待下面会解释会再讲解。
▍namedtuple vs dict
namedtuple 是一个不可变的对象,因此他需要的空间比字典字典来的少,但对于键值的搜索速度比命名元组快,理想上 python 的字典字典在搜索键值的时间复杂度是 O(1 ),而tuple基本上还是tuple的结构,所以它的O(n)时间复杂度不同,但根据空间大小不同,但速度效率的会偏好使用tuple命名空间是非常差的。
如果是像这次爬虫类中的,单纯只需要写入储存资料的话,使用namedtuple确实会比dict和tuple都还适合。
2. namedtuple 使用方法
from collections import namedtuple
▍宣布命名元组
Product_detail = namedtuple('Product', ['name', 'price', 'sales', 'ship_fees'])
p0 = Product_detail('Max0', 6666.6, 10, True)
p1 = Product_detail('Max1', 6666.5, 12, False)
p2 = Product_detail('Max2', 6666.4, 11, True)
print(p0, p1, p2)
# 輸出內容:
>>> Product(name='Max0', price=6666.6, sales=10, ship_fees=True)
>>> Product(name='Max1', price=6666.5, sales=12, ship_fees=False)
>>> Product(name='Max2', price=6666.4, sales=11, ship_fees=True)
▍将列表转换成namedtuple
product_list = ['Max3', 6666.3, 10, True]
p3 = Product_detail._make(product_list)
print(p3)
# 輸出內容:
>>> Product(name='Max3', price=6666.3, sales=10, ship_fees=True)
▍将dict转换成namedtuple
produt_dict = {'name': 'Max4', 'price': 6666.2, 'sales': 9, 'ship_fees': True}
p4 = Product_detail(**produt_dict)
print(p4)
# 輸出內容:
>>> Product(name='Max4', price=6666.2, sales=9, ship_fees=True)
▍将namedtuple转换成dict
produt_nametuple_to_dict = p4._asdict()
print(produt_nametuple_to_dict)
# 輸出內容:
>>> OrderedDict([('name', 'Max4'), ('price', 6666.2), ('sales', 9), ('ship_fees', True)])
四. 讨论 urllib.parse
可以看到Python之父使用很多urllib的套件来解析爬取到的url,所以我们来了解urllib解析url的方式:
1. urllib.parse 使用方法
import urllib.parse
▍ urllib.parse.urlparse()
解析url的主机名、端口、方案、查询等各式参数,本次urlparse()主要使用主机名判断是否为内部地址。
url = 'http://www.maxlist.xyz:80/author?user=Max&pass=123#123'
parsed = urllib.parse.urlparse(url)
print(parsed)
print(parsed.hostname)
print(parsed.port)
# 輸出內容:
>>> ParseResult(scheme='http', netloc='www.maxlist.xyz:80', path='/author', params='', query='user=Max&pass=123', fragment='123')
>>> www.maxlist.xyz
>>> 80
▍urllib.parse.urljoin()
有时爬取到的网址后不会有 netloc 的部分,使用 urljoin 来合并 url
crawler_url = '/newsletter/'
response_url = 'https://xkcd.com'
url_join = urllib.parse.urljoin(response_url, crawler_url)
print(url_join)
# 輸出內容:
>>> https://xkcd.com/newsletter/
▍urllib.parse.urldefrag()
不同的截图还是同页,所以在提取时进行截图处理
url = 'http://www.maxlist.xyz:80/author?user=Max&pass=123#remove'
print(urllib.parse.urldefrag(url))
# 輸出內容:
>>> DefragResult(url='http://www.maxlist.xyz:80/author?user=Max&pass=123', fragment='remove')