Browse Source

new: 接入socket服务

lwh 3 weeks ago
parent
commit
1603c23bb1

+ 7 - 1
pig-statistics/pig-statistics-biz/pom.xml

@@ -100,7 +100,13 @@
 			<groupId>org.springframework.boot</groupId>
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-undertow</artifactId>
 			<artifactId>spring-boot-starter-undertow</artifactId>
 		</dependency>
 		</dependency>
-	</dependencies>
+        <dependency>
+            <groupId>javax.annotation</groupId>
+            <artifactId>javax.annotation-api</artifactId>
+            <version>1.3.2</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
 
 
 	<profiles>
 	<profiles>
 		<profile>
 		<profile>

+ 57 - 0
pig-statistics/pig-statistics-biz/src/main/java/com/pig4cloud/pig/statistics/socket/ClientHandler.java

@@ -0,0 +1,57 @@
+package com.pig4cloud.pig.statistics.socket;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+/**
+ * 客户端处理器,负责与单个客户端的所有数据交互
+ */
+public class ClientHandler implements Runnable {
+    private final Socket clientSocket;
+    private final ProtocolHandler protocolHandler;
+    private final DataProcessor dataProcessor;
+
+    public ClientHandler(Socket clientSocket, ProtocolHandler protocolHandler, DataProcessor dataProcessor) {
+        this.clientSocket = clientSocket;
+        this.protocolHandler = protocolHandler;
+        this.dataProcessor = dataProcessor;
+    }
+
+    @Override
+    public void run() {
+        try (InputStream in = clientSocket.getInputStream();
+             OutputStream out = clientSocket.getOutputStream()) {
+
+            System.out.println("开始处理客户端数据: " + clientSocket.getInetAddress());
+            
+            // 循环处理客户端发送的数据包
+            while (!clientSocket.isClosed()) {
+                // 使用协议处理器解析数据包
+                Packet packet = protocolHandler.readPacket(in);
+                if (packet == null) {
+                    break; // 连接关闭
+                }
+                
+                // 处理数据包并获取响应
+                byte[] responseData = dataProcessor.processData(packet.getType(), packet.getContent());
+                
+                // 发送响应
+                protocolHandler.sendPacket(out, packet.getType(), responseData);
+            }
+        } catch (IOException e) {
+            if (!clientSocket.isClosed()) {
+                System.err.println("客户端处理错误: " + e.getMessage());
+            }
+        } finally {
+            try {
+                clientSocket.close();
+                System.out.println("客户端连接已关闭: " + clientSocket.getInetAddress());
+            } catch (IOException e) {
+                System.err.println("关闭客户端连接错误: " + e.getMessage());
+            }
+        }
+    }
+}
+    

+ 22 - 0
pig-statistics/pig-statistics-biz/src/main/java/com/pig4cloud/pig/statistics/socket/ClientHandlerFactory.java

@@ -0,0 +1,22 @@
+package com.pig4cloud.pig.statistics.socket;
+
+import java.net.Socket;
+
+/**
+ * 客户端处理器工厂,负责创建ClientHandler实例
+ * 解耦TcpServer与ClientHandler的依赖关系
+ */
+public class ClientHandlerFactory {
+    /**
+     * 创建客户端处理器实例
+     */
+    public ClientHandler createClientHandler(Socket clientSocket) {
+        // 创建所需的依赖组件
+        ProtocolHandler protocolHandler = new ProtocolHandler();
+        DataProcessor dataProcessor = new DataProcessor();
+        
+        // 创建并返回ClientHandler实例
+        return new ClientHandler(clientSocket, protocolHandler, dataProcessor);
+    }
+}
+    

+ 91 - 0
pig-statistics/pig-statistics-biz/src/main/java/com/pig4cloud/pig/statistics/socket/DataProcessor.java

