初识socket

socket 是一种通信机制,它提供了不同主机上的进程(程序)之间进行双向通信的端点。
无论是客户端还是服务器端,都首先需要创建一个 socket 对象来开启通信的基础。
例如在 Java 中,使用 Socket 类(客户端创建 socket 时常用)和 ServerSocket 类(服务器端创建 socket 时常用)来创建相应的 socket 对象
Socket客户端就像是我们,而ServerSocket服务端就像是电话商(移动,联通),我们打电话会先把消息发给电话商,然后电话商转发给目标
以下实现一个服务端接收客户端消息

Socket服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class SocketService {
public static void main(String[] args) {
ServerSocket server = null;
try {
//定义socke对象server,并监听1024端口是否有人连接
server = new ServerSocket(1024);
System.out.println("服务启动");
// 阻塞服务,等待客户端连接
Socket socket =server.accept();
String ip = socket.getInetAddress().getHostAddress();
System.out.println("客服端连接"+ip+"端口"+socket.getPort());
new Thread(new Runnable() {

@Override
public void run() {
while(true){
try {
//获取客户端发到服务端的数据流
InputStream is = socket.getInputStream();
// 将数据流由字节流转换成字符流
InputStreamReader isr = new InputStreamReader(is);
// 将字符流进一步包装为带有缓冲的字符流
BufferedReader br = new BufferedReader(isr);
String msg = br.readLine();
System.out.println("收到客户端消息->"+msg);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}).start();

}catch(IOException e) {
e.printStackTrace();
}

}
}

accept() 方法:
它是 ServerSocket 类的一个方法,其返回类型是 Socket。当服务器端调用这个方法时,就如同在门口安排了一个 “接待员”,一直守在那里等待客户端这个 “客人” 上门,并且在客户端到来时,会为这个客户端专门开启一条和服务器端通信的 “通道”,这个 “通道” 就以 Socket 对象的形式体现出来。返回的这个 Socket 对象和普通客户端创建的 Socket 对象类似,都可以通过获取其输入流(InputStream )和输出流(OutputStream )来进行数据的接收和发送,只是它代表的是服务器端与当前连接的客户端之间的通信连接。
值得注意的是,accept() 方法是阻塞式的,即在没有客户端连接过来之前,服务器端执行到这一步的线程就会一直停在这里等待,不会继续执行后续代码,直到有客户端连接成功,它才会获取到对应的 Socket 对象,然后继续往下执行代码,允许服务器端与刚连接上的客户端开展通信操作。

Socket客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SocketClient {
public static void main(String[] args) throws IOException {
Socket socket = null;
//创建socket对象,并连接本地的1024端口
socket = new Socket("127.0.0.1",1024);
//获取socket的输出流对象os,给os赋值就相当于把消息用输出流发送到服务端
OutputStream os = socket.getOutputStream();
//将os对象包装成PrintWriter类型的对象,因为PrintWriter更方便传输文本
PrintWriter pw = new PrintWriter(os);
System.out.println("请输入");
new Thread(new Runnable() {
public void run() {
while (true) {
Scanner sc = new Scanner(System.in);
String msg = sc.nextLine();
// 传输文本
pw.println(msg);
pw.flush();
}
}
}).start();
}
}

PrintWriter 是 Java 中一个方便用于文本输出的类,它提供了很多便捷的方法(比如 println() 、print() 等)来输出文本内容
OutputStream 对象 os 作为参数传递给 PrintWriter 的构造函数。这样做的目的是将基于字节的原始输出流 os 包装成一个可以更方便地进行文本输出的 PrintWriter 形式
后续就可以使用 pw 通过文本形式向网络连接的另一端发送消息了,例如可以直接使用 pw.println(“这是要发送的文本消息”);
这种方式来发送文本内容,而不需要手动去处理字节层面的转换等操作(它内部会帮我们把文本转换为对应的字节形式发送出去

最终效果如下
pAhBPGn.png
pAhBCPs.png

登录注册

验证码

  1. 导入验证码依赖
    1
    2
    3
    4
    5
    6
    <!--验证码-->
    <dependency>
    <groupId>com.github.whvcse</groupId>
    <artifactId>easy-captcha</artifactId>
    <version>${captcha.verion}</version>
    </dependency>
  2. 验证码的使用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    /**
    * 验证码
    */
    @RequestMapping(value = "/checkCode")
    public ResponseVO checkCode() {
    // 验证码的边框高度和宽度
    ArithmeticCaptcha captcha = new ArithmeticCaptcha(100, 42);
    //验证码的答案
    String code = captcha.text();
    //设置验证码id,为了登录或注册时在redis中查找对应的验证码
    String checkCodeKey = UUID.randomUUID().toString();
    //将验证码缓存到redis中
    redisUtils.setex(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey, code, 60 * 10);
    //获取验证码
    String checkCodeBase64 = captcha.toBase64();
    Map<String, String> result = new HashMap<>();
    //将验证码和验证码的id返回前端,返回验证码id是为了登录或注册时前端再将验证码id传回来
    result.put("checkCode", checkCodeBase64);
    result.put("checkCodeKey", checkCodeKey);
    return getSuccessResponseVO(result);
    }
    最终结果:
    pA4yfIO.png

    注册

    这里的参数我认为可以创建一个实体类用requestboby接收看着更舒服
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @RequestMapping(value = "/register")
    public ResponseVO register(@NotEmpty String checkCodeKey,
    @NotEmpty @Email String email,
    @NotEmpty String password,
    @NotEmpty String nickName,
    @NotEmpty String checkCode) {
    try {
    //这里的checkCode其实就是验证码答案与缓存进行对比
    if (!checkCode.equalsIgnoreCase((String) redisUtils.get(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey))) {
    throw new BusinessException("图片验证码不正确");
    }
    userInfoService.register(email, nickName, password);
    return getSuccessResponseVO(null);
    } finally {
    //校验一次就删除因为成功会登录失败会刷新
    redisUtils.delete(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey);
    }
    }

    equalsIgnoreCase() :用于比较两个字符串在忽略大小写的情况下是否相等。由String提供

登录

  1. controller层的登录依旧中规中矩,没什么东西 简单看一下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @RequestMapping(value = "/login")
    public ResponseVO login(@NotEmpty String checkCodeKey,
    @NotEmpty @Email String email,
    @NotEmpty String password,
    @NotEmpty String checkCode) {
    try {
    if (!checkCode.equalsIgnoreCase((String) redisUtils.get(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey))) {
    throw new BusinessException("图片验证码不正确");
    }
    UserInfoVO userInfoVO = userInfoService.login(email, password);
    return getSuccessResponseVO(userInfoVO);
    } finally {
    redisUtils.delete(Constants.REDIS_KEY_CHECK_CODE + checkCodeKey);
    }
    }
  2. service层代码就是业务逻辑:

    首先校验账号密码,然后会把账号密码放进 tokenUserInfoDto对象中,顺便判断是不是管理员
    接着判断账号是否存在(根据redis中是是否存有缓存,)接着将登录信息保存到redis,并返回vo

    这里有些不理解,我以为会在登录成功时就会在redis中存入心跳,但是并没有。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    @Override
    public UserInfoVO login(String email, String password) {
    UserInfo userInfo = this.userInfoMapper.selectByEmail(email);
    if (null == userInfo || !userInfo.getPassword().equals(password)) {
    throw new BusinessException("账号或者密码错误");
    }
    if (UserStatusEnum.DISABLE.getStatus().equals(userInfo.getStatus())) {
    throw new BusinessException("账号已禁用");
    }

    //设置TokenUserInfoDto实体类对象(包含 userID,email token, admin 四个信息)
    TokenUserInfoDto tokenUserInfoDto = getTokenUserInfoDto(userInfo);
    //这是判断账号是否已经登陆,账号登录会在redis中存入一个 字符串+id组成的key(心跳) 如果存在就说明已经登陆
    Long lastHeartBeat = redisComponet.getUserHeartBeat(tokenUserInfoDto.getUserId());
    if (lastHeartBeat != null) {
    throw new BusinessException("此账号已经在别处登录,请退出后再登录");
    }


    // 以前所用的登录校验是JWT令牌,JWT令牌的token所包含的是由id+时间戳组成的一段加密字符串
    // 这里的校验token 是由id+随机生成的20位字符串加密成md5形式的字符串
    String token = StringTools.encodeByMD5(tokenUserInfoDto.getUserId() + StringTools.getRandomString(Constants.LENGTH_20));
    //保存登录信息到redis中
    tokenUserInfoDto.setToken(token);
    redisComponet.saveTokenUserInfoDto(tokenUserInfoDto);
    // 返回VO对象
    UserInfoVO userInfoVO = CopyTools.copy(userInfo, UserInfoVO.class);
    userInfoVO.setToken(tokenUserInfoDto.getToken());
    userInfoVO.setAdmin(tokenUserInfoDto.getAdmin());
    return userInfoVO;
    }

    private TokenUserInfoDto getTokenUserInfoDto(UserInfo userInfo) {
    TokenUserInfoDto tokenUserInfoDto = new TokenUserInfoDto();
    tokenUserInfoDto.setUserId(userInfo.getUserId());
    tokenUserInfoDto.setNickName(userInfo.getNickName());
    //查询是否是管理员
    String adminEmails = appConfig.getAdminEmails();
    if (!StringTools.isEmpty(adminEmails) && ArrayUtils.contains(adminEmails.split(","), userInfo.getEmail())) {
    tokenUserInfoDto.setAdmin(true);
    } else {
    tokenUserInfoDto.setAdmin(false);
    }
    return tokenUserInfoDto;
    }
    RedisComponet类中将登录信息保存到redis中的saveTokenUserInfoDto 方法
    这里的实现逻辑是登录信息分成两份,第一份是由token获取 tokenUserInfoDto 对象
    第二份是由Id获取token; emmm 不明白这里为什么这么搞,有点麻烦
    1
    2
    3
    4
    5

    public void saveTokenUserInfoDto(TokenUserInfoDto tokenUserInfoDto) {
    redisUtils.setex(Constants.REDIS_KEY_WS_TOKEN + tokenUserInfoDto.getToken(), tokenUserInfoDto, Constants.REDIS_KEY_EXPIRES_DAY * 2);
    redisUtils.setex(Constants.REDIS_KEY_WS_TOKEN_USERID + tokenUserInfoDto.getUserId(), tokenUserInfoDto.getToken(), Constants.REDIS_KEY_EXPIRES_DAY * 2);
    }

联系人

搜索联系人

这一部分更多的是业务逻辑的处理,并没有新的东西,简单介绍一些业务逻辑的处理的流程图
pA5biqK.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Override
@Transactional(rollbackFor = Exception.class)
public Integer applyAdd(TokenUserInfoDto tokenUserInfoDto, String contactId, String contactType, String applyInfo) {
//判断添加的类型是否正确(0和1 好友和群)
UserContactTypeEnum typeEnum = UserContactTypeEnum.getByName(contactType);
if (null == typeEnum) {
throw new BusinessException(ResponseCodeEnum.CODE_600);
}
//申请人
String applyUserId = tokenUserInfoDto.getUserId();
//默认申请信息
applyInfo = StringTools.isEmpty(applyInfo) ? String.format(Constants.APPLY_INFO_TEMPLATE, tokenUserInfoDto.getNickName()) : applyInfo;

Long curDate = System.currentTimeMillis();
Integer joinType = null;
String receiveUserId = contactId;

//查询对方好友是否已经添加,如果已经拉黑无法添加
UserContact userContact = userContactMapper.selectByUserIdAndContactId(applyUserId, contactId);
if (userContact != null &&
ArraysUtil.contains(new Integer[]{
UserContactStatusEnum.BLACKLIST_BE.getStatus(),
UserContactStatusEnum.BLACKLIST_BE_FIRST.getStatus()
}, userContact.getStatus())) {
throw new BusinessException("对方已经你拉黑,无法添加");
}

if (UserContactTypeEnum.GROUP == typeEnum) {
GroupInfo groupInfo = groupInfoMapper.selectByGroupId(contactId);
if (groupInfo == null || GroupStatusEnum.DISSOLUTION.getStatus().equals(groupInfo.getStatus())) {
throw new BusinessException("群聊不存在或已解散");
}
receiveUserId = groupInfo.getGroupOwnerId();
joinType = groupInfo.getJoinType();
} else {
UserInfo userInfo = userInfoMapper.selectByUserId(contactId);
if (userInfo == null) {
throw new BusinessException(ResponseCodeEnum.CODE_600);
}
joinType = userInfo.getJoinType();
}

//直接加入不用记录申请记录
if (JoinTypeEnum.JOIN.getType().equals(joinType)) {
this.userContactService.addContact(applyUserId, receiveUserId, contactId, typeEnum.getType(), applyInfo);
return joinType;
}
//从申请记录查询是否申请过
UserContactApply dbApply = this.userContactApplyMapper.selectByApplyUserIdAndReceiveUserIdAndContactId(applyUserId, receiveUserId, contactId);
if (dbApply == null) {
UserContactApply contactApply = new UserContactApply();
contactApply.setApplyUserId(applyUserId);
contactApply.setContactType(typeEnum.getType());
contactApply.setReceiveUserId(receiveUserId);
contactApply.setLastApplyTime(curDate);
contactApply.setContactId(contactId);
contactApply.setStatus(UserContactApplyStatusEnum.INIT.ordinal());
contactApply.setApplyInfo(applyInfo);
this.userContactApplyMapper.insert(contactApply);
} else {
//更新状态
UserContactApply contactApply = new UserContactApply();
contactApply.setStatus(UserContactApplyStatusEnum.INIT.getStatus());
contactApply.setLastApplyTime(curDate);
contactApply.setApplyInfo(applyInfo);
this.userContactApplyMapper.updateByApplyId(contactApply, dbApply.getApplyId());
}
if (dbApply == null || !UserContactApplyStatusEnum.INIT.getStatus().equals(dbApply.getStatus())) {
//如果是待处理状态就不发消息,避免重复发送
//发送ws消息
MessageSendDto messageSend = new MessageSendDto();
messageSend.setMessageType(MessageTypeEnum.CONTACT_APPLY.getType());
messageSend.setMessageContent(applyInfo);
messageSend.setContactId(receiveUserId);
messageHandler.sendMessage(messageSend);
}
return joinType;
}

接收并处理申请

这里是有人加你为好友,你可以 三种操作 同意 拒绝(还可以继续加好友) 拉黑(不可以加好友)

这里并没有很高深的知识点,更多的是对业务的理解,


pA5OS2j.png
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Override
@Transactional(rollbackFor = Exception.class)
public void dealWithApply(String userId, Integer applyId, Integer status) {
//根据传来的status从枚举中找到对应的状态
UserContactApplyStatusEnum statusEnum = UserContactApplyStatusEnum.getByStatus(status);
//判断状态是否存在
if (null == statusEnum || UserContactApplyStatusEnum.INIT == statusEnum) {
throw new BusinessException(ResponseCodeEnum.CODE_600);
}
//从申请记录表中查询是否有过这条记录,如果没有会报错
UserContactApply applyInfo = this.userContactApplyMapper.selectByApplyId(applyId);
if (applyInfo == null || !userId.equals(applyInfo.getReceiveUserId())) {
throw new BusinessException(ResponseCodeEnum.CODE_600);
}

//更新申请信息 只能由待处理更新为其他状态
UserContactApply updateInfo = new UserContactApply();
updateInfo.setStatus(statusEnum.getStatus());
updateInfo.setLastApplyTime(System.currentTimeMillis());

UserContactApplyQuery applyQuery = new UserContactApplyQuery();
applyQuery.setApplyId(applyId);
applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus());
Integer count = userContactApplyMapper.updateByParam(updateInfo, applyQuery);
if (count == 0) {
throw new BusinessException(ResponseCodeEnum.CODE_600);
}
//同意的操作
if (UserContactApplyStatusEnum.PASS.getStatus().equals(status)) {
//添加联系人
userContactService.addContact(applyInfo.getApplyUserId(), applyInfo.getReceiveUserId(), applyInfo.getContactId(), applyInfo.getContactType(), applyInfo.getApplyInfo());
return;
}
//拉黑的操作
if (UserContactApplyStatusEnum.BLACKLIST == statusEnum) {
//拉黑 将接收人添加到申请人的联系人中,标记申请人被拉黑
Date curDate = new Date();
UserContact userContact = new UserContact();
userContact.setUserId(applyInfo.getApplyUserId());
userContact.setContactId(applyInfo.getContactId());
userContact.setContactType(applyInfo.getContactType());
userContact.setCreateTime(curDate);
userContact.setStatus(UserContactStatusEnum.BLACKLIST_BE_FIRST.getStatus());
userContact.setLastUpdateTime(curDate);
userContactMapper.insertOrUpdate(userContact);
return;
}
}

整合netty

在本项目中 只需要服务端即可,因为客户端相当于前端用户,消息就是用户所发来的http请求.
在这个服务端中,我们需要让服务端继承Runnable,这是因为我们需要将netty整合到sringboot项目中,所以需要单独起一个线程去跑netty的服务端
由于使http请求,所以在netty中我们添加几个专门用的http的handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Component
public class NettyWebSocketStarter implements Runnable {

private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);

@Resource
private AppConfig appConfig;

@Resource
private HandlerWebSocket handlerWebSocket;

/**
* boss线程组,用于处理连接
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
/**
* work线程组,用于处理消息
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();

/**
* 资源关闭——在容器销毁时关闭
*/
@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

@Override
public void run() {
try {
//创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
//设置几个重要的处理器
// 对http协议的支持,使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
//聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest
//保证接收的http请求的完整性
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit
// readerIdleTime 读超时事件 即测试段一定事件内未接收到被测试段消息
// writerIdleTime 为写超时时间 即测试端一定时间内想被测试端发送消息
//allIdleTime 所有类型的超时时间
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HandlerHeartBeat());
//将http协议升级为ws协议,对websocket支持
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));
//添加自定义拦截器
pipeline.addLast(handlerWebSocket);
}
});
//启动
ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();
logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

