打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
豌豆夹Redis解决方案Codis源码剖析:Dashboard

豌豆夹Redis解决方案Codis源码剖析:Dashboard

1.不只是Dashboard

虽然名字叫Dashboard,但它在Codis中的作用却不可小觑。它不仅仅是Dashboard管理页面,更重要的是,它负责监控和指挥各个Proxy的负载均衡(数据分布和迁移)。并且,所有API都以RESTFul接口的形式对外提供,供Proxy和codis-config(Codis的命令行工具)调用。下面就来看一下数据分布和迁移的代码执行流程。

Dashboard涉及到的知识点比较多,包括Martini框架、Model模型层、数据的负载均衡分配、Redis中Slot的实现、ZooKeeper的Sequential结点实现消息队列、迁移过程中的数据访问等等。与此同时,本篇还记录了在研究过程中的发散思考,例如Java中的AR模型层。虽然内容稍有些庞杂,但都是真实的记录,相信于人于己都会有很大帮助。

2.Dashboard

2.1 codis-config命令行

codis-config是cmd/cconfig下编译出的命令行工具,它以命令行的形式提供对Codis常用命令的支持,例如启动Dashboard、初始化slot以及migrate,后两者会被封装成REST请求发送到Dashboard执行。下面简单看一下main.go的源码:

