打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
C# 一个高性能异步socket封装库的实现思路

转:https://www.cnblogs.com/yuanchenhui/p/asyn_scoket.html

前言

socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章。

我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

 

异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。

我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

 

纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

 

为了使大家对通讯效率有初步了解,先看测试图。

客户端和服务端都是本机测试,最大连接数为64422,套接字已耗尽!

主机配置情况

百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

 

库的结构图 

 

目标

  1. 即可作为服务端(监听)也可以作为客户端(主动连接)使用。
  2. 可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。
  3. 高可用性。将复杂的底层处理封装,对外接口非常友好。
  4. 高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

实现思路

网络处理逻辑可以分为以下几个部分:

  1. 网络监听   可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。
  2. 主动连接  可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。
  3. Socket收发处理   每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1K的数据。
  4. 组包处理   一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

 NetListener 监听

  1. using System;
  2. using System.Net;
  3. using System.Net.Sockets;
  4. using System.Threading;

  5. namespace IocpCore
  6. {
  7. class NetListener
  8. {
  9. private Socket listenSocket;
  10. public ListenParam _listenParam { get; set; }
  11. public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;

  12. bool start;

  13. NetServer _netServer;
  14. public NetListener(NetServer netServer)
  15. {
  16. _netServer = netServer;
  17. }

  18. public int _acceptAsyncCount = 0;
  19. public bool StartListen()
  20. {
  21. try
  22. {
  23. start = true;
  24. IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);
  25. listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  26. listenSocket.Bind(listenPoint);
  27. listenSocket.Listen(200);

  28. Thread thread1 = new Thread(new ThreadStart(NetProcess));
  29. thread1.Start();

  30. StartAccept();
  31. return true;
  32. }
  33. catch (Exception ex)
  34. {
  35. NetLogger.Log(string.Format("**监听异常!{0}", ex.Message));
  36. return false;
  37. }
  38. }

  39. AutoResetEvent _acceptEvent = new AutoResetEvent(false);
  40. private void NetProcess()
  41. {
  42. while (start)
  43. {
  44. DealNewAccept();
  45. _acceptEvent.WaitOne(1000 * 10);
  46. }
  47. }

  48. private void DealNewAccept()
  49. {
  50. try
  51. {
  52. if(_acceptAsyncCount <= 10)
  53. {
  54. StartAccept();
  55. }

  56. while (true)
  57. {
  58. AsyncSocketClient client = _newSocketClientList.GetObj();
  59. if (client == null)
  60. break;

  61. DealNewAccept(client);
  62. }
  63. }
  64. catch (Exception ex)
  65. {
  66. NetLogger.Log(string.Format("DealNewAccept 异常 {0}***{1}", ex.Message, ex.StackTrace));
  67. }
  68. }

  69. private void DealNewAccept(AsyncSocketClient client)
  70. {
  71. client.SendBufferByteCount = _netServer.SendBufferBytePerClient;
  72. OnAcceptSocket?.Invoke(_listenParam, client);
  73. }

  74. private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
  75. {
  76. try
  77. {
  78. Interlocked.Decrement(ref _acceptAsyncCount);
  79. _acceptEvent.Set();
  80. acceptEventArgs.Completed -= AcceptEventArg_Completed;
  81. ProcessAccept(acceptEventArgs);
  82. }
  83. catch (Exception ex)
  84. {
  85. NetLogger.Log(string.Format("AcceptEventArg_Completed {0}***{1}", ex.Message, ex.StackTrace));
  86. }
  87. }

  88. public bool StartAccept()
  89. {
  90. SocketAsyncEventArgs acceptEventArgs = new SocketAsyncEventArgs();
  91. acceptEventArgs.Completed += AcceptEventArg_Completed;

  92. bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
  93. Interlocked.Increment(ref _acceptAsyncCount);

  94. if (!willRaiseEvent)
  95. {
  96. Interlocked.Decrement(ref _acceptAsyncCount);
  97. _acceptEvent.Set();
  98. acceptEventArgs.Completed -= AcceptEventArg_Completed;
  99. ProcessAccept(acceptEventArgs);
  100. }
  101. return true;
  102. }

  103. ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();
  104. private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
  105. {
  106. try
  107. {
  108. using (acceptEventArgs)
  109. {
  110. if (acceptEventArgs.AcceptSocket != null)
  111. {
  112. AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);
  113. client.CreateClientInfo(this);

  114. _newSocketClientList.PutObj(client);
  115. _acceptEvent.Set();
  116. }
  117. }
  118. }
  119. catch (Exception ex)
  120. {
  121. NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));
  122. }
  123. }
  124. }
  125. }