自定义的Handler

Handler中常用的方法和触发时机

方法被 Netty 在什么时候自动触发?
channelActive()当连接建立成功时(服务端accept或客户端connect)
channelInactive()当连接断开时
channelRead() / channelRead0()当收到数据时
exceptionCaught()管道中发生异常时
userEventTriggered()管道中有 Handler 调用了 ctx.fireUserEventTriggered()
handlerAdded()handler 加入 pipeline 时
handlerRemoved()handler 被移除时

🎯 在 WebSocket 握手完成时,提取用户身份 Token,完成鉴权 + 用户绑定操作。

  • 流程解析(逐句讲解)
    1. 这是 Netty 提供的用户事件监听方法,当发生特殊事件时(如握手成功、心跳超时等)会触发。所以我们可以重新该方法,实现自己的认证逻辑
      1
      2
      @Override
      public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
    2. 判断是不是 WebSocket 协议升级成功的事件(101 Switching Protocols 完成后),如果是,就做后续处理。
      1
      if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) 
    3. 从握手请求的 URL 中提取参数 token
      1
      2
      3
      4
       WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;    String url = complete.requestUri();
      String token = getToken(url);
      //例如 ws://localhost:8080/ws?token=abcdefg

    4. 获取token判断是否在redis里
      • 有,说明登录了,继续下一步
      • 没有,则直接关闭本次连接
    5. 🚀 如果认证成功,就把这个用户 ID 和当前 WebSocket 连接的 Channel 建立映射,加入连接上下文中
      1
      channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());
    6. 整体流程图(客户端连接 → 鉴权 → 通信)
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      客户端连接 ws://localhost:8080/ws?token=xxx

      Netty 握手成功(HTTP 101)

      触发 HandshakeComplete 事件

      userEventTriggered() 方法执行:
      - 获取 token
      - 查询 Redis 验证身份
      - 成功则绑定连接
      - 失败则关闭连接

      之后开始 WebSocket 双向通信

