Socket封装,支持多客户端,支持大文件传输,支持多线程并发,对较大的Socket包进行分块传输。
封装所要达到的效果,是可以像下面这样使用Socket和服务端通信,调用服务端的方法,让你在使用Socket的时候,感觉不到Socket的存在,就像是调用本地方法一样,并且支持ref参数和out参数:
DemoService demoService = new DemoService();DemoService2 demoService2 = new DemoService2();string result = demoService.Test("测试DemoService", 1);demoService.Test2("测试DemoService", 1);string result2 = demoService2.RunTest("测试DemoService2", 2);
一、数据结构:
CmdType:
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace DataStruct{ ////// cmd类型 /// public enum CmdType { ////// 执行方法 /// RunFunction = 1, ////// 心跳 /// Heartbeat = 2 }}
SocketData:
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace DataStruct{ ////// Socket数据 /// [Serializable] public class SocketData { ////// 命令类型 /// public CmdType cmdType { get; set; } ////// 类名 /// public string className { get; set; } ////// 方法名 /// public string functionName { get; set; } ////// 方法参数 /// public object[] funParam { get; set; } }}
SocketResult:
using System;using System.Collections.Generic;using System.Linq;using System.Text;namespace DataStruct{ ////// Socket返回 /// [Serializable] public class SocketResult { ////// 方法返回值 /// public object returnValue { get; set; } ////// 方法参数 /// public object[] param { get; set; } }}
FunctionUtil(根据SocketData执行服务端的方法):
using System;using System.Collections.Generic;using System.IO;using System.Linq;using System.Reflection;using System.Text;using System.Threading.Tasks;namespace DataStruct.Utils{ ////// 执行方法 /// public class FunctionUtil { ////// 执行方法 /// public static object RunFunction(string applicationPath, SocketData socketData) { Assembly assembly = Assembly.LoadFile(Path.Combine(applicationPath, "DataService.dll")); object classObj = assembly.CreateInstance("DataService." + socketData.className); Type type = classObj.GetType(); MethodInfo methodInfo = type.GetMethod(socketData.functionName); ParameterInfo[] parameterInfoArr = methodInfo.GetParameters(); object result = methodInfo.Invoke(classObj, socketData.funParam); SocketResult socketResult = new SocketResult(); socketResult.returnValue = result; socketResult.param = new object[socketData.funParam.Length]; object paramObj; for (int i = 0; i < parameterInfoArr.Length; i++) { paramObj = socketData.funParam[i]; if (parameterInfoArr[i].ParameterType.IsByRef || parameterInfoArr[i].IsOut) { socketResult.param[i] = paramObj; } else { socketResult.param[i] = null; } } return socketResult; } }}
二、Socket通信封装:
using System;using System.Collections.Generic;using System.Configuration;using System.Linq;using System.Net;using System.Net.Sockets;using System.Text;using System.Threading;using DataStruct.Utils;namespace DataStruct{ ////// Socket封装 /// public static class SocketHelper { #region 变量 private static object _lockSend = new object(); private static Socket serverSocket; private static Socket clientSocket; private static ListclientList = new List (); private static System.Timers.Timer heartbeatTimer; #endregion #region 启动服务 /// /// 启动服务 /// public static void StartServer() { try { int port = Convert.ToInt32(ConfigurationManager.AppSettings["ServerPort"]); IPEndPoint ipEndPoint = new IPEndPoint(IPAddress.Any, port); serverSocket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); serverSocket.Bind(ipEndPoint); serverSocket.Listen(10); new Thread(new ThreadStart(delegate() { while (true) { Socket m_Client; try { m_Client = serverSocket.Accept(); m_Client.SendTimeout = 20000; m_Client.ReceiveTimeout = 20000; m_Client.SendBufferSize = 10240; m_Client.ReceiveBufferSize = 10240; clientList.Add(m_Client); LogUtil.Log("监听到新的客户端,当前客户端数:" + clientList.Count); } catch { break; } DateTime lastHeartbeat = DateTime.Now; new Thread(new ThreadStart(delegate() { try { while (true) { byte[] receiveByteArr = null; try { receiveByteArr = Receive(m_Client); } catch { break; } if (receiveByteArr != null) { SocketData data = (SocketData)SerializeUtil.Deserialize(receiveByteArr); if (data.cmdType != CmdType.Heartbeat) { object obj = null; try { obj = FunctionUtil.RunFunction(System.Windows.Forms.Application.StartupPath, data); } catch (Exception ex) { LogUtil.LogError("执行方法出错:" + ex.Message + "\r\n" + ex.StackTrace); Send(m_Client, SerializeUtil.Serialize("error:执行服务端方法出错")); } Send(m_Client, SerializeUtil.Serialize(obj)); LogUtil.Log("接收客户端数据,并向客户端返回数据"); } else { lastHeartbeat = DateTime.Now; LogUtil.Log("收到心跳包,客户端连接正常"); } } else { clientList.Remove(m_Client); LogUtil.Log("客户端正常关闭,当前客户端数:" + clientList.Count); if (m_Client.Connected) m_Client.Disconnect(false); m_Client.Close(); m_Client.Dispose(); break; } } } catch (Exception ex) { LogUtil.LogError(ex.Message + "\r\n" + ex.StackTrace); try { Send(m_Client, SerializeUtil.Serialize("error:" + ex.Message)); } catch { } } })).Start(); //检测客户端 new Thread(new ThreadStart(delegate() { try { while (true) { DateTime now = DateTime.Now; if (now.Subtract(lastHeartbeat).TotalSeconds > 300) { clientList.Remove(m_Client); LogUtil.Log("客户端已失去连接,当前客户端数:" + clientList.Count); if (m_Client.Connected) m_Client.Disconnect(false); m_Client.Close(); m_Client.Dispose(); break; } Thread.Sleep(500); } } catch (Exception ex) { LogUtil.LogError("检测客户端出错:" + ex.Message + "\r\n" + ex.StackTrace); } })).Start(); } })).Start(); LogUtil.Log("服务已启动"); } catch (Exception ex) { LogUtil.LogError("启动服务出错:" + ex.Message + "\r\n" + ex.StackTrace); } } #endregion #region 停止服务 ////// 停止服务 /// public static void StopServer() { try { foreach (Socket socket in clientList) { if (socket.Connected) socket.Disconnect(false); socket.Close(); socket.Dispose(); } clientList.Clear(); if (serverSocket != null) { if (serverSocket.Connected) serverSocket.Disconnect(false); serverSocket.Close(); serverSocket.Dispose(); } LogUtil.Log("服务已停止"); } catch (Exception ex) { LogUtil.LogError("停止服务出错:" + ex.Message + "\r\n" + ex.StackTrace); } } #endregion #region 连接服务器 ////// 连接服务器 /// public static void ConnectServer() { try { if (clientSocket == null || !clientSocket.Connected) { if (clientSocket != null) { clientSocket.Close(); clientSocket.Dispose(); } string ip = ConfigurationManager.AppSettings["ServerIP"]; int port = Convert.ToInt32(ConfigurationManager.AppSettings["ServerPort"]); IPEndPoint ipep = new IPEndPoint(IPAddress.Parse(ip), port); clientSocket = new Socket(ipep.AddressFamily, SocketType.Stream, ProtocolType.Tcp); clientSocket.SendTimeout = 20000; clientSocket.ReceiveTimeout = 20000; clientSocket.SendBufferSize = 10240; clientSocket.ReceiveBufferSize = 10240; clientSocket.Connect(ipep); LogUtil.Log("已连接服务器"); } } catch (Exception ex) { LogUtil.LogError("连接服务器失败:" + ex.Message); } } #endregion #region 断开服务器 ////// 断开服务器 /// public static void DisconnectServer() { try { if (clientSocket != null) { if (clientSocket.Connected) clientSocket.Disconnect(false); clientSocket.Close(); clientSocket.Dispose(); } LogUtil.Log("已断开服务器"); } catch (Exception ex) { LogUtil.LogError("断开服务器失败:" + ex.Message); } } #endregion #region 心跳 public static void StartHeartbeat() { heartbeatTimer = new System.Timers.Timer(); heartbeatTimer.Interval = 5000; heartbeatTimer.Elapsed += new System.Timers.ElapsedEventHandler((obj, eea) => { try { SocketData data = new SocketData(); data.cmdType = CmdType.Heartbeat; Send(clientSocket, SerializeUtil.Serialize(data)); } catch (Exception ex) { LogUtil.LogError("向服务器发送心跳包出错:" + ex.Message); } }); heartbeatTimer.Start(); } #endregion #region 停止心跳 public static void StopHeartbeat() { heartbeatTimer.Stop(); } #endregion #region Send ////// Send /// public static void Send(Socket socket, byte[] data) { lock (_lockSend) { byte[] lenArr = BitConverter.GetBytes(data.Length); int sendTotal = 0; while (sendTotal < lenArr.Length) { int sendOnce = socket.Send(lenArr, sendTotal, lenArr.Length - sendTotal, SocketFlags.None); sendTotal += sendOnce; Thread.Sleep(1); } Thread.Sleep(1); int block = 10240; int count = (data.Length - 1) / block + 1; for (int i = 0; i < count; i++) { int currentBlock = block; if (i == count - 1) { currentBlock = data.Length - block * i; } sendTotal = 0; while (sendTotal < currentBlock) { int sendOnce = socket.Send(data, i * block + sendTotal, currentBlock - sendTotal, SocketFlags.None); sendTotal += sendOnce; Thread.Sleep(1); } Thread.Sleep(1); } } } #endregion #region Receive ////// Receive /// private static byte[] Receive(Socket socket) { lock (socket) { try { int block = 4; byte[] buffer = new byte[block]; int receiveCount = socket.Receive(buffer, 0, block, SocketFlags.None); if (receiveCount == 0) { return null; } else { while (receiveCount < block) { int revCount = socket.Receive(buffer, receiveCount, buffer.Length - receiveCount, SocketFlags.None); receiveCount += revCount; Thread.Sleep(1); } int dataLength = BitConverter.ToInt32(buffer, 0); block = 10240; receiveCount = 0; byte[] result = new byte[dataLength]; while (receiveCount < dataLength) { int revCount = socket.Receive(result, receiveCount, result.Length - receiveCount, SocketFlags.None); receiveCount += revCount; Thread.Sleep(1); } try { SerializeUtil.Deserialize(result); } catch (Exception ex) { LogUtil.LogError("数据检验失败!"); string aa = ex.Message; } return result; } } catch (Exception ex) { LogUtil.LogError("接收数据出错:" + ex.Message + "\r\n" + ex.StackTrace); return null; } } } #endregion #region IsZero ////// IsZero /// private static bool IsZero(byte[] data) { bool bl = true; foreach (byte b in data) { if (b != 0) { return false; } } LogUtil.LogError("接收的字节数组内容全是0"); return bl; } #endregion #region 请求 ////// 请求 /// public static object Request(SocketData data) { try { ConnectServer(); Send(clientSocket, SerializeUtil.Serialize(data)); byte[] receiveByteArr = null; receiveByteArr = Receive(clientSocket); if (receiveByteArr != null) { object result = SerializeUtil.Deserialize(receiveByteArr); if (result.GetType() == typeof(string) && result.ToString().IndexOf("error:") == 0) { string errMsg = result.ToString().Split(':')[1]; LogUtil.LogError(errMsg); throw new Exception(errMsg); } return result; } else { if (clientSocket.Connected) clientSocket.Disconnect(false); clientSocket.Close(); clientSocket.Dispose(); return Request(data); } } catch (Exception ex) { if (clientSocket.Connected) clientSocket.Disconnect(false); LogUtil.LogError(ex.Message); throw ex; } } #endregion #region Request 请求 ////// 请求 /// public static object Request(string className, string methodName, object[] param) { SocketData data = new SocketData(); data.className = className; data.functionName = methodName; data.funParam = param; return Request(data); } #endregion }}
三、服务端的服务接口类:
DemoService:
using System;using System.Collections.Generic;using System.IO;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using DAL;using Models;namespace DataService{ ////// socket服务 /// public class DemoService { public ListGetList(ref PagerModel pager, out string str) { NoticeTypeDal noticeTypeDal = new NoticeTypeDal(); str = "测试123"; return noticeTypeDal.GetList(ref pager); } public string Test(string str, int n) { return str + ":" + n; } public void Test2(string str, int n) { string s = str + n; } public void UploadFile(string fileName, byte[] fileData, int index) { string path = @"C:\Documents and Settings\Administrator\桌面\XXPLServer\files\"; //string path = @"D:\_临时文件\文件\"; //string path = @"C:\Users\Administrator\Desktop\suxtest\file\"; //string path = @"C:\Documents and Settings\Administrator\桌面\Server\上传文件\"; if (index == 1) { using (FileStream fs = new FileStream(path + fileName, FileMode.Create, FileAccess.Write)) { fs.Write(fileData, 0, fileData.Length); fs.Close(); } } else { using (FileStream fs = new FileStream(path + fileName, FileMode.Append, FileAccess.Write)) { fs.Write(fileData, 0, fileData.Length); fs.Close(); } } } }}
DemoService2:
using System;using System.Collections.Generic;using System.Linq;using System.Text;namespace DataService{ public class DemoService2 { public string RunTest(string str, int n) { return str + ":" + n; } }}
四、客户端接口类代码:
DemoService:
using System;using System.Collections.Generic;using System.Linq;using System.Text;using DataStruct;using Common.Utils;using System.Reflection;using Models;namespace ClientService{ public class DemoService { public ListGetList(ref PagerModel pager, out string str) { SocketResult result = (SocketResult)ServiceUtil.Request(this.GetType().Name, MethodBase.GetCurrentMethod().Name, new object[] { pager, null }); pager = (PagerModel)result.param[0]; str = (string)result.param[1]; return (List )result.returnValue; } public string Test(string str, int n) { SocketResult result = (SocketResult)ServiceUtil.Request(this.GetType().Name, MethodBase.GetCurrentMethod().Name, new object[] { str, n }); return result.returnValue.ToString(); } public void Test2(string str, int n) { SocketResult result = (SocketResult)ServiceUtil.Request(this.GetType().Name, MethodBase.GetCurrentMethod().Name, new object[] { str, n }); } public bool UploadFile(string fileName, byte[] fileData, int index) { try { ServiceUtil.Request(this.GetType().Name, MethodBase.GetCurrentMethod().Name, new object[] { fileName, fileData, index }); return true; } catch { return false; } } }}
DemoService2:
using System;using System.Collections.Generic;using System.Linq;using System.Reflection;using System.Text;using Common.Utils;using DataStruct;namespace ClientService{ public class DemoService2 { public string RunTest(string str, int n) { SocketResult result = (SocketResult)ServiceUtil.Request(this.GetType().Name, MethodBase.GetCurrentMethod().Name, new object[] { str, n }); return result.returnValue.ToString(); } }}
五:服务端启动服务:
using System;using System.Collections.Generic;using System.ComponentModel;using System.Configuration;using System.Data;using System.Drawing;using System.IO;using System.Linq;using System.Net;using System.Net.Sockets;using System.Runtime.Serialization.Formatters.Binary;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Windows.Forms;using DataStruct;using DataStruct.Utils;using Newtonsoft.Json;namespace XXPLServer{ public partial class Form1 : Form { #region 变量 #endregion #region Form1构造函数 public Form1() { InitializeComponent(); } #endregion #region Form1_Load private void Form1_Load(object sender, EventArgs e) { } #endregion #region 启动服务 private void btnStartServer_Click(object sender, EventArgs e) { btnStopServer.Enabled = true; btnStartServer.Enabled = false; SocketHelper.StartServer(); } #endregion #region 停止服务 private void btnStopServer_Click(object sender, EventArgs e) { btnStopServer.Enabled = false; btnStartServer.Enabled = true; SocketHelper.StopServer(); } #endregion #region Form1_FormClosing private void Form1_FormClosing(object sender, FormClosingEventArgs e) { SocketHelper.StopServer(); System.Environment.Exit(0); } #endregion }}
六:客户端测试代码:
using System;using System.Collections.Generic;using System.ComponentModel;using System.Configuration;using System.Data;using System.Drawing;using System.Linq;using System.Net;using System.Net.Sockets;using System.Text;using System.Threading;using System.Threading.Tasks;using System.Windows.Forms;using CommonDll;using DataStruct;using ClientService;using System.IO;using System.Diagnostics;using Models;namespace XXPLClient{ public partial class Form1 : Form { #region 变量 #endregion #region Form1构造函数 public Form1() { InitializeComponent(); } #endregion #region Form1_Load private void Form1_Load(object sender, EventArgs e) { SocketHelper.ConnectServer(); //连接服务器 SocketHelper.StartHeartbeat(); //心跳 } #endregion #region Form1_FormClosing private void Form1_FormClosing(object sender, FormClosingEventArgs e) { SocketHelper.DisconnectServer(); SocketHelper.StopHeartbeat(); System.Environment.Exit(0); } #endregion private void btnTest_Click(object sender, EventArgs e) { for (int i = 0; i < 1; i++) { new Thread(new ParameterizedThreadStart((obj) => { try { for (int j = 0; j < 1; j++) { DemoService demoService = new DemoService(); DemoService2 demoService2 = new DemoService2(); string str = demoService.Test("测试DemoService", 1) + "\r\n" + demoService2.RunTest("测试DemoService2", 2); MessageBox.Show(str); } } catch (Exception ex) { LogUtil.LogError(ex.Message); MessageBox.Show(ex.Message); } })).Start(); } } private void btnUpload_Click(object sender, EventArgs e) { if (openFileDialog1.ShowDialog() == DialogResult.OK) { try { new Thread(new ParameterizedThreadStart((obj) => { int block = 1048576; byte[] bArr = new byte[block]; string fileName; using (FileStream fs = new FileStream(openFileDialog1.FileName, FileMode.Open, FileAccess.Read)) { fileName = Path.GetFileName(fs.Name); long count = (fs.Length - 1) / block + 1; DemoService demoService = new DemoService(); for (int i = 0; i < count; i++) { if (i != count - 1) { fs.Read(bArr, 0, bArr.Length); } else { int len = (int)(fs.Length - block * i); bArr = new byte[len]; fs.Read(bArr, 0, bArr.Length); } bool bl = demoService.UploadFile(fileName, bArr, i + 1); while (!bl) { bl = demoService.UploadFile(fileName, bArr, i + 1); LogUtil.LogError("发生错误,重发"); Thread.Sleep(20); } Thread.Sleep(20); } fs.Close(); } MessageBox.Show("成功"); })).Start(); } catch (Exception ex) { MessageBox.Show(ex.Message); } } } private void button1_Click(object sender, EventArgs e) { try { DemoService demoService = new DemoService(); PagerModel pager = new PagerModel(); pager.page = 1; pager.rows = 10; string str; Listlist = demoService.GetList(ref pager, out str); MessageBox.Show(string.Format("数据总条数:{0}\r\n页数:{1}\r\nout参数值:{2}\r\n第一条数据:{3}", pager.totalRows, pager.pageCount, str, list[0].name)); } catch (Exception ex) { string ss = ex.Message; } } }}
七:大文件分块上传:
服务端DemoService添加如下方法:
public void UploadFile(string fileName, byte[] fileData, int index){ string path = @"C:\Documents and Settings\Administrator\桌面\XXPLServer\files\"; //string path = @"D:\_临时文件\文件\"; //string path = @"C:\Users\Administrator\Desktop\suxtest\file\"; //string path = @"C:\Documents and Settings\Administrator\桌面\Server\上传文件\"; if (index == 1) { using (FileStream fs = new FileStream(path + fileName, FileMode.Create, FileAccess.Write)) { fs.Write(fileData, 0, fileData.Length); fs.Close(); } } else { using (FileStream fs = new FileStream(path + fileName, FileMode.Append, FileAccess.Write)) { fs.Write(fileData, 0, fileData.Length); fs.Close(); } }}
客户端DemoService添加如下方法:
public bool UploadFile(string fileName, byte[] fileData, int index){ try { ServiceUtil.Request(this.GetType().Name, MethodBase.GetCurrentMethod().Name, new object[] { fileName, fileData, index }); return true; } catch { return false; }}
客户端选择文件上传:
private void btnUpload_Click(object sender, EventArgs e){ if (openFileDialog1.ShowDialog() == DialogResult.OK) { try { new Thread(new ParameterizedThreadStart((obj) => { int block = 1048576; byte[] bArr = new byte[block]; string fileName; using (FileStream fs = new FileStream(openFileDialog1.FileName, FileMode.Open, FileAccess.Read)) { fileName = Path.GetFileName(fs.Name); long count = (fs.Length - 1) / block + 1; for (int i = 0; i < count; i++) { if (i != count - 1) { fs.Read(bArr, 0, bArr.Length); } else { int len = (int)(fs.Length - block * i); bArr = new byte[len]; fs.Read(bArr, 0, bArr.Length); } DemoService demoService = new DemoService(); bool bl = demoService.UploadFile(fileName, bArr, i + 1); while (!bl) { bl = demoService.UploadFile(fileName, bArr, i + 1); LogUtil.LogError("发生错误,重发"); Thread.Sleep(20); } Thread.Sleep(20); } fs.Close(); } MessageBox.Show("成功"); })).Start(); } catch (Exception ex) { MessageBox.Show(ex.Message); } }}