Przeglądaj źródła

new: redis缓存在线用户

lwh 5 dni temu
rodzic
commit
cd8c9f807f

+ 160 - 2
pig-marketing/pig-marketing-biz/src/main/java/com/pig4cloud/pig/marketing/service/impl/TcpDataServiceImpl.java

@@ -31,11 +31,14 @@ import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.data.mongodb.core.query.Criteria;
 import org.springframework.data.mongodb.core.query.Query;
 import org.springframework.data.mongodb.core.query.Update;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ValueOperations;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -63,7 +66,12 @@ import java.util.stream.Collectors;
 
 		private final RemoteStatUserDataService remoteStatUserDataService;
 
-		// JSON处理器
+		// 添加 RedisTemplate 依赖
+		@Autowired
+		private RedisTemplate<String, Object> redisTemplate;
+
+
+	// JSON处理器
 		private final ObjectMapper objectMapper = new ObjectMapper();
 
 	// 定义时间格式器
@@ -447,6 +455,9 @@ import java.util.stream.Collectors;
 			Query query = new Query(Criteria.where("clientID").is(reqDto.getClientID()));
 			Device device = mongoTemplate.findOne(query, Device.class);
 			syncActiveUserData(device);
+
+			// 添加用户统计逻辑
+			updateUserStatistics(reqDto.getClientID());
 		}
 
 		log.info("保存推送记录:{}--{}", s,hand);
@@ -580,7 +591,8 @@ import java.util.stream.Collectors;
 			Long totalUsers = getTotalUsers();
 			
 			// 2. 统计活跃用户数
-			Long activeUsers = getActiveUsers();
+//			Long activeUsers = getActiveUsers();
+			Long activeUsers = getActiveUserCount();
 			
 			// 3. 构建返回结果
 			UserStatisticsVO statisticsVO = new UserStatisticsVO();
@@ -663,6 +675,17 @@ import java.util.stream.Collectors;
 	@Override
 	public List<OnlineUserVO> getOnlineUsers() {
 		log.info("开始查询在线用户列表");
+		try {
+			// 使用Redis获取十分钟内活跃的用户
+			return getActiveUsersFromRedis();
+
+		} catch (Exception e) {
+			log.error("查询在线用户列表异常", e);
+			return new ArrayList<>();
+		}
+	}
+	public List<OnlineUserVO> getOnlineUsers2() {
+		log.info("开始查询在线用户列表");
 		
 		try {
 			// 计算最近10分钟的时间范围
@@ -877,4 +900,139 @@ import java.util.stream.Collectors;
 		}
 		return "未知";
 	}