@@ -0,0 +1,91 @@
+package com.pig4cloud.pig.statistics.socket;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+/**
+ * 数据处理器,负责实际的业务逻辑处理
+ */
+public class DataProcessor {
+    // 图片保存路径
+    private static final String IMAGE_SAVE_PATH = "images";
+
+    public DataProcessor() {
+        // 确保图片目录存在
+        createImageDirectory();
+    }
+
+    /**
+     * 处理接收到的数据
+     * @param dataType 数据类型
+     * @param content 数据内容
+     * @return 响应数据
+     */
+    public byte[] processData(int dataType, byte[] content) {
+        try {
+            if (dataType == ProtocolHandler.TYPE_TEXT) {
+                // 文本数据,原路返回
+                String text = new String(content, "UTF-8");
+                System.out.println("收到文本: " + text);
+				System.out.println("文本长度: " + text.length());
+                return content;
+                
+            } else if (dataType == ProtocolHandler.TYPE_IMAGE) {
+                // 图片数据,保存并返回路径
+                String fileName = generateImageFileName();
+                String filePath = IMAGE_SAVE_PATH + File.separator + fileName;
+
+                // 保存图片
+                Files.write(Paths.get(filePath), content);
+                System.out.println("图片已保存: " + filePath);
+				System.out.println("图片大小: " + formatFileSize(content.length));
+                // 返回保存路径
+                return filePath.getBytes("UTF-8");
+                
+            } else {
+                // 未知数据类型
+                String errorMsg = "未知的数据类型: " + dataType;
+                System.err.println(errorMsg);
+                return errorMsg.getBytes("UTF-8");
+            }
+        } catch (IOException e) {
+            String errorMsg = "处理数据错误: " + e.getMessage();
+            System.err.println(errorMsg);
+            return errorMsg.getBytes();
+        }
+    }
+	// 辅助方法:将字节数格式化为更易读的单位(KB, MB等)
+	private String formatFileSize(long bytes) {
+		if (bytes < 1024) {
+			return bytes + " B";
+		} else if (bytes < 1024 * 1024) {
+			return String.format("%.2f KB", bytes / 1024.0);
+		} else if (bytes < 1024 * 1024 * 1024) {
+			return String.format("%.2f MB", bytes / (1024.0 * 1024));
+		} else {
+			return String.format("%.2f GB", bytes / (1024.0 * 1024 * 1024));
+		}
+	}
+
+    /**
+     * 创建图片保存目录
+     */
+    private void createImageDirectory() {
+        File directory = new File(IMAGE_SAVE_PATH);
+        if (!directory.exists()) {
+            directory.mkdirs();
+        }
+    }
+
+    /**
+     * 生成唯一的图片文件名
+     */
+    private String generateImageFileName() {
+        return "img_" + System.currentTimeMillis() + "_" + 
+               UUID.randomUUID().toString().substring(0, 8) + ".jpg";
+    }
+}
+    

+ 23 - 0
pig-statistics/pig-statistics-biz/src/main/java/com/pig4cloud/pig/statistics/socket/Packet.java

@@ -0,0 +1,23 @@
+package com.pig4cloud.pig.statistics.socket;
+
+/**
+ * 数据包模型类,用于封装解析后的数据包
+ */
+public class Packet {
+    private final int type;
+    private final byte[] content;
+
+    public Packet(int type, byte[] content) {
+        this.type = type;
+        this.content = content;
+    }
+
+    public int getType() {
+        return type;
+    }
+
+    public byte[] getContent() {
+        return content;
+    }
+}
+    

+ 127 - 0
pig-statistics/pig-statistics-biz/src/main/java/com/pig4cloud/pig/statistics/socket/ProtocolHandler.java

