package threadPool
import java.io.BufferedReader
import java.io.IOException
import java.io.InputStream
import java.io.InputStreamReader
import java.io.OutputStream
import java.io.PrintWriter
import java.net.ServerSocket
import java.net.Socket
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
public class ThreadPoolServer {
private int port = 8000
private ServerSocket serverSocket
private ExecutorService executorService //线程池
private final int POOL_SIZE = 4 //单个CPU时线程池中的工作线程数目
public ThreadPoolServer() throws IOException{
serverSocket = new ServerSocket(port)
//创建线程池
//Runtime 的availableProcessors()方法返回当前系统CPU的数目
//系统CPU越多,线程池中的工作线程数目越多
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*POOL_SIZE)
System.out.println("服务器已启动!!")
}
public void service(){
while(true){
Socket socket = null
try{
socket = serverSocket.accept()
executorService.execute(new Handler(socket))
}catch(IOException e){
e.printStackTrace()
}
}
}
public static void main(String[] args) {
try {
new ThreadPoolServer().service()
} catch (IOException e) {
e.printStackTrace()
}
}
}
class Handler implements Runnable {
private Socket socket
public Handler(Socket socket) {
this.socket = socket
}
private PrintWriter getWriter(Socket socket) throws IOException {
OutputStream socketOut = socket.getOutputStream()
return new PrintWriter(socketOut, true)
}
private BufferedReader getReader(Socket socket) throws IOException {
InputStream socketIn = socket.getInputStream()
return new BufferedReader(new InputStreamReader(socketIn))
}
public String echo(String msg) {
return "echo:" + msg
}
@Override
public void run() {
try {
System.out.println("New connection accepted:" + socket.getInetAddress() + ":" + socket.getPort())
BufferedReader br = getReader(socket)
PrintWriter pw = getWriter(socket)
String msg = null
while ((msg = br.readLine()) != null) {
System.out.println(msg)
pw.println(echo(msg))
if (msg.equals("bye")) {
break
}
}
} catch (IOException e) {
e.printStackTrace()
} finally {
try {
if (socket != null)
socket.close()
} catch (IOException e) {
e.printStackTrace()
}
}
}
}
服务器在调用listen和accept后,就会阻塞在accept函数上,accpet函数返回后循环调用accept函数等待客户的TCP连接。如果这时候又大量的用户并发发起connect连接,那么在listen有队列上限(最大可接受TCP的连接数)的情况下,有多少个connect会成功了。试验证明,当连接数远远高于listen的可连接数上限时,客户端的大部分TCP请求会被抛弃,只有当listen监听队列空闲或者放弃某个连接时,才可以接收新的连接,那么我们应该如何来避免这种情况出现?分析:
(一)客户端
客户端运行初期完成所设定的一定量的socket创建和相应的处理线程的创建,然后使用条件变量来完成线程同步,直到最后一个线程创建完成,才向所有线程发出广播通知,让所有线程并发调用connect,连接成功则关闭连接,失败则返回,如下代码所示。
socket创建和线程创建:
int testCount=300//并发用户数
/*
每个进程需要自己独立的栈空间,linux下默认栈大小是10M,在32位的机子上一个进程需要4G的内存空间,去掉自己的栈空间全局程序段空间,一般只有3G内存可以用,创建线程时就需要从这3G的空间中分配10M出来,所以最多可以分配300个线程。当然这里还可以使用多个进程,每个进程300个线程的方式来进一步扩大并发量。
*/
int sockfd[testCount]
pthread_t ntid[testCount]
bzero(&servaddr,sizeof(servaddr))
servaddr.sin_family=AF_INET
servaddr.sin_port=htons(SERVER_PORT)
inet_pton(AF_INET,argv[1],&servaddr.sin_addr)
int testCaseIndex=0
for(testCaseIndex=0testCaseIndex<testCounttestCaseIndex++)
{
sockfd[testCaseIndex]=socket(AF_INET,SOCK_STREAM,0)
//为每个并发客户端创建一个socket
if(sockfd[testCaseIndex]==-1)
{
printf("socket established error: %s\n",(char*)strerror(errno))
return -1
}
if( pthread_create(&ntid[testCaseIndex],NULL,handleFun,&sockfd[testCaseIndex])!=0)
{
printf("create thread error :%s\n",strerror(errno))
return -1
}
//为每个并发客户端创建一个线程来执行connect
}
printf("%d client has initiated\n",testCaseIndex)
并发客户端的线程实现:线程阻塞在条件变量上(只有条件满足了并且发起唤醒动作,线程才开始执行)。
int sockfd=*((int*)arg)
{
pthread_cond_wait(&cond,&mut)
//在条件变量上等待条件满足!
//阻塞返回后立即解锁,防止互斥量加锁带来的阻塞
pthread_mutex_unlock(&mut)
int conRes=0
conRes=connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))
//线程执行connect连接,每个线程在接到唤醒信号后,才可以执行该语句,来模拟多个线程的并发调用。
if(conRes==-1)
{
printf("connect error: %s\n",strerror(errno))
return 0
}
}
当条件满足时,唤醒阻塞在条件变量上的线程:
while(1)
{
sleep(2)
pthread_cond_broadcast(&cond)//在所有线程创建完成后才进行唤醒。
欢迎分享,转载请注明来源:夏雨云
评论列表(0条)