+
+
+	/**
+	 * 更新用户统计信息
+	 * 使用Redis原子操作统计十分钟内的用户数
+	 * @param clientId 客户端ID
+	 */
+	private void updateUserStatistics(String clientId) {
+		try {
+			String userActiveKey = "tcp:user:active:" + clientId;
+			String activeUserCountKey = "tcp:user:count:10min";
+
+			long currentTime = System.currentTimeMillis();
+			long tenMinutesAgo = currentTime - (10 * 60 * 1000);
+
+			ValueOperations<String, Object> valueOps = redisTemplate.opsForValue();
+			Object lastActiveTimeObj = valueOps.get(userActiveKey);
+
+			boolean isNewUserInWindow = false;
+
+			// 添加详细日志
+			log.info("用户统计调试 - 客户端ID: {}, 当前时间: {}, 10分钟前: {}",
+					clientId, currentTime, tenMinutesAgo);
+
+			if (lastActiveTimeObj == null) {
+				log.info("用户首次活跃 - 客户端ID: {}", clientId);
+				isNewUserInWindow = true;
+			} else {
+				long lastActiveTime = Long.parseLong(lastActiveTimeObj.toString());
+				log.info("用户上次活跃时间: {}, 是否超过10分钟: {}",
+						lastActiveTime, lastActiveTime < tenMinutesAgo);
+				if (lastActiveTime < tenMinutesAgo) {
+					isNewUserInWindow = true;
+				}
+			}
+
+			// 更新用户活跃时间
+			valueOps.set(userActiveKey, currentTime, 10, TimeUnit.MINUTES);
+			log.info("更新用户活跃时间完成 - 客户端ID: {}, isNewUserInWindow: {}",
+					clientId, isNewUserInWindow);
+
+			// 如果是十分钟内的新用户,增加计数
+			if (isNewUserInWindow) {
+				log.info("开始增加用户计数 - 客户端ID: {}", clientId);
+				Long currentCount = valueOps.increment(activeUserCountKey);
+				log.info("increment 操作完成,当前计数: {}", currentCount);
+
+				if (currentCount == 1) {
+					redisTemplate.expire(activeUserCountKey, 10, TimeUnit.MINUTES);
+					log.info("设置计数key过期时间完成");
+				}
+				log.info("用户统计更新 - 客户端ID: {}, 十分钟内活跃用户数: {}", clientId, currentCount);
+			} else {
+				log.info("用户不在10分钟窗口内,跳过计数 - 客户端ID: {}", clientId);
+			}
+
+		} catch (Exception e) {
+			log.error("更新用户统计信息失败,客户端ID: {}", clientId, e);
+		}
+	}
+
+	/**
+	 * 获取十分钟内活跃用户数
+	 * @return 活跃用户数
+	 */
+	public Long getActiveUserCount() {
+		try {
+			String pattern = "tcp:user:active:*";
+			Set<String> userActiveKeys = redisTemplate.keys(pattern);
+			return userActiveKeys != null ? (long) userActiveKeys.size() : 0L;
+		} catch (Exception e) {
+			log.error("获取活跃用户数失败", e);
+			return 0L;
+		}
+	}
+
+	private List<OnlineUserVO> getActiveUsersFromRedis() {
+		try {
+			// 获取所有用户活跃时间key
+			String pattern = "tcp:user:active:*";
+			Set<String> userActiveKeys = redisTemplate.keys(pattern);
+
+			if (userActiveKeys == null || userActiveKeys.isEmpty()) {
+				log.info("Redis中没有找到活跃用户数据");
+				return new ArrayList<>();
+			}
+
+			long currentTime = System.currentTimeMillis();
+			long tenMinutesAgo = currentTime - (10 * 60 * 1000); // 10分钟前的时间戳
+
+			ValueOperations<String, Object> valueOps = redisTemplate.opsForValue();
+			List<OnlineUserVO> activeUsers = new ArrayList<>();
+
+			for (String userActiveKey : userActiveKeys) {
+				try {
+					// 从key中提取clientId
+					String clientId = userActiveKey.substring("tcp:user:active:".length());
+
+					// 获取用户最后活跃时间
+					Object lastActiveTimeObj = valueOps.get(userActiveKey);
+					if (lastActiveTimeObj != null) {
+						long lastActiveTime = Long.parseLong(lastActiveTimeObj.toString());
+
+						// 检查是否在十分钟内活跃
+						if (lastActiveTime >= tenMinutesAgo) {
+							OnlineUserVO onlineUserVO = new OnlineUserVO();
+							onlineUserVO.setClientID(clientId);
+
+							// 将时间戳转换为LocalDateTime
+							LocalDateTime lastActiveDateTime = LocalDateTime.ofInstant(
+									java.time.Instant.ofEpochMilli(lastActiveTime),
+									java.time.ZoneId.systemDefault()
+							);
+							onlineUserVO.setLastActiveTime(lastActiveDateTime);
+
+							activeUsers.add(onlineUserVO);
+						}
+					}
+				} catch (Exception e) {
+					log.warn("处理用户活跃数据失败,key: {}", userActiveKey, e);
+				}
+			}
+
+			// 按最后活跃时间倒序排列
+			activeUsers.sort(Comparator.comparing(OnlineUserVO::getLastActiveTime).reversed());
+
+			log.info("从Redis获取到 {} 个十分钟内活跃用户", activeUsers.size());
+			return activeUsers;
+
+		} catch (Exception e) {
+			log.error("从Redis获取活跃用户列表失败", e);
+			return new ArrayList<>();
+		}
+	}
+
 }