完整代码展示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//用于处理用户自定义的事件  当有用户事件触发时会调用此方法,例如连接超时,异常等。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
// 判断 evt是不是被 WebSocketServerProtocolHandler.HandshakeComplete或WebSocketServerProtocolHandler.HandshakeComplete的子类所创造的对象
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
//向上转型
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
//获取前端发来的路径
String url = complete.requestUri();
// 由于需要进行验证,所以前端发来的请求中携带了token,getToken是自定义的方法
String token = getToken(url);
//token为空说明验证失败,直接关闭通道
if (token == null) {
ctx.channel().close();
return;
}
// 根据token查找缓存,如果找不到说明没登录,关闭通道
TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);
if (null == tokenUserInfoDto) {
ctx.channel().close();
return;
}
/**
* 用户加入
*/
// 这里是自定义的类,用于给这个通道绑定参数,用于辩解是谁的通道
channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());

}
}

  • 先来看自定义的getToken方法,用来解析url中的token
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    private String getToken(String url) {
    if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {
    return null;
    }
    String[] queryParams = url.split("\\?");
    if (queryParams.length < 2) {
    return url;
    }
    String[] params = queryParams[1].split("=");
    if (params.length != 2) {
    return url;
    }
    return params[1];
    }

    自定义的channelContextUtils.addContext 方法

    OK,来看最关键的一步 channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel())
    这行代码将id, channel作为参数传过去了,来看 addContext方法,这里将addContext方法拆分成三部分来讲解比较方便
  1. 通过AttributeKey,给channel绑定参数,参数名为这个channel的id,值为userId,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public void addContext(String userId, Channel channel) {
    try {
    String channelId = channel.id().toString();
    AttributeKey attributeKey = null;
    if (!AttributeKey.exists(channelId)) {
    attributeKey = AttributeKey.newInstance(channel.id().toString());
    } else {
    attributeKey = AttributeKey.valueOf(channel.id().toString());
    }
    // 查找名为 channel的id 的attributeKey, 并设置值
    channel.attr(attributeKey).set(userId);
  2. 通过Id查询用户的所有联系人和群聊,并更新最后登录时间,如果用户长时间未登录,在未登录期间所受到的消息不会全部显示,只会显示最近三天的消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    //获取联系人和群聊的集合
    List<String> contactList = redisComponet.getUserContactList(userId);
    // 遍历集合 找出所有群聊
    for (String groupId : contactList) {
    // 如果集合中数据的开头和 所定义的枚举中的群组相同 说明这是个群聊
    if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) {
    //调用 利用自定义方法 为了群发消息方便
    add2Group(groupId, channel);
    }
    }
    // 自定义的常量Map,这里将不同用户的channel都存起来,Key为用户的Id,因为会有多个用户
    USER_CONTEXT_MAP.put(userId, channel);
    // 更新用户心跳
    redisComponet.saveUserHeartBeat(userId);

    //更新用户最后连接时间
    UserInfo updateInfo = new UserInfo();
    updateInfo.setLastLoginTime(new Date());
    userInfoMapper.updateByUserId(updateInfo, userId);

    //给用户发送一些消息
    //获取用户最后离线时间
    UserInfo userInfo = userInfoMapper.selectByUserId(userId);
    Long sourceLastOffTime = userInfo.getLastOffTime();
    //这里避免毫秒时间差,所以减去1秒的时间
    //如果时间太久,只取最近三天的消息数
    Long lastOffTime = sourceLastOffTime;
    if (sourceLastOffTime != null && System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO > sourceLastOffTime) {
    lastOffTime = System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO;
    }
  3. 上面提到了给用户发送未读的消息,长时间未登录,发送近三天的未读消息,这里会具体实现,会话就是下图如是
    pA7ffoQ.png
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    /**
    * 1、查询会话信息 查询用户所有会话,避免换设备会话不同步
    */
    ChatSessionUserQuery sessionUserQuery = new ChatSessionUserQuery();
    sessionUserQuery.setUserId(userId);
    sessionUserQuery.setOrderBy("last_receive_time desc");
    List<ChatSessionUser> chatSessionList = chatSessionUserMapper.selectList(sessionUserQuery);
    WsInitData wsInitData = new WsInitData();
    wsInitData.setChatSessionList(chatSessionList);

    /**
    * 2、查询聊天消息,首先是查找所有群组的聊天信息,这里是根据数据库一张聊天信息表中接收人ID查询的,为什么单独查询群组,而不是直接根据用户ID查,因为这里的逻辑是 数据库聊天信息表中如果是群聊的话,接收人ID存的群聊的ID,所以需要先查询用户添加的群聊集合最后加上自己,查找信息
    *
    */
    //查询用户的联系人
    UserContactQuery contactQuery = new UserContactQuery();
    contactQuery.setContactType(UserContactTypeEnum.GROUP.getType());
    contactQuery.setUserId(userId);
    List<UserContact> groupContactList = userContactMapper.selectList(contactQuery);
    List<String> groupIdList = groupContactList.stream().map(item -> item.getContactId()).collect(Collectors.toList());
    //将自己也加进去
    groupIdList.add(userId);
    //这里就是查询聊天信息了,最后存到 wsInitData里
    ChatMessageQuery messageQuery = new ChatMessageQuery();
    messageQuery.setContactIdList(groupIdList);
    messageQuery.setLastReceiveTime(lastOffTime);
    List<ChatMessage> chatMessageList = chatMessageMapper.selectList(messageQuery);
    wsInitData.setChatMessageList(chatMessageList);

    /**
    * 3、查询好友申请
    */
    UserContactApplyQuery applyQuery = new UserContactApplyQuery();
    applyQuery.setReceiveUserId(userId);
    applyQuery.setLastApplyTimestamp(sourceLastOffTime);
    applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus());
    Integer applyCount = userContactApplyMapper.selectCount(applyQuery);
    wsInitData.setApplyCount(applyCount);

    //发送消息
    MessageSendDto messageSendDto = new MessageSendDto();
    messageSendDto.setMessageType(MessageTypeEnum.INIT.getType());
    messageSendDto.setContactId(userId);
    messageSendDto.setExtendData(wsInitData);
    //这里的sendMsg是自定义的方法,发送消息
    sendMsg(messageSendDto, userId);
  4. 自定义的发消息方法
    需要一个接收方的ID,很好理解,这个方法只能用在同一个服务器的用户发送接收消息,如果在不同服务器上,需要用到后面讲的redisson的 RTopic发布订阅模式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    private static void sendMsg(MessageSendDto messageSendDto, String reciveId) {
    if (reciveId == null) {
    return;
    }
    //根据id,获取接收方的通道
    Channel sendChannel = USER_CONTEXT_MAP.get(reciveId);
    if (sendChannel == null) {
    return;
    }
    //相当于客户而言,联系人就是发送人,所以这里转换一下再发送,好友打招呼信息发送给自己需要特殊处理
    // 设置消息dto
    if (MessageTypeEnum.ADD_FRIEND_SELF.getType().equals(messageSendDto.getMessageType())) {
    UserInfo userInfo = (UserInfo) messageSendDto.getExtendData();
    messageSendDto.setMessageType(MessageTypeEnum.ADD_FRIEND.getType());
    messageSendDto.setContactId(userInfo.getUserId());
    messageSendDto.setContactName(userInfo.getNickName());
    messageSendDto.setExtendData(null);
    } else {
    messageSendDto.setContactId(messageSendDto.getSendUserId());
    messageSendDto.setContactName(messageSendDto.getSendUserNickName());
    }
    // 将消息转换成josn格式发送
    sendChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto)));
    }

    到这里一个完整 channelContextUtils.addContext 方法就结束了,最后展示一下完整的代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    public void addContext(String userId, Channel channel) {
    try {
    String channelId = channel.id().toString();
    AttributeKey attributeKey = null;
    if (!AttributeKey.exists(channelId)) {
    attributeKey = AttributeKey.newInstance(channel.id().toString());
    } else {
    attributeKey = AttributeKey.valueOf(channel.id().toString());
    }
    channel.attr(attributeKey).set(userId);

    List<String> contactList = redisComponet.getUserContactList(userId);
    for (String groupId : contactList) {
    if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) {
    add2Group(groupId, channel);
    }
    }
    USER_CONTEXT_MAP.put(userId, channel);
    redisComponet.saveUserHeartBeat(userId);

    //更新用户最后连接时间
    UserInfo updateInfo = new UserInfo();
    updateInfo.setLastLoginTime(new Date());
    userInfoMapper.updateByUserId(updateInfo, userId);

    //给用户发送一些消息
    //获取用户最后离线时间
    UserInfo userInfo = userInfoMapper.selectByUserId(userId);
    Long sourceLastOffTime = userInfo.getLastOffTime();
    //这里避免毫秒时间差,所以减去1秒的时间
    //如果时间太久,只取最近三天的消息数
    Long lastOffTime = sourceLastOffTime;
    if (sourceLastOffTime != null && System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO > sourceLastOffTime) {
    lastOffTime = System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO;
    }

    /**
    * 1、查询会话信息 查询用户所有会话,避免换设备会话不同步
    */
    ChatSessionUserQuery sessionUserQuery = new ChatSessionUserQuery();
    sessionUserQuery.setUserId(userId);
    sessionUserQuery.setOrderBy("last_receive_time desc");
    List<ChatSessionUser> chatSessionList = chatSessionUserMapper.selectList(sessionUserQuery);
    WsInitData wsInitData = new WsInitData();
    wsInitData.setChatSessionList(chatSessionList);

    /**
    * 2、查询聊天消息
    */
    //查询用户的联系人
    UserContactQuery contactQuery = new UserContactQuery();
    contactQuery.setContactType(UserContactTypeEnum.GROUP.getType());
    contactQuery.setUserId(userId);
    List<UserContact> groupContactList = userContactMapper.selectList(contactQuery);
    List<String> groupIdList = groupContactList.stream().map(item -> item.getContactId()).collect(Collectors.toList());
    //将自己也加进去
    groupIdList.add(userId);

    ChatMessageQuery messageQuery = new ChatMessageQuery();
    messageQuery.setContactIdList(groupIdList);
    messageQuery.setLastReceiveTime(lastOffTime);
    List<ChatMessage> chatMessageList = chatMessageMapper.selectList(messageQuery);
    wsInitData.setChatMessageList(chatMessageList);

    /**
    * 3、查询好友申请
    */
    UserContactApplyQuery applyQuery = new UserContactApplyQuery();
    applyQuery.setReceiveUserId(userId);
    applyQuery.setLastApplyTimestamp(sourceLastOffTime);
    applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus());
    Integer applyCount = userContactApplyMapper.selectCount(applyQuery);
    wsInitData.setApplyCount(applyCount);

    //发送消息
    MessageSendDto messageSendDto = new MessageSendDto();
    messageSendDto.setMessageType(MessageTypeEnum.INIT.getType());
    messageSendDto.setContactId(userId);
    messageSendDto.setExtendData(wsInitData);

    sendMsg(messageSendDto, userId);
    } catch (Exception e) {
    logger.error("初始化链接失败", e);
    }
    }

