打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
如何用Flask和Redis来动态维护代理池

我们在爬虫时可能会遇到封IP的问题,那么利用代理就可以进行IP的伪装,然后进行爬虫的请求。我们有时会需要非常多的ip,那么维护一个代理池(代理的队列,可以存入或取出),需要对整个池进行定期的检查和更新,以此来保证代理的高质量(也就是代理的检测和筛选),以免对爬虫产生影响。
Redis主要给代理池提供一个队列存储。
Flask用来实现代理池的接口。

为什么要用代理池?

许多网站有专门的反爬虫措施,可能遇到封IP等问题。
互联网上公开了大量免费代理,利用好资源。
通过定时的检测维护同样可以得到许多的可用代理。

代理池的要求

  • 多站抓取,异步检测(异步是为了提高检测效率)
  • 定时筛选,持续更新
  • 提供接口,易于提取

代理池架构

代理池实现

download源码

以github一位大神的源码作为参考:https://github.com/germey/proxypool
分析一下如何实现一个代理池。


我们可以先将源码download到本地,在pycharm中打开:

这是整个项目的结构,examples目录下是示例,proxypool下是这个项目的一些脚本,run是主程序的入口,setup用来安装代理池。

运行程序

运行这个程序时如果出现报错可能是一些库还未安装造成的,根据提示,逐一安装就好了,例如:
pip3 install aiohttp (这是一个异步请求库)
pip install fake-useragent
所需要的库都已经安装完毕后,就可以在Terminal界面使用命令:
python run.py来启动程序了。


可以看到在窗口中输出了一些内容——正在对代理进行测试。

在Redis数据库中,可以看到生成了一个proxies队列来存储上面通过测试的代理。程序通过不停地进行定时检测来添加可用的代理,直到到达了预定的上限。
在api.py这个文件中,通过Flask库建立了一个接口,使得我们可以从web获得存储到数据库中的代理。

在浏览器中输入http://localhost:5000,可以看到欢迎页面:

若在浏览器中输入http://localhost:5000/get,就可以从数据库中取得一个代理的地址了。点击刷新的话,代理的地址会改变。

若在浏览器中输入http://localhost:5000/count,就可以查询数据库中已经存储的代理的数量。

这些接口都是在程序中封装好了的,当然,我们也可根据自身的需求进行一些其他功能的封装:

代码分析

那么我们来看看代码是如何实现这个代理池的。

run.py

from proxypool.api import appfrom proxypool.schedule import Scheduledef main():    s = Schedule()#调用了自己定义的一个调度器    s.run()#运行调度器    app.run()#运行接口if __name__ == '__main__':    main()

调度器

重点是在调度器——schedule.py

进程1

   valid_process = Process(target=Schedule.valid_proxy)

在Schedule这个类中,定义了valid_proxy这个方法。

conn = RedisClient()

方法中的这个语句,调用了RedisClient()方法,这个方法是在db.py中定义的,提供了队列的一些API操作:

class RedisClient(object):#提供了队列的一些API操作    def __init__(self, host=HOST, port=PORT):        if PASSWORD:            self._db = redis.Redis(host=host, port=port, password=PASSWORD)        else:            self._db = redis.Redis(host=host, port=port)    def get(self, count=1):        """        get proxies from redis        """        proxies = self._db.lrange("proxies", 0, count - 1)#从队列左侧拿出代理(旧的)        self._db.ltrim("proxies", count, -1)#从左侧批量获取        return proxies    def put(self, proxy):        """        add proxy to right top        """        self._db.rpush("proxies", proxy)#将代理插入到队列右侧(新的)    def pop(self):        """        get proxy from right.        """        try:            return self._db.rpop("proxies").decode('utf-8')#从右侧取出新的代理        except:            raise PoolEmptyError    @property    def queue_len(self):#获取队列长度        """        get length from queue.        """        return self._db.llen("proxies")    def flush(self):#刷新数据库        """        flush db        """        self._db.flushall()

下面的一条语句

tester = ValidityTester()

ValidityTester是一个类,用来检测代理是否可用。