func main() {    ...    args, err := docopt.Parse(usage, nil, true, "codis config v0.1", true)    if err != nil {        log.Error(err)    }    // set config file    var configFile string    var config *cfg.Cfg    if args["-c"] != nil {        configFile = args["-c"].(string)        config, err = utils.InitConfigFromFile(configFile)        if err != nil {            Fatal(err)        }    } else {        config, err = utils.InitConfig()        if err != nil {            Fatal(err)        }    }    // load global vars    globalEnv = env.LoadCodisEnv(config)    cmd := args["<command>"].(string)    cmdArgs := args["<args>"].([]string)    go http.ListenAndServe(":10086", nil)    err = runCommand(cmd, cmdArgs)}func runCommand(cmd string, args []string) (err error) {    argv := make([]string, 1)    argv[0] = cmd    argv = append(argv, args...)    switch cmd {    case "action":        return errors.Trace(cmdAction(argv))    case "dashboard":        return errors.Trace(cmdDashboard(argv))    case "server":        return errors.Trace(cmdServer(argv))    case "proxy":        return errors.Trace(cmdProxy(argv))    case "slot":        return errors.Trace(cmdSlot(argv))    }    return errors.Errorf("%s is not a valid command. See 'codis-config -h'", cmd)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

2.2 Dashboard启动

大家是否还记得,在安装Codis完之后,我们执行了smaple目录下的一系列脚本初始化Codis并启动服务,其中有一项./start_dashboard.sh就是启动Dashboard服务。它是调用的codis-config,传入的参数”dashboard”正好对应上面的main.go中的cmdDashboard()。

#!/bin/shnohup ../bin/codis-config -c config.ini -L ./log/dashboard.log dashboard --addr=:18087 --http-log=./log/requests.log &>/dev/null &
  • 1
  • 2

打开dashboard.go就能看到cmdDashboard()的实现,它调用的是下面的runDashboard()函数:

func runDashboard(addr string, httpLogFile string) {    log.Info("dashboard listening on addr: ", addr)    m := martini.Classic()    ...    m.Use(martini.Static(filepath.Join(binRoot, "assets/statics")))    m.Use(render.Renderer(render.Options{        Directory:  filepath.Join(binRoot, "assets/template"),        Extensions: []string{".tmpl", ".html"},        Charset:    "UTF-8",        IndentJSON: true,    }))    m.Get("/api/server_groups", apiGetServerGroupList)    m.Get("/api/overview", apiOverview)    m.Get("/api/redis/:addr/stat", apiRedisStat)    m.Get("/api/redis/:addr/:id/slotinfo", apiGetRedisSlotInfo)    m.Get("/api/redis/group/:group_id/:slot_id/slotinfo", apiGetRedisSlotInfoFromGroupId)    ...    // create temp node in ZK    if err := createDashboardNode(); err != nil {        Fatal(err)    }    defer releaseDashboardNode()    // create long live migrate manager    conn := CreateZkConn()    defer conn.Close()    globalMigrateManager = NewMigrateManager(conn, globalEnv.ProductName(), preMigrateCheck)    defer globalMigrateManager.removeNode()    m.RunOnAddr(addr)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

值得注意的是,Dashboard使用的是Martini框架,非常容易就能暴露各种RESTFul接口。这里不深入研究Martini框架的用法了,但提一个Java中的“山寨Martini”框架-SparkJava。Spark这个名字实在太火了,这个框架跟分布式内存计算的那个Spark框架可没有一点关系。稍有些遗憾的是,使用SparkJava的前提是必须安装JDK 8,因为SparkJava大量使用了JDK 8中的特性:

import static spark.Spark.*;// Visit http://localhost:4567/hellopublic class HelloWorld {    public static void main(String[] args) {        get("/hello", (req, res) -> "Hello World");    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.3 Slot的角色

在开始剖析数据分布和迁移的源码之前,先说一下Codis中Slot的概念。Slot是Codis的数据分配、迁移等操作的基本单位,可以把Slot看成一致性哈希中的Bucket、VNode等概念。客户端传来的Key经过某种hash函数(Codis用的是Crc32)对应到某个Slot中,因为哈希函数是一定的所以这个对应关系是无法改变的。但我们可以改变的是Slot与Group的对应关系,这样当新增Group时就可以通过调整Slot的分布达到负载均衡的效果。

不幸的是,Redis中并没有Slot这个概念,也就是说:虽然我们给Key分配好了Slot,但是一旦存入Redis后Key属于哪个Slot这个信息就丢失了。解决的方法有很多种,比如:

  • 1)在ZooKeeper中保存Key与Slot的对应关系,需要时就查询一下。这种方式类似HDFS中的NameNode,缺点是Key很多时会占用很多空间。
  • 2)数据迁移时遍历所有Key,用哈希函数现去算一下Key所属的Slot是否要迁移,缺点是迁移时计算量比较大,而且每个Slot迁移时都要去算可能有很多重复的计算量。
  • 3)保存到Redis之前在Key或Value中加入一些隐含信息,缺点是会改变业务的数据。
  • 4)修改Redis源码加入Slot的概念,在Redis中保存Key属于的Slot,并提供基于Slot的Migrate原子操作。Codis采取的是这种做法,缺点就是要修改Redis源码,以后升级Redis比较麻烦,尤其像Codis没有将改动封装到一个动态链接库则可能更为麻烦。
  • 5)利用Redis中database的概念替代Slot,GitHub上的xcodis采取的就是这种思想。缺点是每个Slot对应的Redis连接在使用前都要select到对应的数据库,否则就会修改到其他Slot的数据。

3.数据分布

3.1 initslot.sh脚本

类似于start_dashboard.sh脚本,我们安装完Codis后执行的sample下./initslot.sh脚本也是通过codis-config完成slot的初始化工作的。打开initslot.sh脚本看看,发现它使用codis-config调用了slot的init和range-set两个方法:

#!/bin/shecho "slots initializing..."../bin/codis-config -c config.ini slot init -fecho "done"echo "set slot ranges to server groups..."../bin/codis-config -c  config.ini slot range-set 0 511 1 online../bin/codis-config -c  config.ini slot range-set 512 1023 2 onlineecho "done"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

声明:因为codis-config除了启动Dashboard外,其主要作用就是封装RESTFul请求,代码比较简单,所以后面的源码剖析就都直接跳过codis-config的请求发送过程,直接跳到Dashboard接到请求后的处理过程,所有RESTFul API的实现都在dashboard_apis.go中。