集群发送消息

为什么要使用集群发消息,因为实际中,会有多个服务器,如果发送者和接收者不在同一台服务器,则会发送消息失败。所以需要采用集群的方式,利用redisson的订阅模式,可以跨服务器发送消息。

RTpoic

RTpoic 是 Redisson 提供的用于实现发布-订阅(Pub/Sub)模式的类,它封装了 Redis 的发布和订阅功能,让开发者能够轻松地在分布式环境中进行消息的发布和订阅。

实际上类似于Kafka、RocketMQ等一系列MessageQueue的生产-消费关系。

自产自销:简单的说,可以实现一个服务中,自己发布消息,自己订阅消息进行消息的分发处理

  • 简单实用
  1. 配置redisson
    使用之前需要在配置类中进行配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Value("${spring.redis.host:}")
    private String redisHost;

    @Value("${spring.redis.port:}")
    private Integer redisPort;

    @Bean(name = "redissonClient", destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
    try {
    // 创建配置 指定redis地址及节点信息
    Config config = new Config();
    config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort);
    // 根据config创建出RedissonClient实例
    RedissonClient redissonClient = Redisson.create(config);
    return redissonClient;
    } catch (RedisConnectionException e) {
    logger.error("redis配置错误,请检查redis配置");
    }
    return null;
    }
  • 发送接收消息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // 该注解标识在springboot初始化后立即执行该方法
    @PostConstruct
    public void lisMessage() {
    // 获取 名为MESSAGE_TOPIC的RTopic类型的对象
    RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
    // 添加监听器,接收类型为MessageSendDto.class
    rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) -> {
    logger.info("收到广播消息:{}", JSON.toJSONString(sendDto));
    // 通过redis发送消息在所有服务器上
    channelContextUtils.sendMessage(sendDto);
    });
    }
    // 将消息发送到redisson
    public void sendMessage(MessageSendDto sendDto) {
    RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
    rTopic.publish(sendDto);
    }

    群发消息

    在这个项目发现一个很有意思的点,在groupserviceimpl的类中savegroup方法中有这样一段代码,这是一段创建群聊,然后将群聊添加成自己的联系人,最后将自己的ws通道添加到ws的群通道中

    这里一开始我是疑惑的,在这个savegroup的方法应该是通过前端http请求,springmvc将请求映射到这个方法上的,而netty会将http请求升级为ws,那么这里是怎么拿到channel的,这就要我们往下看 channelContextUtils.addUser2Group方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //创建群主会话
    ChatSessionUser chatSessionUser = new ChatSessionUser();
    chatSessionUser.setUserId(groupInfo.getGroupOwnerId());
    chatSessionUser.setContactId(groupInfo.getGroupId());
    chatSessionUser.setContactName(groupInfo.getGroupName());
    chatSessionUser.setSessionId(sessionId);
    this.chatSessionUserService.add(chatSessionUser);

    //添加为联系人
    redisComponet.addUserContact(groupInfo.getGroupOwnerId(), groupInfo.getGroupId());

    // 将自己的ws通道添加到一个ws的群通道作为群聊
    channelContextUtils.addUser2Group(groupInfo.getGroupOwnerId(), groupInfo.getGroupId());
  1. channelContextUtils.addUser2Group
    看到这里其实已经明白了,通过addUser2Group方法,会获取到在channelContextUtils的静态MAP下的channel
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public void addUser2Group(String userId, String groupId) {
    // 查找userId的channel,USER_CONTEXT_MAP是自定义的静态map
    Channel channel = USER_CONTEXT_MAP.get(userId);
    //调用自定义方法,将自己的channel加入channelgroup中
    add2Group(groupId, channel);
    }


    private void add2Group(String groupId, Channel context) {
    ChannelGroup group = GROUP_CONTEXT_MAP.get(groupId);
    if (group == null) {
    group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    GROUP_CONTEXT_MAP.put(groupId, group);
    }
    if (context == null) {
    return;
    }
    group.add(context);
    }

