千家信息网

WebSocket如何实现服务器消息推送客户端

发表于:2025-01-20 作者:千家信息网编辑
千家信息网最后更新 2025年01月20日,这篇文章主要为大家展示了"WebSocket如何实现服务器消息推送客户端",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"WebSocket如何实现服务器消息
千家信息网最后更新 2025年01月20日WebSocket如何实现服务器消息推送客户端

这篇文章主要为大家展示了"WebSocket如何实现服务器消息推送客户端",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"WebSocket如何实现服务器消息推送客户端"这篇文章吧。

  一、背景

  项目需要做一个消息能够实时获取的功能,系统日活跃量达到10000,产生的消息是活跃量的数倍,如果采用 Http 的方式轮询后端服务,会使得后端服务压力过大而奔溃,因此需要一种新的技术方式来改变 "拉" 的方式。

  二、解决方案

  经过各种 Google、百度 后发现可以使用 html5 的新技术 WebSocket ,将现有 "拉"消息的方式改变成 "推" 的模式,大大的减少服务器压力。

  三、具体实现

  实例采用 Spring Boot 框架,

  引入 pom 依赖

  org.springframework.boot

  spring-boot-starter-websocket

  org.springframework.boot

  spring-boot-starter-undertow

  org.springframework.boot

  spring-boot-starter-web

  org.springframework.boot

  spring-boot-starter-tomcat

  WebSocket 服务可采用 websocket-api 或 spring-websocket 开发,我们采用 websocket-api 的注解开发方式:

  package com.gridsum.techpub.systemhistory.api.server;

  import org.slf4j.Logger;

  import org.slf4j.LoggerFactory;

  import org.springframework.stereotype.Service;

  import javax.websocket.*;

  import javax.websocket.server.PathParam;

  import javax.websocket.server.ServerEndpoint;

  import java.io.IOException;

  import java.util.Objects;

  import java.util.Set;

  import java.util.concurrent.CopyOnWriteArraySet;

  /**

  * @author ouyangrongtao

  * @version 1.0

  * @description WebSocketServer

  * @date 2019/12/23 10:16

  **/

  @ServerEndpoint("/websocket/{sid}")

  @Service

  public class WebSocketServer {

  private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);

  private ClientInfo clientInfo;

  /**

  * 存放每个客户端对应的 ClientInfo 对象。

  */

  private static final Set WEB_SOCKET_SET = new CopyOnWriteArraySet<>();

  /**

  * 连接建立成功调用的方法

  *

  * @param session 会话

  * @param sid 客户端

  */

  @OnOpen

  public void onOpen(Session session, @PathParam("sid") String sid) {

  //加入set中

  this.clientInfo = new ClientInfo(sid, session);

  WEB_SOCKET_SET.add(clientInfo);

  logger.info("有新窗口开始监听:[{}],当前在线人数为[{}]", sid, WEB_SOCKET_SET.size());

  try {

  this.sendMessage(session, "连接成功");

  } catch (IOException e) {

  logger.error("websocket IO异常");

  }

  }

  /**

  * 连接关闭调用的方法

  */

  @OnClose

  public void onClose() {

  //从set中删除

  WEB_SOCKET_SET.remove(this.clientInfo);

  logger.info("有一连接关闭!当前在线人数为:[{}]", WEB_SOCKET_SET.size());

  }

  /**

  *

  * @param message 客户端发送过来的消息

  */

  @OnMessage

  public void onMessage(String message) {

  logger.info("收到来自窗口[{}]的信息:[{}]", this.clientInfo.getSid(), message);

  //发消息

  for (ClientInfo item : WEB_SOCKET_SET) {

  try {

  this.sendMessage(item.getSession(), message);

  } catch (IOException ignored) {

  }

  }

  }

  /**

  * 错误时调用

  * @param session 会话

  * @param error 错误信息

  */

  @OnError

  public void onError(Session session, Throwable error) {

  logger.error("发生错误", error);

  }

  /**

  * 给 sid 发送消息

  * @param message 消息

  * @param sid sid

  */

  public void sendMessage(String message, String sid) {

  logger.info("推送消息到窗口[{}],推送内容:[{}]", sid, message);

  ClientInfo client = WEB_SOCKET_SET.parallelStream()

  .filter(item -> item.getSid().equals(sid)).findFirst().orElse(null);

  if (client != null) {

  try {

  this.sendMessage(client.getSession(), message);

  } catch (IOException ignored) {

  }

  }

  }

  /**

  * 实现服务器主动推送

  * @param session session

  * @param message message

  * @throws IOException IOException

  */郑州哪个妇科医院好 http://www.sptdfk.com/

  private void sendMessage(Session session, String message) throws IOException {

  session.getBasicRemote().sendText(message);

  }

  class ClientInfo {

  /**

  * 接收sid

  */

  private String sid = "";

  /**

  * 客户端

  */

  private Session session;

  public ClientInfo() { }

  private ClientInfo(String sid, Session session) {

  this.sid = sid;

  this.session = session;

  }

  private String getSid() {

  return sid;

  }

  private Session getSession() {

  return session;

  }

  @Override

  public boolean equals(Object o) {

  if (this == o) {

  return true;

  }

  if (o == null || getClass() != o.getClass()) {

  return false;

  }

  ClientInfo that = (ClientInfo) o;

  return Objects.equals(sid, that.sid);

  }

  @Override

  public int hashCode() {

  return Objects.hash(sid);

  }

  }

  }

  前端代码

  运行 WebSocketClient1000001

  来一个发消息的接口

  /**

  * 发送消息给客户端

  * @author ouyangrongtao

  */

  @RestController

  public class WebSocketController {

  private WebSocketServer webSocketServer;

  @Autowired

  public WebSocketController(WebSocketServer webSocketServer) {

  this.webSocketServer = webSocketServer;

  }

  @PostMapping("/socket/push")

  public boolean pushToWeb(@RequestBody Map content) {

  webSocketServer.sendMessage(content.get("message"), content.get("cid"));

  return true;

  }

  }

  到此已经基本写完。使用 Postman 调用发消息的接口,发现客户端可以收到发送的消息。

  四、问题记录

  在做的时候,因为项目用的 Tomcat 容器,导致 Tomcat 相关包与 WebSocket 依赖有冲突,最终项目不能启动,解决方式只需要将 Tomcat 容器改为 Undertow 。

  org.springframework.boot

  spring-boot-starter-undertow

  org.springframework.boot

  spring-boot-starter-web

  org.springframework.boot

  spring-boot-starter-tomcat

  异常信息:

  Caused by: java.lang.IllegalStateException: javax.websocket.server.ServerContainer not available

  at org.springframework.util.Assert.state(Assert.java:73)

  at org.springframework.web.socket.server.standard.ServerEndpointExporter.afterPropertiesSet(ServerEndpointExporter.java:106)

  at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1753)

  at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1690)

  ... 16 common frames omitted

以上是"WebSocket如何实现服务器消息推送客户端"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0