类中的这个方法通过调用了aiohttp这个库来实现异步检测,具体逻辑如下注释:

    async def test_single_proxy(self, proxy):        """        text one proxy, if valid, put them to usable_proxies.        """        try:            async with aiohttp.ClientSession() as session:                try:                    if isinstance(proxy, bytes):#判断                        proxy = proxy.decode('utf-8')#转码                    real_proxy = 'http://' + proxy#设置代理                    print('Testing', proxy)                    async with session.get(self.test_api, proxy=real_proxy, timeout=get_proxy_timeout) as response:                        if response.status == 200:#说明代理可以正常使用                            self._conn.put(proxy)#将此代理加到队列的右侧                            print('Valid proxy', proxy)                except (ProxyConnectionError, TimeoutError, ValueError):#否则不正常                    print('Invalid proxy', proxy)        except (ServerDisconnectedError, ClientResponseError,ClientConnectorError) as s:            print(s)            pass

上面分析的两条语句:
conn = RedisClient()
tester = ValidityTester()
都是在valid_proxy这个方法里的,再看看这个方法接下来做了什么事情:

        while True:#不断循环执行            print('Refreshing ip')            count = int(0.5 * conn.queue_len)#从队列中取出一半长度的代理            if count == 0:                print('Waiting for adding')#如果队列为空,等待添加新的代理                time.sleep(cycle)                continue            raw_proxies = conn.get(count)#从队列左侧取出conut数量的代理            tester.set_raw_proxies(raw_proxies)#设置参数            tester.test()#调用检测方法            time.sleep(cycle)

以上就是第一个进程

  valid_process = Process(target=Schedule.valid_proxy)

中所做的事情。

进程2

再看第二个进程:

      check_process = Process(target=Schedule.check_pool)

这是从各大网站获取代理,检测,再将代理存入数据库:

    @staticmethod    def check_pool(lower_threshold=POOL_LOWER_THRESHOLD,                   upper_threshold=POOL_UPPER_THRESHOLD,                   cycle=POOL_LEN_CHECK_CYCLE):#限制代理池中代理的数量,以及检查的周期        """        If the number of proxies less than lower_threshold, add proxy        """        conn = RedisClient()        adder = PoolAdder(upper_threshold)        while True:            if conn.queue_len < lower_threshold:                adder.add_to_queue()            time.sleep(cycle)

把代理添加到队列中用到add_to_queue()方法:

        def add_to_queue(self):        print('PoolAdder is working')        proxy_count = 0        while not self.is_over_threshold():            for callback_label in range(self._crawler.__CrawlFuncCount__):                callback = self._crawler.__CrawlFunc__[callback_label]                raw_proxies = self._crawler.get_raw_proxies(callback)                # test crawled proxies                self._tester.set_raw_proxies(raw_proxies)                self._tester.test()                proxy_count += len(raw_proxies)                if self.is_over_threshold():                    print('IP is enough, waiting to be used')                    break            if proxy_count == 0:                raise ResourceDepletionError
    for callback_label in range(self._crawler.__CrawlFuncCount__):            callback = self._crawler.__CrawlFunc__[callback_label]

上面的两个参数__CrawlFuncCount__、__CrawlFunc__是在getter.py的元类中定义的:

class ProxyMetaclass(type):    """        元类,在FreeProxyGetter类中加入        __CrawlFunc__和__CrawlFuncCount__        两个参数,分别表示爬虫函数,和爬虫函数的数量。    """    def __new__(cls, name, bases, attrs):        count = 0        attrs['__CrawlFunc__'] = []#声明属性        for k, v in attrs.items():            if 'crawl_' in k:#如果方法是以'crawl_'开头,那么就添加到attrs中                attrs['__CrawlFunc__'].append(k)                count += 1        attrs['__CrawlFuncCount__'] = count        return type.__new__(cls, name, bases, attrs)

下面的class FreeProxyGetter(object, metaclass=ProxyMetaclass):
将元类指定为ProxyMetaclass,并且定义了许多crawl_开头的方法,这些是根据获取代理的目标网站不同而制定的不同的方法,如果有别的目标网站,也可以通过类似的格式定义好方法添加进去,这样就保证了良好的扩展性。
关于元类的用法,可以查询python的相关文档。

总结

上面简单对代码进行了分析,作为对实现代理池的一些参考。
如果觉得这个程序写得还不错,详细的使用方法可以参考项目的README.md:


如果觉得程序还有许多可以改进的地方,那么也可以自己修改。
如果觉得还有别的更好的实现思路,欢迎交流:)

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Python3网络爬虫开发实战之使用代理爬取微信公众号文章
PyPy 和 CPython 的性能比较测试
scrapy框架通用爬虫、深度爬虫、分布式爬虫、分布式深度爬虫,源码解析及应用
线程池的研究及实现
网页爬虫
手把手教你在Windows下设置分布式队列Celery的心跳轮询
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服