简介
目标: 使用PHP和Redis提供使用简单而功能强大的消息队列系统.
安装
目前内嵌到Chip框架, 也可以独立使用, 经过简单的修改可与其他核心模块完全解耦.
概述
使用说明
配置文件
在Chip里的conf目录下创建queue.ini, 根据自己的机器增加以下配置项:
[product][dev : product]queue.redis.0.host = 127.0.0.1 ; 机器IPqueue.redis.0.port = 6379 ; 机器端口queue.redis.0.db = 0 ; redis 数据库queue.redis.0.name = queue ; 队列名queue.redis.0.distributed = 0 ; 是否开启并发模式queue.redis.1.host = 127.0.0.1queue.redis.1.port = 6379queue.redis.1.db = 0queue.redis.1.name = queuequeue.redis.1.distributed = 0[test : dev]
创建队列
$queue = \Chip\Queue::getInstance();
自定义配置创建
如果不想使用默认配置文件里的配置,可以自定义配置文件和配置索引
$queue = \Chip\Queue::getInstance('myself.ini', 'mytest');
创建任务
创建任务是将一个消息写入一个有名字的管道, 下面管道名为”mytube”
$data = array( 'name' => 'kevin', 'content' => array( 'test' => 'some string', 'num' => null, ),);$queue = Queue::getInstance();$queue->putInTube('mytube', $data);
创建延时任务
延时任务是在入队列后, 不会马上被处理进程获取, 在延迟指定时间后才会被处理.
// 3600秒后才会被处理$option = array( 'delay' => 3600, // 单位为秒);$queue->putInTube('mytube', $data, $option);
创建定时任务
定时任务入队列后, 在特定时间点才会被处理进程获取并处理.
// 在2015-10-29 23:34任务才会被处理$option = array( 'timing' => '2015-10-29 23:34',);$queue->putInTube('mytube', $data, $option);
定时参数的格式遵循PHP的日期和时间格式 PHP date and time formats, 可以使用:
Next Monday
+1 days
last day of next month
2013-09-13 00:00:00
- and so on..
注意的是, 如果创建了一个过去时间的定时任务, 任务不会被丢弃, 而是会马上被触发.
创建周期任务
周期任务可以代替非具体时间的周期性的crontab.
// 每3600秒被触发一次,不会销毁.$option = array( 'periodic' => 3600,);$queue->putInTube('mytube', $data, $option);
创建失败重试任务
处理的时候如果处理失败后, 可以设定一个重试次数, 来重复尝试处理这个任务:
这个参数可以与上面的时间控制的参数一起使用.
$option = array( 'attempts' => 5,);$queue->putInTube('mytube', $data, $option);
设定处理时长TTR
每个消息任务都有规定的最大运行时间, 默认是5分钟, 超时后会被视为失败的任务.
可以在创建任务的时候指定处理最大时长:
这个参数可以与上面的时间控制的参数一起使用.
$option = array( 'ttr' => 600, // 最大处理时长为10分钟);$queue->putInTube('mytube', $data, $option);
处理任务的时候, 如果时间过长, 也可以主动声明处理时间:
set_time_limit(600);
或者使用touch来重新计算处理时间:(参考处理任务)
$job->touch();
处理任务
处理消息任务需要创建一个守护进程的脚本, 必须在CLI模式下执行, 这里可以使用Chip里的CLI-TASK:
class DaemonTask extends TaskBase{ /** * 创建守护进程, 需要在CLI模式下执行 */ public function indexAction() { $queue = Queue::getInstance(); // 创建进程, 监听'mytube'这个管道, 处理队列任务 $queue->doWork('mytube', function(Caster $job) { $data = $job->getBody(); // process // 特别注意的一点, 匿名函数里的程序是以子进程形式存在的 // 如果正常处理完一个JOB后, 需要发送结束信号: exit(0) exit(0); }); }}
任务状态调度控制
消息的状态有四种: 准备,订阅,延迟,失败, 这些状态的生命周期如下所示:
put with delay release with delay ----------------> [DELAYED] <------------. | | | (time passes) | | | put v reserve | delete -----------------> [READY] ---------> [RESERVED] --------> *poof* ^ ^ | | | \ release | | | `-------------' | | | | kick | | | | bury | [FAILED] <---------------' | | delete `--------> *poof*
处理进程中的任务处在订阅状态(RESERVED), 一般处理完后会自动delete, 有异常的话会bury到失败队列.
开发者也可以主动delete,release,或者bury这个任务:
$queue->doWork('mytube', function(Caster $job) { $data = $job->getBody(); if ($data['name'] != 'kevin') { $job->bury(); } else { $job->release(120); // 将任务回置为延迟任务 } exit(0);});
处理失败状态的任务:
$queue = Queue::getInstance();$ids = $queue->getIdsByState(Caster::STATE_FAILED, 'mytube');foreach ($ids as $id) { $job = Caster::reload($id); $job->kick(); // 将失败的任务重新放置到准备队列里}
并发模式
多数情况,消息队列保持”FIFO”(先进先出)的原则, 然而也有业务会用到分布式场景.
分布式处理消息会提高处理效率,为了避免因并发读取而出现的问题,需要将配置文件中distributed 设置为1
...queue.redis.1.distributed = 1
设计理念
利用Redis多种数据类型的特点,将每条队列构建成如下的数据结构:
array( 'id' => null, 'tube' => null, 'data' => array(), 'state' => '', // 任务状态 'created_at' => '', 'updated_at' => '', 'error' => '', 'failed_at' => '', 'timing' => 0, // 定时, 单位毫秒 'attempts' => 0, // 尝试次数计数器 'max_attempts' => 3, // 最大尝试次数限制 'retry_times' => 0, // 重新处理过的次数 'periodic' => 0, // 周期任务生命周期时长, 单位秒 'ttr' => 30, // 进程运行生命周期时长, 单位秒);
每条任务会生成对应的Redis中Set、SortSet、Hash等数据类型用以流程控制:
queue:job:[id] -> key, value 哈希存储一个任务的业务数据queue:tubes -> tube 集合存储所有tube类型queue:jobs -> id 有序集合存储整个任务列表的次序queue:jobs:[state] -> id 有序集合存储特定状态任务列表的次序queue:jobs:[tube]:[state] -> id 有序集合存储一个tube下的特定状态的任务列表的次序