NetConnectManage连接处理

  1. using System;
  2. using System.Net;
  3. using System.Net.Sockets;

  4. namespace IocpCore
  5. {
  6. class NetConnectManage
  7. {
  8. public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;

  9. public bool ConnectAsyn(string peerIp, int peerPort, object tag)
  10. {
  11. try
  12. {
  13. Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
  14. SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();
  15. socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
  16. socketEventArgs.Completed += SocketConnect_Completed;

  17. SocketClientInfo clientInfo = new SocketClientInfo();
  18. socketEventArgs.UserToken = clientInfo;
  19. clientInfo.PeerIp = peerIp;
  20. clientInfo.PeerPort = peerPort;
  21. clientInfo.Tag = tag;

  22. bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);
  23. if (!willRaiseEvent)
  24. {
  25. ProcessConnect(socketEventArgs);
  26. socketEventArgs.Completed -= SocketConnect_Completed;
  27. socketEventArgs.Dispose();
  28. }
  29. return true;
  30. }
  31. catch (Exception ex)
  32. {
  33. NetLogger.Log("ConnectAsyn",ex);
  34. return false;
  35. }
  36. }

  37. private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)
  38. {
  39. ProcessConnect(socketEventArgs);
  40. socketEventArgs.Completed -= SocketConnect_Completed;
  41. socketEventArgs.Dispose();
  42. }

  43. private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)
  44. {
  45. SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;
  46. if (socketEventArgs.SocketError == SocketError.Success)
  47. {
  48. DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);
  49. }
  50. else
  51. {
  52. SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);
  53. socketParam.ClientInfo = clientInfo;
  54. OnSocketConnectEvent?.Invoke(socketParam, null);
  55. }
  56. }


  57. void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)
  58. {
  59. clientInfo.SetClientInfo(socket);

  60. AsyncSocketClient client = new AsyncSocketClient(socket);
  61. client.SetClientInfo(clientInfo);

  62. //触发事件
  63. SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);
  64. socketParam.ClientInfo = clientInfo;
  65. OnSocketConnectEvent?.Invoke(socketParam, client);
  66. }

  67. public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
  68. {
  69. socket = null;
  70. try
  71. {
  72. Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);

  73. SocketClientInfo clientInfo = new SocketClientInfo();
  74. clientInfo.PeerIp = peerIp;
  75. clientInfo.PeerPort = peerPort;
  76. clientInfo.Tag = tag;

  77. EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
  78. socketTmp.Connect(remoteEP);
  79. if (!socketTmp.Connected)
  80. return false;

  81. DealConnectSocket(socketTmp, clientInfo);
  82. socket = socketTmp;
  83. return true;
  84. }
  85. catch (Exception ex)
  86. {
  87. NetLogger.Log(string.Format("连接对方:({0}:{1})出错!", peerIp, peerPort), ex);
  88. return false;
  89. }
  90. }
  91. }
  92. }

