easychat
初识socket
socket 是一种通信机制,它提供了不同主机上的进程(程序)之间进行双向通信的端点。
无论是客户端还是服务器端,都首先需要创建一个 socket 对象来开启通信的基础。
例如在 Java 中,使用 Socket 类(客户端创建 socket 时常用)和 ServerSocket 类(服务器端创建 socket 时常用)来创建相应的 socket 对象
Socket客户端就像是我们,而ServerSocket服务端就像是电话商(移动,联通),我们打电话会先把消息发给电话商,然后电话商转发给目标
以下实现一个服务端接收客户端消息
Socket服务端:
1 | |
accept() 方法:
它是 ServerSocket 类的一个方法,其返回类型是 Socket。当服务器端调用这个方法时,就如同在门口安排了一个 “接待员”,一直守在那里等待客户端这个 “客人” 上门,并且在客户端到来时,会为这个客户端专门开启一条和服务器端通信的 “通道”,这个 “通道” 就以 Socket 对象的形式体现出来。返回的这个 Socket 对象和普通客户端创建的 Socket 对象类似,都可以通过获取其输入流(InputStream )和输出流(OutputStream )来进行数据的接收和发送,只是它代表的是服务器端与当前连接的客户端之间的通信连接。
值得注意的是,accept() 方法是阻塞式的,即在没有客户端连接过来之前,服务器端执行到这一步的线程就会一直停在这里等待,不会继续执行后续代码,直到有客户端连接成功,它才会获取到对应的 Socket 对象,然后继续往下执行代码,允许服务器端与刚连接上的客户端开展通信操作。
Socket客户端
1 | |
PrintWriter 是 Java 中一个方便用于文本输出的类,它提供了很多便捷的方法(比如 println() 、print() 等)来输出文本内容
OutputStream 对象 os 作为参数传递给 PrintWriter 的构造函数。这样做的目的是将基于字节的原始输出流 os 包装成一个可以更方便地进行文本输出的 PrintWriter 形式
后续就可以使用 pw 通过文本形式向网络连接的另一端发送消息了,例如可以直接使用 pw.println(“这是要发送的文本消息”);
这种方式来发送文本内容,而不需要手动去处理字节层面的转换等操作(它内部会帮我们把文本转换为对应的字节形式发送出去
登录注册
验证码
- 导入验证码依赖
1
2
3
4
5
6<!--验证码-->
<dependency>
<groupId>com.github.whvcse</groupId>
<artifactId>easy-captcha</artifactId>
<version>${captcha.verion}</version>
</dependency> - 验证码的使用最终结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/**
* 验证码
*/
@RequestMapping(value = "/checkCode")
public ResponseVO checkCode() {
// 验证码的边框高度和宽度
ArithmeticCaptcha captcha = new ArithmeticCaptcha(100, 42);
//验证码的答案
String code = captcha.text();
//设置验证码id,为了登录或注册时在redis中查找对应的验证码
String checkCodeKey = UUID.randomUUID().toString();
//将验证码缓存到redis中
redisUtils.setex(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey, code, 60 * 10);
//获取验证码
String checkCodeBase64 = captcha.toBase64();
Map<String, String> result = new HashMap<>();
//将验证码和验证码的id返回前端,返回验证码id是为了登录或注册时前端再将验证码id传回来
result.put("checkCode", checkCodeBase64);
result.put("checkCodeKey", checkCodeKey);
return getSuccessResponseVO(result);
}
注册
这里的参数我认为可以创建一个实体类用requestboby接收看着更舒服1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18@RequestMapping(value = "/register")
public ResponseVO register(@NotEmpty String checkCodeKey,
@NotEmpty @Email String email,
@NotEmpty String password,
@NotEmpty String nickName,
@NotEmpty String checkCode) {
try {
//这里的checkCode其实就是验证码答案与缓存进行对比
if (!checkCode.equalsIgnoreCase((String) redisUtils.get(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey))) {
throw new BusinessException("图片验证码不正确");
}
userInfoService.register(email, nickName, password);
return getSuccessResponseVO(null);
} finally {
//校验一次就删除因为成功会登录失败会刷新
redisUtils.delete(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey);
}
}equalsIgnoreCase() :用于比较两个字符串在忽略大小写的情况下是否相等。由String提供
登录
- controller层的登录依旧中规中矩,没什么东西 简单看一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15@RequestMapping(value = "/login")
public ResponseVO login(@NotEmpty String checkCodeKey,
@NotEmpty @Email String email,
@NotEmpty String password,
@NotEmpty String checkCode) {
try {
if (!checkCode.equalsIgnoreCase((String) redisUtils.get(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey))) {
throw new BusinessException("图片验证码不正确");
}
UserInfoVO userInfoVO = userInfoService.login(email, password);
return getSuccessResponseVO(userInfoVO);
} finally {
redisUtils.delete(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey);
}
} - service层代码就是业务逻辑:
首先校验账号密码,然后会把账号密码放进 tokenUserInfoDto对象中,顺便判断是不是管理员
接着判断账号是否存在(根据redis中是是否存有缓存,)接着将登录信息保存到redis,并返回vo这里有些不理解,我以为会在登录成功时就会在redis中存入心跳,但是并没有。
RedisComponet类中将登录信息保存到redis中的saveTokenUserInfoDto 方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45@Override
public UserInfoVO login(String email, String password) {
UserInfo userInfo = this.userInfoMapper.selectByEmail(email);
if (null == userInfo || !userInfo.getPassword().equals(password)) {
throw new BusinessException("账号或者密码错误");
}
if (UserStatusEnum.DISABLE.getStatus().equals(userInfo.getStatus())) {
throw new BusinessException("账号已禁用");
}
//设置TokenUserInfoDto实体类对象(包含 userID,email token, admin 四个信息)
TokenUserInfoDto tokenUserInfoDto = getTokenUserInfoDto(userInfo);
//这是判断账号是否已经登陆,账号登录会在redis中存入一个 字符串+id组成的key(心跳) 如果存在就说明已经登陆
Long lastHeartBeat = redisComponet.getUserHeartBeat(tokenUserInfoDto.getUserId());
if (lastHeartBeat != null) {
throw new BusinessException("此账号已经在别处登录,请退出后再登录");
}
// 以前所用的登录校验是JWT令牌,JWT令牌的token所包含的是由id+时间戳组成的一段加密字符串
// 这里的校验token 是由id+随机生成的20位字符串加密成md5形式的字符串
String token = StringTools.encodeByMD5(tokenUserInfoDto.getUserId() + StringTools.getRandomString(Constants.LENGTH_20));
//保存登录信息到redis中
tokenUserInfoDto.setToken(token);
redisComponet.saveTokenUserInfoDto(tokenUserInfoDto);
// 返回VO对象
UserInfoVO userInfoVO = CopyTools.copy(userInfo, UserInfoVO.class);
userInfoVO.setToken(tokenUserInfoDto.getToken());
userInfoVO.setAdmin(tokenUserInfoDto.getAdmin());
return userInfoVO;
}
private TokenUserInfoDto getTokenUserInfoDto(UserInfo userInfo) {
TokenUserInfoDto tokenUserInfoDto = new TokenUserInfoDto();
tokenUserInfoDto.setUserId(userInfo.getUserId());
tokenUserInfoDto.setNickName(userInfo.getNickName());
//查询是否是管理员
String adminEmails = appConfig.getAdminEmails();
if (!StringTools.isEmpty(adminEmails) && ArrayUtils.contains(adminEmails.split(","), userInfo.getEmail())) {
tokenUserInfoDto.setAdmin(true);
} else {
tokenUserInfoDto.setAdmin(false);
}
return tokenUserInfoDto;
}
这里的实现逻辑是登录信息分成两份,第一份是由token获取 tokenUserInfoDto 对象
第二份是由Id获取token; emmm 不明白这里为什么这么搞,有点麻烦1
2
3
4
5
public void saveTokenUserInfoDto(TokenUserInfoDto tokenUserInfoDto) {
redisUtils.setex(Constants.REDIS_KEY_WS_TOKEN + tokenUserInfoDto.getToken(), tokenUserInfoDto, Constants.REDIS_KEY_EXPIRES_DAY * 2);
redisUtils.setex(Constants.REDIS_KEY_WS_TOKEN_USERID + tokenUserInfoDto.getUserId(), tokenUserInfoDto.getToken(), Constants.REDIS_KEY_EXPIRES_DAY * 2);
}
联系人
搜索联系人
这一部分更多的是业务逻辑的处理,并没有新的东西,简单介绍一些业务逻辑的处理的流程图
1 | |
接收并处理申请
这里是有人加你为好友,你可以 三种操作 同意 拒绝(还可以继续加好友) 拉黑(不可以加好友)
这里并没有很高深的知识点,更多的是对业务的理解,

1 | |
整合netty
在本项目中 只需要服务端即可,因为客户端相当于前端用户,消息就是用户所发来的http请求.
在这个服务端中,我们需要让服务端继承Runnable,这是因为我们需要将netty整合到sringboot项目中,所以需要单独起一个线程去跑netty的服务端
由于使http请求,所以在netty中我们添加几个专门用的http的handler
1 | |
自定义的Handler
Handler中常用的方法和触发时机
| 方法 | 被 Netty 在什么时候自动触发? |
|---|---|
channelActive() | 当连接建立成功时(服务端accept或客户端connect) |
channelInactive() | 当连接断开时 |
channelRead() / channelRead0() | 当收到数据时 |
exceptionCaught() | 管道中发生异常时 |
userEventTriggered() | 管道中有 Handler 调用了 ctx.fireUserEventTriggered() 时 |
handlerAdded() | handler 加入 pipeline 时 |
handlerRemoved() | handler 被移除时 |
🎯 在 WebSocket 握手完成时,提取用户身份 Token,完成鉴权 + 用户绑定操作。
- 流程解析(逐句讲解)
- 这是 Netty 提供的用户事件监听方法,当发生特殊事件时(如握手成功、心跳超时等)会触发。所以我们可以重新该方法,实现自己的认证逻辑
1
2@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) - 判断是不是 WebSocket 协议升级成功的事件(101 Switching Protocols 完成后),如果是,就做后续处理。
1
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) - 从握手请求的 URL 中提取参数 token
1
2
3
4WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt; String url = complete.requestUri();
String token = getToken(url);
//例如 ws://localhost:8080/ws?token=abcdefg - 获取token判断是否在redis里
- 有,说明登录了,继续下一步
- 没有,则直接关闭本次连接
- 🚀 如果认证成功,就把这个用户 ID 和当前 WebSocket 连接的 Channel 建立映射,加入连接上下文中
1
channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel()); - 整体流程图(客户端连接 → 鉴权 → 通信)
1
2
3
4
5
6
7
8
9
10
11
12
13客户端连接 ws://localhost:8080/ws?token=xxx
↓
Netty 握手成功(HTTP 101)
↓
触发 HandshakeComplete 事件
↓
userEventTriggered() 方法执行:
- 获取 token
- 查询 Redis 验证身份
- 成功则绑定连接
- 失败则关闭连接
↓
之后开始 WebSocket 双向通信
- 这是 Netty 提供的用户事件监听方法,当发生特殊事件时(如握手成功、心跳超时等)会触发。所以我们可以重新该方法,实现自己的认证逻辑
完整代码展示:
1 | |
- 先来看自定义的getToken方法,用来解析url中的token
1
2
3
4
5
6
7
8
9
10
11
12
13
14private String getToken(String url) {
if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {
return null;
}
String[] queryParams = url.split("\\?");
if (queryParams.length < 2) {
return url;
}
String[] params = queryParams[1].split("=");
if (params.length != 2) {
return url;
}
return params[1];
}自定义的channelContextUtils.addContext 方法
OK,来看最关键的一步 channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel())
这行代码将id, channel作为参数传过去了,来看 addContext方法,这里将addContext方法拆分成三部分来讲解比较方便
- 通过AttributeKey,给channel绑定参数,参数名为这个channel的id,值为userId,
1
2
3
4
5
6
7
8
9
10
11public void addContext(String userId, Channel channel) {
try {
String channelId = channel.id().toString();
AttributeKey attributeKey = null;
if (!AttributeKey.exists(channelId)) {
attributeKey = AttributeKey.newInstance(channel.id().toString());
} else {
attributeKey = AttributeKey.valueOf(channel.id().toString());
}
// 查找名为 channel的id 的attributeKey, 并设置值
channel.attr(attributeKey).set(userId); - 通过Id查询用户的所有联系人和群聊,并更新最后登录时间,如果用户长时间未登录,在未登录期间所受到的消息不会全部显示,只会显示最近三天的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30//获取联系人和群聊的集合
List<String> contactList = redisComponet.getUserContactList(userId);
// 遍历集合 找出所有群聊
for (String groupId : contactList) {
// 如果集合中数据的开头和 所定义的枚举中的群组相同 说明这是个群聊
if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) {
//调用 利用自定义方法 为了群发消息方便
add2Group(groupId, channel);
}
}
// 自定义的常量Map,这里将不同用户的channel都存起来,Key为用户的Id,因为会有多个用户
USER_CONTEXT_MAP.put(userId, channel);
// 更新用户心跳
redisComponet.saveUserHeartBeat(userId);
//更新用户最后连接时间
UserInfo updateInfo = new UserInfo();
updateInfo.setLastLoginTime(new Date());
userInfoMapper.updateByUserId(updateInfo, userId);
//给用户发送一些消息
//获取用户最后离线时间
UserInfo userInfo = userInfoMapper.selectByUserId(userId);
Long sourceLastOffTime = userInfo.getLastOffTime();
//这里避免毫秒时间差,所以减去1秒的时间
//如果时间太久,只取最近三天的消息数
Long lastOffTime = sourceLastOffTime;
if (sourceLastOffTime != null && System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO > sourceLastOffTime) {
lastOffTime = System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO;
} - 上面提到了给用户发送未读的消息,长时间未登录,发送近三天的未读消息,这里会具体实现,会话就是下图如是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46/**
* 1、查询会话信息 查询用户所有会话,避免换设备会话不同步
*/
ChatSessionUserQuery sessionUserQuery = new ChatSessionUserQuery();
sessionUserQuery.setUserId(userId);
sessionUserQuery.setOrderBy("last_receive_time desc");
List<ChatSessionUser> chatSessionList = chatSessionUserMapper.selectList(sessionUserQuery);
WsInitData wsInitData = new WsInitData();
wsInitData.setChatSessionList(chatSessionList);
/**
* 2、查询聊天消息,首先是查找所有群组的聊天信息,这里是根据数据库一张聊天信息表中接收人ID查询的,为什么单独查询群组,而不是直接根据用户ID查,因为这里的逻辑是 数据库聊天信息表中如果是群聊的话,接收人ID存的群聊的ID,所以需要先查询用户添加的群聊集合最后加上自己,查找信息
*
*/
//查询用户的联系人
UserContactQuery contactQuery = new UserContactQuery();
contactQuery.setContactType(UserContactTypeEnum.GROUP.getType());
contactQuery.setUserId(userId);
List<UserContact> groupContactList = userContactMapper.selectList(contactQuery);
List<String> groupIdList = groupContactList.stream().map(item -> item.getContactId()).collect(Collectors.toList());
//将自己也加进去
groupIdList.add(userId);
//这里就是查询聊天信息了,最后存到 wsInitData里
ChatMessageQuery messageQuery = new ChatMessageQuery();
messageQuery.setContactIdList(groupIdList);
messageQuery.setLastReceiveTime(lastOffTime);
List<ChatMessage> chatMessageList = chatMessageMapper.selectList(messageQuery);
wsInitData.setChatMessageList(chatMessageList);
/**
* 3、查询好友申请
*/
UserContactApplyQuery applyQuery = new UserContactApplyQuery();
applyQuery.setReceiveUserId(userId);
applyQuery.setLastApplyTimestamp(sourceLastOffTime);
applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus());
Integer applyCount = userContactApplyMapper.selectCount(applyQuery);
wsInitData.setApplyCount(applyCount);
//发送消息
MessageSendDto messageSendDto = new MessageSendDto();
messageSendDto.setMessageType(MessageTypeEnum.INIT.getType());
messageSendDto.setContactId(userId);
messageSendDto.setExtendData(wsInitData);
//这里的sendMsg是自定义的方法,发送消息
sendMsg(messageSendDto, userId); - 自定义的发消息方法
需要一个接收方的ID,很好理解,这个方法只能用在同一个服务器的用户发送接收消息,如果在不同服务器上,需要用到后面讲的redisson的 RTopic发布订阅模式1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private static void sendMsg(MessageSendDto messageSendDto, String reciveId) {
if (reciveId == null) {
return;
}
//根据id,获取接收方的通道
Channel sendChannel = USER_CONTEXT_MAP.get(reciveId);
if (sendChannel == null) {
return;
}
//相当于客户而言,联系人就是发送人,所以这里转换一下再发送,好友打招呼信息发送给自己需要特殊处理
// 设置消息dto
if (MessageTypeEnum.ADD_FRIEND_SELF.getType().equals(messageSendDto.getMessageType())) {
UserInfo userInfo = (UserInfo) messageSendDto.getExtendData();
messageSendDto.setMessageType(MessageTypeEnum.ADD_FRIEND.getType());
messageSendDto.setContactId(userInfo.getUserId());
messageSendDto.setContactName(userInfo.getNickName());
messageSendDto.setExtendData(null);
} else {
messageSendDto.setContactId(messageSendDto.getSendUserId());
messageSendDto.setContactName(messageSendDto.getSendUserNickName());
}
// 将消息转换成josn格式发送
sendChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto)));
}到这里一个完整 channelContextUtils.addContext 方法就结束了,最后展示一下完整的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85public void addContext(String userId, Channel channel) {
try {
String channelId = channel.id().toString();
AttributeKey attributeKey = null;
if (!AttributeKey.exists(channelId)) {
attributeKey = AttributeKey.newInstance(channel.id().toString());
} else {
attributeKey = AttributeKey.valueOf(channel.id().toString());
}
channel.attr(attributeKey).set(userId);
List<String> contactList = redisComponet.getUserContactList(userId);
for (String groupId : contactList) {
if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) {
add2Group(groupId, channel);
}
}
USER_CONTEXT_MAP.put(userId, channel);
redisComponet.saveUserHeartBeat(userId);
//更新用户最后连接时间
UserInfo updateInfo = new UserInfo();
updateInfo.setLastLoginTime(new Date());
userInfoMapper.updateByUserId(updateInfo, userId);
//给用户发送一些消息
//获取用户最后离线时间
UserInfo userInfo = userInfoMapper.selectByUserId(userId);
Long sourceLastOffTime = userInfo.getLastOffTime();
//这里避免毫秒时间差,所以减去1秒的时间
//如果时间太久,只取最近三天的消息数
Long lastOffTime = sourceLastOffTime;
if (sourceLastOffTime != null && System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO > sourceLastOffTime) {
lastOffTime = System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO;
}
/**
* 1、查询会话信息 查询用户所有会话,避免换设备会话不同步
*/
ChatSessionUserQuery sessionUserQuery = new ChatSessionUserQuery();
sessionUserQuery.setUserId(userId);
sessionUserQuery.setOrderBy("last_receive_time desc");
List<ChatSessionUser> chatSessionList = chatSessionUserMapper.selectList(sessionUserQuery);
WsInitData wsInitData = new WsInitData();
wsInitData.setChatSessionList(chatSessionList);
/**
* 2、查询聊天消息
*/
//查询用户的联系人
UserContactQuery contactQuery = new UserContactQuery();
contactQuery.setContactType(UserContactTypeEnum.GROUP.getType());
contactQuery.setUserId(userId);
List<UserContact> groupContactList = userContactMapper.selectList(contactQuery);
List<String> groupIdList = groupContactList.stream().map(item -> item.getContactId()).collect(Collectors.toList());
//将自己也加进去
groupIdList.add(userId);
ChatMessageQuery messageQuery = new ChatMessageQuery();
messageQuery.setContactIdList(groupIdList);
messageQuery.setLastReceiveTime(lastOffTime);
List<ChatMessage> chatMessageList = chatMessageMapper.selectList(messageQuery);
wsInitData.setChatMessageList(chatMessageList);
/**
* 3、查询好友申请
*/
UserContactApplyQuery applyQuery = new UserContactApplyQuery();
applyQuery.setReceiveUserId(userId);
applyQuery.setLastApplyTimestamp(sourceLastOffTime);
applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus());
Integer applyCount = userContactApplyMapper.selectCount(applyQuery);
wsInitData.setApplyCount(applyCount);
//发送消息
MessageSendDto messageSendDto = new MessageSendDto();
messageSendDto.setMessageType(MessageTypeEnum.INIT.getType());
messageSendDto.setContactId(userId);
messageSendDto.setExtendData(wsInitData);
sendMsg(messageSendDto, userId);
} catch (Exception e) {
logger.error("初始化链接失败", e);
}
}
集群发送消息
为什么要使用集群发消息,因为实际中,会有多个服务器,如果发送者和接收者不在同一台服务器,则会发送消息失败。所以需要采用集群的方式,利用redisson的订阅模式,可以跨服务器发送消息。
RTpoic
RTpoic 是 Redisson 提供的用于实现发布-订阅(Pub/Sub)模式的类,它封装了 Redis 的发布和订阅功能,让开发者能够轻松地在分布式环境中进行消息的发布和订阅。
实际上类似于Kafka、RocketMQ等一系列MessageQueue的生产-消费关系。
自产自销:简单的说,可以实现一个服务中,自己发布消息,自己订阅消息进行消息的分发处理
- 简单实用
- 配置redisson
使用之前需要在配置类中进行配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20@Value("${spring.redis.host:}")
private String redisHost;
@Value("${spring.redis.port:}")
private Integer redisPort;
@Bean(name = "redissonClient", destroyMethod = "shutdown")
public RedissonClient redissonClient() {
try {
// 创建配置 指定redis地址及节点信息
Config config = new Config();
config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort);
// 根据config创建出RedissonClient实例
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
} catch (RedisConnectionException e) {
logger.error("redis配置错误,请检查redis配置");
}
return null;
}
- 发送接收消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// 该注解标识在springboot初始化后立即执行该方法
@PostConstruct
public void lisMessage() {
// 获取 名为MESSAGE_TOPIC的RTopic类型的对象
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
// 添加监听器,接收类型为MessageSendDto.class
rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) -> {
logger.info("收到广播消息:{}", JSON.toJSONString(sendDto));
// 通过redis发送消息在所有服务器上
channelContextUtils.sendMessage(sendDto);
});
}
// 将消息发送到redisson
public void sendMessage(MessageSendDto sendDto) {
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
rTopic.publish(sendDto);
}群发消息
在这个项目发现一个很有意思的点,在groupserviceimpl的类中savegroup方法中有这样一段代码,这是一段创建群聊,然后将群聊添加成自己的联系人,最后将自己的ws通道添加到ws的群通道中这里一开始我是疑惑的,在这个savegroup的方法应该是通过前端http请求,springmvc将请求映射到这个方法上的,而netty会将http请求升级为ws,那么这里是怎么拿到channel的,这就要我们往下看 channelContextUtils.addUser2Group方法
1
2
3
4
5
6
7
8
9
10
11
12
13//创建群主会话
ChatSessionUser chatSessionUser = new ChatSessionUser();
chatSessionUser.setUserId(groupInfo.getGroupOwnerId());
chatSessionUser.setContactId(groupInfo.getGroupId());
chatSessionUser.setContactName(groupInfo.getGroupName());
chatSessionUser.setSessionId(sessionId);
this.chatSessionUserService.add(chatSessionUser);
//添加为联系人
redisComponet.addUserContact(groupInfo.getGroupOwnerId(), groupInfo.getGroupId());
// 将自己的ws通道添加到一个ws的群通道作为群聊
channelContextUtils.addUser2Group(groupInfo.getGroupOwnerId(), groupInfo.getGroupId());
- channelContextUtils.addUser2Group
看到这里其实已经明白了,通过addUser2Group方法,会获取到在channelContextUtils的静态MAP下的channel1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public void addUser2Group(String userId, String groupId) {
// 查找userId的channel,USER_CONTEXT_MAP是自定义的静态map
Channel channel = USER_CONTEXT_MAP.get(userId);
//调用自定义方法,将自己的channel加入channelgroup中
add2Group(groupId, channel);
}
private void add2Group(String groupId, Channel context) {
ChannelGroup group = GROUP_CONTEXT_MAP.get(groupId);
if (group == null) {
group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
GROUP_CONTEXT_MAP.put(groupId, group);
}
if (context == null) {
return;
}
group.add(context);
}
聊天执行流程
登录后会连接netty
通过http接口获取到发送的消息的内容
- 这一步会将消息存数据库一份
- 接着会将消息 发给消息队列
1
2
3
4
5
6
7
8
9@RequestMapping("/sendMessage")
@GlobalInterceptor
public ResponseVO sendMessage(HttpServletRequest request,
@NotEmpty String contactId,
@NotEmpty @Max(500) String messageContent,
@NotNull Integer messageType,
Long fileSize,
String fileName,
Integer fileType) {......}
- redis的rTopic 接收消息
- rTopic 收到消息后 会触发 channelContextUtils.sendMessage(sendDto)发送消息;
- Redis 的 Pub/Sub(发布订阅)机制,它是“即时广播”机制,不会持久保存消息,也不会堆积在 Redis 里。
1
2
3
4
5
6
7
8
9
10
11
12
13@PostConstruct
public void lisMessage() {
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) -> {
logger.info("收到广播消息:{}", JSON.toJSONString(sendDto));
channelContextUtils.sendMessage(sendDto);
});
}
public void sendMessage(MessageSendDto sendDto) {
RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
rTopic.publish(sendDto);
}
- 发送消息
1
2
3
4
5Channel sendChannel = USER_CONTEXT_MAP.get(reciveId);
if (sendChannel == null) {
return;
}
sendChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto))); - 前端渲染消息
- 前端 WebSocket 实例监听到第4步的消息,会展示到聊天界面上







