请选择 进入手机版 | 继续访问电脑版
MSIPO技术圈 首页 IT技术 查看内容

一种轻量级websocket实现方案

2023-07-13

一种轻量级websocket服务器和客户端实现方案

pom依赖

<dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.5.3</version>
        </dependency>

服务器端案例

定义ws服务器工具类WsktUtil

package com.example.demo.util;

import cn.hutool.core.collection.ListUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;

import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

@Slf4j
public class WsktUtil extends WebSocketServer {


    private static List<WebSocket> clients = new CopyOnWriteArrayList<>();
    private static int port = 8843;


    private WsktUtil() {
        super(new InetSocketAddress(port));
    }

    private static final WsktUtil wskt = new WsktUtil();


    @Override
    public void onOpen(WebSocket conn, ClientHandshake handshake) {
        log.info("一个新客户端打开连接...");
        conn.send("Welcome to link wskt server!");


        //客户端ip
        String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
        log.info("客户端请求的ip:{}", ip);
        int port = conn.getRemoteSocketAddress().getPort();
        log.info("客户端的port:{}", port);
        //客户端请求的 websocket path
        String resourceDescriptor = handshake.getResourceDescriptor();
        log.info("客户端请求的  path:{}", resourceDescriptor);
        clients.add(conn);


    }

    @Override
    public void onClose(WebSocket conn, int code, String reason, boolean remote) {
        log.warn("一个客户端断开websocket连接...");
        String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
        log.info("客户端请求的ip:{}", ip);
        int port = conn.getRemoteSocketAddress().getPort();
        log.info("客户端的port:{}", port);
        String resourceDescriptor = conn.getResourceDescriptor();
        log.info("客户端请求的 path:{}", resourceDescriptor);
        log.info("code:{}", code);
        log.info("reason:{}", reason);
        log.info("remote:{}", remote);
        clients.remove(conn);
    }

    @Override
    public void onMessage(WebSocket conn, String message) {
        log.info("一个客户端发送了消息....");
        String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
        log.info("客户端请求的ip:{}", ip);
        int port = conn.getRemoteSocketAddress().getPort();
        log.info("客户端请求的port:{}", port);
        String resourceDescriptor = conn.getResourceDescriptor();
        log.info("客户端请求的path:{}", resourceDescriptor);
        log.info("客户端发送的msg:{}", message);

        handleClientReqMsg(conn, message);

    }

    @Override
    public void onError(WebSocket conn, Exception ex) {
        ex.printStackTrace();
        if (conn != null) {
            // some errors like port binding failed may not be assignable to a specific
            // websocket
            String ip = conn.getRemoteSocketAddress().getAddress().getHostAddress();
            log.info("异常客户端的ip:{}", ip);
            int port = conn.getRemoteSocketAddress().getPort();
            log.info("异常客户端的port:{}", port);
            String resourceDescriptor = conn.getResourceDescriptor();
            log.info("异常客户端的path:{}", resourceDescriptor);


        }
    }

    @Override
    public void onStart() {
        System.out.println("wskt Server started!");
        setConnectionLostTimeout(100); //连接丢失超时时间100s

        //启动后定时打印 客户端连接信息
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                printCurrentConns();
            }
        }, 1000, 3000); //延迟1s后,每3秒打印一次
    }

    private static void printCurrentConns() {
        int size = clients.size();
        log.info("--当前共有{}个websocket连接---", size);
        if (size > 0) {
            for (int i = 0; i < clients.size(); i++) {
                String ip = clients.get(i).getRemoteSocketAddress().getAddress().getHostAddress();
                int port = clients.get(i).getRemoteSocketAddress().getPort();
                String resourceDescriptor = clients.get(i).getResourceDescriptor();
                log.info("第{}个客户端的ip:{},port:{},path:{}", i + 1, ip, port, resourceDescriptor);
            }
        }
        log.info("--------------------------");
    }


    public static void startServer() throws UnknownHostException {
        wskt.start();
        log.info("wsktServer started on port: {}", wskt.getPort());
    }


    private static void handleClientReqMsg(WebSocket conn, String reqStr) {
        // 可以在这里处理req/resp形式的ws请求
        if (conn.getResourceDescriptor().equals("/123")) {
            if (reqStr.equals("aaa")) {
                publishMsgToClient("bbb", conn);
            }
        }
        if (conn.getResourceDescriptor().equals("/234")) {
            if (reqStr.equals("ccc")) {
                publishMsgToClient("ddd", conn);
            }
        }
    }


    public static void publishMsgToClient(String msg, WebSocket targetClient) {
        if (targetClient == null) {
            return;
        }
        String ip = targetClient.getRemoteSocketAddress().getAddress().getHostAddress();
        int port = targetClient.getRemoteSocketAddress().getPort();
        String resourceDescriptor = targetClient.getResourceDescriptor();
        wskt.broadcast(msg, ListUtil.toList(targetClient));
        log.info("server 发布消息:{}给客户端ip:{},port:{},path:{}", msg, ip, port, resourceDescriptor);
    }

    public static void publishMsgToSomeClients(String msg, Collection<WebSocket> clients) {
        if (clients == null || clients.size() == 0) {
            return;
        }
        wskt.broadcast(msg, clients);
        log.info("server 广播消息:{}", msg);
    }


    /**
     * ws://127.0.0.1:8843/123
     *
     * @param ip   127.0.0.1
     * @param port 8843
     * @param path /123
     * @return WebSocket
     */
    public static WebSocket getOneClient(String ip, int port, String path) {
        if (clients.size() > 0) {
            for (WebSocket client : clients) {
                if (client != null) {
                    int cPort = client.getRemoteSocketAddress().getPort();
                    String cip = client.getRemoteSocketAddress().getAddress().getHostAddress();
                    String cPath = client.getResourceDescriptor();
                    if (ip.equals(cip) && port == cPort && path.equals(cPath)) {
                        return client;
                    }
                }
            }
        }
        return null;
    }

    public static void serverPushClientTest(String msg) {
        if (clients.size() > 0) {
            for (WebSocket client : clients) {
                if (client != null) {
                    String ip = client.getRemoteSocketAddress().getAddress().getHostAddress();
                    int port = client.getRemoteSocketAddress().getPort();
                    String resourceDescriptor = client.getResourceDescriptor();

                    wskt.broadcast(msg, ListUtil.toList(client));
                    log.info("server 推送消息:{}给客户端ip:{},port:{},path:{}", msg, ip, port, resourceDescriptor);
                }
            }
        }
    }

}