@@ -0,0 +1,127 @@
+package com.pig4cloud.pig.statistics.socket;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * 协议处理器,负责数据包的解析和构建
+ */
+public class ProtocolHandler {
+    // 协议定义
+    public static final byte PACKET_HEADER = 0x2;
+    public static final byte PACKET_TAIL = 0x3;
+    public static final byte TYPE_TEXT = 0x01;
+    public static final byte TYPE_IMAGE = 0x02;
+
+    /**
+     * 从输入流读取一个完整的数据包
+     */
+    public Packet readPacket(InputStream in) throws IOException {
+        ByteArrayOutputStream packetBuffer = new ByteArrayOutputStream();
+        boolean inPacket = false;
+        int dataType = -1;
+        byte[] lengthBytes = new byte[4];
+        int lengthIndex = 0;
+        int contentLength = -1;
+        ByteArrayOutputStream contentBuffer = new ByteArrayOutputStream();
+
+        int b;
+        while ((b = in.read()) != -1) {
+            byte currentByte = (byte) b;
+
+            // 处理包头
+            if (!inPacket && currentByte == PACKET_HEADER) {
+                inPacket = true;
+                packetBuffer.reset();
+                packetBuffer.write(currentByte);
+                lengthIndex = 0;
+                contentLength = -1;
+                contentBuffer.reset();
+                continue;
+            }
+
+            // 处理包内数据
+            if (inPacket) {
+                packetBuffer.write(currentByte);
+
+                // 读取数据类型
+                if (dataType == -1) {
+                    dataType = currentByte & 0xFF;
+                    continue;
+                }
+
+                // 读取数据长度(4字节)
+                if (contentLength == -1) {
+                    lengthBytes[lengthIndex++] = currentByte;
+                    if (lengthIndex == 4) {
+                        // 转换4字节为整数(大端模式)
+                        contentLength = ((lengthBytes[0] & 0xFF) << 24) |
+                                       ((lengthBytes[1] & 0xFF) << 16) |
+                                       ((lengthBytes[2] & 0xFF) << 8) |
+                                       (lengthBytes[3] & 0xFF);
+                    }
+                    continue;
+                }
+
+                // 读取数据内容
+                if (contentBuffer.size() < contentLength) {
+                    contentBuffer.write(currentByte);
+
+                    // 检查是否已读取所有内容
+                    if (contentBuffer.size() == contentLength) {
+                        // 接下来应该是包尾
+                        continue;
+                    }
+                } else {
+                    // 检查包尾
+                    if (currentByte == PACKET_TAIL) {
+                        // 完整包接收完成,返回数据包
+                        return new Packet(dataType, contentBuffer.toByteArray());
+                    } else {
+                        // 包尾不正确,视为无效包
+                        System.err.println("无效的包尾,丢弃数据包");
+                        inPacket = false;
+                        dataType = -1;
+                        contentLength = -1;
+                    }
+                }
+            }
+        }
+
+        // 到达流末尾,返回null
+        return null;
+    }
+
+    /**
+     * 发送一个数据包到输出流
+     */
+    public void sendPacket(OutputStream out, int dataType, byte[] data) throws IOException {
+        // 构造响应包
+        ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
+
+        // 写入包头
+        responseBuffer.write(PACKET_HEADER);
+
+        // 写入数据类型
+        responseBuffer.write(dataType);
+
+        // 写入数据长度(4字节,大端模式)
+        responseBuffer.write((data.length >> 24) & 0xFF);
+        responseBuffer.write((data.length >> 16) & 0xFF);
+        responseBuffer.write((data.length >> 8) & 0xFF);
+        responseBuffer.write(data.length & 0xFF);
+
+        // 写入数据内容
+        responseBuffer.write(data);
+
+        // 写入包尾
+        responseBuffer.write(PACKET_TAIL);
+
+        // 发送响应
+        out.write(responseBuffer.toByteArray());
+        out.flush();
+    }
+}
+    

+ 221 - 0
pig-statistics/pig-statistics-biz/src/main/java/com/pig4cloud/pig/statistics/socket/TcpClient.java