AsyncSocketClient socket收发处理

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Net;
  5. using System.Net.Sockets;

  6. namespace IocpCore
  7. {
  8. public class AsyncSocketClient
  9. {
  10. public static int IocpReadLen = 1024;

  11. public readonly Socket ConnectSocket;

  12. protected SocketAsyncEventArgs m_receiveEventArgs;
  13. public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
  14. protected byte[] m_asyncReceiveBuffer;

  15. protected SocketAsyncEventArgs m_sendEventArgs;
  16. public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
  17. protected byte[] m_asyncSendBuffer;

  18. public event Action<AsyncSocketClient, byte[]> OnReadData;
  19. public event Action<AsyncSocketClient, int> OnSendData;
  20. public event Action<AsyncSocketClient> OnSocketClose;

  21. static object releaseLock = new object();
  22. public static int createCount = 0;
  23. public static int releaseCount = 0;

  24. ~AsyncSocketClient()
  25. {
  26. lock (releaseLock)
  27. {
  28. releaseCount++;
  29. }
  30. }

  31. public AsyncSocketClient(Socket socket)
  32. {
  33. lock (releaseLock)
  34. {
  35. createCount++;
  36. }

  37. ConnectSocket = socket;

  38. m_receiveEventArgs = new SocketAsyncEventArgs();
  39. m_asyncReceiveBuffer = new byte[IocpReadLen];
  40. m_receiveEventArgs.AcceptSocket = ConnectSocket;
  41. m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;

  42. m_sendEventArgs = new SocketAsyncEventArgs();
  43. m_asyncSendBuffer = new byte[IocpReadLen * 2];
  44. m_sendEventArgs.AcceptSocket = ConnectSocket;
  45. m_sendEventArgs.Completed += SendEventArgs_Completed;
  46. }

  47. SocketClientInfo _clientInfo;

  48. public SocketClientInfo ClientInfo
  49. {
  50. get
  51. {
  52. return _clientInfo;
  53. }
  54. }

  55. internal void CreateClientInfo(NetListener netListener)
  56. {
  57. _clientInfo = new SocketClientInfo();
  58. try
  59. {
  60. _clientInfo.Tag = netListener._listenParam._tag;
  61. IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
  62. Debug.Assert(netListener._listenParam._port == ip.Port);

  63. _clientInfo.LocalIp = ip.Address.ToString();
  64. _clientInfo.LocalPort = netListener._listenParam._port;

  65. ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
  66. _clientInfo.PeerIp = ip.Address.ToString();
  67. _clientInfo.PeerPort = ip.Port;
  68. }
  69. catch (Exception ex)
  70. {
  71. NetLogger.Log("CreateClientInfo", ex);
  72. }
  73. }
  74. internal void SetClientInfo(SocketClientInfo clientInfo)
  75. {
  76. _clientInfo = clientInfo;
  77. }

  78. #region read process
  79. bool _inReadPending = false;
  80. public EN_SocketReadResult ReadNextData()
  81. {
  82. lock (this)
  83. {
  84. if (_socketError)
  85. return EN_SocketReadResult.ReadError;
  86. if (_inReadPending)
  87. return EN_SocketReadResult.InAsyn;
  88. if(!ConnectSocket.Connected)
  89. {
  90. OnReadError();
  91. return EN_SocketReadResult.ReadError;
  92. }

  93. try
  94. {
  95. m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
  96. _inReadPending = true;
  97. bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求
  98. if (!willRaiseEvent)
  99. {
  100. _inReadPending = false;
  101. ProcessReceive();
  102. if (_socketError)
  103. {
  104. OnReadError();
  105. return EN_SocketReadResult.ReadError;
  106. }
  107. return EN_SocketReadResult.HaveRead;
  108. }
  109. else
  110. {
  111. return EN_SocketReadResult.InAsyn;
  112. }
  113. }
  114. catch (Exception ex)
  115. {
  116. NetLogger.Log("ReadNextData", ex);
  117. _inReadPending = false;
  118. OnReadError();
  119. return EN_SocketReadResult.ReadError;
  120. }
  121. }
  122. }

  123. private void ProcessReceive()
  124. {
  125. if (ReceiveEventArgs.BytesTransferred > 0
  126. && ReceiveEventArgs.SocketError == SocketError.Success)
  127. {
  128. int offset = ReceiveEventArgs.Offset;
  129. int count = ReceiveEventArgs.BytesTransferred;

  130. byte[] readData = new byte[count];
  131. Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);

  132. _inReadPending = false;
  133. if (!_socketError)
  134. OnReadData?.Invoke(this, readData);
  135. }
  136. else
  137. {
  138. _inReadPending = false;
  139. OnReadError();
  140. }
  141. }

  142. private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
  143. {
  144. lock (this)
  145. {
  146. _inReadPending = false;
  147. ProcessReceive();
  148. if (_socketError)
  149. {
  150. OnReadError();
  151. }
  152. }
  153. }

  154. bool _socketError = false;
  155. private void OnReadError()
  156. {
  157. lock (this)
  158. {
  159. if (_socketError == false)
  160. {
  161. _socketError = true;
  162. OnSocketClose?.Invoke(this);
  163. }
  164. CloseClient();
  165. }
  166. }
  167. #endregion

  168. #region send process
  169. int _sendBufferByteCount = 102400;
  170. public int SendBufferByteCount
  171. {
  172. get
  173. {
  174. return _sendBufferByteCount;
  175. }
  176. set
  177. {
  178. if (value < 1024)
  179. {
  180. _sendBufferByteCount = 1024;
  181. }
  182. else
  183. {
  184. _sendBufferByteCount = value;
  185. }
  186. }
  187. }

  188. SendBufferPool _sendDataPool = new SendBufferPool();
  189. internal EN_SendDataResult PutSendData(byte[] data)
  190. {
  191. //此处省略302 }

  192. private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
  193. {
  194. lock (this)
  195. {
  196. try
  197. {
  198. _inSendPending = false;
  199. ProcessSend(m_sendEventArgs);

  200. int sendCount = 0;
  201. if (sendEventArgs.SocketError == SocketError.Success)
  202. {
  203. sendCount = sendEventArgs.BytesTransferred;
  204. }
  205. OnSendData?.Invoke(this, sendCount);

  206. if (_socketError)
  207. {
  208. OnSendError();
  209. }
  210. }
  211. catch (Exception ex)
  212. {
  213. NetLogger.Log("SendEventArgs_Completed", ex);
  214. }
  215. }
  216. }

  217. private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
  218. {
  219. if (sendEventArgs.SocketError == SocketError.Success)
  220. {
  221. return true;
  222. }
  223. else
  224. {
  225. OnSendError();
  226. return false;
  227. }
  228. }

  229. private int GetSendData()
  230. {
  231. int dataLen = 0;
  232. while (true)
  233. {
  234. byte[] data = _sendDataPool.GetObj();
  235. if (data == null)
  236. return dataLen;
  237. Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
  238. dataLen += data.Length;
  239. if (dataLen > IocpReadLen)
  240. break;
  241. }
  242. return dataLen;
  243. }
  244. private void OnSendError()
  245. {
  246. lock (this)
  247. {
  248. if (_socketError == false)
  249. {
  250. _socketError = true;
  251. OnSocketClose?.Invoke(this);
  252. }
  253. CloseClient();
  254. }
  255. }
  256. #endregion

  257. internal void CloseSocket()
  258. {
  259. try
  260. {
  261. ConnectSocket.Close();
  262. }
  263. catch (Exception ex)
  264. {
  265. NetLogger.Log("CloseSocket", ex);
  266. }
  267. }

  268. static object socketCloseLock = new object();
  269. public static int closeSendCount = 0;
  270. public static int closeReadCount = 0;

  271. bool _disposeSend = false;
  272. void CloseSend()
  273. {
  274. if (!_disposeSend && !_inSendPending)
  275. {
  276. lock (socketCloseLock)
  277. closeSendCount++;

  278. _disposeSend = true;
  279. m_sendEventArgs.SetBuffer(null, 0, 0);
  280. m_sendEventArgs.Completed -= SendEventArgs_Completed;
  281. m_sendEventArgs.Dispose();
  282. }
  283. }

  284. bool _disposeRead = false;
  285. void CloseRead()
  286. {
  287. if (!_disposeRead && !_inReadPending)
  288. {
  289. lock (socketCloseLock)
  290. closeReadCount++;

  291. _disposeRead = true;
  292. m_receiveEventArgs.SetBuffer(null, 0, 0);
  293. m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
  294. m_receiveEventArgs.Dispose();
  295. }
  296. }
  297. private void CloseClient()
  298. {
  299. try
  300. {
  301. CloseSend();
  302. CloseRead();
  303. ConnectSocket.Close();
  304. }
  305. catch (Exception ex)
  306. {
  307. NetLogger.Log("CloseClient", ex);
  308. }
  309. }

  310. //发送缓冲大小
  311. private List<byte[]> SplitData(byte[] data, int maxLen)
  312. {
  313. List<byte[]> items = new List<byte[]>();

  314. int start = 0;
  315. while (true)
  316. {
  317. int itemLen = Math.Min(maxLen, data.Length - start);
  318. if (itemLen == 0)
  319. break;
  320. byte[] item = new byte[itemLen];
  321. Array.Copy(data, start, item, 0, itemLen);
  322. items.Add(item);

  323. start += itemLen;
  324. }
  325. return items;
  326. }
  327. }

  328. public enum EN_SocketReadResult
  329. {
  330. InAsyn,
  331. HaveRead,
  332. ReadError
  333. }

  334. public enum EN_SocketSendResult
  335. {
  336. InAsyn,
  337. HaveSend,
  338. NoSendData,
  339. SendError
  340. }

  341. class SendBufferPool
  342. {
  343. ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();

  344. public Int64 _bufferByteCount = 0;
  345. public bool PutObj(byte[] obj)
  346. {
  347. if (_bufferPool.PutObj(obj))
  348. {
  349. lock (this)
  350. {
  351. _bufferByteCount += obj.Length;
  352. }
  353. return true;
  354. }
  355. else
  356. {
  357. return false;
  358. }
  359. }

  360. public byte[] GetObj()
  361. {
  362. byte[] result = _bufferPool.GetObj();
  363. if (result != null)
  364. {
  365. lock (this)
  366. {
  367. _bufferByteCount -= result.Length;
  368. }
  369. }
  370. return result;
  371. }
  372. }
  373. }