3.2 初始化Slot

dashboard_apis.go中并没有直接实现初始化功能,而是调用了models包。实际上,Codis提取了一整套的Model类作为模型层,Dashboard和Proxy都引用了这套模型层。在Codis 2.0中Proxy的内部架构发生了不小的变化,然而模型层的代码相对很稳定,这也是DDD领域驱动设计的优势吧。

func apiInitSlots(r *http.Request) (int, string) {    r.ParseForm()    isForce := false    val := r.FormValue("is_force")    if len(val) > 0 && (val == "1" || val == "true") {        isForce = true    }    conn := CreateZkConn()    defer conn.Close()    if err := models.InitSlotSet(conn, globalEnv.ProductName(), models.DEFAULT_SLOT_NUM); err != nil {        log.Warning(err)        return 500, err.Error()    }    return jsonRetSucc()}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

下面就看一下Slot模型层,代码位置在pkg/models/slot.go。根据totalSlotNum(默认1024)逐一创建Slot对象,初始时GroupId是无效值-1。然后调用Slot对象的Update()函数保存到ZooKeeper或etcd中。

func InitSlotSet(zkConn zkhelper.Conn, productName string, totalSlotNum int) error {    for i := 0; i < totalSlotNum; i++ {        slot := NewSlot(productName, i)        if err := slot.Update(zkConn); err != nil {            return errors.Trace(err)        }    }    return nil}func NewSlot(productName string, id int) *Slot {    return &Slot{        ProductName: productName,        Id:          id,        GroupId:     INVALID_ID,        State: SlotState{            Status:   SLOT_STATUS_OFFLINE,            LastOpTs: "0",            MigrateStatus: SlotMigrateStatus{                From: INVALID_ID,                To:   INVALID_ID,            },        },    }}func (s *Slot) Update(zkConn zkhelper.Conn) error {    data, err := json.Marshal(s)    zkPath := GetSlotPath(s.ProductName, s.Id)    _, err = zkhelper.CreateOrUpdate(zkConn, zkPath, string(data), 0, zkhelper.DefaultFileACLs(), true)    ...}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

个人觉得Codis的模型层写的不是太好,虽然提取了最重要的公共逻辑,但是模型层混入了太多的存储代码,包括JSON序列化和与ZooKeeper通信。再加上Golang本身的异常返回值判断,导致模型的很多函数代码都比较乱。

当然,也没必要严格遵照DDD将存储逻辑划分到Repository,甚至将模型划分成Value和Entity。而且Codis作者是短时间内完成开发的,所以不能“吹毛求疵”,看看后续Codis升级版本能否梳理出更加清晰的模型层。感觉ActiveRecord模式非常适合这种对ZooKeeper进行简单CRUD的场景,都是“单表”(ZooKeeper中的结点)操作而不存在关联操作,详细见最后一部分的分析。

3.3 分配Range

同样地,设置Range也是在Slot模型层中实现的。在./initslot.sh中,我们将0~511 Slot分配给了Group 1,将512~1023分配给Group 2。SetSlotRange()将范围内的Slot的GroupId从-1改为新分配的GroupId,并更新到ZooKeeper中:

func SetSlotRange(zkConn zkhelper.Conn, productName string, fromSlot, toSlot, groupId int, status SlotStatus) error {    ...    for i := fromSlot; i <= toSlot; i++ {        s, err := GetSlot(zkConn, productName, i)        if err != nil {            return errors.Trace(err)        }        s.GroupId = groupId        s.State.Status = status        data, err := json.Marshal(s)        if err != nil {            return errors.Trace(err)        }        zkPath := GetSlotPath(productName, i)        _, err = zkhelper.CreateOrUpdate(zkConn, zkPath, string(data), 0, zkhelper.DefaultFileACLs(), true)        if err != nil {            return errors.Trace(err)        }    }    ...}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

4.数据迁移

