虽然名字叫Dashboard,但它在Codis中的作用却不可小觑。它不仅仅是Dashboard管理页面,更重要的是,它负责监控和指挥各个Proxy的负载均衡(数据分布和迁移)。并且,所有API都以RESTFul接口的形式对外提供,供Proxy和codis-config(Codis的命令行工具)调用。下面就来看一下数据分布和迁移的代码执行流程。
Dashboard涉及到的知识点比较多,包括Martini框架、Model模型层、数据的负载均衡分配、Redis中Slot的实现、ZooKeeper的Sequential结点实现消息队列、迁移过程中的数据访问等等。与此同时,本篇还记录了在研究过程中的发散思考,例如Java中的AR模型层。虽然内容稍有些庞杂,但都是真实的记录,相信于人于己都会有很大帮助。
codis-config是cmd/cconfig下编译出的命令行工具,它以命令行的形式提供对Codis常用命令的支持,例如启动Dashboard、初始化slot以及migrate,后两者会被封装成REST请求发送到Dashboard执行。下面简单看一下main.go的源码:
func main() { ... args, err := docopt.Parse(usage, nil, true, "codis config v0.1", true) if err != nil { log.Error(err) } // set config file var configFile string var config *cfg.Cfg if args["-c"] != nil { configFile = args["-c"].(string) config, err = utils.InitConfigFromFile(configFile) if err != nil { Fatal(err) } } else { config, err = utils.InitConfig() if err != nil { Fatal(err) } } // load global vars globalEnv = env.LoadCodisEnv(config) cmd := args["<command>"].(string) cmdArgs := args["<args>"].([]string) go http.ListenAndServe(":10086", nil) err = runCommand(cmd, cmdArgs)}func runCommand(cmd string, args []string) (err error) { argv := make([]string, 1) argv[0] = cmd argv = append(argv, args...) switch cmd { case "action": return errors.Trace(cmdAction(argv)) case "dashboard": return errors.Trace(cmdDashboard(argv)) case "server": return errors.Trace(cmdServer(argv)) case "proxy": return errors.Trace(cmdProxy(argv)) case "slot": return errors.Trace(cmdSlot(argv)) } return errors.Errorf("%s is not a valid command. See 'codis-config -h'", cmd)}
大家是否还记得,在安装Codis完之后,我们执行了smaple目录下的一系列脚本初始化Codis并启动服务,其中有一项./start_dashboard.sh就是启动Dashboard服务。它是调用的codis-config,传入的参数”dashboard”正好对应上面的main.go中的cmdDashboard()。
#!/bin/shnohup ../bin/codis-config -c config.ini -L ./log/dashboard.log dashboard --addr=:18087 --http-log=./log/requests.log &>/dev/null &
打开dashboard.go就能看到cmdDashboard()的实现,它调用的是下面的runDashboard()函数:
func runDashboard(addr string, httpLogFile string) { log.Info("dashboard listening on addr: ", addr) m := martini.Classic() ... m.Use(martini.Static(filepath.Join(binRoot, "assets/statics"))) m.Use(render.Renderer(render.Options{ Directory: filepath.Join(binRoot, "assets/template"), Extensions: []string{".tmpl", ".html"}, Charset: "UTF-8", IndentJSON: true, })) m.Get("/api/server_groups", apiGetServerGroupList) m.Get("/api/overview", apiOverview) m.Get("/api/redis/:addr/stat", apiRedisStat) m.Get("/api/redis/:addr/:id/slotinfo", apiGetRedisSlotInfo) m.Get("/api/redis/group/:group_id/:slot_id/slotinfo", apiGetRedisSlotInfoFromGroupId) ... // create temp node in ZK if err := createDashboardNode(); err != nil { Fatal(err) } defer releaseDashboardNode() // create long live migrate manager conn := CreateZkConn() defer conn.Close() globalMigrateManager = NewMigrateManager(conn, globalEnv.ProductName(), preMigrateCheck) defer globalMigrateManager.removeNode() m.RunOnAddr(addr)}
值得注意的是,Dashboard使用的是Martini框架,非常容易就能暴露各种RESTFul接口。这里不深入研究Martini框架的用法了,但提一个Java中的“山寨Martini”框架-SparkJava。Spark这个名字实在太火了,这个框架跟分布式内存计算的那个Spark框架可没有一点关系。稍有些遗憾的是,使用SparkJava的前提是必须安装JDK 8,因为SparkJava大量使用了JDK 8中的特性:
import static spark.Spark.*;// Visit http://localhost:4567/hellopublic class HelloWorld { public static void main(String[] args) { get("/hello", (req, res) -> "Hello World"); }}
在开始剖析数据分布和迁移的源码之前,先说一下Codis中Slot的概念。Slot是Codis的数据分配、迁移等操作的基本单位,可以把Slot看成一致性哈希中的Bucket、VNode等概念。客户端传来的Key经过某种hash函数(Codis用的是Crc32)对应到某个Slot中,因为哈希函数是一定的所以这个对应关系是无法改变的。但我们可以改变的是Slot与Group的对应关系,这样当新增Group时就可以通过调整Slot的分布达到负载均衡的效果。
不幸的是,Redis中并没有Slot这个概念,也就是说:虽然我们给Key分配好了Slot,但是一旦存入Redis后Key属于哪个Slot这个信息就丢失了。解决的方法有很多种,比如:
类似于start_dashboard.sh脚本,我们安装完Codis后执行的sample下./initslot.sh脚本也是通过codis-config完成slot的初始化工作的。打开initslot.sh脚本看看,发现它使用codis-config调用了slot的init和range-set两个方法:
#!/bin/shecho "slots initializing..."../bin/codis-config -c config.ini slot init -fecho "done"echo "set slot ranges to server groups..."../bin/codis-config -c config.ini slot range-set 0 511 1 online../bin/codis-config -c config.ini slot range-set 512 1023 2 onlineecho "done"
声明:因为codis-config除了启动Dashboard外,其主要作用就是封装RESTFul请求,代码比较简单,所以后面的源码剖析就都直接跳过codis-config的请求发送过程,直接跳到Dashboard接到请求后的处理过程,所有RESTFul API的实现都在dashboard_apis.go中。
dashboard_apis.go中并没有直接实现初始化功能,而是调用了models包。实际上,Codis提取了一整套的Model类作为模型层,Dashboard和Proxy都引用了这套模型层。在Codis 2.0中Proxy的内部架构发生了不小的变化,然而模型层的代码相对很稳定,这也是DDD领域驱动设计的优势吧。
func apiInitSlots(r *http.Request) (int, string) { r.ParseForm() isForce := false val := r.FormValue("is_force") if len(val) > 0 && (val == "1" || val == "true") { isForce = true } conn := CreateZkConn() defer conn.Close() if err := models.InitSlotSet(conn, globalEnv.ProductName(), models.DEFAULT_SLOT_NUM); err != nil { log.Warning(err) return 500, err.Error() } return jsonRetSucc()}
下面就看一下Slot模型层,代码位置在pkg/models/slot.go。根据totalSlotNum(默认1024)逐一创建Slot对象,初始时GroupId是无效值-1。然后调用Slot对象的Update()函数保存到ZooKeeper或etcd中。
func InitSlotSet(zkConn zkhelper.Conn, productName string, totalSlotNum int) error { for i := 0; i < totalSlotNum; i++ { slot := NewSlot(productName, i) if err := slot.Update(zkConn); err != nil { return errors.Trace(err) } } return nil}func NewSlot(productName string, id int) *Slot { return &Slot{ ProductName: productName, Id: id, GroupId: INVALID_ID, State: SlotState{ Status: SLOT_STATUS_OFFLINE, LastOpTs: "0", MigrateStatus: SlotMigrateStatus{ From: INVALID_ID, To: INVALID_ID, }, }, }}func (s *Slot) Update(zkConn zkhelper.Conn) error { data, err := json.Marshal(s) zkPath := GetSlotPath(s.ProductName, s.Id) _, err = zkhelper.CreateOrUpdate(zkConn, zkPath, string(data), 0, zkhelper.DefaultFileACLs(), true) ...}
个人觉得Codis的模型层写的不是太好,虽然提取了最重要的公共逻辑,但是模型层混入了太多的存储代码,包括JSON序列化和与ZooKeeper通信。再加上Golang本身的异常返回值判断,导致模型的很多函数代码都比较乱。
当然,也没必要严格遵照DDD将存储逻辑划分到Repository,甚至将模型划分成Value和Entity。而且Codis作者是短时间内完成开发的,所以不能“吹毛求疵”,看看后续Codis升级版本能否梳理出更加清晰的模型层。感觉ActiveRecord模式非常适合这种对ZooKeeper进行简单CRUD的场景,都是“单表”(ZooKeeper中的结点)操作而不存在关联操作,详细见最后一部分的分析。
同样地,设置Range也是在Slot模型层中实现的。在./initslot.sh中,我们将0~511 Slot分配给了Group 1,将512~1023分配给Group 2。SetSlotRange()将范围内的Slot的GroupId从-1改为新分配的GroupId,并更新到ZooKeeper中:
func SetSlotRange(zkConn zkhelper.Conn, productName string, fromSlot, toSlot, groupId int, status SlotStatus) error { ... for i := fromSlot; i <= toSlot; i++ { s, err := GetSlot(zkConn, productName, i) if err != nil { return errors.Trace(err) } s.GroupId = groupId s.State.Status = status data, err := json.Marshal(s) if err != nil { return errors.Trace(err) } zkPath := GetSlotPath(productName, i) _, err = zkhelper.CreateOrUpdate(zkConn, zkPath, string(data), 0, zkhelper.DefaultFileACLs(), true) if err != nil { return errors.Trace(err) } } ...}
Codis的数据迁移实现了两个特性:
下面就是Codis的Rebalance算法。两个辅助函数getLivingNodeInfos()和getQuotaMap()会拿到所有存活的Group以及其目标Quota。其中存活结点对象中保存的就是Group目前拥有的Slot数和内存数,而所谓目标Quota配额指的就是根据这些指标计算出Rebalance后每个Group应有的Slot数。
Codis目前使用的算法比较简单,不要被Rebalance()中的三层嵌套循环“吓住”了。具体来说,假如之前我们有Group 1负责Slot 0~511,Group 2负责Slot 512~1023。现在新增一个Group 3,没有负责任何Slot,此时通过命令行或Web管理页面触发”Auto Rebalance”。
因为Group 1和2的当前Slot肯定大于目标Quota,所以遍历到它俩时什么都不会做,主要处理就在Group 3。当遍历到Group 3时,它会从Group 1和2中不断迁移过来Slot,直到达到目标Quota再停止。
// experimental simple auto rebalance :)func Rebalance(zkConn zkhelper.Conn, delay int) error { targetQuota, err := getQuotaMap(zkConn) livingNodes, err := getLivingNodeInfos(zkConn) log.Info("start rebalance") for _, node := range livingNodes { for len(node.CurSlots) > targetQuota[node.GroupId] { for _, dest := range livingNodes { if dest.GroupId != node.GroupId && len(dest.CurSlots) < targetQuota[dest.GroupId] { slot := node.CurSlots[len(node.CurSlots)-1] // create a migration task t := NewMigrateTask(MigrateTaskInfo{ Delay: delay, FromSlot: slot, ToSlot: slot, NewGroupId: dest.GroupId, Status: MIGRATE_TASK_MIGRATING, CreateAt: strconv.FormatInt(time.Now().Unix(), 10), }) u, err := uuid.NewV4() t.Id = u.String() if ok, err := preMigrateCheck(t); ok { // do migrate err := t.run() } node.CurSlots = node.CurSlots[0 : len(node.CurSlots)-1] dest.CurSlots = append(dest.CurSlots, slot) } } } } log.Info("rebalance finish") return nil}func getQuotaMap(zkConn zkhelper.Conn) (map[int]int, error) { nodes, err := getLivingNodeInfos(zkConn) ret := make(map[int]int) var totalMem int64 totalQuota := 0 for _, node := range nodes { totalMem += node.MaxMemory } for _, node := range nodes { quota := int(models.DEFAULT_SLOT_NUM * node.MaxMemory * 1.0 / totalMem) ret[node.GroupId] = quota totalQuota += quota } // round up if totalQuota < models.DEFAULT_SLOT_NUM { for k, _ := range ret { ret[k] += models.DEFAULT_SLOT_NUM - totalQuota break } } return ret, nil}func getLivingNodeInfos(zkConn zkhelper.Conn) ([]*NodeInfo, error) { groups, err := models.ServerGroups(zkConn, globalEnv.ProductName()) slots, err := models.Slots(zkConn, globalEnv.ProductName()) slotMap := make(map[int][]int) for _, slot := range slots { if slot.State.Status == models.SLOT_STATUS_ONLINE { slotMap[slot.GroupId] = append(slotMap[slot.GroupId], slot.Id) } } var ret []*NodeInfo for _, g := range groups { master, err := g.Master(zkConn) out, err := utils.GetRedisConfig(master.Addr, "maxmemory") maxMem, err := strconv.ParseInt(out, 10, 64) node := &NodeInfo{ GroupId: g.Id, CurSlots: slotMap[g.Id], MaxMemory: maxMem, } ret = append(ret, node) } return ret, nil}
具体每个Slot是如何迁移的“秘密”就在MigrateTask中,下面就去一探究竟!
MigrateTask每次迁移一个Slot,并更新进度给Web监控页面显示。migrateSingleSlot()函数中首先修改Slot的状态,然后再调用Migrator做迁移。SetMigrateStatus()并不是简单的修改Slot状态保存到ZooKeeper中就完成了,它还肩负着与各个Proxy做Pre-migrate确认的工作,下面就会看到这个过程。注意Codis这里对Migrator做了抽象,提取出了接口。
// migrate multi slotsfunc (t *MigrateTask) run() error { // create zk conn on demand t.zkConn = CreateZkConn() defer t.zkConn.Close() to := t.NewGroupId t.Status = MIGRATE_TASK_MIGRATING for slotId := t.FromSlot; slotId <= t.ToSlot; slotId++ { err := t.migrateSingleSlot(slotId, to) t.Percent = (slotId - t.FromSlot + 1) * 100 / (t.ToSlot - t.FromSlot + 1) log.Info("total percent:", t.Percent) } t.Status = MIGRATE_TASK_FINISHED log.Info("migration finished") return nil}func (t *MigrateTask) migrateSingleSlot(slotId int, to int) error { // set slot status s, err := models.GetSlot(t.zkConn, t.productName, slotId) from := s.GroupId if s.State.Status == models.SLOT_STATUS_MIGRATE { from = s.State.MigrateStatus.From } // modify slot status if err := s.SetMigrateStatus(t.zkConn, from, to); err != nil { log.Error(err) return err } err = t.slotMigrator.Migrate(s, from, to, t, func(p SlotMigrateProgress) { // on migrate slot progress if p.Remain%500 == 0 { log.Info(p) } }) // migrate done, change slot status back s.State.Status = models.SLOT_STATUS_ONLINE s.State.MigrateStatus.From = models.INVALID_ID s.State.MigrateStatus.To = models.INVALID_ID if err := s.Update(t.zkConn); err != nil { log.Error(err) return err } return nil}
SetMigrateStatus()首先会新建一个Action与各个Proxy进行确认,确认完毕后才会将Slot的状态改为Migrate。
func (s *Slot) SetMigrateStatus(zkConn zkhelper.Conn, fromGroup, toGroup int) error { // wait until all proxy confirmed err := NewAction(zkConn, s.ProductName, ACTION_TYPE_SLOT_PREMIGRATE, s, "", true) s.State.Status = SLOT_STATUS_MIGRATE s.State.MigrateStatus.From = fromGroup s.State.MigrateStatus.To = toGroup s.GroupId = toGroup return s.Update(zkConn)}
Action也是Codis中非常重要的一个Model,它是Dashboard与Proxy之间通过ZooKeeper交换信息的数据类。NewAction()函数经过删减还是这么一大堆,其实它要做的简单说来就是:1)创建Action;2)取到存活的Proxy列表;3)保存Action到actions和ActionResponse路径下;4)等待所有Proxy的确认消息。
这里除了之前提到的Codis的模型层混入了很多存储和序列化的代码以及异常返回值判断外,还有一点就是:用ZooKeeper实现消息队列很“蹩脚”。Codis的方法是先创建一个ZooKeeper的Sequential结点,目的就是获得一个唯一的序列号,然后立刻删除这个结点。最后使用这个序列号在actions和ActionResponse路径下创建真正的结点。
func NewAction(zkConn zkhelper.Conn, productName string, actionType ActionType, target interface{}, desc string, needConfirm bool) error { ts := strconv.FormatInt(time.Now().Unix(), 10) action := &Action{ Type: actionType, Desc: desc, Target: target, Ts: ts, } // set action receivers proxies, err := ProxyList(zkConn, productName, func(p *ProxyInfo) bool { return p.State == PROXY_STATE_ONLINE }) for _, p := range proxies { buf, err := json.Marshal(p) if err != nil { return errors.Trace(err) } action.Receivers = append(action.Receivers, string(buf)) } b, _ := json.Marshal(action) prefix := GetWatchActionPath(productName) //action root path err = CreateActionRootPath(zkConn, prefix) //response path respPath := path.Join(path.Dir(prefix), "ActionResponse") err = CreateActionRootPath(zkConn, respPath) //create response node, etcd do not support create in order directory //get path first actionRespPath, err := zkConn.Create(respPath+"/", b, int32(zk.FlagSequence), zkhelper.DefaultFileACLs()) //remove file then create directory zkConn.Delete(actionRespPath, -1) actionRespPath, err = zkConn.Create(actionRespPath, b, 0, zkhelper.DefaultDirACLs()) suffix := path.Base(actionRespPath) // create action node actionPath := path.Join(prefix, suffix) _, err = zkConn.Create(actionPath, b, 0, zkhelper.DefaultFileACLs()) if needConfirm { if err := WaitForReceiver(zkConn, productName, actionRespPath, proxies); err != nil { return errors.Trace(err) } } return nil}
WaitForReceiver()等待Proxy确认的逻辑相对清晰一些,Proxy响应方式就是通过evtbus接收到消息后,将自己的Proxy模型类加入到ActionResponse路径下刚才新建的Action结点下,作为其子结点。WaitForReceiver()不断轮询子结点数,当等于所有存活Proxy数时就说明所有Proxy都确认应答了,可以返回了。如果发现某个Proxy没应答则将其置成下线,避免这个Proxy保存了过期的Slot与Group对应关系。
这里Codis没有清理掉已完成的ActionResponse下的Action结点,而是有专门的ActionGC()函数去处理,不得不说这部分的设计配合上ZooKeeper真是很复杂!
func WaitForReceiver(zkConn zkhelper.Conn, productName string, actionZkPath string, proxies []ProxyInfo) error { times := 0 var proxyIds []string var offlineProxyIds []string for _, p := range proxies { proxyIds = append(proxyIds, p.Id) } sort.Strings(proxyIds) // check every 500ms for times < 60 { if times >= 6 && (times*500)%1000 == 0 { log.Warning("abnormal waiting time for receivers", actionZkPath) } nodes, _, err := zkConn.Children(actionZkPath) var confirmIds []string for _, node := range nodes { id := path.Base(node) confirmIds = append(confirmIds, id) } if len(confirmIds) != 0 { sort.Strings(confirmIds) if utils.Strings(proxyIds).Eq(confirmIds) { return nil } offlineProxyIds = proxyIds[len(confirmIds)-1:] } times += 1 time.Sleep(500 * time.Millisecond) } // set offline proxies for _, id := range offlineProxyIds { log.Errorf("mark proxy %s to PROXY_STATE_MARK_OFFLINE", id) if err := SetProxyStatus(zkConn, productName, id, PROXY_STATE_MARK_OFFLINE); err != nil { return err } } return ErrReceiverTimeout}
现在所有Proxy都确认了,终于可以迁移了!Migrate()会不断向Slot迁移的源Redis发送迁移命令”SLOTSMGRTTAGSLOT”,这个命令是Codis修改Redis源码自己实现的,每次会从这个Slot中随机迁移一个Key到目的Redis,并保证原子性。所以要用循环反复发送这个迁移命令到Redis,直到Slot中的所有Key都迁移完成了才算是完成。
func (m *CodisSlotMigrator) Migrate(slot *models.Slot, fromGroup, toGroup int, task *MigrateTask, onProgress func(SlotMigrateProgress)) (err error) { groupFrom, err := models.GetGroup(task.zkConn, task.productName, fromGroup) groupTo, err := models.GetGroup(task.zkConn, task.productName, toGroup) fromMaster, err := groupFrom.Master(task.zkConn) toMaster, err := groupTo.Master(task.zkConn) c, err := redis.Dial("tcp", fromMaster.Addr) defer c.Close() _, remain, err := sendRedisMigrateCmd(c, slot.Id, toMaster.Addr) for remain > 0 { if task.Delay > 0 { time.Sleep(time.Duration(task.Delay) * time.Millisecond) } _, remain, err = sendRedisMigrateCmd(c, slot.Id, toMaster.Addr) if remain >= 0 { onProgress(SlotMigrateProgress{ SlotId: slot.Id, FromGroup: fromGroup, ToGroup: toGroup, Remain: remain, }) } } return nil}func sendRedisMigrateCmd(c redis.Conn, slotId int, toAddr string) (int, int, error) { addrParts := strings.Split(toAddr, ":") if len(addrParts) != 2 { return -1, -1, ErrInvalidAddr } reply, err := redis.Values(c.Do("SLOTSMGRTTAGSLOT", addrParts[0], addrParts[1], MIGRATE_TIMEOUT, slotId)) if err != nil { return -1, -1, err } var succ, remain int if _, err := redis.Scan(reply, &succ, &remain); err != nil { return -1, -1, err } return succ, remain, nil}
对于迁移中的Slot,如果恰好此时有客户端要访问该Slot中的某个Key该怎么办?Codis不是遇到这个问题的第一个中间件,像Taobao Tair中也有对此的解决方案:
发生迁移的时候data server如何对外提供服务?
当迁移发生的时候, 我们举个例子, 假设data server A 要把 桶 3,4,5 迁移给data server B. 因为迁移完成前, 客户端的路由表没有变化, 客户端对 3, 4, 5 的访问请求都会路由到A. 现在假设 3还没迁移, 4 正在迁移中, 5已经迁移完成. 那么如果是对3的访问, 则没什么特别, 跟以前一样. 如果是对5的访问, 则A会把该请求转发给B,并且将B的返回结果返回给客户, 如果是对4的访问, 在A处理, 同时如果是对4的修改操作, 会记录修改log.当桶4迁移完成的时候, 还要把log发送到B, 在B上应用这些log. 最终A B上对于桶4来说, 数据完全一致才是真正的迁移完成
但Codis采取的是不同的策略。当迁移过程中发生数据访问时,Proxy会发送”slotsmgrttagone”迁移命令给Redis,强制将客户端要访问的Key立刻迁移,然后再处理客户端的请求。
func (s *Server) dispatch(r *PipelineRequest) { s.handleMigrateState(r.slotIdx, r.keys[0]) tr, ok := s.pipeConns[s.slots[r.slotIdx].dst.Master()] if !ok { //try recreate taskrunner if err := s.createTaskRunner(s.slots[r.slotIdx]); err != nil { r.backQ <- &PipelineResponse{ctx: r, resp: nil, err: err} return } tr = s.pipeConns[s.slots[r.slotIdx].dst.Master()] } tr.in <- r}func (s *Server) handleMigrateState(slotIndex int, key []byte) error { shd := s.slots[slotIndex] if shd.slotInfo.State.Status != models.SLOT_STATUS_MIGRATE { return nil } redisConn, err := s.pools.GetConn(shd.migrateFrom.Master()) defer s.pools.ReleaseConn(redisConn) redisReader := redisConn.(*redispool.PooledConn).BufioReader() err = WriteMigrateKeyCmd(redisConn.(*redispool.PooledConn), shd.dst.Master(), 30*1000, key) ... return nil}func WriteMigrateKeyCmd(w io.Writer, addr string, timeoutMs int, key []byte) error { hostPort := strings.Split(addr, ":") if len(hostPort) != 2 { return errors.Errorf("invalid address " + addr) } respW := respcoding.NewRESPWriter(w) err := respW.WriteCommand("slotsmgrttagone", hostPort[0], hostPort[1], strconv.Itoa(int(timeoutMs)), string(key)) return errors.Trace(err)}
前面曾提到过,对于存储到ZooKeeper的模型,个人感觉使用ActiveRecord模式(AR)会非常方便。以RubyOnRails中AR为例,下面是典型的AR用法:
class User < ActiveRecord::Base self.table_name = "tb_user"enduser = User.newuser.name = "David"user.occupation = "Code Artist"user = User.find_by(name: 'David')user.name = 'Dave'user.saveusers = User.where(name: 'David', occupation: 'Code Artist').order('created_at DESC')
在Rails或者说Ruby中,User继承ActiveRecord::Base类,运行时User就会自动具有增删改查方法。然而,要想在Java中实现类似的ActiveRecord模式的模型层不是件容易事,主要问题在于Java的静态性。尽管字节码中不断新增各种新字段来保存运行时的类信息,从而增强反射功能,但限制依旧存在。
其实,保存、更新、删除都好办,因为这三种操作的前提是已经拿到模型的一个实例了。最核心的问题在查询上!要想在Java静态方法中获取正在调用的类实例信息貌似做不到啊。像jOOQ、JFinal等框架实现AR时都采取了不同的“迂回”策略,通过各种间接方式获得运行时的对象信息。以下是一种更为直接的方式,在各个具体模型类中自己根据需要定义查询方法。
public abstract class Model { private static CuratorFramework client; private static ObjectMapper mapper; public String save() { try { return client.create() .creatingParentsIfNeeded() .withMode(ephemeral() ? EPHEMERAL : PERSISTENT) .forPath(path(), marshal()); } catch (Exception e) { throw new IllegalStateException(e); } } public void update() { try { client.setData().forPath(path(), marshal()); } catch (Exception e) { throw new IllegalStateException(e); } } /** FIXME: Supposed to be static, yet cannot get class info in static context in Java */ @SuppressWarnings("unchecked") public <T> T find() { try { return (T) unmarshal(client.getData().forPath(path()), getClass()); } catch (Exception e) { throw new IllegalStateException(e); } } ...}public class Slot extends Model { private int id; private SlotState state; private int groupId; /** * Initialize all slots. * @param totalNum total number of slots */ public static void initSlotSet(int totalNum) { try { for (int i = 0; i < totalNum; i++) { new Slot(i).save(); } } catch (Exception e) { throw new IllegalStateException("Error when init slots", e); } } public static Slot find(int id) { Slot slot = new Slot(); slot.setId(id); return slot.find(); }}
这样虽然在Java里实现起来不那么优雅,但是在Model抽象类中统一管理了ZooKeeper客户端(Curator)和序列化框架(Jackson),并对子类提供了仿AR式的接口,使用起来还是很方便的,而且代码结构也清晰了很多。
联系客服