如何用C#实现多线程TCP协议的服务器端程序

如何用C#实现多线程TCP协议的服务器端程序,第1张

用C#实现多线程TCP协议的服务器端程序

// <summary>

/// Tcp客户线程类(服务端),ThreadServerProcessor 线程产生的客户连接,用该线程读写

/// </summary>

public class ThreadClientProcessor

{

//Tcp连接实例

private TcpClient tcpClient

//消息框,本来想写日志用

private System.Windows.Forms.ListBox MessageList

private string Password//该连接登陆密码

private string Cmd1Echo

private string Cmd2Echo

private bool ClientLogOn//客户是否登陆

private bool TcpClose

public ThreadClientProcessor(){}

//构造函数,参数解释:Tcp客户,消息框,该服务密码(password命令后的参数) ,命令回应串 1,2 ******************

public ThreadClientProcessor(TcpClient client , ListBox listBox,string LogonText ,string cmd1echo,string cmd2echo)

{

ClientList.Add(this) //把当前实例加入一个列表中,方便以后控制

this.tcpClient=client

this.MessageList=listBox

this.Password=LogonText

this.Cmd1Echo=cmd1echo

this.Cmd2Echo=cmd2echo

this.ClientLogOn=false

this.TcpClose=false

}

public static char[] CmdSplit={' '}//读来的串由' ' 进行分离,命名+' '+参数

//public const string[] Cmd=new string[] { "password","cmd1","cmd2","echo","bye"}

//该函数由你自己写,这个只是给一个例子,

//功能:命令处理器,给个命令串,返回该命令处理结果,把命令和处理结果放在一个文本文件里,便于系统升级

public string TcpCmd(string s)

{

string result

try

{

string cmdarg=s.Trim()

string[] args=cmdarg.Split(CmdSplit)

string cmd=args[0].ToLower()

switch (cmd )

{

case "password" :

if (args.Length>1)

{

ClientLogOn= Password.Equals(args[1].Trim())

result=ClientLogOn? "登陆成功":"密码不正确,未登陆"

}

else result= "登陆时候,没有输入密码"

break

case "cmd1":

result=ClientLogOn?this.Cmd1Echo:"该命令无权执行,请先登陆"

break

case "cmd2":

result=ClientLogOn?this.Cmd2Echo:"该命令无权执行,请先登陆"

break

case "echo":

result=string.Format("服务器回应:\n {0}",s)

break

case "bye":

this.TcpClose=true

result="DisConnected"

break

default:

result="不可识别的命令"

break

}

}

catch

{

result="解析命令发生错误,你输入的是狗屁命令,TMD *^* "

}

return result

} //end cmd

//定义一个线程,该线程对应的函数是 void start()(不是Start())********************************

//一下程序主要是操作该线程

public System.Threading.Thread tcpClientThread

//启动客户连接线程 *************************************************************

public void Start()

{

tcpClientThread=new Thread(new ThreadStart(start))

tcpClientThread.Priority=ThreadPriority.BelowNormal

tcpClientThread.Start()

}

//断开该当前实例连接,终止线程 **************************************************************

public void Abort()

{

if (this.tcpClientThread!=null)

{

//tcpClientThread.Interrupt()

tcpClientThread.Abort()

//一定要等一会儿,以为后边tcpClient.Close()时候,会影响NetWorkStream的操作

Thread.Sleep(TimeSpan.FromMilliseconds(100))

tcpClient.Close()

}

}

//静态列表,包含了每个连接实例(在构造实例时候使用了 ArrayList.Add( object))

private static System.Collections.ArrayList ClientList=new ArrayList()

//断开所有的Tcp客户连接,静态方法*************************************************************

public static void AbortAllClient()

{

for(int j=0 j<ClientList.Countj++)

{

//从实例列表中取一个对象,转化为ThreadClientProcessor对象

ThreadClientProcessor o=(ThreadClientProcessor ) ClientList[j]

//调用ThreadClientProcessor 对象的停止方法

o.Abort()

}

//清除连接列表

ClientList.Clear()

}

//读写连接的函数,用于线程//*******************************************************************

private void start()

{

byte[] buf=new byte[1024*1024]//预先定义1MB的缓冲

int Len=0//流的实际长度

NetworkStream networkStream=tcpClient.GetStream()//建立读写Tcp的流

try

{

byte[] p=Encoding.UTF8.GetBytes(" 欢迎光临,请输入密码" )

//向Tcp连接写 欢迎消息

if (!this.ClientLogOn )

networkStream.Write(p,0,p.Length)

//开始循环读写tcp流

while (!TcpClose)

{

//如果当前线程是在其它状态,(等待挂起,等待终止.....)就结束该循环

if (Thread.CurrentThread.ThreadState!=ThreadState.Running)

break

//判断Tcp流是否有可读的东西

if ( networkStream.DataAvailable)

{

//从流中读取缓冲字节数组

Len=networkStream.Read(buf,0,buf.Length)

//转化缓冲数组为串

string cmd=Encoding.UTF8.GetString(buf,0,Len)

this.MessageList.Items.Add("客户机:"+cmd)

//处理该缓冲的串(分析命令),分析结果为res串

string res=TcpCmd(cmd)

//把命令的返回结果res 转化为字节数组

byte[] result=Encoding.UTF8.GetBytes(res)

//发送结果缓冲数组给客户端

networkStream.Write(result,0,result.Length)

this.MessageList.Items.Add("服务器回应:"+res)

}

else

{

//Thread.Sleep(TimeSpan.FromMilliseconds(200d))

//this.MessageList.Items.Add("客户机无命令")

//如果当前Tcp连接空闲,客户端没有写入,则当前线程停止200毫秒

Thread.Sleep(TimeSpan.FromMilliseconds(200d))

}

}

一:进程和线程

每个进程有自己独立的地址空间。“在同一个进程”还是“不在同一个进程”是系统功能划分的重要决策点。《Erlang程序设计》[ERL]把进程比喻为人:

每个人有自己的记忆(内存),人与人通过谈话(消息传递)来交流,谈话既可以是面谈(同一台服务器),也可以在电话里谈(不同的服务器,有网络通信)。面谈和电话谈的区别在于,面谈可以立即知道对方是否死了(crash,SIGCHLD),而电话谈只能通过周期性的心跳来判断对方是否还活着。

有了这些比喻,设计分布式系统时可以采取“角色扮演”,团队里的几个人各自扮演一个进程,人的角色由进程的代码决定(管登录的、管消息分发的、管买卖的等等)。每个人有自己的记忆,但不知道别人的记忆,要想知道别人的看法,只能通过交谈(暂不考虑共享内存这种IPC)。然后就可以思考:

·容错:万一有人突然死了

·扩容:新人中途加进来

·负载均衡:把甲的活儿挪给乙做

·退休:甲要修复bug,先别派新任务,等他做完手上的事情就把他重启

等等各种场景,十分便利。

线程的特点是共享地址空间,从而可以高效地共享数据。一台机器上的多个进程能高效地共享代码段(操作系统可以映射为同样的物理内存),但不能共享数据。如果多个进程大量共享内存,等于是把多进程程序当成多线程来写,掩耳盗铃。

“多线程”的价值,我认为是为了更好地发挥多核处理器(multi-cores)的效能。在单核时代,多线程没有多大价值(个人想法:如果要完成的任务是CPU密集型的,那多线程没有优势,甚至因为线程切换的开销,多线程反而更慢;如果要完成的任务既有CPU计算,又有磁盘或网络IO,则使用多线程的好处是,当某个线程因为IO而阻塞时,OS可以调度其他线程执行,虽然效率确实要比任务的顺序执行效率要高,然而,这种类型的任务,可以通过单线程的”non-blocking IO+IO multiplexing”的模型(事件驱动)来提高效率,采用多线程的方式,带来的可能仅仅是编程上的简单而已)。Alan Cox说过:”A computer is a state machine.Threads are for people who can’t program state machines.”(计算机是一台状态机。线程是给那些不能编写状态机程序的人准备的)如果只有一块CPU、一个执行单元,那么确实如Alan Cox所说,按状态机的思路去写程序是最高效的。

二:单线程服务器的常用编程模型

据我了解,在高性能的网络程序中,使用得最为广泛的恐怕要数”non-blocking IO + IO multiplexing”这种模型,即Reactor模式。

在”non-blocking IO + IO multiplexing”这种模型中,程序的基本结构是一个事件循环(event loop),以事件驱动(event-driven)和事件回调的方式实现业务逻辑:

[cpp] view plain copy

//代码仅为示意,没有完整考虑各种情况

while(!done)

{

int timeout_ms = max(1000, getNextTimedCallback())

int retval = poll(fds, nfds, timeout_ms)

if (retval<0){

处理错误,回调用户的error handler

}else{

处理到期的timers,回调用户的timer handler

if(retval>0){

处理IO事件,回调用户的IO event handler

}

}

}

这里select(2)/poll(2)有伸缩性方面的不足(描述符过多时,效率较低),Linux下可替换为epoll(4),其他操作系统也有对应的高性能替代品。

Reactor模型的优点很明显,编程不难,效率也不错。不仅可以用于读写socket,连接的建立(connect(2)/accept(2)),甚至DNS解析都可以用非阻塞方式进行,以提高并发度和吞吐量(throughput),对于IO密集的应用是个不错的选择。lighttpd就是这样,它内部的fdevent结构十分精妙,值得学习。

基于事件驱动的编程模型也有其本质的缺点,它要求事件回调函数必须是非阻塞的。对于涉及网络IO的请求响应式协议,它容易割裂业务逻辑,使其散布于多个回调函数之中,相对不容易理解和维护。

三:多线程服务器的常用编程模型

大概有这么几种:

a:每个请求创建一个线程,使用阻塞式IO操作。在Java 1.4引人NIO之前,这是Java网络编程的推荐做法。可惜伸缩性不佳(请求太多时,操作系统创建不了这许多线程)。

b:使用线程池,同样使用阻塞式IO操作。与第1种相比,这是提高性能的措施。

c:使用non-blocking IO + IO multiplexing。即Java NIO的方式。

d:Leader/Follower等高级模式。

在默认情况下,我会使用第3种,即non-blocking IO + one loop per thread模式来编写多线程C++网络服务程序。

1:one loop per thread

此种模型下,程序里的每个IO线程有一个event loop,用于处理读写和定时事件(无论周期性的还是单次的)。代码框架跟“单线程服务器的常用编程模型”一节中的一样。

libev的作者说:

One loop per thread is usually a good model. Doing this is almost never wrong, some times a better-performance model exists, but it is always a good start.

这种方式的好处是:

a:线程数目基本固定,可以在程序启动的时候设置,不会频繁创建与销毁。

b:可以很方便地在线程间调配负载。

c:IO事件发生的线程是固定的,同一个TCP连接不必考虑事件并发。

Event loop代表了线程的主循环,需要让哪个线程干活,就把timer或IO channel(如TCP连接)注册到哪个线程的loop里即可:对实时性有要求的connection可以单独用一个线程;数据量大的connection可以独占一个线程,并把数据处理任务分摊到另几个计算线程中(用线程池);其他次要的辅助性connections可以共享一个线程。

比如,在dbproxy中,一个线程用于专门处理客户端发来的管理命令;一个线程用于处理客户端发来的MySQL命令,而与后端数据库通信执行该命令时,是将该任务分配给所有事件线程处理的。

对于non-trivial(有一定规模)的服务端程序,一般会采用non-blocking IO + IO multiplexing,每个connection/acceptor都会注册到某个event loop上,程序里有多个event loop,每个线程至多有一个event loop。

多线程程序对event loop提出了更高的要求,那就是“线程安全”。要允许一个线程往别的线程的loop里塞东西,这个loop必须得是线程安全的。

在dbproxy中,线程向其他线程分发任务,是通过管道和队列实现的。比如主线程accept到连接后,将表示该连接的结构放入队列,并向管道中写入一个字节。计算线程在自己的event loop中注册管道的读事件,一旦有数据可读,就尝试从队列中取任务。

2:线程池

不过,对于没有IO而光有计算任务的线程,使用event loop有点浪费。可以使用一种补充方案,即用blocking queue实现的任务队列:

[cpp] view plain copy

typedef boost::function<void()>Functor

BlockingQueue<Functor>taskQueue //线程安全的全局阻塞队列

//计算线程

void workerThread()

{

while (running) //running变量是个全局标志

{

Functor task = taskQueue.take() //this blocks

task()//在产品代码中需要考虑异常处理

}

}

// 创建容量(并发数)为N的线程池

int N = num_of_computing_threads

for (int i = 0i <N++i)

{

create_thread(&workerThread) //启动线程

}

//向任务队列中追加任务

Foo foo //Foo有calc()成员函数

boost::function<void()>task = boost::bind(&Foo::calc,&foo)

taskQueue.post(task)

除了任务队列,还可以用BlockingQueue<T>实现数据的生产者消费者队列,即T是数据类型而非函数对象,queue的消费者从中拿到数据进行处理。其实本质上是一样的。

3:总结

总结而言,我推荐的C++多线程服务端编程模式为:one (event) loop per thread + thread pool:

event loop用作IO multiplexing,配合non-blockingIO和定时器;

thread pool用来做计算,具体可以是任务队列或生产者消费者队列。

以这种方式写服务器程序,需要一个优质的基于Reactor模式的网络库来支撑,muduo正是这样的网络库。比如dbproxy使用的是libevent。

程序里具体用几个loop、线程池的大小等参数需要根据应用来设定,基本的原则是“阻抗匹配”(解释见下),使得CPU和IO都能高效地运作。所谓阻抗匹配原则:

如果池中线程在执行任务时,密集计算所占的时间比重为 P (0 <P <= 1),而系统一共有 C 个 CPU,为了让这 C 个 CPU 跑满而又不过载,线程池大小的经验公式 T = C/P。(T 是个 hint,考虑到 P 值的估计不是很准确,T 的最佳值可以上下浮动 50%)

以后我再讲这个经验公式是怎么来的,先验证边界条件的正确性。

假设 C = 8,P = 1.0,线程池的任务完全是密集计算,那么T = 8。只要 8 个活动线程就能让 8 个 CPU 饱和,再多也没用,因为 CPU 资源已经耗光了。

假设 C = 8,P = 0.5,线程池的任务有一半是计算,有一半等在 IO 上,那么T = 16。考虑操作系统能灵活合理地调度 sleeping/writing/running 线程,那么大概 16 个“50%繁忙的线程”能让 8 个 CPU 忙个不停。启动更多的线程并不能提高吞吐量,反而因为增加上下文切换的开销而降低性能。

如果 P <0.2,这个公式就不适用了,T 可以取一个固定值,比如 5*C。

另外,公式里的 C 不一定是 CPU 总数,可以是“分配给这项任务的 CPU 数目”,比如在 8 核机器上分出 4 个核来做一项任务,那么 C=4。

四:进程间通信只用TCP

Linux下进程间通信的方式有:匿名管道(pipe)、具名管道(FIFO)、POSIX消息队列、共享内存、信号(signals),以及Socket。同步原语有互斥器(mutex)、条件变量(condition variable)、读写锁(reader-writer lock)、文件锁(record locking)、信号量(semaphore)等等。

进程间通信我首选Sockets(主要指TCP,我没有用过UDP,也不考虑Unix domain协议)。其好处在于:

可以跨主机,具有伸缩性。反正都是多进程了,如果一台机器的处理能力不够,很自然地就能用多台机器来处理。把进程分散到同一局域网的多台机器上,程序改改host:port配置就能继续用;

TCP sockets和pipe都是操作文件描述符,用来收发字节流,都可以read/write/fcntl/select/poll等。不同的是,TCP是双向的,Linux的pipe是单向的,进程间双向通信还得开两个文件描述符,不方便;而且进程要有父子关系才能用pipe,这些都限制了pipe的使用;

TCP port由一个进程独占,且进程退出时操作系统会自动回收文件描述符。因此即使程序意外退出,也不会给系统留下垃圾,程序重启之后能比较容易地恢复,而不需要重启操作系统(用跨进程的mutex就有这个风险);而且,port是独占的,可以防止程序重复启动,后面那个进程抢不到port,自然就没法初始化了,避免造成意料之外的结果;

与其他IPC相比,TCP协议的一个天生的好处是“可记录、可重现”。tcpdump和Wireshark是解决两个进程间协议和状态争端的好帮手,也是性能(吞吐量、延迟)分析的利器。我们可以借此编写分布式程序的自动化回归测试。也可以用tcpcopy之类的工具进行压力测试。TCP还能跨语言,服务端和客户端不必使用同一种语言。

分布式系统的软件设计和功能划分一般应该以“进程”为单位。从宏观上看,一个分布式系统是由运行在多台机器上的多个进程组成的,进程之间采用TCP长连接通信。

使用TCP长连接的好处有两点:一是容易定位分布式系统中的服务之间的依赖关系。只要在机器上运行netstat -tpna|grep <port>就能立刻列出用到某服务的客户端地址(Foreign Address列),然后在客户端的机器上用netstat或lsof命令找出是哪个进程发起的连接。TCP短连接和UDP则不具备这一特性。二是通过接收和发送队列的长度也较容易定位网络或程序故障。在正常运行的时候,netstat打印的Recv-Q和Send-Q都应该接近0,或者在0附近摆动。如果Recv-Q保持不变或持续增加,则通常意味着服务进程的处理速度变慢,可能发生了死锁或阻塞。如果Send-Q保持不变或持续增加,有可能是对方服务器太忙、来不及处理,也有可能是网络中间某个路由器或交换机故障造成丢包,甚至对方服务器掉线,这些因素都可能表现为数据发送不出去。通过持续监控Recv-Q和Send-Q就能及早预警性能或可用性故障。以下是服务端线程阻塞造成Recv-Q和客户端Send-Q激增的例子:

[cpp] view plain copy

$netstat -tn

Proto Recv-Q Send-Q Local AddressForeign

tcp 78393 0 10.0.0.10:2000 10.0.0.10:39748 #服务端连接

tcp 0 132608 10.0.0.10:39748 10.0.0.10:2000 #客户端连接

tcp 0 52 10.0.0.10:22 10.0.0.4:55572

五:多线程服务器的适用场合

如果要在一台多核机器上提供一种服务或执行一个任务,可用的模式有:

a:运行一个单线程的进程;

b:运行一个多线程的进程;

c:运行多个单线程的进程;

d:运行多个多线程的进程;

考虑这样的场景:如果使用速率为50MB/s的数据压缩库,进程创建销毁的开销是800微秒,线程创建销毁的开销是50微秒。如何执行压缩任务?

如果要偶尔压缩1GB的文本文件,预计运行时间是20s,那么起一个进程去做是合理的,因为进程启动和销毁的开销远远小于实际任务的耗时。

如果要经常压缩500kB的文本数据,预计运行时间是10ms,那么每次都起进程 似乎有点浪费了,可以每次单独起一个线程去做。

如果要频繁压缩10kB的文本数据,预计运行时间是200微秒,那么每次起线程似 乎也很浪费,不如直接在当前线程搞定。也可以用一个线程池,每次把压缩任务交给线程池,避免阻塞当前线程(特别要避免阻塞IO线程)。

由此可见,多线程并不是万灵丹(silver bullet)。

1:必须使用单线程的场合

据我所知,有两种场合必须使用单线程:

a:程序可能会fork(2);

实际编程中,应该保证只有单线程程序能进行fork(2)。多线程程序不是不能调用fork(2),而是这么做会遇到很多麻烦:

fork一般不能在多线程程序中调用,因为Linux的fork只克隆当前线程的thread of control,不可隆其他线程。fork之后,除了当前线程之外,其他线程都消失了。

这就造成一种危险的局面。其他线程可能正好处于临界区之内,持有了某个锁,而它突然死亡,再也没有机会去解锁了。此时如果子进程试图再对同一个mutex加锁,就会立即死锁。因此,fork之后,子进程就相当于处于signal handler之中(因为不知道调用fork时,父进程中的线程此时正在调用什么函数,这和信号发生时的场景一样),你不能调用线程安全的函数(除非它是可重入的),而只能调用异步信号安全的函数。比如,fork之后,子进程不能调用:

malloc,因为malloc在访问全局状态时几乎肯定会加锁;

任何可能分配或释放内存的函数,比如snprintf;

任何Pthreads函数;

printf系列函数,因为其他线程可能恰好持有stdout/stderr的锁;

除了man 7 signal中明确列出的信号安全函数之外的任何函数。

因此,多线程中调用fork,唯一安全的做法是fork之后,立即调用exec执行另一个程序,彻底隔断子进程与父进程的联系。

在多线程环境中调用fork,产生子进程后。子进程内部只存在一个线程,也就是父进程中调用fork的线程的副本。

使用fork创建子进程时,子进程通过继承整个地址空间的副本,也从父进程那里继承了所有互斥量、读写锁和条件变量的状态。如果父进程中的某个线程占有锁,则子进程同样占有这些锁。问题是子进程并不包含占有锁的线程的副本,所以子进程没有办法知道它占有了哪些锁,并且需要释放哪些锁。

尽管Pthread提供了pthread_atfork函数试图绕过这样的问题,但是这回使得代码变得混乱。因此《Programming With Posix Threads》一书的作者说:”Avoid using fork in threaded code except where the child process will immediately exec a new program.”。

b:限制程序的CPU占用率;

这个很容易理解,比如在一个8核的服务器上,一个单线程程序即便发生busy-wait,占满1个core,其CPU使用率也只有12.5%,在这种最坏的情况下,系统还是有87.5%的计算资源可供其他服务进程使用。

因此对于一些辅助性的程序,如果它必须和主要服务进程运行在同一台机器的话,那么做成单线程的能避免过分抢夺系统的计算资源。

1、创建一个动物集合,插入动物园中有的几种动物(请给出10种)

2、一次性输出内容

3、使用iterator遍历集合中所有内容

4、将集合内容转存储于一个数字内,并在数组中进行排序

只列举了2种动物,自己再添加

import java.util.ArrayList

import java.util.Arrays

import java.util.Collection

import java.util.Iterator

/**

*

* @author Administrator

*/

interface Animal {

public String toString()

}

class Cat implements Animal,Comparable<Animal>{

public String name

public Cat(){

super()

name = "猫"

}

public String toString(){

return name

}

@Override

public int compareTo(Animal o) {

if(this.toString().compareTo(o.toString())==-1){

return -1

}else if(this.toString().compareTo(o.toString())==0){

return 0

}else

return 1

}

}

class Dog implements Animal,Comparable<Animal>{

public String name

public Dog(){

super()

name="狗"

}

public String toString(){

return name

}

@Override

public int compareTo(Animal o) {

if(this.toString().compareTo(o.toString())==-1){

return -1

}else if(this.toString().compareTo(o.toString())==0){

return 0

}else

return 1

}

}

public class Demo8 {

public static void main(String[] args){

// 创建一个动物集合,插入动物园中有的几种动物

Collection<Animal> col = new ArrayList<Animal>()

col.add(new Cat())

col.add(new Dog())

col.add(new Cat())

// 一次性输出内容

System.out.println(Arrays.toString(col.toArray()))

//使用iterator遍历集合中所有内容

//并将集合内容转存储于一个数组内

Iterator<Animal>it = col.iterator()

int n = 0

Animal[] an = new Animal[3]

while(it.hasNext()){

Animal temp = (Animal)it.next()

System.out.println("使用iterator遍历集合中所有内容:"+temp)

an[n++]=temp

}

//并在数组中进行排序

Arrays.sort(an)

System.out.println(Arrays.toString(an))

}


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/753454.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-08-16
下一篇2023-08-16

发表评论

登录后才能评论

评论列表(0条)

    保存