Codis的数据迁移实现了两个特性:

  • 不停机:迁移过程中,客户端可以继续通过Proxy访问Redis,不需要停机等待迁移完成。
  • 平滑:整个迁移过程不追求快速完成,而是非常平滑地完成,不会对正常业务产生影响。

4.1 Rebalance算法

下面就是Codis的Rebalance算法。两个辅助函数getLivingNodeInfos()和getQuotaMap()会拿到所有存活的Group以及其目标Quota。其中存活结点对象中保存的就是Group目前拥有的Slot数和内存数,而所谓目标Quota配额指的就是根据这些指标计算出Rebalance后每个Group应有的Slot数。

Codis目前使用的算法比较简单,不要被Rebalance()中的三层嵌套循环“吓住”了。具体来说,假如之前我们有Group 1负责Slot 0~511,Group 2负责Slot 512~1023。现在新增一个Group 3,没有负责任何Slot,此时通过命令行或Web管理页面触发”Auto Rebalance”。

因为Group 1和2的当前Slot肯定大于目标Quota,所以遍历到它俩时什么都不会做,主要处理就在Group 3。当遍历到Group 3时,它会从Group 1和2中不断迁移过来Slot,直到达到目标Quota再停止。

// experimental simple auto rebalance :)func Rebalance(zkConn zkhelper.Conn, delay int) error {    targetQuota, err := getQuotaMap(zkConn)    livingNodes, err := getLivingNodeInfos(zkConn)    log.Info("start rebalance")    for _, node := range livingNodes {        for len(node.CurSlots) > targetQuota[node.GroupId] {            for _, dest := range livingNodes {                if dest.GroupId != node.GroupId && len(dest.CurSlots) < targetQuota[dest.GroupId] {                    slot := node.CurSlots[len(node.CurSlots)-1]                    // create a migration task                    t := NewMigrateTask(MigrateTaskInfo{                        Delay:      delay,                        FromSlot:   slot,                        ToSlot:     slot,                        NewGroupId: dest.GroupId,                        Status:     MIGRATE_TASK_MIGRATING,                        CreateAt:   strconv.FormatInt(time.Now().Unix(), 10),                    })                    u, err := uuid.NewV4()                    t.Id = u.String()                    if ok, err := preMigrateCheck(t); ok {                        // do migrate                        err := t.run()                    }                    node.CurSlots = node.CurSlots[0 : len(node.CurSlots)-1]                    dest.CurSlots = append(dest.CurSlots, slot)                }            }        }    }    log.Info("rebalance finish")    return nil}func getQuotaMap(zkConn zkhelper.Conn) (map[int]int, error) {    nodes, err := getLivingNodeInfos(zkConn)    ret := make(map[int]int)    var totalMem int64    totalQuota := 0    for _, node := range nodes {        totalMem += node.MaxMemory    }    for _, node := range nodes {        quota := int(models.DEFAULT_SLOT_NUM * node.MaxMemory * 1.0 / totalMem)        ret[node.GroupId] = quota        totalQuota += quota    }    // round up    if totalQuota < models.DEFAULT_SLOT_NUM {        for k, _ := range ret {            ret[k] += models.DEFAULT_SLOT_NUM - totalQuota            break        }    }    return ret, nil}func getLivingNodeInfos(zkConn zkhelper.Conn) ([]*NodeInfo, error) {    groups, err := models.ServerGroups(zkConn, globalEnv.ProductName())    slots, err := models.Slots(zkConn, globalEnv.ProductName())    slotMap := make(map[int][]int)    for _, slot := range slots {        if slot.State.Status == models.SLOT_STATUS_ONLINE {            slotMap[slot.GroupId] = append(slotMap[slot.GroupId], slot.Id)        }    }    var ret []*NodeInfo    for _, g := range groups {        master, err := g.Master(zkConn)        out, err := utils.GetRedisConfig(master.Addr, "maxmemory")        maxMem, err := strconv.ParseInt(out, 10, 64)        node := &NodeInfo{            GroupId:   g.Id,            CurSlots:  slotMap[g.Id],            MaxMemory: maxMem,        }        ret = append(ret, node)    }    return ret, nil}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

