为自己搭建一个分布式 IM(即时通讯) 系统

为自己搭建一个分布式 IM(即时通讯) 系统,第1张

CIM(CROSS-IM) 一款面向开发者的 IM(即时通讯)系统;同时提供了一些组件帮助开发者构建一款属于自己可水平扩展的 IM 。

借助 CIM 你可以实现以下需求:

下面来看看具体的架构设计。

整体主要由以下模块组成:

cim-server

IM 服务端;用于接收 client 连接、消息透传、消息推送等功能。

支持集群部署。

cim-forward-route

消息路由服务器;用于处理消息路由、消息转发、用户登录、用户下线以及一些运营工具(获取在线用户数等)。

cim-client

IM 客户端;给用户使用的消息终端,一个命令即可启动并向其他人发起通讯(群聊、私聊);同时内置了一些常用命令方便使用。

整体的流程也比较简单,流程图如下:

所以当我们自己部署时需要以下步骤:

接下来重点看看具体的实现,比如群聊、私聊消息如何流转;IM 服务端负载均衡;服务如何注册发现等等。

IM 服务端

先来看看服务端;主要是实现客户端上下线、消息下发等功能。

首先是服务启动:

由于是在 SpringBoot 中搭建的,所以在应用启动时需要启动 Netty 服务。

从 pipline 中可以看出使用了 Protobuf 的编解码(具体报文在客户端中分析)。

注册发现

需要满足 IM 服务端的水平扩展需求,所以 cim-server 是需要将自身数据发布到注册中心的。

所以在应用启动成功后需要将自身数据注册到 Zookeeper 中。

最主要的目的就是将当前应用的 ip + cim-server-port+ http-port 注册上去。

上图是我在演示环境中注册的两个 cim-server 实例(由于在一台服务器,所以只是端口不同)。

这样在客户端(监听这个 Zookeeper 节点)就能实时的知道目前可用的服务信息。

登录

当客户端请求 cim-forward-route 中的登录接口(详见下文)做完业务验证(就相当于日常登录其他网站一样)之后,客户端会向服务端发起一个长连接,如之前的流程所示:

这时客户端会发送一个特殊报文,表明当前是登录信息。

服务端收到后就需要将该客户端的 userID 和当前 Channel 通道关系保存起来。

同时也缓存了用户的信息,也就是 userID 和 用户名。

离线

当客户端断线后也需要将刚才缓存的信息清除掉。

同时也需要调用 route 接口清除相关信息(具体接口看下文)。

IM 路由

从架构图中可以看出,路由层是非常重要的一环;它提供了一系列的 HTTP 服务承接了客户端和服务端。

目前主要是以下几个接口。

注册接口

由于每一个客户端都是需要登录才能使用的,所以第一步自然是注册。

这里就设计的比较简单,直接利用 Redis 来存储用户信息;用户信息也只有 ID 和 userName 而已。

只是为了方便查询在 Redis 中的 KV 又反过来存储了一份 VK,这样 ID 和 userName 都必须唯一。

登录接口

这里的登录和 cim-server 中的登录不一样,具有业务性质,

为了实现只能一个用户登录,使用了 Redis 中的 set 来保存登录信息;利用 userID 作为 key ,重复的登录就会写入失败。

获取一台可用的路由实例也比较简单:

当然要获取 Zookeeper 中的服务实例前自然是需要监听 cim-server 之前注册上去的那个节点。

具体代码如下:

也是在应用启动之后监听 Zookeeper 中的路由节点,一旦发生变化就会更新内部缓存。

群聊接口

这是一个真正发消息的接口,实现的效果就是其中一个客户端发消息,其余所有客户端都能收到!

流程肯定是客户端发送一条消息到服务端,服务端收到后在上文介绍的 SessionSocketHolder 中遍历所有 Channel(通道)然后下发消息即可。

服务端是单机倒也可以,但现在是集群设计。所以所有的客户端会根据之前的轮询算法分配到不同的 cim-server 实例中。

因此就需要路由层来发挥作用了。

路由接口收到消息后首先遍历出所有的客户端和服务实例的关系。

路由关系在 Redis 中的存放如下:

由于 Redis 单线程的特质,当数据量大时;一旦使用 keys 匹配所有 cim-route:* 数据,会导致 Redis 不能处理其他请求。

所以这里改为使用 scan 命令来遍历所有的 cim-route:*。

接着会挨个调用每个客户端所在的服务端的 HTTP 接口用于推送消息。

在 cim-server 中的实现如下:

cim-server 收到消息后会在内部缓存中查询该 userID 的通道,接着只需要发消息即可。

在线用户接口

这是一个辅助接口,可以查询出当前在线用户信息。

实现也很简单,也就是查询之前保存 ”用户登录状态的那个去重 set “即可。

私聊接口

之所以说获取在线用户是一个辅助接口,其实就是用于辅助私聊使用的。

