打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
高性能Socket设计实现

高性能Socket设计实现

2010-08-31 14:33 by 田志良, 2250 visits, 收藏, 编辑

因为学习的需要,要求一个高性能的Socket服务器来提供多而繁杂的客户端连接请求,参考了许多资料和各位的思想,自己琢磨出了一套方案,觉的可行,于是拿出来晒晒,希望大家一起学习改进。(这个方案的1.0版本已经贴出来了,但是由于本人觉的1.0不太完美,做了下改进,本篇讲的主要是2.0)

1.0的文章参考:http://www.cnblogs.com/niuchenglei/archive/2009/07/23/1529462.html
1.0和2.0性能上基本没有变化,只是针对某些地方做了改进性的修改,本篇主要介绍原理,并贴出部分代码,上一篇是一个Overview。

设计原则:使用.net的SocketAsyncEventArgs(原因是这个比较简单,而且性能也很好,当然要是c++的话就用IOCP了)。考虑到能快速的反应用户的连接请求我采用了连接池的技术,类似于sqlserver的连接池,当然我的“池”还不够好,为了能快速的处理接受的数据我又加入了一个缓冲区池,说白了就是给每一个连接对象事先开辟好了空间。在传输方面,为了保证数据的有效性我们采用客户端和服务器端的验证(当然也不是太复杂)。

具体分析:分析的顺序是自底向上的

1.MySocketAsyncEventArgs类:这个类是一个继承自System.Net.Socket.SocketAsyncEventArgs类,是由于特定情况需要而添加了一些外加属性的类。

internal sealed class MySocketAsyncEventArgs : SocketAsyncEventArgs{internal string UID;private string Property;internal MySocketAsyncEventArgs(string property){this.Property = property;}}

UID:用户标识符,用来标识这个连接是那个用户的。
Property:标识该连接是用来发送信息还是监听接收信息的。param:Receive/Send,MySocketAsyncEventArgs类只带有一个参数的构造函数,说明类在实例化时就被说明是用来完成接收还是发送任务的。

2.SocketAsyncEventArgsWithId类:该类是一个用户的连接的最小单元,也就是说对一个用户来说有两个SocketAsyncEventArgs对象,这两个对象是一样的,但是有一个用来发送消息,一个接收消息,这样做的目的是为了实现双工通讯,提高用户体验。默认的用户标识是"-1”,状态是false表示不可用

internal sealed class SocketAsyncEventArgsWithId:IDisposable{private string uid = "-1";private bool state = false;private MySocketAsyncEventArgs receivesaea;private MySocketAsyncEventArgs sendsaea;internal string UID{get { return uid; }set{uid = value;ReceiveSAEA.UID = value;SendSAEA.UID = value;}}}
UID:用户标识,跟MySocketAsyncEventArgs的UID是一样的,在对SocketAsycnEventArgsWithId的UID属性赋值的时候也对MySocketAsyncEventArgs的UID属性赋值。
State:表示连接的可用与否,一旦连接被实例化放入连接池后State即变为True

3.SocketAsyncEventArgsPool类:这个类才是真正的连接池类,这个类真正的为server提供一个可用的用户连接,并且维持这个连接直到用户断开,并把不用的连接放回连接池中供下一用户连接。