聊天执行流程

  1. 登录后会连接netty

  2. 通过http接口获取到发送的消息的内容

    • 这一步会将消息存数据库一份
    • 接着会将消息 发给消息队列
      1
      2
      3
      4
      5
      6
      7
      8
      9
          @RequestMapping("/sendMessage")
      @GlobalInterceptor
      public ResponseVO sendMessage(HttpServletRequest request,
      @NotEmpty String contactId,
      @NotEmpty @Max(500) String messageContent,
      @NotNull Integer messageType,
      Long fileSize,
      String fileName,
      Integer fileType) {......}
  3. redis的rTopic 接收消息
    • rTopic 收到消息后 会触发 channelContextUtils.sendMessage(sendDto)发送消息;
    • Redis 的 Pub/Sub(发布订阅)机制,它是“即时广播”机制,不会持久保存消息,也不会堆积在 Redis 里。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
         @PostConstruct
      public void lisMessage() {
      RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
      rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) -> {
      logger.info("收到广播消息:{}", JSON.toJSONString(sendDto));
      channelContextUtils.sendMessage(sendDto);
      });
      }

      public void sendMessage(MessageSendDto sendDto) {
      RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC);
      rTopic.publish(sendDto);
      }
  4. 发送消息
    1
    2
    3
    4
    5
     Channel sendChannel = USER_CONTEXT_MAP.get(reciveId);
    if (sendChannel == null) {
    return;
    }
    sendChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto)));
  5. 前端渲染消息
    • 前端 WebSocket 实例监听到第4步的消息,会展示到聊天界面上