一般我们使用私聊的前提肯定得知道当前哪些用户在线,接着你才会知道你要和谁进行私聊。

类似于这样:

在我们这个场景中,私聊的前提就是需要获得在线用户的 userID。

所以私聊接口在收到消息后需要查询到接收者所在的 cim-server 实例信息,后续的步骤就和群聊一致了。调用接收者所在实例的 HTTP 接口下发信息。

只是群聊是遍历所有的在线用户,私聊只发送一个的区别。

下线接口

一旦客户端下线,我们就需要将之前存放在 Redis 中的一些信息删除掉(路由信息、登录状态)。

IM 客户端

客户端中的一些逻辑其实在上文已经谈到一些了。

登录

第一步也就是登录,需要在启动时调用 route 的登录接口,获得 cim-server 信息再创建连接。

登录过程中 route 接口会判断是否为重复登录,重复登录则会直接退出程序。

接下来是利用 route 接口返回的 cim-server 实例信息(ip+port)创建连接。

最后一步就是发送一个登录标志的信息到服务端,让它保持客户端和 Channel 的关系。

自定义协议

上文提到的一些登录报文、真正的消息报文这些其实都是在我们自定义协议中可以区别出来的。

由于是使用 Google Protocol Buffer 编解码,所以先看看原始格式。

其实这个协议中目前一共就三个字段:

目前主要是三种类型,分别对应不同的业务:

心跳

为了保持客户端和服务端的连接,每隔一段时间没有发送消息都需要自动的发送心跳。

目前的策略是每隔一分钟就是发送一个心跳包到服务端:

这样服务端每隔一分钟没有收到业务消息时就会收到 ping 的心跳包:

内置命令

客户端也内置了一些基本命令来方便使用。

比如输入 :q 就会退出客户端,同时会关闭一些系统资源。

当输入 :olu(onlineUser 的简写)就会去调用 route 的获取所有在线用户接口。

群聊

群聊的使用非常简单,只需要在控制台输入消息回车即可。

这时会去调用 route 的群聊接口。

私聊

私聊也是同理,但前提是需要触发关键字;使用 userId消息内容 这样的格式才会给某个用户发送消息,所以一般都需要先使用 :olu 命令获取所以在线用户才方便使用。

消息回调

为了满足一些定制需求,比如消息需要保存之类的。

所以在客户端收到消息之后会回调一个接口,在这个接口中可以自定义实现。

因此先创建了一个 caller 的 bean,这个 bean 中包含了一个 CustomMsgHandleListener 接口,需要自行处理只需要实现此接口即可。

自定义界面

由于我自己不怎么会写界面,但保不准有其他大牛会写。所以客户端中的群聊、私聊、获取在线用户、消息回调等业务(以及之后的业务)都是以接口形式提供。

也方便后面做页面集成,只需要调这些接口就行了;具体实现不用怎么关心。

cim 目前只是第一版,BUG 多,功能少(只拉了几个群友做了测试);不过后续还会接着完善,至少这一版会给那些没有相关经验的朋友带来一些思路。

欢迎工作一到五年的Java工程师朋友们加入Java程序员开发: 721575865

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

rocket.chat是一个开源的社交软件,即可以直接在web页面使用,也可以下载APP(Android,IOS,Windows,Mac OS)

主要功能:群组聊天,直接通信,私聊群,桌面通知,媒体嵌入,链接预览,文件上传,语音/视频 聊天,截图等,还支持实时翻译,实现用户之间的自动实时消息转换。

也可以作为公司的内部聊天平台,所有数据都在自己的服务器上。

官方网址:https://rocket.chat/

官方github地址:https://github.com/RocketChat/Rocket.Chat

安装方式有好几种方式,这里采取docker-compose容器安装方式,快速几分钟即可搭建完成。前提已安装好docker和docker-compose。

参考官方文档:https://docs.rocket.chat/

以下是获取到的官方docker-compose.yml,默认端口3000,使用mongo数据库,根据自己需求更改。

我这里不需要更改什么,直接使用即可。

下载完成后,直接运行即可

注意,开放3000端口 or 关闭防火墙,如果你是公有云服务器,记得修改你的安全组!

浏览器输入IP:3000,即可访问。

第一次登录,需要创建管理员相关信息,及组织公司相关信息(不重要),只有邮件地址有效即可。之后进入自己的邮箱确认链接验证即可。

创建完成后,就可以登录账号,也可以创建新的普通用户。当然相关设置只能第一个管理员账号才能设置。

登录进去,默认进入# general公共频道,可以自己创建频道和拉人。剩下的功能自己用管理员账号研究。

rocket.chat有官方APP,在相关应用商店或者直接下载安装即可。

但是app连接服务器可能出现问题,导致连接不上。如下:

SSL配置

问题1:安卓app必须需要SSL连接才可,即 https://

所以要么自己在服务器上采用自签证,要么用域名商的ssl,如Cloudflare配置域名自动免费签证。

