java中如何实现WebSocket客户端断线重连
这篇文章给大家分享的是有关java中如何实现WebSocket客户端断线重连的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
Maven依赖
org.projectlombok lombok true cn.hutool hutool-all 5.5.2 org.java-websocket Java-WebSocket 1.5.1
代码
不废话,上代码。
package ai.guiji.csdn.ws.client; import cn.hutool.core.thread.ThreadUtil;import cn.hutool.core.util.StrUtil;import lombok.extern.slf4j.Slf4j;import org.java_websocket.WebSocket;import org.java_websocket.client.WebSocketClient;import org.java_websocket.framing.Framedata;import org.java_websocket.handshake.ServerHandshake; import javax.net.ssl.*;import java.net.Socket;import java.net.URI;import java.nio.ByteBuffer;import java.security.cert.CertificateException;import java.security.cert.X509Certificate;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicInteger;import java.util.function.Consumer; /** @Author huyi @Date 2021/10/15 20:03 @Description: 重连websocket客户端 */@Slf4jpublic class ReConnectWebSocketClient { /** 字符串消息回调 */ private ConsumermsgStr; /** 字节流消息回调 */ private Consumer msgByte; /** 异常回调 */ private Consumer error; /** 连接标识 */ private String key; /** ws服务端连接 */ private URI serverUri; /** 尝试重连标识 */ private AtomicBoolean tryReconnect; /** 需要ping标识 */ private AtomicBoolean needPing; /** websocket连接实体 */ private WebSocketClient webSocketClient; /** 重连次数 */ private AtomicInteger reConnectTimes; /** 连接结束标识 */ private AtomicBoolean end; /** 连接后初始发送报文,这里也可以不需要,如果服务端主动断开连接,重连后可以继续推送报文的话。 */ private String initReConnectReq; /** 结束回调 */ private Consumer endConsumer; public ReConnectWebSocketClient( URI serverUri, String key, Consumer msgStr, Consumer msgByte, Consumer error) { this.msgStr = msgStr; this.msgByte = msgByte; this.error = error; this.key = key; this.serverUri = serverUri; this.tryReconnect = new AtomicBoolean(false); this.needPing = new AtomicBoolean(true); this.reConnectTimes = new AtomicInteger(0); this.end = new AtomicBoolean(false); this.endConsumer = this::close; init(); } /** 初始化连接 */ public void init() { // 创建连接 createWebSocketClient(); // ping线程 circlePing(); } private void needReconnect() throws Exception { ThreadUtil.sleep(10, TimeUnit.SECONDS); int cul = reConnectTimes.incrementAndGet(); if (cul > 3) { close("real stop"); throw new Exception("服务端断连,3次重连均失败"); } log.warn("[{}]第[{}]次断开重连", key, cul); if (tryReconnect.get()) { log.error("[{}]第[{}]次断开重连结果 -> 连接正在重连,本次重连请求放弃", key, cul); needReconnect(); return; } try { tryReconnect.set(true); if (webSocketClient.isOpen()) { log.warn("[{}]第[{}]次断开重连,关闭旧连接", key, cul); webSocketClient.closeConnection(2, "reconnect stop"); } webSocketClient = null; createWebSocketClient(); connect(); if (StrUtil.hasBlank(initReConnectReq)) { send(initReConnectReq); } } catch (Exception exception) { log.error("[{}]第[{}]次断开重连结果 -> 连接正在重连,重连异常:[{}]", key, cul, exception.getMessage()); needReconnect(); } finally { tryReconnect.set(false); } } private void createWebSocketClient() { webSocketClient = new WebSocketClient(serverUri) { @Override public void onOpen(ServerHandshake serverHandshake) { log.info("[{}]ReConnectWebSocketClient [onOpen]连接成功{}", key, getRemoteSocketAddress()); tryReconnect.set(false); } @Override public void onMessage(String text) { log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服务端数据:text={}", key, text); msgStr.accept(text); } @Override public void onMessage(ByteBuffer bytes) { log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服务端数据:bytes={}", key, bytes); msgByte.accept(bytes); } @Override public void onWebsocketPong(WebSocket conn, Framedata f) { log.info( "[{}]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode={}", key, f.getOpcode()); } @Override public void onClose(int i, String s, boolean b) { log.info("[{}]ReConnectWebSocketClient [onClose]关闭,s={},b={}", key, s, b); if (StrUtil.hasBlank(s) || s.contains("https")) { if (end.get()) { return; } try { needReconnect(); } catch (Exception exception) { endConsumer.accept("reconnect error"); error.accept(exception); } } } @Override public void onError(Exception e) { log.info("[{}]ReConnectWebSocketClient [onError]异常,e={}", key, e); endConsumer.accept("error close"); error.accept(e); } }; if (serverUri.toString().contains("wss://")) { trustAllHosts(webSocketClient); } } public void circlePing() { new Thread( () -> { while (needPing.get()) { if (webSocketClient.isOpen()) { webSocketClient.sendPing(); } ThreadUtil.sleep(5, TimeUnit.SECONDS); } log.warn("[{}]Ping循环关闭", key); }) .start(); } /** * 连接 * * @throws Exception 异常 */ public void connect() throws Exception { webSocketClient.connectBlocking(10, TimeUnit.SECONDS); } /** * 发送 * * @param msg 消息 * @throws Exception 异常 */ public void send(String msg) throws Exception { this.initReConnectReq = msg; if (webSocketClient.isOpen()) { webSocketClient.send(msg); } } /** * 关闭 * * @param msg 关闭消息 */ public void close(String msg) { needPing.set(false); end.set(true); if (webSocketClient != null) { webSocketClient.closeConnection(3, msg); } } /** * 忽略证书 * * @param client */ public void trustAllHosts(WebSocketClient client) { TrustManager[] trustAllCerts = new TrustManager[] { new X509ExtendedTrustManager() { @Override public void checkClientTrusted( X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {} @Override public void checkServerTrusted( X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {} @Override public void checkClientTrusted( X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {} @Override public void checkServerTrusted( X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {} @Override public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} @Override public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {} @Override public X509Certificate[] getAcceptedIssuers() { return null; } } }; try { SSLContext ssl = SSLContext.getInstance("SSL"); ssl.init(null, trustAllCerts, new java.security.SecureRandom()); SSLSocketFactory socketFactory = ssl.getSocketFactory(); client.setSocketFactory(socketFactory); } catch (Exception e) { log.error("ReConnectWebSocketClient trustAllHosts 异常,e={0}", e); } }}
代码说明:
1、参数的重连次数可以配置。
2、增加异步pingpong线程,一旦结束连接会自动关闭。
3、对字符串、字节流、异常都有回调措施。
测试代码方法
public static void main(String[] args) throws Exception { ReConnectWebSocketClient client = new ReConnectWebSocketClient( new URI(String.format("wss://192.168.1.77:24009")), "test", // 字符串消息处理 msg -> { // todo 字符串消息处理 System.out.println("字符串消息:" + msg); }, null, // 异常回调 error -> { // todo 字符串消息处理 System.out.println("异常:" + error.getMessage()); }); client.connect(); client.send("haha"); }
验证结果
16:08:54.468 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]连接成功/192.168.1.77:24009
16:08:54.475 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服务端数据:text=connect success from tcp4:192.168.6.63:11018!
字符串消息:connect success from tcp4:192.168.6.63:11018!
16:08:56.080 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=,b=true
16:09:06.097 [WebSocketConnectReadThread-12] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[1]次断开重连
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]连接成功/192.168.1.77:24009
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服务端数据:text=connect success from tcp4:192.168.6.63:11038!
字符串消息:connect success from tcp4:192.168.6.63:11038!
16:09:09.369 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:14.370 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:19.371 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:24.379 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:29.382 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:34.398 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:39.402 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:44.404 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:49.415 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:54.429 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:59.437 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:04.449 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:06.154 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:09.455 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:14.462 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:19.468 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:19.644 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=,b=true
16:10:29.654 [WebSocketConnectReadThread-16] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[2]次断开重连
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onError]异常,e={}
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.connect0(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:673)
at org.java_websocket.client.WebSocketClient.run(WebSocketClient.java:461)
at java.lang.Thread.run(Thread.java:748)
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=error close,b=false
异常:Connection refused: connect
16:10:34.473 [Thread-0] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]Ping循环关闭
这里我才用的是手动关闭服务端方式触发,客户端被动断连情况。重连两次,第二次服务端还未启动导致异常触发。
感谢各位的阅读!关于"java中如何实现WebSocket客户端断线重连"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!