NetServer  聚合其他类

  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Net.Sockets;
  6. using System.Threading;

  7. namespace IocpCore
  8. {
  9. public class NetServer
  10. {
  11. public Action<SocketEventParam> OnSocketPacketEvent;

  12. //每个连接发送缓冲大小
  13. public int SendBufferBytePerClient { get; set; } = 1024 * 100;

  14. bool _serverStart = false;
  15. List<NetListener> _listListener = new List<NetListener>();

  16. //负责对收到的字节流 组成完成的包
  17. ClientPacketManage _clientPacketManage;

  18. public Int64 SendByteCount { get; set; }
  19. public Int64 ReadByteCount { get; set; }

  20. List<ListenParam> _listListenPort = new List<ListenParam>();
  21. public void AddListenPort(int port, object tag)
  22. {
  23. _listListenPort.Add(new ListenParam(port, tag));
  24. }
  25. /// <summary>
  26. ///
  27. /// </summary>
  28. /// <param name="listenFault">监听失败的端口</param>
  29. /// <returns></returns>
  30. public bool StartListen(out List<int> listenFault)
  31. {
  32. _serverStart = true;

  33. _clientPacketManage = new ClientPacketManage(this);
  34. _clientPacketManage.OnSocketPacketEvent += PutClientPacket;

  35. _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;

  36. _listListener.Clear();
  37. Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
  38. thread1.Start();

  39. Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
  40. thread2.Start();

  41. Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
  42. thread3.Start();

  43. listenFault = new List<int>();
  44. foreach (ListenParam param in _listListenPort)
  45. {
  46. NetListener listener = new NetListener(this);
  47. listener._listenParam = param;
  48. listener.OnAcceptSocket += Listener_OnAcceptSocket;
  49. if (!listener.StartListen())
  50. {
  51. listenFault.Add(param._port);
  52. }
  53. else
  54. {
  55. _listListener.Add(listener);
  56. NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));
  57. }
  58. }

  59. return listenFault.Count == 0;
  60. }

  61. public void PutClientPacket(SocketEventParam param)
  62. {
  63. OnSocketPacketEvent?.Invoke(param);
  64. }

  65. //获取包的最小长度
  66. int _packetMinLen;
  67. int _packetMaxLen;
  68. public int PacketMinLen
  69. {
  70. get { return _packetMinLen; }
  71. }
  72. public int PacketMaxLen
  73. {
  74. get { return _packetMaxLen; }
  75. }

  76. /// <summary>
  77. /// 设置包的最小和最大长度
  78. /// 当minLen=0时,认为是接收字节流
  79. /// </summary>
  80. /// <param name="minLen"></param>
  81. /// <param name="maxLen"></param>
  82. public void SetPacketParam(int minLen, int maxLen)
  83. {
  84. Debug.Assert(minLen >= 0);
  85. Debug.Assert(maxLen > minLen);
  86. _packetMinLen = minLen;
  87. _packetMaxLen = maxLen;
  88. }

  89. //获取包的总长度
  90. public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
  91. public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;

  92. ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
  93. private void NetPacketProcess()
  94. {
  95. while (_serverStart)
  96. {
  97. try
  98. {
  99. DealEventPool();
  100. }
  101. catch (Exception ex)
  102. {
  103. NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));
  104. }
  105. _socketEventPool.WaitOne(1000);
  106. }
  107. }

  108. Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
  109. public int ClientCount
  110. {
  111. get
  112. {
  113. lock (_clientGroup)
  114. {
  115. return _clientGroup.Count;
  116. }
  117. }
  118. }
  119. public List<Socket> ClientList
  120. {
  121. get
  122. {
  123. lock (_clientGroup)
  124. {
  125. return _clientGroup.Keys.ToList();
  126. }
  127. }
  128. }

  129. private void DealEventPool()
  130. {
  131. while (true)
  132. {
  133. SocketEventParam param = _socketEventPool.GetObj();
  134. if (param == null)
  135. return;

  136. if (param.SocketEvent == EN_SocketEvent.close)
  137. {
  138. lock (_clientGroup)
  139. {
  140. _clientGroup.Remove(param.Socket);
  141. }
  142. }

  143. if (_packetMinLen == 0)//字节流处理
  144. {
  145. OnSocketPacketEvent?.Invoke(param);
  146. }
  147. else
  148. {
  149. //组成一个完整的包 逻辑
  150. _clientPacketManage.PutSocketParam(param);
  151. }
  152. }
  153. }

  154. private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
  155. {
  156. try
  157. {
  158. if (param.Socket == null || client == null) //连接失败
  159. {

  160. }
  161. else
  162. {
  163. lock (_clientGroup)
  164. {
  165. bool remove = _clientGroup.Remove(client.ConnectSocket);
  166. Debug.Assert(!remove);
  167. _clientGroup.Add(client.ConnectSocket, client);
  168. }

  169. client.OnSocketClose += Client_OnSocketClose;
  170. client.OnReadData += Client_OnReadData;
  171. client.OnSendData += Client_OnSendData;

  172. _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
  173. }
  174. _socketEventPool.PutObj(param);
  175. }
  176. catch (Exception ex)
  177. {
  178. NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));
  179. }
  180. }

  181. internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
  182. {
  183. try
  184. {
  185. lock (_clientGroup)
  186. {
  187. if (!_clientGroup.ContainsKey(socket))
  188. {
  189. Debug.Assert(false);
  190. return;
  191. }

  192. NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));
  193. AsyncSocketClient client = _clientGroup[socket];
  194. client.CloseSocket();
  195. }
  196. }
  197. catch (Exception ex)
  198. {
  199. NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));
  200. }
  201. }

  202. #region listen port
  203. private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
  204. {
  205. try
  206. {
  207. lock (_clientGroup)
  208. {
  209. bool remove = _clientGroup.Remove(client.ConnectSocket);
  210. Debug.Assert(!remove);
  211. _clientGroup.Add(client.ConnectSocket, client);
  212. }

  213. client.OnSocketClose += Client_OnSocketClose;
  214. client.OnReadData += Client_OnReadData;
  215. client.OnSendData += Client_OnSendData;

  216. _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

  217. SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
  218. param.ClientInfo = client.ClientInfo;

  219. _socketEventPool.PutObj(param);
  220. }
  221. catch (Exception ex)
  222. {
  223. NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));
  224. }
  225. }


  226. ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
  227. private void NetSendProcess()
  228. {
  229. while (true)
  230. {
  231. DealSendEvent();
  232. _listSendEvent.WaitOne(1000);
  233. }
  234. }

  235. ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
  236. private void NetReadProcess()
  237. {
  238. while (true)
  239. {
  240. DealReadEvent();
  241. _listReadEvent.WaitOne(1000);
  242. }
  243. }


  244. private void DealSendEvent()
  245. {
  246. while (true)
  247. {
  248. SocketEventDeal item = _listSendEvent.GetObj();
  249. if (item == null)
  250. break;
  251. switch (item.SocketEvent)
  252. {
  253. case EN_SocketDealEvent.send:
  254. {
  255. while (true)
  256. {
  257. EN_SocketSendResult result = item.Client.SendNextData();
  258. if (result == EN_SocketSendResult.HaveSend)
  259. continue;
  260. else
  261. break;
  262. }
  263. }
  264. break;
  265. case EN_SocketDealEvent.read:
  266. {
  267. Debug.Assert(false);
  268. }
  269. break;
  270. }
  271. }
  272. }

  273. private void DealReadEvent()
  274. {
  275. while (true)
  276. {
  277. SocketEventDeal item = _listReadEvent.GetObj();
  278. if (item == null)
  279. break;
  280. switch (item.SocketEvent)
  281. {
  282. case EN_SocketDealEvent.read:
  283. {
  284. while (true)
  285. {
  286. EN_SocketReadResult result = item.Client.ReadNextData();
  287. if (result == EN_SocketReadResult.HaveRead)
  288. continue;
  289. else
  290. break;
  291. }
  292. }
  293. break;
  294. case EN_SocketDealEvent.send:
  295. {
  296. Debug.Assert(false);
  297. }
  298. break;
  299. }
  300. }
  301. }

  302. private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
  303. {
  304. //读下一条
  305. _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));

  306. try
  307. {
  308. SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
  309. param.ClientInfo = client.ClientInfo;
  310. param.Data = readData;
  311. _socketEventPool.PutObj(param);

  312. lock (this)
  313. {
  314. ReadByteCount += readData.Length;
  315. }
  316. }
  317. catch (Exception ex)
  318. {
  319. NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));
  320. }
  321. }
  322. #endregion

  323. private void Client_OnSendData(AsyncSocketClient client, int sendCount)
  324. {
  325. //发送下一条
  326. _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
  327. lock (this)
  328. {
  329. SendByteCount += sendCount;
  330. }
  331. }

  332. private void Client_OnSocketClose(AsyncSocketClient client)
  333. {
  334. try
  335. {
  336. SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
  337. param.ClientInfo = client.ClientInfo;
  338. _socketEventPool.PutObj(param);
  339. }
  340. catch (Exception ex)
  341. {
  342. NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));
  343. }
  344. }

  345. /// <summary>
  346. /// 放到发送缓冲
  347. /// </summary>
  348. /// <param name="socket"></param>
  349. /// <param name="data"></param>
  350. /// <returns></returns>
  351. public EN_SendDataResult SendData(Socket socket, byte[] data)
  352. {
  353. if (socket == null)
  354. return EN_SendDataResult.no_client;
  355. lock (_clientGroup)
  356. {
  357. if (!_clientGroup.ContainsKey(socket))
  358. return EN_SendDataResult.no_client;
  359. AsyncSocketClient client = _clientGroup[socket];
  360. EN_SendDataResult result = client.PutSendData(data);
  361. if (result == EN_SendDataResult.ok)
  362. {
  363. //发送下一条
  364. _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
  365. }
  366. return result;
  367. }
  368. }

  369. /// <summary>
  370. /// 设置某个连接的发送缓冲大小
  371. /// </summary>
  372. /// <param name="socket"></param>
  373. /// <param name="byteCount"></param>
  374. /// <returns></returns>
  375. public bool SetClientSendBuffer(Socket socket, int byteCount)
  376. {
  377. lock (_clientGroup)
  378. {
  379. if (!_clientGroup.ContainsKey(socket))
  380. return false;
  381. AsyncSocketClient client = _clientGroup[socket];
  382. client.SendBufferByteCount = byteCount;
  383. return true;
  384. }
  385. }


  386. #region connect process
  387. NetConnectManage _netConnectManage = new NetConnectManage();
  388. /// <summary>
  389. /// 异步连接一个客户端
  390. /// </summary>
  391. /// <param name="peerIp"></param>
  392. /// <param name="peerPort"></param>
  393. /// <param name="tag"></param>
  394. /// <returns></returns>
  395. public bool ConnectAsyn(string peerIp, int peerPort, object tag)
  396. {
  397. return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
  398. }

  399. /// <summary>
  400. /// 同步连接一个客户端
  401. /// </summary>
  402. /// <param name="peerIp"></param>
  403. /// <param name="peerPort"></param>
  404. /// <param name="tag"></param>
  405. /// <param name="socket"></param>
  406. /// <returns></returns>
  407. public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
  408. {
  409. return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
  410. }
  411. #endregion
  412. }

  413. enum EN_SocketDealEvent
  414. {
  415. read,
  416. send,
  417. }
  418. class SocketEventDeal
  419. {
  420. public AsyncSocketClient Client { get; set; }
  421. public EN_SocketDealEvent SocketEvent { get; set; }
  422. public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
  423. {
  424. Client = client;
  425. SocketEvent = socketEvent;
  426. }
  427. }
  428. }

