要实现消息互通就必须要让这些消息服务器本身能互通,想了两个方式,一种是消息服务器之间交叉链接,另一种是增加一个特殊的消息服务器,这个消息服务器不对外开放,只负责消息转发和推送。
下列测试不考虑防火墙等。仅测试可行性和效率。
消息服务器
转发服务器
公共缓存
软件环境
client1 可向 client2 或者其他 client 发送消息,并接收其他 client 发送的消息.
Redis 中保存 client 连接的信息,给每个用户分配唯一的 key ,包括链接的哪台服务器,转发服务器定时检测消息服务器,如消息服务器挂掉,由转发服务器清理掉Redis已经挂掉的所有链接。
1. Client1 给 Client2 发送一条消息
2. Socket1 接收到消息,根据 key从Redis 取出 Client2 的连接信息,连接在本机,直接推送给 Client2 ,流程结束。
3.如果连接不在本机,把消息推送到转发服务器,由转发服务器把该消息推送给连接所在消息服务器,消息服务器接收消息,推送给 Client2 。
服务器上创建一个server.php,内容如下:
上只需把ip变更一下即可。192.168.0.201变更为192.168.0.202.
在转发服务器上建立脚本proxy.php,内容如下:
注意开启顺序
1.开启转发服务器php proxy.php
2.分别开启socket服务器php server.php
可以在转发服务器上看到两个消息服务器已经连接
3.开始测试,分别打开两个telnet,连接两个消息服务器,发送消息测试:
登陆
基于强大的 swoole 扩展,让php高效的实现这些成为可能,目前消息服务器到转发服务器是长连接,转发服务器到消息服务器是短连接,存在性能瓶颈,也浪费了连接资源。下一步改造成长连接,消息服务器的client使用异步。
服务器socket只创一个实例就可以,然后创建监听文件描述符,然后用select或者poll 进行并发监听,来了请求就进行处理一下,数据处理完成就关掉数据联接。但即使这样,在大规模商用的时候,还是可能超出一个服务器的能力,要负载平衡,多个服务器平均承担负载。
不过看你描述,你的程序应该还远没到大规模商用的地步。
socket接受线程:C语言为了高并发所以选择了epoll。当程序启动的时候(g_net_update.c文件中main函数,会启动一个thread见函数create_accept_task)这个thread就处理一件事情,只管接收客户端的连接,当有连接进来的时候 通过epoll_ctl函数,把socket fd 加入到epoll里面去,epoll设置监听事件EPOLLIN | EPOLLET主要是监听的是加入到epoll中的socket是否可读(因为我的需求是客户端连上了server就会马上向server发送一份数据的)。其它的部分在主线程中处理。
主线程:是一个无线循环,epoll_wait 函数相当于把客户端的连接从epoll中拿出来(因为我们监听的是EPOLLIN | EPOLLET)说明这个时候客户端有数据发送过来)。再通过recv_buffer_from_fd 函数把客户端发送过来的数据读出来。然后其他的一切就抛给线程池去处理。
线程池:(代码中我会在池里面创建15个线程) 双向链表。加入线程就是在链表后面加一个链表项,链表的前面会一个一个被拿出来处理。主要是malloc 函数free函数,sem_wait函数sem_post的处理(sem_wait 会阻塞当值大于0是会减一,sem_post是值加一)。typedef void* (FUNC)(void arg, int index)是我们自定义的线程的逻辑处理部分,arg是参数,index是第几个线程处理(我们隐形的给每个线程都标了号),例如代码中的respons_stb_info,更加具体可以看看代码里面是怎么实现的。聪明的你也可以改掉这块的内容改成动态线程池,当某个时刻的处理比较多的时候能够动态的增加线程,而不像我代码里面的是固定的。
数据库连接池:按照我的需求在处理客户端请求数据的时候是要访问数据库的。就是一下子创建出一堆的数据连接。要访问数据库的时候先去数据库连接池中找出空闲的连接,具体可以看下代码。使用的时候可以参考下database_process.c文件(代码中数据库连接池和线程池中的个数是一样的)。这里我想说下get_db_connect_from_pool这个函数,我用了随机数,我是为了不想每次都从0开始去判断哪个连接没有用到。为了数据库连接池中的每个链接都能等概率的使用到,具体的还是可以看下代码的实现。
欢迎分享,转载请注明来源:夏雨云
评论列表(0条)