开机启动ws服务器

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) throws UnknownHostException {
        SpringApplication.run(DemoApplication.class, args);

        WsktUtil.startServer();
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                WsktUtil.serverPushClientTest("server push:" + System.currentTimeMillis());
            }
        }, 6000, 5000);
    }

}

测试结果

2023-07-12 17:55:10.552  INFO 952 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8621 (http) with context path '/mybatis'
2023-07-12 17:55:10.563  INFO 952 --- [           main] com.example.demo.DemoApplication         : Started DemoApplication in 3.046 seconds (JVM running for 3.791)
2023-07-12 17:55:10.569  INFO 952 --- [           main] com.example.demo.util.WsktUtil           : wsktServer started on port: 8843
wskt Server started!
2023-07-12 17:55:11.572  INFO 952 --- [        Timer-1] com.example.demo.util.WsktUtil           : --当前共有0个websocket连接---
2023-07-12 17:55:11.572  INFO 952 --- [        Timer-1] com.example.demo.util.WsktUtil           : --------------------------
2023-07-12 17:55:14.190  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 一个新客户端打开连接...
2023-07-12 17:55:14.193  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的ip:127.0.0.1
2023-07-12 17:55:14.193  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端的port:58011
2023-07-12 17:55:14.193  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的  path:/234
2023-07-12 17:55:14.195  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 一个客户端发送了消息....
2023-07-12 17:55:14.195  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的ip:127.0.0.1
2023-07-12 17:55:14.195  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的port:58011
2023-07-12 17:55:14.195  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的path:/234
2023-07-12 17:55:14.195  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端发送的msg:
2023-07-12 17:55:14.572  INFO 952 --- [        Timer-1] com.example.demo.util.WsktUtil           : --当前共有1个websocket连接---
2023-07-12 17:55:14.572  INFO 952 --- [        Timer-1] com.example.demo.util.WsktUtil           : 第1个客户端的ip:127.0.0.1,port:58011,path:/234
2023-07-12 17:55:14.572  INFO 952 --- [        Timer-1] com.example.demo.util.WsktUtil           : --------------------------
2023-07-12 17:55:15.996  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 一个客户端发送了消息....
2023-07-12 17:55:15.996  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的ip:127.0.0.1
2023-07-12 17:55:15.996  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的port:58011
2023-07-12 17:55:15.997  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端请求的path:/234
2023-07-12 17:55:15.997  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : 客户端发送的msg:ccc
2023-07-12 17:55:15.999  INFO 952 --- [SocketWorker-43] com.example.demo.util.WsktUtil           : server 发布消息:ddd给客户端ip:127.0.0.1,port:58011,path:/234
2023-07-12 17:55:16.570  INFO 952 --- [        Timer-0] com.example.demo.util.WsktUtil           : server 推送消息:server push:1689155716570给客户端ip:127.0.0.1,port:58011,path:/234

客户端案例

自定义一个WebSocketClient子类

package cn.demo.wskt.client;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;

public class MyClient extends WebSocketClient {
    public MyClient(URI serverUri) {
        super(serverUri);
    }

    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        System.out.println("ws open");
        System.out.println(serverHandshake.getHttpStatus());
        System.out.println(serverHandshake.getHttpStatusMessage());
    }

    @Override
    public void onMessage(String s) {
        System.out.println("ws onMessage: "+s);
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("ws close");
        System.out.println("code: "+ code);
        System.out.println("reason: "+ reason);
        System.out.println("remote: "+ remote);
    }

    @Override
    public void onError(Exception ex) {
        ex.printStackTrace();
    }
}

测试连接ws服务器

package cn.demo.wskt.client;

import org.java_websocket.enums.ReadyState;

import java.net.URI;
import java.net.URISyntaxException

相关阅读

热门文章