库的使用

使用起来非常简单,示例如下 

 

  1. using IocpCore;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Net.Sockets;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. using System.Windows;

  9. namespace WarningClient
  10. {
  11. public class SocketServer
  12. {
  13. public Action<SocketEventParam> OnSocketEvent;

  14. public Int64 SendByteCount
  15. {
  16. get
  17. {
  18. if (_netServer == null)
  19. return 0;
  20. return _netServer.SendByteCount;
  21. }
  22. }
  23. public Int64 ReadByteCount
  24. {
  25. get
  26. {
  27. if (_netServer == null)
  28. return 0;
  29. return _netServer.ReadByteCount;
  30. }
  31. }

  32. NetServer _netServer;
  33. EN_PacketType _packetType = EN_PacketType.byteStream;
  34. public void SetPacktType(EN_PacketType packetType)
  35. {
  36. _packetType = packetType;
  37. if (_netServer == null)
  38. return;
  39. if (packetType == EN_PacketType.byteStream)
  40. {
  41. _netServer.SetPacketParam(0, 1024);
  42. }
  43. else
  44. {
  45. _netServer.SetPacketParam(9, 1024);
  46. }
  47. }

  48. public bool Init(List<int> listenPort)
  49. {
  50. NetLogger.OnLogEvent += NetLogger_OnLogEvent;
  51. _netServer = new NetServer();
  52. SetPacktType(_packetType);
  53. _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;
  54. _netServer.OnSocketPacketEvent += SocketPacketDeal;

  55. foreach (int n in listenPort)
  56. {
  57. _netServer.AddListenPort(n, n);
  58. }

  59. List<int> listenFault;
  60. bool start = _netServer.StartListen(out listenFault);
  61. return start;
  62. }

  63. int GetPacketTotalLen(byte[] data, int offset)
  64. {
  65. if (MainWindow._packetType == EN_PacketType.znss)
  66. return GetPacketZnss(data, offset);
  67. else
  68. return GetPacketAnzhiyuan(data, offset);
  69. }

  70. int GetPacketAnzhiyuan(byte[] data, int offset)
  71. {
  72. int n = data[offset + 5] + 6;
  73. return n;
  74. }

  75. int GetPacketZnss(byte[] data, int offset)
  76. {
  77. int packetLen = (int)(data[4]) + 5;
  78. return packetLen;
  79. }


  80. public bool ConnectAsyn(string peerIp, int peerPort, object tag)
  81. {
  82. return _netServer.ConnectAsyn(peerIp, peerPort, tag);
  83. }

  84. public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
  85. {
  86. return _netServer.Connect(peerIp, peerPort, tag, out socket);
  87. }

  88. private void NetLogger_OnLogEvent(string message)
  89. {
  90. AppLog.Log(message);
  91. }

  92. Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();

  93. public int ClientCount
  94. {
  95. get
  96. {
  97. lock (_clientGroup)
  98. {
  99. return _clientGroup.Count;
  100. }
  101. }
  102. }
  103. public List<Socket> ClientList
  104. {
  105. get
  106. {
  107. if (_netServer != null)
  108. return _netServer.ClientList;
  109. return new List<Socket>();
  110. }
  111. }
  112. void AddClient(SocketEventParam socketParam)
  113. {
  114. lock (_clientGroup)
  115. {
  116. _clientGroup.Remove(socketParam.Socket);
  117. _clientGroup.Add(socketParam.Socket, socketParam);
  118. }
  119. }

  120. void RemoveClient(SocketEventParam socketParam)
  121. {
  122. lock (_clientGroup)
  123. {
  124. _clientGroup.Remove(socketParam.Socket);
  125. }
  126. }

  127. ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();

  128. public ObjectPool<SocketEventParam> ReadDataPool
  129. {
  130. get
  131. {
  132. return _readDataPool;
  133. }
  134. }

  135. private void SocketPacketDeal(SocketEventParam socketParam)
  136. {
  137. OnSocketEvent?.Invoke(socketParam);
  138. if (socketParam.SocketEvent == EN_SocketEvent.read)
  139. {
  140. if (MainWindow._isShowReadPacket)
  141. _readDataPool.PutObj(socketParam);
  142. }
  143. else if (socketParam.SocketEvent == EN_SocketEvent.accept)
  144. {
  145. AddClient(socketParam);
  146. string peerIp = socketParam.ClientInfo.PeerIpPort;
  147. AppLog.Log(string.Format("客户端链接!本地端口:{0},对端:{1}",
  148. socketParam.ClientInfo.LocalPort, peerIp));
  149. }
  150. else if (socketParam.SocketEvent == EN_SocketEvent.connect)
  151. {
  152. string peerIp = socketParam.ClientInfo.PeerIpPort;
  153. if (socketParam.Socket != null)
  154. {
  155. AddClient(socketParam);

  156. AppLog.Log(string.Format("连接对端成功!本地端口:{0},对端:{1}",
  157. socketParam.ClientInfo.LocalPort, peerIp));
  158. }
  159. else
  160. {
  161. AppLog.Log(string.Format("连接对端失败!本地端口:{0},对端:{1}",
  162. socketParam.ClientInfo.LocalPort, peerIp));
  163. }
  164. }
  165. else if (socketParam.SocketEvent == EN_SocketEvent.close)
  166. {
  167. MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);
  168. RemoveClient(socketParam);
  169. string peerIp = socketParam.ClientInfo.PeerIpPort;
  170. AppLog.Log(string.Format("客户端断开!本地端口:{0},对端:{1},",
  171. socketParam.ClientInfo.LocalPort, peerIp));
  172. }
  173. }

  174. public EN_SendDataResult SendData(Socket socket, byte[] data)
  175. {
  176. if(socket == null)
  177. {
  178. MessageBox.Show("还没连接!");
  179. return EN_SendDataResult.no_client;
  180. }
  181. return _netServer.SendData(socket, data);
  182. }

  183. internal void SendToAll(byte[] data)
  184. {
  185. lock (_clientGroup)
  186. {
  187. foreach (Socket socket in _clientGroup.Keys)
  188. {
  189. SendData(socket, data);
  190. }
  191. }
  192. }
  193. }
  194. }

博客原文:https://www.cnblogs.com/yuanchenhui/p/asyn_scoket.html

 

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
C#使用Protocol Buffer(ProtoBuf)进行Unity中的Socket通信
调用线程必须为 STA,因为许多 UI 组件都需要
简单聊天程序java socket
使用 java Socket 从客户端向服务器端传文件 - cwyang的专栏 - CSD...
基于Apache Mina实现的TCP长连接和短连接实例
java实现电脑远程控制详解,附完整源代码
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服