具体每个Slot是如何迁移的“秘密”就在MigrateTask中,下面就去一探究竟!

4.2 Migrate任务

MigrateTask每次迁移一个Slot,并更新进度给Web监控页面显示。migrateSingleSlot()函数中首先修改Slot的状态,然后再调用Migrator做迁移。SetMigrateStatus()并不是简单的修改Slot状态保存到ZooKeeper中就完成了,它还肩负着与各个Proxy做Pre-migrate确认的工作,下面就会看到这个过程。注意Codis这里对Migrator做了抽象,提取出了接口。

// migrate multi slotsfunc (t *MigrateTask) run() error {    // create zk conn on demand    t.zkConn = CreateZkConn()    defer t.zkConn.Close()    to := t.NewGroupId    t.Status = MIGRATE_TASK_MIGRATING    for slotId := t.FromSlot; slotId <= t.ToSlot; slotId++ {        err := t.migrateSingleSlot(slotId, to)        t.Percent = (slotId - t.FromSlot + 1) * 100 / (t.ToSlot - t.FromSlot + 1)        log.Info("total percent:", t.Percent)    }    t.Status = MIGRATE_TASK_FINISHED    log.Info("migration finished")    return nil}func (t *MigrateTask) migrateSingleSlot(slotId int, to int) error {    // set slot status    s, err := models.GetSlot(t.zkConn, t.productName, slotId)    from := s.GroupId    if s.State.Status == models.SLOT_STATUS_MIGRATE {        from = s.State.MigrateStatus.From    }    // modify slot status    if err := s.SetMigrateStatus(t.zkConn, from, to); err != nil {        log.Error(err)        return err    }    err = t.slotMigrator.Migrate(s, from, to, t, func(p SlotMigrateProgress) {        // on migrate slot progress        if p.Remain%500 == 0 {            log.Info(p)        }    })    // migrate done, change slot status back    s.State.Status = models.SLOT_STATUS_ONLINE    s.State.MigrateStatus.From = models.INVALID_ID    s.State.MigrateStatus.To = models.INVALID_ID    if err := s.Update(t.zkConn); err != nil {        log.Error(err)        return err    }    return nil}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

4.3 Pre-migrate确认

SetMigrateStatus()首先会新建一个Action与各个Proxy进行确认,确认完毕后才会将Slot的状态改为Migrate。

