一种轻量级websocket服务器和客户端实现方案
pom依赖
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
服务器端案例
定义ws服务器工具类WsktUtil
package com.example.demo.util;
import cn.hutool.core.collection.ListUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
@Slf4j
public class WsktUtil extends WebSocketServer {
private static List<WebSocket> clients = new CopyOnWriteArrayList<>();
private static int port = 8843;
private WsktUtil() {
super(new InetSocketAddress(port));
}
private static final WsktUtil wskt = new WsktUtil();
@Override
public void onOpen(WebSocket conn, ClientHandshake handshake) {
log.info("一个新客户端打开连接...");
conn.send("Welcome to link wskt server!");
String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
log.info("客户端请求的ip:{}", ip);
int port = conn.getRemoteSocketAddress().getPort();
log.info("客户端的port:{}", port);
String resourceDescriptor = handshake.getResourceDescriptor();
log.info("客户端请求的 path:{}", resourceDescriptor);
clients.add(conn);
}
@Override
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
log.warn("一个客户端断开websocket连接...");
String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
log.info("客户端请求的ip:{}", ip);
int port = conn.getRemoteSocketAddress().getPort();
log.info("客户端的port:{}", port);
String resourceDescriptor = conn.getResourceDescriptor();
log.info("客户端请求的 path:{}", resourceDescriptor);
log.info("code:{}", code);
log.info("reason:{}", reason);
log.info("remote:{}", remote);
clients.remove(conn);
}
@Override
public void onMessage(WebSocket conn, String message) {
log.info("一个客户端发送了消息....");
String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
log.info("客户端请求的ip:{}", ip);
int port = conn.getRemoteSocketAddress().getPort();
log.info("客户端请求的port:{}", port);
String resourceDescriptor = conn.getResourceDescriptor();
log.info("客户端请求的path:{}", resourceDescriptor);
log.info("客户端发送的msg:{}", message);
handleClientReqMsg(conn, message);
}
@Override
public void onError(WebSocket conn, Exception ex) {
ex.printStackTrace();
if (conn != null) {
String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
log.info("异常客户端的ip:{}", ip);
int port = conn.getRemoteSocketAddress().getPort();
log.info("异常客户端的port:{}", port);
String resourceDescriptor = conn.getResourceDescriptor();
log.info("异常客户端的path:{}", resourceDescriptor);
}
}
@Override
public void onStart() {
System.out.println("wskt Server started!");
setConnectionLostTimeout(100);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
printCurrentConns();
}
}, 1000, 3000);
}
private static void printCurrentConns() {
int size = clients.size();
log.info("--当前共有{}个websocket连接---", size);
if (size > 0) {
for (int i = 0; i < clients.size(); i++) {
String ip = clients.get(i).getRemoteSocketAddress().getAddress().getHostAddress();
int port = clients.get(i).getRemoteSocketAddress().getPort();
String resourceDescriptor = clients.get(i).getResourceDescriptor();
log.info("第{}个客户端的ip:{},port:{},path:{}", i + 1, ip, port, resourceDescriptor);
}
}
log.info("--------------------------");
}
public static void startServer() throws UnknownHostException {
wskt.start();
log.info("wsktServer started on port: {}", wskt.getPort());
}
private static void handleClientReqMsg(WebSocket conn, String reqStr) {
if (conn.getResourceDescriptor().equals("/123")) {
if (reqStr.equals("aaa")) {
publishMsgToClient("bbb", conn);
}
}
if (conn.getResourceDescriptor().equals("/234")) {
if (reqStr.equals("ccc")) {
publishMsgToClient("ddd", conn);
}
}
}
public static void publishMsgToClient(String msg, WebSocket targetClient) {
if (targetClient == null) {
return;
}
String ip = targetClient.getRemoteSocketAddress().getAddress().getHostAddress();
int port = targetClient.getRemoteSocketAddress().getPort();
String resourceDescriptor = targetClient.getResourceDescriptor();
wskt.broadcast(msg, ListUtil.toList(targetClient));
log.info("server 发布消息:{}给客户端ip:{},port:{},path:{}", msg, ip, port, resourceDescriptor);
}
public static void publishMsgToSomeClients(String msg, Collection<WebSocket> clients) {
if (clients == null || clients.size() == 0) {
return;
}
wskt.broadcast(msg, clients);
log.info("server 广播消息:{}", msg);
}
public static WebSocket getOneClient(String ip, int port, String path) {
if (clients.size() > 0) {
for (WebSocket client : clients) {
if (client != null) {
int cPort = client.getRemoteSocketAddress().getPort();
String cip = client.getRemoteSocketAddress().getAddress().getHostAddress();
String cPath = client.getResourceDescriptor();
if (ip.equals(cip) && port == cPort && path.equals(cPath)) {
return client;
}
}
}
}
return null;
}
public static void serverPushClientTest(String msg) {
if (clients.size() > 0) {
for (WebSocket client : clients) {
if (client != null) {
String ip = client.getRemoteSocketAddress().getAddress().getHostAddress();
int port = client.getRemoteSocketAddress().getPort();
String resourceDescriptor = client.getResourceDescriptor();
wskt.broadcast(msg, ListUtil.toList(client));
log.info("server 推送消息:{}给客户端ip:{},port:{},path:{}", msg, ip, port, resourceDescriptor);
}
}
}
}
}
开机启动ws服务器
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) throws UnknownHostException {
SpringApplication.run(DemoApplication.class, args);
WsktUtil.startServer();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
WsktUtil.serverPushClientTest("server push:" + System.currentTimeMillis());
}
}, 6000, 5000);
}
}
测试结果
2023-07-12 17:55:10.552 INFO 952 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8621 (http) with context path '/mybatis'
2023-07-12 17:55:10.563 INFO 952 --- [ main] com.example.demo.DemoApplication : Started DemoApplication in 3.046 seconds (JVM running for 3.791)
2023-07-12 17:55:10.569 INFO 952 --- [ main] com.example.demo.util.WsktUtil : wsktServer started on port: 8843
wskt Server started!
2023-07-12 17:55:11.572 INFO 952 --- [ Timer-1] com.example.demo.util.WsktUtil : --当前共有0个websocket连接---
2023-07-12 17:55:11.572 INFO 952 --- [ Timer-1] com.example.demo.util.WsktUtil : --------------------------
2023-07-12 17:55:14.190 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 一个新客户端打开连接...
2023-07-12 17:55:14.193 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的ip:127.0.0.1
2023-07-12 17:55:14.193 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端的port:58011
2023-07-12 17:55:14.193 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的 path:/234
2023-07-12 17:55:14.195 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 一个客户端发送了消息....
2023-07-12 17:55:14.195 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的ip:127.0.0.1
2023-07-12 17:55:14.195 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的port:58011
2023-07-12 17:55:14.195 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的path:/234
2023-07-12 17:55:14.195 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端发送的msg:
2023-07-12 17:55:14.572 INFO 952 --- [ Timer-1] com.example.demo.util.WsktUtil : --当前共有1个websocket连接---
2023-07-12 17:55:14.572 INFO 952 --- [ Timer-1] com.example.demo.util.WsktUtil : 第1个客户端的ip:127.0.0.1,port:58011,path:/234
2023-07-12 17:55:14.572 INFO 952 --- [ Timer-1] com.example.demo.util.WsktUtil : --------------------------
2023-07-12 17:55:15.996 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 一个客户端发送了消息....
2023-07-12 17:55:15.996 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的ip:127.0.0.1
2023-07-12 17:55:15.996 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的port:58011
2023-07-12 17:55:15.997 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端请求的path:/234
2023-07-12 17:55:15.997 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : 客户端发送的msg:ccc
2023-07-12 17:55:15.999 INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil : server 发布消息:ddd给客户端ip:127.0.0.1,port:58011,path:/234
2023-07-12 17:55:16.570 INFO 952 --- [ Timer-0] com.example.demo.util.WsktUtil : server 推送消息:server push:1689155716570给客户端ip:127.0.0.1,port:58011,path:/234
客户端案例
自定义一个WebSocketClient子类
package cn.demo.wskt.client;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
public class MyClient extends WebSocketClient {
public MyClient(URI serverUri) {
super(serverUri);
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println("ws open");
System.out.println(serverHandshake.getHttpStatus());
System.out.println(serverHandshake.getHttpStatusMessage());
}
@Override
public void onMessage(String s) {
System.out.println("ws onMessage: "+s);
}
@Override
public void onClose(int code, String reason, boolean remote) {
System.out.println("ws close");
System.out.println("code: "+ code);
System.out.println("reason: "+ reason);
System.out.println("remote: "+ remote);
}
@Override
public void onError(Exception ex) {
ex.printStackTrace();
}
}
测试连接ws服务器
package cn.demo.wskt.client;
import org.java_websocket.enums.ReadyState;
import java.net.URI;
import java.net.URISyntaxException