为了简单,直接给自己IP配置域名,开启SSL即可。简单可自行设置即可。

Cloudflare配置完域名,记得开启‘始终使用 HTTPS’功能。

nginx反向代理

问题2:采用nginx反向代理后,app提示websocket已于此伺服器上禁止

采用nginx反向代理情况:

1:其他安装方式不能改3000端口的情况下(rocket.chat默认端口)。

2:docker服务被其他nginx的80端口占用的情况下,不能改80端口,用其他nginx反向代理给docker的3000端口。

3:或者为了服务器安全,采用其他服务器nginx反向代理给真实服务器。

如果直接配置如下:

app连接显示会提示:websocket已于此伺服器上禁止

原因是nginx需要开启websocket,加入这重要的两行配置即可。

更换后配置如下:

再次连接app成功登录。

HTTP协议的生命周期是通过Request和Response来界定的,而Response是被动的(服务端不能主动与客户端通信),收到 一次请求才会返回一次响应。而当服务端需要主动和客户端进行通信,或者需要建立全双工通信(保持在一个连接中)时,HTTP就力不从心了。

在Websocket出现之前,实现全双工通信的方式主要是ajax轮询和long poll,这样是非常消耗性能的。

WebSocket是HTML5 新增加的特性之一,目前主流浏览器大都提供了对其的支持。其特点是可以在客户端和服务端之间建立全双工通信,一些特殊场景,例如实时通信、在线游戏、多人协作等,WebSocket都可以作为解决方案。

Spring自4.0版本后增加了WebSocket支持,本例就使用Spring WebSocket构建一个简单实时聊天的应用。

Spring WebSocket提供了一个WebSocketHandler接口,这个接口提供了WebSocket连接建立后生命周期的处理方法。

WebSocketSession不同于HttpSession,每次断开连接(正常断开或发生异常断开)都会重新起一个WebSocketSession。

这个抽象类提供了一系列对WebSocketSession及传输消息的处理方法:

spring WebSocket提供了四种WebSocketMessage的实现:TextMessage(文本类消息)、BinaryMessage(二进制消息)、PingMessage、PongMessage(后两者用于心跳检测,在一端收到了Ping消息的时候,该端点必须发送Pong消息给对方,以检测该连接是否存在和有效)。

HandshakeInterceptor接口是WebSocket连接握手过程的拦截器,通过实现该接口可以对握手过程进行管理。值得注意的是,beforeHandshake中的attributes与WebSocketSession中通过getAttributes()返回的Map是同一个Map,我们可以在其中放入一些用户的特定信息。

通过实现WebSocketConfigurer接口,可以注册相应的WebSocket处理器、路径、允许域、SockJs支持。

url为指定的WebSocket注册路径,当协议为http时,使用ws://,当协议为https,使用wss://。

onmessage的event对象:

可以看出,应使用event.data获取服务端发送的消息。

有的浏览器不支持WebSocket,使用SockJs可以模拟WebSocket。

以下使用WebSocket构建一个实时聊天应用。

1.客户端与服务端通信只使用TextMessage(文本类消息),客户端只能发送聊天文本,服务端可以单播和广播消息,包括聊天文本、上线、下线、掉线、用户列表信息、认证信息和服务器时间。

2.以HttpSession来唯一区别用户,而不是WebSocketSession。

3.核心思路是当新的WebSocketSession建立时,将其加入一个集合,当该session失效时(close、error)将其从集合中删除,当服务端需要单播或广播消息时,以这个集合为根据。

新建Spring Boot项目,添加必要依赖。

(其实在WebSocket中已经没有了请求、响应之分,但习惯上将客户端发送的消息称为请求,服务端发送的消息称为响应)

从chrome的WS控制台,我们可以看到发送的信息

通过setAllowedOrigins(String... origins)方法可以限制访问,查看WebSocket Request Headers的Origin属性:

这种限制与限制跨域是类似的,不同的是端口号不在其限制范围内。可以通过setAllowedOrigins("*")的方式设置允许所有域。

使用WebSocket构建实时聊天

苦逼的IE同志说不出话来,只算到IE11可能不支持WebSocket,没想到他其实是不支持contenteditable="plaintext-only"(后来又发现火狐也不支持)。

WebSocket是一个长连接,需要心跳检测机制来判断服务端与客户端之间建立的WebSocket连接是否存在和有效。当服务端断开连接时,客户端会立马断开连接,并调用websocket.close,而当客户端出现中断网络连接的情况,服务端不会立马作出反应(Spring WebSocket不会),而是过一段时间(推测是几分钟)后才将这个断掉的WebSocketSession踢出。

使用WebSocket构建实时聊天


欢迎分享,转载请注明来源:夏雨云

原文地址:https://www.xiayuyun.com/zonghe/416487.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-24
下一篇2023-05-24

发表评论

登录后才能评论

评论列表(0条)

    保存