初识 Beanstalk
这几天一直在想一个问题 “如何利用 Ruby 维持一个量大的 Queue”, 一直在考虑这样的问题, 其实本质上也就是因为 background job 引来的, 当然在使用 Play! 的使用对 Job 的理解与在 Rails 中看到的 Delayed_job 对 Job 的理解, 也会突然多了另外一个思路.
在 Play! 中, 我将 Job 定义为一个个在后台运行的任务进程, 每一个 Job 会自己去数据库寻找需要处理的数据, 然后进行定时或者周期性处理. 而在 Rails 的 Delayed_job 中, 是将一个个需要执行的小任务存储到数据库中, 然后定时拿出这一个个任务进行执行, 反过来想想和我解决的另外一个功能比较像, 按照需求申请一个个需要执行的 JobRequest 然后另外一个线程去周期性处理这些 Request. 而这次需要考虑的是因为是比较量大的任务, 所以直接使用数据库可能无法满足. 绞尽脑汁的想来想去, 在互联网上找来找去, 不记得从什么地方找到了这篇文章 然后就找到了 Beanstalk ,突然感觉, 哈~ 这不就是我一直寻找的吗? 一个类似中间件的 Queue 一个可进行分布式部署的 Queue, 再一看这文章中的数据分析, 满足, 满足, 太满足了, 大多数情况就这样一个中间件的 Queue 完全满足了, 而且这家伙就是针对 background job 这样的需求的(当然也看做是 messaging queue). 接下来肯定是拿下来折腾, 弄清楚怎么用.
安装非常简单, 按照 Beanstalkd 的 Downloads 页面 来就好了, 因为我是 Mac OS 所以就直接使用 brew install beanstalkd
安装了.
接下来:
- 如何启动 beanstalkd (包括常用参数)
- 如何与 beanstalkd 交互
- 如何监控 beanstalk 的状态
如何启动 beanstalkd
在 *nix 下面, 启动 beanstalkd 真的很容易, 就一条命令足以 beanstalkd -b ./ &
, beanstalkd
为命令, -b
为开启 binlog 用来持久化 Queue 中的任务, 避免 Power off 队列任务丢失. ./
指定输出 binlog 的目录为当前目录, &
这个就是让他后台运行.
-b DIR
输出 binlog 保持任务不丢失, DIR 指定路径-l ADDR
绑定的地址, 默认为 (0.0.0.0)-p PORT
绑定的端口, 默认为 (11300)-z BYTES
每一个 job 消息的最大大小, 默认 65535 bytes (64kb, 足够了吧)-s BYTES
设置单个 binlog 文件的最大大小, 超过会自动切分文件, 默认 10485760 (10mb)-V
查看输出的日志. (我简单 debug 用)
其中有一个我一直没有弄明白, -f MS
的 “fsync at most once every MS milliseconds (use -f0 for “always fsync”)” 我不知道这里的 fsync 的是什么内容? 作用是什么? 这个功能的 issue.
如何与 beanstalkd 交互
因为在他首页的例子代码是利用的 ruby 中的 beanstalk-client gem, 刚好这段时间在学习 Rails, 所以就直接拿 Ruby 测试交互了. 也不知道是不是因为这个太简单了的缘故, 无论是 beanstalkd 或者是他的 beanstalk-client 都没一个 document, 结果上手的时候很尴尬,只好去找 beanstalk-client 的源代码. . .
在使用前, 还是阅读了一下 beanstalkd 的一些细节问题, 由于官方的针对 beanstalk 协议的文档实在太难阅读了, 所以就阅读了一篇淘宝对 beanstalkd 的介绍 和 另外一篇博客, 了解了 Job 的生命周期也就了解了几个需要执行的几个关键方法.
简单的例子代码
1 require "beanstalk-client"
2 require "pp"
3
4 # 初始化一个 tube 的 Queue
5 # 第二个参数为 tube 的名字, beanstalkd 可以拥有多个不同的 tube
6 BK = Beanstalk::Pool.new(['localhost:11300'], "test")
7
8 def put(job)
9 BK.put(job.to_s)
10 # 第二个参数为优先级, 数值越小优先级越高
11 BK.yput(job, 0)
12 end
13
14 def get()
15 begin
16 job = BK.reserve(0.1)
17 # 或者可以直接那解析 ymal 程 hash. job.ybody
18 puts job.body
19
20 # 任务处理完成了, 告诉 beanstalkd 将这个 job 删除.
21 job.delete
22 rescue Exception => e
23 puts e
24 end
25 end
26
27 def status()
28 # 查看整个 beanstalkd 的状态
29 #BK.stats()
30 BK.stats_tube('test')
31 end
32
33 job = {id: 1, act: :email, type: :notice, to: "foo@bar.com"}
34
35 put(job)
36 get()
37
38 pp status()["total-jobs"]
知道那几个函数的入口, 其他的详细代码自己怎么想就可以怎么写了, 这个接口设计得真的很简单.
如何监控 beanstalk 的状态
在上面的代码例子中, 可以通过程序代码调用 stats()
与 stats_tube(tube_name)
来查看整个 beanstalkd 与其中某一个 tube 的状态, 因为任务执行的状态也是在使用中大家肯定会非常关心的点, 所以就有一些非常棒的 beanstalk tools 来完成这些功能, beanstalkd_view 就是利用 Sinatra 实现的非常棒的监控程序, 可以单独运行也可以集成到 Rails 中使用.
好了, beanstalk 的认识已经差不多了, 接下来会将这个引入到系统中正常的使用了.