SocketAsyncEventArgs是一个套接字操作的类,主要作用是实现socket消息的异步接收和发送,跟Socket的BeginSend和
BeginReceive方法异步处理没有多大区别,它的优势在于完成端口的实现来处理大数据的并发情况,由于本人学习不久,对千万级的
数据访问还没有多大体会,这里的简单实现作为一个学习的笔记,请酌情参考,如有错误,请及时指正。
先说说SockeAsyncEventArgs类的操作方法,以下是摘自MSDN的内容(MSDN的SockeAsyncEventArgs类描述):
1、分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。
1 SocketAsyncEventArgs saea = new SocketAsyncEventArgs();2 //或者(这里的SocketAsyncEventArgsPool类一般是自己实现,MSDN有通过栈结构实现的程序池,也可以使用队列或链表):3 SocketAsyncEventArgs saea = new SocketAsyncEventArgsPool().Pop();
2、将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。SocketAsyncEventArgs一般会根据操作执行相同的回调函数,所有设置内容在不同操作的回调都可以访问,我在调试时发现不能同时收发消息(可能是半双工),因此使用接收和发送试用两个对象
1 byte[] buffer = new byte[1024];2 saea.SetBuffer(buffer, 0, buffer.Length); //设置缓冲区3 saea.Completed += new EventHandler<SocketAsyncEventArgs>(MethodName); //设置回调方法4 saea.RemoteEndPoint = new IPEndPoint(IPAddress.Any,1234); //设置远端连接节点,一般用于接收消息5 saea.UserToken = new AsyncUserToken(); //设置用户信息,一般把连接的Socket对象放在这里
3、调用适当的套接字方法 (xxxAsync) 以启动异步操作。
4、如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。
5、如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。
1 //这是调用套接字的方法,即socket调用的方法: 2 Socket socket = saea.AcceptSocket; 3 socket.ConnectAsync(saea); //异步进行连接 4 socket.AcceptAsync(saea); //异步接收连接 5 socket.ReceiveAsync(saea); //异步接收消息 6 socket.SendAsync(saea); //异步发送消息 7 //这里注意的是,每个操作方法返回的是布尔值,这个布尔值的作用,是表明当前操作是否有等待I/O的情况,如果返回false则表示当前是同步操作,不需要等待,此时要要同步执行回调方法,一般写法是 8 bool willRaiseEvent = socket.ReceiveAsync(saea); //继续异步接收消息 9 if (!willRaiseEvent)10 {11 MethodName(saea);12 }
6、将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。
如果用于持续监听连接,要注意saea.AcceptSocket = null;只有把saea对象的AcceptSocket置为null,才能监听到新的连接;
如果只用于单次通讯,则在用完saea对象是可丢弃,saea.Dispose(),如果想重复利用,则设置相应的异步操作即可,
1 saea.AcceptSocket = null;//重新监听 2 socket.ReceiveAsync(saea);//重新接收3 socket.SendAsync(saea);//重新发送
关于具体的实现,类似于之前我记下的简单Socket通信,先是SocketServerManager的实现,该实现也是参考自MSDN:
1 public class SocketServerManager 2 { 3 readonly Socket _socket; //监听Socket 4 readonly EndPoint _endPoint; 5 private const int Backlog = 100; //允许连接数目 6 private int byteSize = 64; 7 8 //同时UI界处理的事件 9 public delegate void OnEventCompletedHanlder(MessageFormat msg); 10 public event OnEventCompletedHanlder OnReceiveCompletedEvent; 11 public event OnEventCompletedHanlder OnSendCompletedEvent; 12 public event OnEventCompletedHanlder OnConnectedEvent; 13 public event OnEventCompletedHanlder OnDisconnectEvent; 14 public event OnEventCompletedHanlder OnNotConnectEvent; 15 16 //private BufferManager bufferManager; //消息缓存管理 17 SocketAsyncEventArgsPool rwPool; //SAEA池 18 private Semaphore maxClient; 19 private Dictionary<string, SocketAsyncEventArgs> dicSAEA = null; 20 21 public SocketServerManager(string ip, int port) 22 { 23 _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 24 IPAddress ipAddress = IPAddress.Parse(ip); 25 _endPoint = new IPEndPoint(ipAddress, port); 26 //bufferManager = new BufferManager(totalBytes, byteSize); 27 maxClient = new Semaphore(Backlog, Backlog); 28 Init(); 29 } 30 31 public void Init() 32 { 33 //bufferManager.InitBuffer(); 34 SocketAsyncEventArgs rwEventArgs; 35 rwPool = new SocketAsyncEventArgsPool(Backlog); 36 dicSAEA = new Dictionary<string, SocketAsyncEventArgs>(); 37 for (int i = 0; i < 100; i++) 38 { 39 rwEventArgs = new SocketAsyncEventArgs(); 40 rwEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); 41 rwEventArgs.UserToken = new AsyncUserToken(); 42 43 rwEventArgs.SetBuffer(new byte[byteSize],0,byteSize); 44 //bufferManager.SetBuffer(rwEventArgs); 45 46 rwPool.Push(rwEventArgs); 47 } 48 } 49 50 /// <summary> 51 /// 开启Socket监听 52 /// </summary> 53 public void Start() 54 { 55 _socket.Bind(_endPoint); //绑定本地地址进行监听 56 _socket.Listen(Backlog); //设置监听数量 57 58 StartAccept(null); 59 } 60 61 public void StartAccept(SocketAsyncEventArgs acceptEventArg) 62 { 63 if (acceptEventArg == null) 64 { 65 acceptEventArg = new SocketAsyncEventArgs(); 66 acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnectedCompleted); 67 } 68 else 69 { 70 acceptEventArg.AcceptSocket = null; 71 } 72 73 maxClient.WaitOne(); 74 bool willRaiseEvent = _socket.AcceptAsync(acceptEventArg); 75 if (!willRaiseEvent) 76 { 77 ProcessAccept(acceptEventArg); 78 } 79 } 80 81 private void ProcessAccept(SocketAsyncEventArgs e) 82 { 83 if (e.SocketError != SocketError.Success) return; //异步处理失败,不做处理 84 SocketAsyncEventArgs saea = rwPool.Pop(); 85 AsyncUserToken token = saea.UserToken as AsyncUserToken; 86 token.UserSocket = e.AcceptSocket; //获取远端对话Socket对象 87 string ipRemote = token.UserSocket.RemoteEndPoint.ToString(); 88 string ip = token.UserSocket.RemoteEndPoint.ToString(); 89 MessageFormat msg = new MessageFormat(string.Format("远程地址[{0}]成功连接到本地", ipRemote), ip, MsgType.Empty); 90 msg.tag = MsgType.Empty; 91 if (OnConnectedEvent != null) OnConnectedEvent(msg); //调用UI方法处理 92 93 //连接成功后,发送消息通知远程客户端 94 //OnSend("Connected Success !", _sendSocket.RemoteEndPoint); 95 SocketAsyncEventArgs sendArgs = new SocketAsyncEventArgs(); 96 sendArgs.RemoteEndPoint = token.UserSocket.RemoteEndPoint; 97 sendArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); 98 sendArgs.UserToken = saea.UserToken; 99 dicSAEA.Add(token.UserSocket.RemoteEndPoint.ToString(), sendArgs);100 bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(saea);101 if (!willRaiseEvent)102 {103 OnReceiveCompleted(saea);104 }105 106 StartAccept(e);107 }108 109 /// <summary>110 /// 远端地址连接本地成功的回调111 /// </summary>112 /// <param name="sender"></param>113 /// <param name="e"></param>114 public void OnConnectedCompleted(object sender, SocketAsyncEventArgs e)115 {116 ProcessAccept(e);117 }118 119 public void IO_Completed(object sender, SocketAsyncEventArgs e)120 {121 switch (e.LastOperation)122 {123 case SocketAsyncOperation.Receive:124 OnReceiveCompleted(e);125 break;126 case SocketAsyncOperation.Send:127 OnSendCompleted(e);128 break;129 default:130 throw new ArgumentException("The last operation completed on the socket was not a receive or send");131 } 132 }133 134 /// <summary>135 /// 执行异步发送消息136 /// </summary>137 /// <param name="msg">消息内容</param>138 /// <param name="ip">发送远端地址</param>139 public void OnSend(MessageFormat mf)140 {141 if (!dicSAEA.ContainsKey(mf.ipStr))142 {143 if (OnNotConnectEvent != null)144 {145 OnNotConnectEvent(new MessageFormat("不存在此连接客户端","",MsgType.Empty));146 return;147 }148 }149 SocketAsyncEventArgs saea = dicSAEA[mf.ipStr];150 AsyncUserToken token = saea.UserToken as AsyncUserToken;151 if (saea == null) return;152 //saea.SetBuffer(sendBuffer, 0, sendBuffer.Length); //设置SAEA的buffer消息内容153 byte[] sendBuffer = Encoding.Unicode.GetBytes(string.Format("[length={0}]{1}", mf.msgStr.Length, mf.msgStr));154 saea.SetBuffer(sendBuffer, 0, sendBuffer.Length);155 bool willRaiseEvent = token.UserSocket.SendAsync(saea);156 if (!willRaiseEvent)157 {158 OnSendCompleted(saea);159 }160 }161 162 /// <summary>163 /// 发送消息回调处理164 /// </summary>165 /// <param name="sender"></param>166 /// <param name="e"></param>167 public void OnSendCompleted(SocketAsyncEventArgs e)168 {169 AsyncUserToken token = e.UserToken as AsyncUserToken;170 byte[] sendBuffer = e.Buffer;171 string msgStr = Encoding.Unicode.GetString(sendBuffer);172 string ipAddress = token.UserSocket.RemoteEndPoint.ToString();173 MessageFormat msg = new MessageFormat(msgStr, ipAddress, MsgType.Send);174 if (OnSendCompletedEvent != null) OnSendCompletedEvent(msg); //调用UI方法处理175 }176 177 /// <summary>178 /// 接收消息回调处理179 /// </summary>180 /// <param name="e"></param>181 public void OnReceiveCompleted(SocketAsyncEventArgs e)182 {183 if (e.SocketError != SocketError.Success) return; //判断消息的接收状态184 AsyncUserToken token = e.UserToken as AsyncUserToken;185 int lengthBuffer = e.BytesTransferred; //获取接收的字节长度186 string ipAddress = token.UserSocket.RemoteEndPoint.ToString();187 MessageFormat msg = new MessageFormat();188 //如果接收的字节长度为0,则判断远端服务器关闭连接189 if (lengthBuffer <= 0)190 {191 msg.msgStr = "远端服务器已经断开连接";192 msg.ipStr = ipAddress;193 msg.tag = MsgType.Handler;194 if (OnDisconnectEvent != null) OnDisconnectEvent(msg);195 CloseClientSocket(e);196 }197 else198 {199 byte[] receiveBuffer = e.Buffer;200 byte[] buffer = new byte[lengthBuffer];201 Buffer.BlockCopy(receiveBuffer, 0, buffer, 0, lengthBuffer);202 msg.msgStr = Encoding.Unicode.GetString(buffer);203 msg.ipStr = ipAddress;204 msg.tag = MsgType.Receive;205 bool willRaiseEvent = token.UserSocket.ReceiveAsync(e); //继续异步接收消息206 if (!willRaiseEvent)207 {208 OnReceiveCompleted(e);209 }210 if (OnReceiveCompletedEvent != null) OnReceiveCompletedEvent(msg); //调用UI方法处理211 }212 }213 214 private void CloseClientSocket(SocketAsyncEventArgs e)215 {216 AsyncUserToken token = e.UserToken as AsyncUserToken;217 try218 {219 token.UserSocket.Shutdown(SocketShutdown.Send);220 }221 catch (Exception) { }222 dicSAEA.Remove(token.UserSocket.RemoteEndPoint.ToString());223 token.UserSocket.Close();224 225 maxClient.Release();226 rwPool.Push(e);227 }228 }
通过一个栈结构SocketAsyncEventArgsPool保存的SocketAsyncEventArgs对象是用于接收消息,为了做到双方通讯,我每次在接收到远端客户端连接时,就创建一个新的SocketAsyncEventArgs对象保存在Dictionary结构中,这样在消息发送是就可以根据Ip来发送给远程客户端。
客户端的实现比较简单,不用考虑多方的通讯:
1 public class SocketClientManager 2 { 3 readonly Socket _socket; //用于消息交互的socket对象 4 readonly EndPoint _endPoint; //远端地址 5 readonly SocketAsyncEventArgs _saea; //处理连接和接收SAEA对象 6 //处理发送的SAEA处理,由于绑定不同的回调函数,因此需要不同的SAEA对象 7 SocketAsyncEventArgs _sendSaea; 8 9 //处理UI的事件 10 public delegate void OnEventCompletedHanlder(MessageFormat msgFormat); 11 public event OnEventCompletedHanlder OnConnectedEvent; //连接成功事件 12 public event OnEventCompletedHanlder OnReceiveCompletedEvent; //收到消息事件 13 public event OnEventCompletedHanlder OnSendCompletedEvent; //发送成功事件 14 15 public SocketClientManager(string ip, int port) 16 { 17 _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); 18 IPAddress ipAddress = IPAddress.Parse(ip); 19 _endPoint = new IPEndPoint(ipAddress, port); 20 _saea = new SocketAsyncEventArgs {RemoteEndPoint = _endPoint}; 21 } 22 23 /// <summary> 24 /// 开启进行远程连接 25 /// </summary> 26 public void Start() 27 { 28 _saea.Completed += OnConnectedCompleted; 29 _socket.ConnectAsync(_saea); //进行异步连接 30 31 } 32 33 /// <summary> 34 /// 连接成功的事件回调函数 35 /// </summary> 36 /// <param name="sender"></param> 37 /// <param name="e"></param> 38 public void OnConnectedCompleted(object sender, SocketAsyncEventArgs e) 39 { 40 if (e.SocketError != SocketError.Success) return; 41 Socket socket = sender as Socket; 42 string ipRemote = socket.RemoteEndPoint.ToString(); 43 MessageFormat messageFormat = new MessageFormat(string.Format("连接服务器[{0}]成功!", ipRemote), socket.LocalEndPoint.ToString(), MsgType.Empty); 44 if (OnConnectedEvent != null) OnConnectedEvent(messageFormat); 45 46 //开启新的接受消息异步操作事件 47 var receiveSaea = new SocketAsyncEventArgs(); 48 var receiveBuffer = new byte[1024 * 4]; 49 receiveSaea.SetBuffer(receiveBuffer, 0, receiveBuffer.Length); //设置消息的缓冲区大小 50 receiveSaea.Completed += OnReceiveCompleted; //绑定回调事件 51 receiveSaea.RemoteEndPoint = _endPoint; 52 _socket.ReceiveAsync(receiveSaea); 53 } 54 55 /// <summary> 56 /// 接受消息的回调函数 57 /// </summary> 58 /// <param name="sender"></param> 59 /// <param name="e"></param> 60 public void OnReceiveCompleted(object sender, SocketAsyncEventArgs e) 61 { 62 if (e.SocketError == SocketError.OperationAborted) return; 63 var socket = sender as Socket; 64 MessageFormat messageFormat = new MessageFormat(); 65 if (e.SocketError == SocketError.Success &&e.BytesTransferred > 0) 66 { 67 string ipAddress = socket.RemoteEndPoint.ToString(); 68 int lengthBuffer = e.BytesTransferred; 69 byte[] receiveBuffer = e.Buffer; 70 byte[] buffer = new byte[lengthBuffer]; 71 Buffer.BlockCopy(receiveBuffer, 0, buffer, 0, lengthBuffer); 72 messageFormat.msgStr = Encoding.Unicode.GetString(buffer); 73 messageFormat.ipStr = ipAddress; 74 messageFormat.tag = MsgType.Receive;; 75 socket.ReceiveAsync(e); 76 } 77 else if (e.SocketError == SocketError.ConnectionReset && e.BytesTransferred == 0) 78 { 79 messageFormat.msgStr = "服务器已经断开连接"; 80 messageFormat.ipStr = socket.RemoteEndPoint.ToString(); 81 messageFormat.tag = MsgType.Handler; 82 } 83 else 84 { 85 return; 86 } 87 if (OnReceiveCompletedEvent != null) OnReceiveCompletedEvent(messageFormat); 88 } 89 90 /// <summary> 91 /// 发送消息回调函数 92 /// </summary> 93 /// <param name="sender"></param> 94 /// <param name="e"></param> 95 public void OnSendCompleted(object sender, SocketAsyncEventArgs e) 96 { 97 if (e.SocketError != SocketError.Success) return; 98 var socket = sender as Socket; 99 byte[] sendBuffer = e.Buffer;100 MessageFormat messageFormat = new MessageFormat(Encoding.Unicode.GetString(sendBuffer),socket.RemoteEndPoint.ToString(),MsgType.Send);101 if (OnSendCompletedEvent != null) OnSendCompletedEvent(messageFormat);102 }103 104 /// <summary>105 /// 断开连接106 /// </summary>107 public void OnDisConnect()108 {109 if (_socket != null)110 {111 try112 {113 _socket.Shutdown(SocketShutdown.Both);114 }115 catch (SocketException ex)116 {117 }118 finally119 {120 _socket.Close();121 }122 }123 }124 125 /// <summary>126 /// 发送消息127 /// </summary>128 /// <param name="msg"></param>129 public void SendMsg(MessageFormat mf)130 {131 byte[] sendBuffer = Encoding.Unicode.GetBytes(string.Format("[length={0}]{1}", mf.msgStr.Length, mf.msgStr));132 if (_sendSaea == null)133 {134 _sendSaea = new SocketAsyncEventArgs {RemoteEndPoint = _endPoint};135 _sendSaea.Completed += OnSendCompleted;136 }137 _sendSaea.SetBuffer(sendBuffer, 0, sendBuffer.Length);138 if (_socket != null) _socket.SendAsync(_sendSaea);139 }140 }
同样是使用收发不同的SocketAsyncEventArgs对象。
另外,关于解决缓存容量不足以容纳一条消息的半包问题,这里使用了简单的字符处理类,这个类是复制自Jimmy Zhang的类RequestHandler,当然,方法还是不完善的,难以解决不同顺序的消息接受。
具体源码(.net4.5,vs2013):具体源码 http://files.cnblogs.com/files/supheart/ServerBySocket.zip
联系客服