func (s *Slot) SetMigrateStatus(zkConn zkhelper.Conn, fromGroup, toGroup int) error {    // wait until all proxy confirmed    err := NewAction(zkConn, s.ProductName, ACTION_TYPE_SLOT_PREMIGRATE, s, "", true)    s.State.Status = SLOT_STATUS_MIGRATE    s.State.MigrateStatus.From = fromGroup    s.State.MigrateStatus.To = toGroup    s.GroupId = toGroup    return s.Update(zkConn)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Action也是Codis中非常重要的一个Model,它是Dashboard与Proxy之间通过ZooKeeper交换信息的数据类。NewAction()函数经过删减还是这么一大堆,其实它要做的简单说来就是:1)创建Action;2)取到存活的Proxy列表;3)保存Action到actions和ActionResponse路径下;4)等待所有Proxy的确认消息。

这里除了之前提到的Codis的模型层混入了很多存储和序列化的代码以及异常返回值判断外,还有一点就是:用ZooKeeper实现消息队列很“蹩脚”。Codis的方法是先创建一个ZooKeeper的Sequential结点,目的就是获得一个唯一的序列号,然后立刻删除这个结点。最后使用这个序列号在actions和ActionResponse路径下创建真正的结点。

func NewAction(zkConn zkhelper.Conn, productName string, actionType ActionType, target interface{}, desc string, needConfirm bool) error {    ts := strconv.FormatInt(time.Now().Unix(), 10)    action := &Action{        Type:   actionType,        Desc:   desc,        Target: target,        Ts:     ts,    }    // set action receivers    proxies, err := ProxyList(zkConn, productName, func(p *ProxyInfo) bool {        return p.State == PROXY_STATE_ONLINE    })    for _, p := range proxies {        buf, err := json.Marshal(p)        if err != nil {            return errors.Trace(err)        }        action.Receivers = append(action.Receivers, string(buf))    }    b, _ := json.Marshal(action)    prefix := GetWatchActionPath(productName)    //action root path    err = CreateActionRootPath(zkConn, prefix)    //response path    respPath := path.Join(path.Dir(prefix), "ActionResponse")    err = CreateActionRootPath(zkConn, respPath)    //create response node, etcd do not support create in order directory    //get path first    actionRespPath, err := zkConn.Create(respPath+"/", b, int32(zk.FlagSequence), zkhelper.DefaultFileACLs())    //remove file then create directory    zkConn.Delete(actionRespPath, -1)    actionRespPath, err = zkConn.Create(actionRespPath, b, 0, zkhelper.DefaultDirACLs())    suffix := path.Base(actionRespPath)    // create action node    actionPath := path.Join(prefix, suffix)    _, err = zkConn.Create(actionPath, b, 0, zkhelper.DefaultFileACLs())    if needConfirm {        if err := WaitForReceiver(zkConn, productName, actionRespPath, proxies); err != nil {            return errors.Trace(err)        }    }    return nil}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

WaitForReceiver()等待Proxy确认的逻辑相对清晰一些,Proxy响应方式就是通过evtbus接收到消息后,将自己的Proxy模型类加入到ActionResponse路径下刚才新建的Action结点下,作为其子结点。WaitForReceiver()不断轮询子结点数,当等于所有存活Proxy数时就说明所有Proxy都确认应答了,可以返回了。如果发现某个Proxy没应答则将其置成下线,避免这个Proxy保存了过期的Slot与Group对应关系。

这里Codis没有清理掉已完成的ActionResponse下的Action结点,而是有专门的ActionGC()函数去处理,不得不说这部分的设计配合上ZooKeeper真是很复杂!

func WaitForReceiver(zkConn zkhelper.Conn, productName string, actionZkPath string, proxies []ProxyInfo) error {    times := 0    var proxyIds []string    var offlineProxyIds []string    for _, p := range proxies {        proxyIds = append(proxyIds, p.Id)    }    sort.Strings(proxyIds)    // check every 500ms    for times < 60 {        if times >= 6 && (times*500)%1000 == 0 {            log.Warning("abnormal waiting time for receivers", actionZkPath)        }        nodes, _, err := zkConn.Children(actionZkPath)        var confirmIds []string        for _, node := range nodes {            id := path.Base(node)            confirmIds = append(confirmIds, id)        }        if len(confirmIds) != 0 {            sort.Strings(confirmIds)            if utils.Strings(proxyIds).Eq(confirmIds) {                return nil            }            offlineProxyIds = proxyIds[len(confirmIds)-1:]        }        times += 1        time.Sleep(500 * time.Millisecond)    }    // set offline proxies    for _, id := range offlineProxyIds {        log.Errorf("mark proxy %s to PROXY_STATE_MARK_OFFLINE", id)        if err := SetProxyStatus(zkConn, productName, id, PROXY_STATE_MARK_OFFLINE); err != nil {            return err        }    }    return ErrReceiverTimeout}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

4.4 Migrate过程

现在所有Proxy都确认了,终于可以迁移了!Migrate()会不断向Slot迁移的源Redis发送迁移命令”SLOTSMGRTTAGSLOT”,这个命令是Codis修改Redis源码自己实现的,每次会从这个Slot中随机迁移一个Key到目的Redis,并保证原子性。所以要用循环反复发送这个迁移命令到Redis,直到Slot中的所有Key都迁移完成了才算是完成。

func (m *CodisSlotMigrator) Migrate(slot *models.Slot, fromGroup, toGroup int, task *MigrateTask, onProgress func(SlotMigrateProgress)) (err error) {    groupFrom, err := models.GetGroup(task.zkConn, task.productName, fromGroup)    groupTo, err := models.GetGroup(task.zkConn, task.productName, toGroup)    fromMaster, err := groupFrom.Master(task.zkConn)    toMaster, err := groupTo.Master(task.zkConn)    c, err := redis.Dial("tcp", fromMaster.Addr)    defer c.Close()    _, remain, err := sendRedisMigrateCmd(c, slot.Id, toMaster.Addr)    for remain > 0 {        if task.Delay > 0 {            time.Sleep(time.Duration(task.Delay) * time.Millisecond)        }        _, remain, err = sendRedisMigrateCmd(c, slot.Id, toMaster.Addr)        if remain >= 0 {            onProgress(SlotMigrateProgress{                SlotId:    slot.Id,                FromGroup: fromGroup,                ToGroup:   toGroup,                Remain:    remain,            })        }    }    return nil}func sendRedisMigrateCmd(c redis.Conn, slotId int, toAddr string) (int, int, error) {    addrParts := strings.Split(toAddr, ":")    if len(addrParts) != 2 {        return -1, -1, ErrInvalidAddr    }    reply, err := redis.Values(c.Do("SLOTSMGRTTAGSLOT", addrParts[0], addrParts[1], MIGRATE_TIMEOUT, slotId))    if err != nil {        return -1, -1, err    }    var succ, remain int    if _, err := redis.Scan(reply, &succ, &remain); err != nil {        return -1, -1, err    }    return succ, remain, nil}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

4.5 迁移中的数据访问

对于迁移中的Slot,如果恰好此时有客户端要访问该Slot中的某个Key该怎么办?Codis不是遇到这个问题的第一个中间件,像Taobao Tair中也有对此的解决方案:

发生迁移的时候data server如何对外提供服务?
当迁移发生的时候, 我们举个例子, 假设data server A 要把 桶 3,4,5 迁移给data server B. 因为迁移完成前, 客户端的路由表没有变化, 客户端对 3, 4, 5 的访问请求都会路由到A. 现在假设 3还没迁移, 4 正在迁移中, 5已经迁移完成. 那么如果是对3的访问, 则没什么特别, 跟以前一样. 如果是对5的访问, 则A会把该请求转发给B,并且将B的返回结果返回给客户, 如果是对4的访问, 在A处理, 同时如果是对4的修改操作, 会记录修改log.当桶4迁移完成的时候, 还要把log发送到B, 在B上应用这些log. 最终A B上对于桶4来说, 数据完全一致才是真正的迁移完成

但Codis采取的是不同的策略。当迁移过程中发生数据访问时,Proxy会发送”slotsmgrttagone”迁移命令给Redis,强制将客户端要访问的Key立刻迁移,然后再处理客户端的请求。

func (s *Server) dispatch(r *PipelineRequest) {    s.handleMigrateState(r.slotIdx, r.keys[0])    tr, ok := s.pipeConns[s.slots[r.slotIdx].dst.Master()]    if !ok {        //try recreate taskrunner        if err := s.createTaskRunner(s.slots[r.slotIdx]); err != nil {            r.backQ <- &PipelineResponse{ctx: r, resp: nil, err: err}            return        }        tr = s.pipeConns[s.slots[r.slotIdx].dst.Master()]    }    tr.in <- r}func (s *Server) handleMigrateState(slotIndex int, key []byte) error {    shd := s.slots[slotIndex]    if shd.slotInfo.State.Status != models.SLOT_STATUS_MIGRATE {        return nil    }    redisConn, err := s.pools.GetConn(shd.migrateFrom.Master())    defer s.pools.ReleaseConn(redisConn)    redisReader := redisConn.(*redispool.PooledConn).BufioReader()    err = WriteMigrateKeyCmd(redisConn.(*redispool.PooledConn), shd.dst.Master(), 30*1000, key)    ...    return nil}func WriteMigrateKeyCmd(w io.Writer, addr string, timeoutMs int, key []byte) error {    hostPort := strings.Split(addr, ":")    if len(hostPort) != 2 {        return errors.Errorf("invalid address " + addr)    }    respW := respcoding.NewRESPWriter(w)    err := respW.WriteCommand("slotsmgrttagone", hostPort[0], hostPort[1],        strconv.Itoa(int(timeoutMs)), string(key))    return errors.Trace(err)}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

5.后记:AR模型层

前面曾提到过,对于存储到ZooKeeper的模型,个人感觉使用ActiveRecord模式(AR)会非常方便。以RubyOnRails中AR为例,下面是典型的AR用法:

class User < ActiveRecord::Base  self.table_name = "tb_user"enduser = User.newuser.name = "David"user.occupation = "Code Artist"user = User.find_by(name: 'David')user.name = 'Dave'user.saveusers = User.where(name: 'David', occupation: 'Code Artist').order('created_at DESC')
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在Rails或者说Ruby中,User继承ActiveRecord::Base类,运行时User就会自动具有增删改查方法。然而,要想在Java中实现类似的ActiveRecord模式的模型层不是件容易事,主要问题在于Java的静态性。尽管字节码中不断新增各种新字段来保存运行时的类信息,从而增强反射功能,但限制依旧存在。

其实,保存、更新、删除都好办,因为这三种操作的前提是已经拿到模型的一个实例了。最核心的问题在查询上!要想在Java静态方法中获取正在调用的类实例信息貌似做不到啊。像jOOQ、JFinal等框架实现AR时都采取了不同的“迂回”策略,通过各种间接方式获得运行时的对象信息。以下是一种更为直接的方式,在各个具体模型类中自己根据需要定义查询方法。

public abstract class Model {    private static CuratorFramework client;    private static ObjectMapper mapper;    public String save() {        try {            return client.create()                        .creatingParentsIfNeeded()                        .withMode(ephemeral() ? EPHEMERAL : PERSISTENT)                        .forPath(path(), marshal());        } catch (Exception e) {            throw new IllegalStateException(e);        }    }    public void update() {        try {            client.setData().forPath(path(), marshal());        } catch (Exception e) {            throw new IllegalStateException(e);        }    }    /** FIXME: Supposed to be static, yet cannot get class info in static context in Java */    @SuppressWarnings("unchecked")    public <T> T find() {        try {            return (T) unmarshal(client.getData().forPath(path()), getClass());        } catch (Exception e) {            throw new IllegalStateException(e);        }    }    ...}public class Slot extends Model {    private int id;    private SlotState state;    private int groupId;    /**     * Initialize all slots.     * @param totalNum      total number of slots     */    public static void initSlotSet(int totalNum) {        try {            for (int i = 0; i < totalNum; i++) {                new Slot(i).save();            }        } catch (Exception e) {            throw new IllegalStateException("Error when init slots", e);        }    }    public static Slot find(int id) {        Slot slot = new Slot();        slot.setId(id);        return slot.find();    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

这样虽然在Java里实现起来不那么优雅,但是在Model抽象类中统一管理了ZooKeeper客户端(Curator)和序列化框架(Jackson),并对子类提供了仿AR式的接口,使用起来还是很方便的,而且代码结构也清晰了很多。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
高质量服务端之路(二)
Redis集群方案总结
细说分布式 Redis 架构设计和那些踩过的坑
Codis的安装与使用
使用Codis搭建redis集群服务
Redis 集群方案调研
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服