@@ -0,0 +1,221 @@
+package com.pig4cloud.pig.statistics.socket;
+
+import java.io.*;
+import java.net.*;
+import java.nio.file.*;
+
+/**
+ * TCP客户端测试类,用于测试服务器功能
+ * 支持发送文本消息和图片文件
+ */
+public class TcpClient {
+    // 协议定义,与服务器保持一致
+    private static final byte PACKET_HEADER = 0x2;
+    private static final byte PACKET_TAIL = 0x3;
+    private static final byte TYPE_TEXT = 0x01;
+    private static final byte TYPE_IMAGE = 0x02;
+    
+    private String serverHost;
+    private int serverPort;
+    private Socket socket;
+    private InputStream in;
+    private OutputStream out;
+    
+    public TcpClient(String serverHost, int serverPort) {
+        this.serverHost = serverHost;
+        this.serverPort = serverPort;
+    }
+    
+    /**
+     * 连接到服务器
+     */
+    public void connect() throws IOException {
+        socket = new Socket(serverHost, serverPort);
+        in = socket.getInputStream();
+        out = socket.getOutputStream();
+        System.out.println("已连接到服务器: " + serverHost + ":" + serverPort);
+    }
+    
+    /**
+     * 发送文本消息到服务器
+     */
+    public String sendText(String text) throws IOException {
+        if (socket == null || socket.isClosed()) {
+            throw new IOException("未连接到服务器,请先调用connect()方法");
+        }
+        
+        byte[] data = text.getBytes("UTF-8");
+        sendPacket(TYPE_TEXT, data);
+        return receiveResponse();
+    }
+    
+    /**
+     * 发送图片文件到服务器
+     */
+    public String sendImage(String imageFilePath) throws IOException {
+        if (socket == null || socket.isClosed()) {
+            throw new IOException("未连接到服务器,请先调用connect()方法");
+        }
+        
+        // 读取图片文件
+        byte[] imageData = Files.readAllBytes(Paths.get(imageFilePath));
+        sendPacket(TYPE_IMAGE, imageData);
+        return receiveResponse();
+    }
+    
+    /**
+     * 按照协议格式发送数据包
+     */
+    private void sendPacket(int type, byte[] data) throws IOException {
+        ByteArrayOutputStream packetBuffer = new ByteArrayOutputStream();
+        
+        // 写入包头
+        packetBuffer.write(PACKET_HEADER);
+        
+        // 写入数据类型
+        packetBuffer.write(type);
+        
+        // 写入数据长度(4字节,大端模式)
+        packetBuffer.write((data.length >> 24) & 0xFF);
+        packetBuffer.write((data.length >> 16) & 0xFF);
+        packetBuffer.write((data.length >> 8) & 0xFF);
+        packetBuffer.write(data.length & 0xFF);
+        
+        // 写入数据内容
+        packetBuffer.write(data);
+        
+        // 写入包尾
+        packetBuffer.write(PACKET_TAIL);
+        
+        // 发送数据包
+        out.write(packetBuffer.toByteArray());
+        out.flush();
+    }
+    
+    /**
+     * 接收服务器响应
+     */
+    private String receiveResponse() throws IOException {
+        // 解析服务器响应的数据包
+        ByteArrayOutputStream packetBuffer = new ByteArrayOutputStream();
+        boolean inPacket = false;
+        int dataType = -1;
+        byte[] lengthBytes = new byte[4];
+        int lengthIndex = 0;
+        int contentLength = -1;
+        ByteArrayOutputStream contentBuffer = new ByteArrayOutputStream();
+        
+        int b;
+        while ((b = in.read()) != -1) {
+            byte currentByte = (byte) b;
+            
+            // 处理包头
+            if (!inPacket && currentByte == PACKET_HEADER) {
+                inPacket = true;
+                packetBuffer.reset();
+                packetBuffer.write(currentByte);
+                lengthIndex = 0;
+                contentLength = -1;
+                contentBuffer.reset();
+                continue;
+            }
+            
+            // 处理包内数据
+            if (inPacket) {
+                packetBuffer.write(currentByte);
+                
+                // 读取数据类型
+                if (dataType == -1) {
+                    dataType = currentByte & 0xFF;
+                    continue;
+                }
+                
+                // 读取数据长度(4字节)
+                if (contentLength == -1) {
+                    lengthBytes[lengthIndex++] = currentByte;
+                    if (lengthIndex == 4) {
+                        // 转换4字节为整数(大端模式)
+                        contentLength = ((lengthBytes[0] & 0xFF) << 24) |
+                                       ((lengthBytes[1] & 0xFF) << 16) |
+                                       ((lengthBytes[2] & 0xFF) << 8) |
+                                       (lengthBytes[3] & 0xFF);
+                    }
+                    continue;
+                }
+                
+                // 读取数据内容
+                if (contentBuffer.size() < contentLength) {
+                    contentBuffer.write(currentByte);
+                    
+                    // 检查是否已读取所有内容
+                    if (contentBuffer.size() == contentLength) {
+                        // 接下来应该是包尾
+                        continue;
+                    }
+                } else {
+                    // 检查包尾
+                    if (currentByte == PACKET_TAIL) {
+                        // 完整包接收完成,返回内容
+                        return new String(contentBuffer.toByteArray(), "UTF-8");
+                    } else {
+                        // 包尾不正确,视为无效包
+                        throw new IOException("收到无效的数据包,包尾不正确");
+                    }
+                }
+            }
+        }
+        
+        throw new IOException("与服务器的连接已关闭");
+    }
+    
+    /**
+     * 关闭与服务器的连接
+     */
+    public void disconnect() {
+        try {
+            if (in != null) in.close();
+            if (out != null) out.close();
+            if (socket != null && !socket.isClosed()) {
+                socket.close();
+                System.out.println("已断开与服务器的连接");
+            }
+        } catch (IOException e) {
+            System.err.println("关闭连接时发生错误: " + e.getMessage());
+        }
+    }
+    
+    /**
+     * 测试方法
+     */
+    public static void main(String[] args) {
+        // 服务器地址和端口
+        String host = "localhost";
+        int port = 8888;
+        
+        TcpClient client = new TcpClient(host, port);
+        
+        try {
+            // 连接服务器
+            client.connect();
+            
+            // 测试发送文本
+            String text = "Hello, TCP Server!";
+            System.out.println("发送文本: " + text);
+            String textResponse = client.sendText(text);
+            System.out.println("服务器响应: " + textResponse);
+            
+            // 测试发送图片(请替换为实际的图片路径)
+            String imagePath = "C:\\Users\\L\\Desktop\\素材\\image16.png"; // 测试图片路径
+            System.out.println("发送图片: " + imagePath);
+            String imageResponse = client.sendImage(imagePath);
+            System.out.println("服务器响应: " + imageResponse);
+            
+        } catch (IOException e) {
+            System.err.println("客户端错误: " + e.getMessage());
+            e.printStackTrace();
+        } finally {
+            // 断开连接
+            client.disconnect();
+        }
+    }
+}