这个类是最核心的东西了,当然它设计的好坏影响着总体性能的好坏,它的各项操作也可能成为整个服务器性能的瓶颈。Pool包含有几个成员:

  • Stack<SocketAsyncEventArgsWithId> pool : 从字面意思上就知道这是一个连接栈,用来存放空闲的连接的,使用时pop出来,使用完后push进去。
  • IDictionary<string, SocketAsyncEventArgsWithId> busypool :这个也很好理解,busypool是一个字典类型的,用来存放正在使用的连接的,key是用户标识,设计的目的是为了统计在线用户数目和查找相应用户的连接,当然这是很重要的,为什么设计成字典类型的,是因为我们查找时遍历字典的关键字就行了而不用遍历每一项的UID,这样效率会有所提高。
  • string[] keys:这是一个存放用户标识的数组,起一个辅助的功能。
  • Count属性:返回连接池中可用的连接数。
  • OnlineUID属性:返回在线用户的标识列表。
  • Pop(string uid)方法:用于获取一个可用连接给用户。
  • Push(SocketAsyncEventArgsWithId item)方法:把一个使用完的连接放回连接池。
  • FindByUID(string uid)方法:查找在线用户连接,返回这个连接。
  • BusyPoolContains(string uid)方法:判断某个用户的连接是否在线。
   
 internal sealed class SocketAsyncEventArgsPool:IDisposable{internal Stack<SocketAsyncEventArgsWithId> pool;internal IDictionary<string, SocketAsyncEventArgsWithId> busypool;private string[] keys;internal Int32 Count{get{lock (this.pool){return this.pool.Count;}}}internal string[] OnlineUID{get{lock (this.busypool){busypool.Keys.CopyTo(keys, 0);}return keys;}}internal SocketAsyncEventArgsPool(Int32 capacity){keys = new string[capacity];this.pool = new Stack<SocketAsyncEventArgsWithId>(capacity);this.busypool = new Dictionary<string, SocketAsyncEventArgsWithId>(capacity);}internal SocketAsyncEventArgsWithId Pop(string uid){if (uid == string.Empty || uid == "")return null;SocketAsyncEventArgsWithId si = null;lock (this.pool){si = this.pool.Pop();}si.UID = uid;si.State = true;    //mark the state of pool is not the initial stepbusypool.Add(uid, si);return si;}internal void Push(SocketAsyncEventArgsWithId item){if (item == null)throw new ArgumentNullException("SocketAsyncEventArgsWithId对象为空");if (item.State == true){if (busypool.Keys.Count != 0){if (busypool.Keys.Contains(item.UID))busypool.Remove(item.UID);elsethrow new ArgumentException("SocketAsyncEventWithId不在忙碌队列中");}elsethrow new ArgumentException("忙碌队列为空");}item.UID = "-1";item.State = false;lock (this.pool){this.pool.Push(item);}}internal SocketAsyncEventArgsWithId FindByUID(string uid){if (uid == string.Empty || uid == "")return null;SocketAsyncEventArgsWithId si = null;foreach (string key in this.OnlineUID){if (key == uid){si = busypool[uid];break;}}return si;}internal bool BusyPoolContains(string uid){lock (this.busypool){return busypool.Keys.Contains(uid);}}}
 Note:这个类的设计缺陷是使用了太多的lock语句,对对象做了太多的互斥操作,所以我尽量的把lock内的语句化简或挪到lock外部执行。

4.BufferManager类:该类是一个管理连接缓冲区的类,职责是为每一个连接维持一个接收数据的区域。它的设计也采用了类似与池的技术,先实例化好多内存区域,并把每一块的地址放入栈中,每执行依次pop时拿出一块区域来给SocketAsyncEventArgs对象作为Buffer.

internal sealed class BufferManager:IDisposable{private Byte[] buffer;private Int32 bufferSize;private Int32 numSize;private Int32 currentIndex;private Stack<Int32> freeIndexPool;internal Boolean SetBuffer(SocketAsyncEventArgs args){if (this.freeIndexPool.Count > 0){args.SetBuffer(this.buffer, this.freeIndexPool.Pop(), this.bufferSize);}else{if ((this.numSize - this.bufferSize) < this.currentIndex){return false;}args.SetBuffer(this.buffer, this.currentIndex, this.bufferSize);this.currentIndex += this.bufferSize;}return true;}}

 

5.RequestHandler类:这里代码就不贴了,这个类也比较简单。比如发送方要发送的内容为:hello,nice to meet you那么真正发送的内容是:[length=22]hello,nice to meet you,length后的数字是字符串的长度,接收方接收到消息后根据长度检验和获取信息。

强烈推荐这篇文章:http://www.cnblogs.com/JimmyZhang/archive/2008/09/16/1291854.html

6.SocketListener类:终于到了最重要的部分了,也是一个对外部真正有意义的类,这个类监听用户的连接请求并从连接池取出一个可用连接给用户,并且时刻监听用户发来的数据并处理。在设计这个类时为了迎合双工通信我把监听的任务放到另一个线程中去,这也是我为什么要给每个用户两个SocketAsyncEventArgs的原因。当然两个线程是不够的还要异步。比较重要的语句我都用粗体标注了。socket的方法都是成对出现的,ReceiveAsync对应OnReceiveCompleted,SendAsync对应OnSendCompleted,所以理解起来也不算太难,只是要注意一点:就是接收和发送消息是在两个线程里的。

    
public sealed class SocketListener:IDisposable{/// <summary>/// 缓冲区/// </summary>private BufferManager bufferManager;/// <summary>/// 服务器端Socket/// </summary>private Socket listenSocket;/// <summary>/// 服务同步锁/// </summary>private static Mutex mutex = new Mutex();/// <summary>/// 当前连接数/// </summary>private Int32 numConnections;/// <summary>/// 最大并发量/// </summary>private Int32 numConcurrence;/// <summary>/// 服务器状态/// </summary>private ServerState serverstate;/// <summary>/// 读取写入字节/// </summary>private const Int32 opsToPreAlloc = 1;/// <summary>/// Socket连接池/// </summary>private SocketAsyncEventArgsPool readWritePool;/// <summary>/// 并发控制信号量/// </summary>private Semaphore semaphoreAcceptedClients;/// <summary>/// 通信协议/// </summary>private RequestHandler handler;/// <summary>/// 回调委托/// </summary>/// <param name="IP"></param>/// <returns></returns>public delegate string GetIDByIPFun(string IP);/// <summary>/// 回调方法实例/// </summary>private GetIDByIPFun GetIDByIP;/// <summary>/// 接收到信息时的事件委托/// </summary>/// <param name="info"></param>public delegate void ReceiveMsgHandler(string uid, string info);/// <summary>/// 接收到信息时的事件/// </summary>public event ReceiveMsgHandler OnMsgReceived;/// <summary>/// 开始监听数据的委托/// </summary>public delegate void StartListenHandler();/// <summary>/// 开始监听数据的事件/// </summary>public event StartListenHandler StartListenThread;/// <summary>/// 发送信息完成后的委托/// </summary>/// <param name="successorfalse"></param>public delegate void SendCompletedHandler(string uid,string exception);/// <summary>/// 发送信息完成后的事件/// </summary>public event SendCompletedHandler OnSended;/// <summary>/// 获取当前的并发数/// </summary>public Int32 NumConnections{get { return this.numConnections; }}/// <summary>/// 最大并发数/// </summary>public Int32 MaxConcurrence{get { return this.numConcurrence; }}/// <summary>/// 返回服务器状态/// </summary>public ServerState State{get{return serverstate;}}/// <summary>/// 获取当前在线用户的UID/// </summary>public string[] OnlineUID{get { return readWritePool.OnlineUID; }}/// <summary>/// 初始化服务器端/// </summary>/// <param name="numConcurrence">并发的连接数量(1000以上)</param>/// <param name="receiveBufferSize">每一个收发缓冲区的大小(32768)</param>public SocketListener(Int32 numConcurrence, Int32 receiveBufferSize, GetIDByIPFun GetIDByIP){serverstate = ServerState.Initialing;this.numConnections = 0;this.numConcurrence = numConcurrence;this.bufferManager = new BufferManager(receiveBufferSize * numConcurrence * opsToPreAlloc, receiveBufferSize);this.readWritePool = new SocketAsyncEventArgsPool(numConcurrence);this.semaphoreAcceptedClients = new Semaphore(numConcurrence, numConcurrence);handler = new RequestHandler();this.GetIDByIP = GetIDByIP;}/// <summary>/// 服务端初始化/// </summary>public void Init(){this.bufferManager.InitBuffer();SocketAsyncEventArgsWithId readWriteEventArgWithId;for (Int32 i = 0; i < this.numConcurrence; i++){readWriteEventArgWithId = new SocketAsyncEventArgsWithId();readWriteEventArgWithId.ReceiveSAEA.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveCompleted);readWriteEventArgWithId.SendSAEA.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendCompleted);//只给接收的SocketAsyncEventArgs设置缓冲区this.bufferManager.SetBuffer(readWriteEventArgWithId.ReceiveSAEA);this.readWritePool.Push(readWriteEventArgWithId);}serverstate = ServerState.Inited;}/// <summary>/// 启动服务器/// </summary>/// <param name="data">端口号</param>public void Start(Object data){Int32 port = (Int32)data;IPAddress[] addresslist = Dns.GetHostEntry(Environment.MachineName).AddressList;IPEndPoint localEndPoint = new IPEndPoint(addresslist[addresslist.Length - 1], port);this.listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6){this.listenSocket.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false);this.listenSocket.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));}else{this.listenSocket.Bind(localEndPoint);}this.listenSocket.Listen(100);this.StartAccept(null);//开始监听已连接用户的发送数据StartListenThread();serverstate = ServerState.Running;mutex.WaitOne();}/// <summary>/// 开始监听线程的入口函数/// </summary>public void Listen(){while (true){string[] keys = readWritePool.OnlineUID;foreach (string uid in keys){if (uid != null && readWritePool.busypool[uid].ReceiveSAEA.LastOperation != SocketAsyncOperation.Receive){Boolean willRaiseEvent = (readWritePool.busypool[uid].ReceiveSAEA.UserToken as Socket).ReceiveAsync(readWritePool.busypool[uid].ReceiveSAEA);if (!willRaiseEvent)ProcessReceive(readWritePool.busypool[uid].ReceiveSAEA);}}}}/// <summary>/// 发送信息/// </summary>/// <param name="uid">要发送的用户的uid</param>/// <param name="msg">消息体</param>public void Send(string uid, string msg){if (uid == string.Empty || uid == "" || msg == string.Empty || msg == "")return;SocketAsyncEventArgsWithId socketWithId = readWritePool.FindByUID(uid);if (socketWithId == null)//说明用户已经断开//100   发送成功//200   发送失败//300   用户不在线//其它  表示异常的信息OnSended(uid, "300");else{MySocketAsyncEventArgs e = socketWithId.SendSAEA;if (e.SocketError == SocketError.Success){int i = 0;try{string message = @"[lenght=" + msg.Length + @"]" + msg;byte[] sendbuffer = Encoding.Unicode.GetBytes(message);e.SetBuffer(sendbuffer, 0, sendbuffer.Length);Boolean willRaiseEvent = (e.UserToken as Socket).SendAsync(e);if (!willRaiseEvent){this.ProcessSend(e);}}catch (Exception ex){if (i <= 5){i++;//如果发送出现异常就延迟0.01秒再发Thread.Sleep(10);Send(uid, msg);}else{OnSended(uid, ex.ToString());}}}else{OnSended(uid, "200");this.CloseClientSocket(((MySocketAsyncEventArgs)e).UID);}}}/// <summary>/// 停止服务器/// </summary>public void Stop(){if(listenSocket!=null)listenSocket.Close();listenSocket = null;Dispose();mutex.ReleaseMutex();serverstate = ServerState.Stoped;}private void StartAccept(SocketAsyncEventArgs acceptEventArg){if (acceptEventArg == null){acceptEventArg = new SocketAsyncEventArgs();acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);}elseacceptEventArg.AcceptSocket = null;this.semaphoreAcceptedClients.WaitOne();Boolean willRaiseEvent = this.listenSocket.AcceptAsync(acceptEventArg);if (!willRaiseEvent){this.ProcessAccept(acceptEventArg);}}private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e){this.ProcessAccept(e);}private void ProcessAccept(SocketAsyncEventArgs e){if (e.LastOperation != SocketAsyncOperation.Accept)    //检查上一次操作是否是Accept,不是就返回return;if (e.BytesTransferred <= 0)    //检查发送的长度是否大于0,不是就返回return;string UID = GetIDByIP((e.AcceptSocket.RemoteEndPoint as IPEndPoint).Address.ToString());   //根据IP获取用户的UIDif (UID == string.Empty || UID == null || UID == "")return;if (readWritePool.BusyPoolContains(UID))    //判断现在的用户是否已经连接,避免同一用户开两个连接return;SocketAsyncEventArgsWithId readEventArgsWithId = this.readWritePool.Pop(UID);readEventArgsWithId.ReceiveSAEA.UserToken = e.AcceptSocket;readEventArgsWithId.SendSAEA.UserToken = e.AcceptSocket;Interlocked.Increment(ref this.numConnections);this.StartAccept(e);}private void OnReceiveCompleted(object sender, SocketAsyncEventArgs e){ProcessReceive(e);}private void OnSendCompleted(object sender, SocketAsyncEventArgs e){ProcessSend(e);}private void ProcessReceive(SocketAsyncEventArgs e){if (e.LastOperation != SocketAsyncOperation.Receive)return;if (e.BytesTransferred > 0){if (e.SocketError == SocketError.Success){Int32 byteTransferred = e.BytesTransferred;string received = Encoding.Unicode.GetString(e.Buffer, e.Offset, byteTransferred);//检查消息的准确性string[] msg = handler.GetActualString(received);foreach (string m in msg)OnMsgReceived(((MySocketAsyncEventArgs)e).UID, m);//可以在这里设一个停顿来实现间隔时间段监听,这里的停顿是单个用户间的监听间隔//发送一个异步接受请求,并获取请求是否为成功Boolean willRaiseEvent = (e.UserToken as Socket).ReceiveAsync(e);if (!willRaiseEvent)ProcessReceive(e);}}elsethis.CloseClientSocket(((MySocketAsyncEventArgs)e).UID);}private void ProcessSend(SocketAsyncEventArgs e){if (e.LastOperation != SocketAsyncOperation.Send)return;if (e.BytesTransferred > 0){if (e.SocketError == SocketError.Success)OnSended(((MySocketAsyncEventArgs)e).UID, "100");elseOnSended(((MySocketAsyncEventArgs)e).UID, "200");}elsethis.CloseClientSocket(((MySocketAsyncEventArgs)e).UID);}private void CloseClientSocket(string uid){if (uid == string.Empty || uid == "")return;SocketAsyncEventArgsWithId saeaw = readWritePool.FindByUID(uid);if (saeaw == null)return;Socket s = saeaw.ReceiveSAEA.UserToken as Socket;try{s.Shutdown(SocketShutdown.Both);}catch (Exception){//客户端已经关闭}this.semaphoreAcceptedClients.Release();Interlocked.Decrement(ref this.numConnections);this.readWritePool.Push(saeaw);}#region IDisposable Memberspublic void Dispose(){bufferManager.Dispose();bufferManager = null;readWritePool.Dispose();readWritePool = null;}#endregion}
 
关于所有的类已经介绍完了,相信各位都已经很明白了,如果不是太清楚就看源代码,别忘了源代码是最好的文档!

当然这个2.0仍然还有很多缺陷,比如职责划分不太OO,运行不太稳定,处理异常能力较差,处理超负载的连接能力较差,主动拒绝,可测试性差等,希望大家多给点建议,改进才对啊。

源代码下载:http://files.cnblogs.com/niuchenglei/socketlib.rar

分类: Socket
4
0
(请您对文章做出评价)
博主前一篇:Windows Socket五种I/O模型
博主后一篇:解决TCP网络传输“粘包”问题
Add your comment

3 条回复

  1. #1楼 ToBin      2010-11-30 21:48
    哥们,这两个SocketAsyncEventArgs用的一个socket,
    难道这就是双工通信?
    是全双工通信,还是半双工通信
     回复 引用 查看   
  2. #2楼 qiuqingpo      2010-12-27 11:51
    70 /// <summary>
    171 /// 开始监听线程的入口函数
    172 /// </summary>
    173 public void Listen()
    174 {
    175 while (true)
    176 {
    177 string[] keys = readWritePool.OnlineUID;
    178 foreach (string uid in keys)
    179 {
    180 if (uid != null && readWritePool.busypool[uid].ReceiveSAEA.LastOperation != SocketAsyncOperation.Receive)
    181 {
    182 Boolean willRaiseEvent = (readWritePool.busypool[uid].ReceiveSAEA.UserToken as Socket).ReceiveAsync(readWritePool.busypool[uid].ReceiveSAEA);
    183 if (!willRaiseEvent)
    184 ProcessReceive(readWritePool.busypool[uid].ReceiveSAEA);
    185 }
    186 }
    187 }
    188 }

    大哥.你这样不让我CPU 100% 才怪呢.真不知道你是怎么测试的?
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
Socket连接池
C#+flash socket 聊天程序
C#中Socket通信编程的同步实现
C# + Socket断线重连
C# 读写INI文件
简单聊天程序java socket
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服