Tao,在英文中的意思是“The ultimate principle of universe”,即“道”,它是宇宙的终极奥义。
“道生一,一生二,二生三,三生无穷。” ——《道德经》
Tao同时也是我用Go语言开发的一个异步的TCP服务器框架(TCP Asynchronous Go server FramewOrk),秉承Go语言“Less is more”的极简主义哲学,它能穿透一切表象,带你一窥网络编程的世界,让你从此彻底摆脱只会写“socket-bind-listen-accept”的窘境。本文将简单讨论一下这个框架的设计思路以及自己的一些思考。
你开发的产品有一套特有的业务逻辑,要通过互联网得到服务端的支持才能为你的客户提供服务。
怎样快速稳定地实现产品的功能,而不需要耗费大量的时间处理各种底层的网络通信细节。
Tao提供了一种用框架支撑业务逻辑的机制。你只需要与客户端定义好消息格式,然后将对应的业务逻辑编写成函数注册到框架中就可以了。
让我们举一个例子来看看如何使用Tao框架实现一个简单的群聊天服务器。服务器端代码可以这么写:
package mainimport ( "fmt" "runtime" "github.com/leesper/tao" "github.com/leesper/tao/examples/chat" "github.com/leesper/holmes")type ChatServer struct { tao.Server}func NewChatServer(addr string) *ChatServer { return &ChatServer { tao.NewTCPServer(addr), }}func main() { runtime.GOMAXPROCS(runtime.NumCPU()) defer holmes.Start().Stop() tao.MessageMap.Register(chat.CHAT_MESSAGE, chat.DeserializeChatMessage) tao.HandlerMap.Register(chat.CHAT_MESSAGE, chat.ProcessChatMessage) chatServer := NewChatServer(fmt.Sprintf("%s:%d", "0.0.0.0", 18341)) chatServer.SetOnConnectCallback(func(conn tao.Connection) bool { holmes.Info("%s", "On connect") return true }) chatServer.SetOnErrorCallback(func() { holmes.Info("%s", "On error") }) chatServer.SetOnCloseCallback(func(conn tao.Connection) { holmes.Info("%s", "Closing chat client") }) chatServer.Start()}
启动一个服务器只需要三步就能完成。首先注册消息和业务逻辑回调,其次填入IP地址和端口,最后Start一下就可以了。这时候客户端就能够发起连接,并开始聊天。业务逻辑的实现很简单,遍历所有的连接,然后发送数据:
func ProcessChatMessage(ctx tao.Context, conn tao.Connection) { if serverConn, ok := conn.(*tao.ServerConnection); ok { if serverConn.GetOwner() != nil { connections := serverConn.GetOwner().GetAllConnections() for v := range connections.IterValues() { c := v.(tao.Connection) c.Write(ctx.Message()) } } }}
Go语言是“云计算时代的C语言”,适用于开发基础性服务,比如服务器。它语法类似C语言且标准库丰富,上手较快,所以开发效率高;编译速度快,运行效率接近C,所以运行效率高。
Go语言面向对象编程的风格是“多用组合,少用继承”,以匿名嵌入的方式实现继承。举个例子,Tao框架中基于TCPServer定义了一个TLSTCPServer,用于支持传输层安全,像这样就可以了:
type TLSTCPServer struct{ certFile string keyFile string *TCPServer}
于是TLSTCPServer就自动继承了TCPServer所有的属性和方法。当然,这里是以指针的方式嵌入的。
Go语言的面向接口编程是“鸭子类型”的,即“如果我走起来像鸭子,叫起来像鸭子,那么我就是一只鸭子”。其他的编程语言需要显示地说明自己继承某个接口,Go语言却采取的是“隐式声明”的方式。比如Tao框架使用的多线程日志库Holmes实现“每小时创建一个新日志文件”功能的核心代码如下:
func (ls *logSegment)Write(p []byte) (n int, err error) { if ls.timeToCreate != nil && ls.logFile != os.Stdout && ls.logFile != os.Stderr { select { case current := <-ls.timeToCreate: ls.logFile.Close() ls.logFile = nil name := getLogFileName(current) ls.logFile, err = os.Create(path.Join(ls.logPath, name)) if err != nil { fmt.Fprintln(os.Stderr, err) ls.logFile = os.Stderr } else { next := current.Truncate(ls.unit).Add(ls.unit) ls.timeToCreate = time.After(next.Sub(time.Now())) } default: // do nothing } } return ls.logFile.Write(p)}
而标准库中的io.Writer定义如下,那么这里的logSegment就实现了io.Writer的接口,所有以io.Writer作为形参的函数,我都可以传一个logSegment的实参进去。
type Writer interface { Write(p []byte) (n int, err error)}
掌握Go语言,要把握“一个中心,两个基本点”。“一个中心”是Go语言并发模型,即“不要通过共享内存来通信,要通过通信来共享内存”;“两个基本点”是Go语言的并发模型的两大基石:channel和go-routine。理解了它们就能看懂大部分代码。下面让我们正式开始介绍Tao框架吧。
Tao框架提供两种服务器,一种是普通的TCPServer,另一种是传输层安全的TLSTCPServer。服务器的核心职责是“监听并接受客户端连接”。每个进程能够打开的文件描述符是有限制的,所以它还需要限制最大并发连接数,关键代码如下:
func (server *TCPServer) Start() { // omitted... listener, err := net.Listen("tcp", server.address) if err != nil { holmes.Fatal("%v", err) } defer listener.Close() var tempDelay time.Duration for server.IsRunning() { conn, err := listener.Accept() if err != nil { if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if tempDelay >= 1 * time.Second { tempDelay = 1 * time.Second } holmes.Error("Accept error %v, retrying in %d", err, tempDelay) time.Sleep(tempDelay) continue } return } tempDelay = 0 connSize := server.connections.Size() if server.connections.Size() >= MAX_CONNECTIONS { holmes.Error("Num of conns %d exceeding MAX %d, refuse", connSize, MAX_CONNECTIONS) conn.Close() continue } // omitted... go func() { tcpConn.Start() }() holmes.Info("Accepting client %s, net id %d, now %d\n", tcpConn.GetName(), netid, server.connections.Size()) }}
如果服务器在接受客户端连接请求的时候发生了临时错误,那么服务器将等待最多1秒的时间再重新尝试接受请求,如果现有的连接数超过了MAX_CONNECTIONS(1000),就拒绝并关闭连接,否则启动一个新的连接开始工作。
在其他的编程语言中,采用Reactor模式编写的服务器往往需要在一个IO线程异步地通过epoll进行多路复用。而因为协程的开销廉价,Go语言可以对每一个网络连接创建三个go-routine。readLoop()负责读取数据并反序列化成消息;writeLoop()负责序列化消息并发送二进制字节流;最后handleServerLoop()负责调用消息处理函数。这三个协程在Connection启动时就会各自独立运行:
func (conn *ServerConnection)Start() { if conn.GetOnConnectCallback() != nil { conn.GetOnConnectCallback()(conn) } conn.finish.Add(3) loopers := []func(Connection, *sync.WaitGroup) {readLoop, writeLoop, handleServerLoop} for _, l := range loopers { looper := l // necessary go looper(conn, conn.finish) }}
readLoop做了三件关键的工作。首先调用消息编解码器将接收到的字节流反序列化成消息;然后更新用于心跳检测的时间戳;最后,根据消息的协议号找到对应的消息处理函数,如果注册了消息回调函数,那么就调用该函数处理消息,否则将消息和处理函数打包发送到MessageHandlerChannel中。
/* readLoop() blocking read from connection, deserialize bytes into message,then find corresponding handler, put it into channel */func readLoop(conn Connection, finish *sync.WaitGroup) { // omitted... for { select { case <-conn.GetCloseChannel(): return default: } msg, err := conn.GetMessageCodec().Decode(conn) if err != nil { holmes.Error("Error decoding message %v", err) if _, ok := err.(ErrorUndefined); ok { // update heart beat timestamp conn.SetHeartBeat(time.Now().UnixNano()) continue } return } // update heart beat timestamp conn.SetHeartBeat(time.Now().UnixNano()) handler := HandlerMap.Get(msg.MessageNumber()) if handler == nil { if conn.GetOnMessageCallback() != nil { holmes.Info("Message %d call onMessage()", msg.MessageNumber()) conn.GetOnMessageCallback()(msg, conn) } else { holmes.Warn("No handler or onMessage() found for message %d", msg.MessageNumber()) } continue } // send handler to handleLoop if !conn.IsClosed() { conn.GetMessageHandlerChannel()<- MessageHandler{msg, handler} } }}
writeLoop做了一件事情,从MessageSendChannel中读取已序列化好的字节流,然后发送到网络上。但是要注意,该协程在连接关闭退出执行之前,会坚持将MessageSendChannel中的消息全部发送完毕,避免漏发消息,这就是关键所在。
/* writeLoop() receive message from channel, serialize it into bytes,then blocking write into connection */func writeLoop(conn Connection, finish *sync.WaitGroup) { defer func() { if p := recover(); p != nil { holmes.Error("panics: %v", p) } for packet := range conn.GetMessageSendChannel() { if packet != nil { if _, err := conn.GetRawConn().Write(packet); err != nil { holmes.Error("Error writing data %v", err) } } } finish.Done() conn.Close() }() for { select { case <-conn.GetCloseChannel(): return case packet := <-conn.GetMessageSendChannel(): if packet != nil { if _, err := conn.GetRawConn().Write(packet); err != nil { holmes.Error("Error writing data %v", err) return } } } }}
readLoop将消息和处理函数打包发给了MessageHandlerChannel,于是handleServerLoop就从MessageHandlerChannel中取出消息和处理函数,然后交给工作者线程池,由后者负责调度执行,完成对消息的处理。这里很好的诠释了Go语言是如何通过channel实现线程间通信的。
func handleServerLoop(conn Connection, finish *sync.WaitGroup) { // omitted... for { select { case <-conn.GetCloseChannel(): return case msgHandler := <-conn.GetMessageHandlerChannel(): msg := msgHandler.message handler := msgHandler.handler if !isNil(handler) { WorkerPoolInstance().Put(conn.GetNetId(), func() { handler(NewContext(msg, conn.GetNetId()), conn) }) } case timeout := <-conn.GetTimeOutChannel(): if timeout != nil { extraData := timeout.ExtraData.(int64) if extraData != conn.GetNetId() { holmes.Error("time out of %d running on client %d", extraData, conn.GetNetId()) } WorkerPoolInstance().Put(conn.GetNetId(), func() { timeout.Callback(time.Now(), conn) }) } } }}
任何一个实现了Message接口的类型,都是一个消息,它需要提供方法访问自己的协议号并将自己序列化成字节数组;另外,每个消息都需要注册自己的反序列化函数和处理函数:
type Message interface { MessageNumber() int32 Serialize() ([]byte, error)}type MessageMapType map[int32]UnmarshalFunctionfunc (mm *MessageMapType) Register(msgType int32, unmarshaler func([]byte) (Message, error)) { (*mm)[msgType] = UnmarshalFunction(unmarshaler)}func (mm *MessageMapType) Get(msgType int32) UnmarshalFunction { if unmarshaler, ok := (*mm)[msgType]; ok { return unmarshaler } return nil}type HandlerMapType map[int32]HandlerFunctionfunc (hm *HandlerMapType) Register(msgType int32, handler func(Context, Connection)) { (*hm)[msgType] = HandlerFunction(handler)}func (hm *HandlerMapType) Get(msgType int32) HandlerFunction { if fn, ok := (*hm)[msgType]; ok { return fn } return nil}
对每个消息处理函数而言,要处理的消息以及发送该消息的客户端都是不同的,这些信息被称为“消息上下文”,用Context结构表示,每个不同的客户端用一个64位整数netid标识:
// User context infotype Context struct{ message Message netid int64}func NewContext(msg Message, id int64) Context { return Context{ message: msg, netid: id, }}func (ctx Context)Message() Message { return ctx.message}func (ctx Context)Id() int64 { return ctx.netid}
接收数据时,编解码器(Codec)负责按照一定的格式将网络连接上读取的字节数据反序列化成消息,并将消息交给上层处理(解码);发送数据时,编解码器将上层传递过来的消息序列化成字节数据,交给下层发送(编码):
type Codec interface { Decode(Connection) (Message, error) Encode(Message) ([]byte, error)}
Tao框架采用的是“Type-Length-Data”的格式打包数据。Type占4个字节,表示协议类型;Length占4个字节,表示消息长度,Data为变长字节序列,长度由Length表示。反序列化时,由Type字段可以确定协议类型,然后截取Length长度的字节数据Data,并调用注册到MessageMap中的反序列化函数处理。核心代码如下:
// use type-length-value format: |4 bytes|4 bytes|n bytes <= 8M|type TypeLengthValueCodec struct {}func (codec TypeLengthValueCodec)Decode(c Connection) (Message, error) { byteChan := make(chan []byte) errorChan := make(chan error) var err error go func(bc chan []byte, ec chan error) { typeData := make([]byte, NTYPE) _, err = io.ReadFull(c.GetRawConn(), typeData) if err != nil { ec<- err return } bc<- typeData }(byteChan, errorChan) var typeBytes []byte select { case <-c.GetCloseChannel(): return nil, ErrorConnClosed case err = <-errorChan: return nil, err case typeBytes = <-byteChan: typeBuf := bytes.NewReader(typeBytes) var msgType int32 if err = binary.Read(typeBuf, binary.LittleEndian, &msgType); err != nil { return nil, err } lengthBytes := make([]byte, NLEN) _, err = io.ReadFull(c.GetRawConn(), lengthBytes) if err != nil { return nil, err } lengthBuf := bytes.NewReader(lengthBytes) var msgLen uint32 if err = binary.Read(lengthBuf, binary.LittleEndian, &msgLen); err != nil { return nil, err } if msgLen > MAXLEN { holmes.Error("len %d, type %d", msgLen, msgType) return nil, ErrorIllegalData } // read real application message msgBytes := make([]byte, msgLen) _, err = io.ReadFull(c.GetRawConn(), msgBytes) if err != nil { return nil, err } // deserialize message from bytes unmarshaler := MessageMap.Get(msgType) if unmarshaler == nil { return nil, Undefined(msgType) } return unmarshaler(msgBytes) }}
这里的代码存在一些微妙的设计,需要仔细解释一下。TypeLengthValueCodec.Decode()函数会被readLoop协程用到。因为io.ReadFull()是同步调用,没有数据可读时会阻塞readLoop协程。此时如果关闭网络连接,readLoop协程将无法退出。所以这里的代码用到了一个小技巧:专门开辟了一个新协程来等待读取最开始的4字节Type数据,然后自己select阻塞在多个channel上,这样就不会忽略其他channel传递过来的消息。一旦成功读取到Type数据,就继续后面的流程:读取Length数据,根据Length读取应用数据交给先前注册好的反序列化函数。注意,如果收到超过最大长度的数据就会关闭连接,这是为了防止外部程序恶意消耗系统资源。
为了提高框架的健壮性,避免因为处理业务逻辑造成的响应延迟,消息处理函数一般都会被调度到工作者协程池执行。设计工作者协程池的一个关键是如何将任务散列给池子中的不同协程。一方面,要避免并发问题,必须保证同一个网络连接发来的消息都被散列到同一个协程按顺序执行;另一方面,散列一定要是均匀的,不能让协程“忙的忙死,闲的闲死”。关键还是在散列函数的设计上。
协程池是按照单例模式设计的。创建时会调用newWorker()创建一系列worker协程。
// <<singleton>>type WorkerPool struct { workers []*worker closeChan chan struct{}}var ( globalWorkerPool *WorkerPool)func WorkerPoolInstance() *WorkerPool { return globalWorkerPool}func newWorker(i int, c int, closeChan chan struct{}) *worker { w := &worker{ index: i, callbackChan: make(chan workerFunc, c), closeChan: closeChan, } go w.start() return w}
给工作者协程分配任务的方式很简单,通过hashCode()散列函数找到对应的worker协程,然后把回调函数发送到对应协程的channel中。对应协程在运行时就会从channel中取出然后执行,在start()函数中。
func (wp *WorkerPool) Put(k interface{}, cb func()) error { code := hashCode(k) return wp.workers[code & uint32(len(wp.workers) - 1)].put(workerFunc(cb))}func (w *worker) start() { for { select { case <-w.closeChan: break case cb := <-w.callbackChan: func() { cb() }() } } close(w.callbackChan)}func (w *worker) put(cb workerFunc) error { select { case w.callbackChan<- cb: return nil default: return ErrorWouldBlock }}
Tao框架设计了一个定时器TimingWheel,用来控制定时任务。Connection在此基础上进行了进一步封装。提供定时执行(RunAt),延时执行(RunAfter)和周期执行(RunEvery)功能。这里通过定时器的设计引出多线程编程的一点经验之谈。
每个定时任务由一个timerType表示,它带有自己的id和包含定时回调函数的结构OnTimeOut。expiration表示该任务到期要被执行的时间,interval表示时间间隔,interval > 0意味着该任务是会被周期性重复执行的任务。
/* 'expiration' is the time when timer time out, if 'interval' > 0the timer will time out periodically, 'timeout' contains the callbackto be called when times out */type timerType struct { id int64 expiration time.Time interval time.Duration timeout *OnTimeOut index int // for container/heap}type OnTimeOut struct { Callback func(time.Time, interface{}) ExtraData interface{}}func NewOnTimeOut(extra interface{}, cb func(time.Time, interface{})) *OnTimeOut { return &OnTimeOut{ Callback: cb, ExtraData: extra, }}
定时器需要按照到期时间的顺序从最近到最远排列,这是一个天然的小顶堆,于是这里采用标准库container/heap创建了一个堆数据结构来组织定时任务,存取效率达到O(nlogn)。
type timerHeapType []*timerTypefunc (heap timerHeapType) getIndexById(id int64) int { for _, t := range heap { if t.id == id { return t.index } } return -1}func (heap timerHeapType) Len() int { return len(heap)}func (heap timerHeapType) Less(i, j int) bool { return heap[i].expiration.UnixNano() < heap[j].expiration.UnixNano()}func (heap timerHeapType) Swap(i, j int) { heap[i], heap[j] = heap[j], heap[i] heap[i].index = i heap[j].index = j}func (heap *timerHeapType) Push(x interface{}) { n := len(*heap) timer := x.(*timerType) timer.index = n *heap = append(*heap, timer)}func (heap *timerHeapType) Pop() interface{} { old := *heap n := len(old) timer := old[n-1] timer.index = -1 *heap = old[0 : n-1] return timer}
TimingWheel在创建时会启动一个单独协程来运行定时器核心代码start()。它在多个channel上进行多路复用操作:如果从cancelChan收到timerId,就执行取消操作:从堆上删除对应的定时任务;将定时任务数量发送给sizeChan,别的线程就能获取当前定时任务数;如果从quitChan收到消息,定时器就会被关闭然后退出;如果从addChan收到timer,就将该定时任务添加到堆;如果从tw.ticker.C收到定时信号,就调用getExpired()函数获取到期的任务,然后将这些任务回调发送到TimeOutChannel中,其他相关线程会通过该channel获取并执行定时回调。最后tw.update()会更新周期性执行的定时任务,重新调度执行。
func (tw *TimingWheel) update(timers []*timerType) { if timers != nil { for _, t := range timers { if t.isRepeat() { t.expiration = t.expiration.Add(t.interval) heap.Push(&tw.timers, t) } } }}func (tw *TimingWheel) start() { for { select { case timerId := <-tw.cancelChan: index := tw.timers.getIndexById(timerId) if index >= 0 { heap.Remove(&tw.timers, index) } case tw.sizeChan<- tw.timers.Len(): case <-tw.quitChan: tw.ticker.Stop() return case timer := <-tw.addChan: heap.Push(&tw.timers, timer) case <-tw.ticker.C: timers := tw.getExpired() for _, t := range timers { tw.GetTimeOutChannel()<- t.timeout } tw.update(timers) } }}
用Tao框架开发的服务器一开始总是时不时地崩溃。有时候运行了几个小时服务器就突然退出了。查看打印出来的调用栈发现。每次程序都在定时器上崩溃,原因是数组访问越界。这就是并发访问导致的问题,为什么呢?因为定时器的核心函数在一个协程中操作堆数据结构,与此同时其提供的添加,删除等接口却有可能在其他协程中调用。多个协程并发访问一个没有加锁的数据结构,必然会出现问题。解决方法很简单:将多个协程的并发访问转化为单个协程的串行访问,也就是将添加,删除等操作发送给不同的channel,然后在start()协程中统一处理:
func (tw *TimingWheel)AddTimer(when time.Time, interv time.Duration, to *OnTimeOut) int64 { if to == nil { return int64(-1) } timer := newTimer(when, interv, to) tw.addChan<- timer return timer.id}func (tw *TimingWheel) Size() int { return <-tw.sizeChan}func (tw *TimingWheel)CancelTimer(timerId int64) { tw.cancelChan<- timerId}
陈硕在他的《Linux多线程服务端编程》一书中说到,维护长连接的服务器都应该在应用层自己实现心跳消息:
“在严肃的网络程序中,应用层的心跳协议是必不可少的。应该用心跳消息来判断对方进程是否能正常工作。”
要使用一个连接来同时发送心跳和其他业务消息,这样一旦应用层因为出错发不出消息,对方就能够立刻通过心跳停止感知到。值得注意的是,在Tao框架中,定时器只有一个,而客户端连接可能会有很多个。在长连接模式下,每个客户端都需要处理心跳包,或者其他类型的定时任务。将框架设计为“每个客户端连接自带一个定时器”是不合适的——有十万个连接就有十万个定时器,会有较高的CPU占用率。定时器应该只有一个,所有客户端注册进来的定时任务都由它负责处理。但是如果所有的客户端连接都等待唯一一个定时器发来的消息,就又会存在并发问题。比如client1的定时任务到期了,但它现在正忙着处理其他消息,这个定时任务就可能被其他client执行。所以这里采取了一种“先集中后分散”的处理机制:每一个定时任务都由一个TimeOut结构表示,该结构中除了回调函数还包含一个ExtraData。客户端启动定时任务的时候都会填入net id。TCPServer统一接收定时任务,然后从定时任务中取出net id,然后将该定时任务交给相应的Connection去执行:
/* Retrieve the extra data(i.e. net id), and then redispatchtimeout callbacks to corresponding client connection, thisprevents one client from running callbacks of other clients */func (server *TCPServer) timeOutLoop() { defer server.finish.Done() for { select { case <-server.closeServChan: return case timeout := <-server.GetTimingWheel().GetTimeOutChannel(): netid := timeout.ExtraData.(int64) if conn, ok := server.connections.Get(netid); ok { tcpConn := conn.(Connection) if !tcpConn.IsClosed() { tcpConn.GetTimeOutChannel()<- timeout } } else { holmes.Warn("Invalid client %d", netid) } } }}
当我们谈论并发编程的时候,我们在谈论什么?用一句话概括:当多个线程同时访问一个未受保护的共享数据时,就会产生并发问题。那么多线程编程的本质就是怎样避免上述情况的发生了。这里总结一些,有三种基本的方法。
这是教科书上最常见的方法了。用各种信号量/互斥锁对数据结构进行保护,先加锁,然后执行操作,最后解锁。举个例子,Tao框架中用于网络连接管理的ConnectionMap就是这么实现的:
type ConnectionMap struct{ sync.RWMutex m map[int64]Connection}func NewConnectionMap() *ConnectionMap { return &ConnectionMap{ m: make(map[int64]Connection), }}func (cm *ConnectionMap)Get(k int64) (Connection, bool) { cm.RLock() conn, ok := cm.m[k] cm.RUnlock() return conn, ok}func (cm *ConnectionMap)Put(k int64, v Connection) { cm.Lock() cm.m[k] = v cm.Unlock()}func (cm *ConnectionMap)Size() int { cm.RLock() size := len(cm.m) cm.RUnlock() return size}
这种方法在前面已经介绍过,它属于无锁化的一种编程方式。多个线程的操作请求都放到一个任务队列中,最终由一个单一的线程来读取队列并串行执行。这种方法在并发量很大的时候还是会有性能瓶颈。
最好的办法还是要从数据结构上入手,有很多技巧能够让数据结构适应多线程并发访问的场景。比如Java标准库中的java.util.concurrent,包含了各种并发数据结构,其中ConcurrentHashMap的基本原理就是分段锁,对每个段(Segment)加锁保护,并发写入数据时通过散列函数分发到不同的段上面,在SegmentA上加锁并不影响SegmentB的访问。
处理并发多线程问题,一定要小心再小心,思考再思考,一不注意就会踩坑
联系客服