+ 82 - 0
pig-statistics/pig-statistics-biz/src/main/java/com/pig4cloud/pig/statistics/socket/TcpServer.java

@@ -0,0 +1,82 @@
+package com.pig4cloud.pig.statistics.socket;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+/**
+ * TCP服务器主类,负责监听端口和管理客户端连接
+ */
+@Slf4j
+@Component
+public class TcpServer {
+	@Value("${tcp.server.port}")
+	private int port;
+
+	private ServerSocket serverSocket;
+	private boolean isRunning;
+	private final ClientHandlerFactory clientHandlerFactory;
+
+	public TcpServer() {
+		this.clientHandlerFactory = new ClientHandlerFactory();
+	}
+
+	/**
+	 * 启动服务器,Spring初始化后自动调用
+	 */
+	@PostConstruct
+	public void start() throws IOException {
+		serverSocket = new ServerSocket(port);
+		isRunning = true;
+		log.info("TCP服务器已启动,监听端口: {}", port);
+
+		// 启动接受客户端连接的线程
+		new Thread(this::acceptConnections, "ConnectionAcceptor").start();
+	}
+
+	/**
+	 * 接受客户端连接的循环
+	 */
+	private void acceptConnections() {
+		while (isRunning) {
+			try {
+				// 接受客户端连接
+				Socket clientSocket = serverSocket.accept();
+				log.info("新客户端连接: {}", clientSocket.getInetAddress().getHostAddress());
+
+				// 创建并启动客户端处理器
+				ClientHandler clientHandler = clientHandlerFactory.createClientHandler(clientSocket);
+				new Thread(clientHandler, "ClientHandler-" + clientSocket.getInetAddress()).start();
+			} catch (IOException e) {
+				if (isRunning) {
+					log.error("接受客户端连接错误: {}", e.getMessage(), e);
+				} else {
+					log.info("服务器已停止,不再接受新连接");
+				}
+			}
+		}
+	}
+
+	/**
+	 * 停止服务器,Spring销毁前自动调用
+	 */
+	@PreDestroy
+	public void stop() {
+		isRunning = false;
+		try {
+			if (serverSocket != null) {
+				serverSocket.close();
+			}
+			log.info("TCP服务器已停止");
+		} catch (IOException e) {
+			log.error("停止服务器错误: {}", e.getMessage(), e);
+		}
+	}
+}
+    

+ 7 - 1
pig-statistics/pig-statistics-biz/src/main/resources/application.yml

@@ -20,4 +20,10 @@ spring:
   config:
   config:
     import:
     import:
       - nacos:application.yml
       - nacos:application.yml
-      - nacos:${spring.application.name}.yml
+      - nacos:${spring.application.name}.yml
+
+
+# TCP服务器配置
+tcp:
+  server:
+    port: 8888