Gateway.php 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<[email protected]>
  10. * @copyright walkor<[email protected]>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace GatewayWorker;
  15. use GatewayWorker\Lib\Context;
  16. use Workerman\Connection\TcpConnection;
  17. use Workerman\Worker;
  18. use Workerman\Lib\Timer;
  19. use Workerman\Autoloader;
  20. use Workerman\Connection\AsyncTcpConnection;
  21. use GatewayWorker\Protocols\GatewayProtocol;
  22. /**
  23. *
  24. * Gateway,基于Worker 开发
  25. * 用于转发客户端的数据给Worker处理,以及转发Worker的数据给客户端
  26. *
  27. * @author walkor<[email protected]>
  28. *
  29. */
  30. class Gateway extends Worker
  31. {
  32. /**
  33. * 版本
  34. *
  35. * @var string
  36. */
  37. const VERSION = '3.0.18';
  38. /**
  39. * 本机 IP
  40. * 单机部署默认 127.0.0.1,如果是分布式部署,需要设置成本机 IP
  41. *
  42. * @var string
  43. */
  44. public $lanIp = '127.0.0.1';
  45. /**
  46. * 本机端口
  47. *
  48. * @var string
  49. */
  50. public $lanPort = 0;
  51. /**
  52. * gateway 内部通讯起始端口,每个 gateway 实例应该都不同,步长1000
  53. *
  54. * @var int
  55. */
  56. public $startPort = 2000;
  57. /**
  58. * 注册服务地址,用于注册 Gateway BusinessWorker,使之能够通讯
  59. *
  60. * @var string|array
  61. */
  62. public $registerAddress = '127.0.0.1:1236';
  63. /**
  64. * 是否可以平滑重启,gateway 不能平滑重启,否则会导致连接断开
  65. *
  66. * @var bool
  67. */
  68. public $reloadable = false;
  69. /**
  70. * 心跳时间间隔
  71. *
  72. * @var int
  73. */
  74. public $pingInterval = 0;
  75. /**
  76. * $pingNotResponseLimit * $pingInterval 时间内,客户端未发送任何数据,断开客户端连接
  77. *
  78. * @var int
  79. */
  80. public $pingNotResponseLimit = 0;
  81. /**
  82. * 服务端向客户端发送的心跳数据
  83. *
  84. * @var string
  85. */
  86. public $pingData = '';
  87. /**
  88. * 秘钥
  89. *
  90. * @var string
  91. */
  92. public $secretKey = '';
  93. /**
  94. * 路由函数
  95. *
  96. * @var callback
  97. */
  98. public $router = null;
  99. /**
  100. * gateway进程转发给businessWorker进程的发送缓冲区大小
  101. *
  102. * @var int
  103. */
  104. public $sendToWorkerBufferSize = 10240000;
  105. /**
  106. * gateway进程将数据发给客户端时每个客户端发送缓冲区大小
  107. *
  108. * @var int
  109. */
  110. public $sendToClientBufferSize = 1024000;
  111. /**
  112. * 协议加速
  113. *
  114. * @var bool
  115. */
  116. public $protocolAccelerate = false;
  117. /**
  118. * 保存客户端的所有 connection 对象
  119. *
  120. * @var array
  121. */
  122. protected $_clientConnections = array();
  123. /**
  124. * uid 到 connection 的映射,一对多关系
  125. */
  126. protected $_uidConnections = array();
  127. /**
  128. * group 到 connection 的映射,一对多关系
  129. *
  130. * @var array
  131. */
  132. protected $_groupConnections = array();
  133. /**
  134. * 保存所有 worker 的内部连接的 connection 对象
  135. *
  136. * @var array
  137. */
  138. protected $_workerConnections = array();
  139. /**
  140. * gateway 内部监听 worker 内部连接的 worker
  141. *
  142. * @var Worker
  143. */
  144. protected $_innerTcpWorker = null;
  145. /**
  146. * 当 worker 启动时
  147. *
  148. * @var callback
  149. */
  150. protected $_onWorkerStart = null;
  151. /**
  152. * 当有客户端连接时
  153. *
  154. * @var callback
  155. */
  156. protected $_onConnect = null;
  157. /**
  158. * 当客户端发来消息时
  159. *
  160. * @var callback
  161. */
  162. protected $_onMessage = null;
  163. /**
  164. * 当客户端连接关闭时
  165. *
  166. * @var callback
  167. */
  168. protected $_onClose = null;
  169. /**
  170. * 当 worker 停止时
  171. *
  172. * @var callback
  173. */
  174. protected $_onWorkerStop = null;
  175. /**
  176. * 进程启动时间
  177. *
  178. * @var int
  179. */
  180. protected $_startTime = 0;
  181. /**
  182. * gateway 监听的端口
  183. *
  184. * @var int
  185. */
  186. protected $_gatewayPort = 0;
  187. /**
  188. * connectionId 记录器
  189. * @var int
  190. */
  191. protected static $_connectionIdRecorder = 0;
  192. /**
  193. * 用于保持长连接的心跳时间间隔
  194. *
  195. * @var int
  196. */
  197. const PERSISTENCE_CONNECTION_PING_INTERVAL = 25;
  198. /**
  199. * 构造函数
  200. *
  201. * @param string $socket_name
  202. * @param array $context_option
  203. */
  204. public function __construct($socket_name, $context_option = array())
  205. {
  206. parent::__construct($socket_name, $context_option);
  207. $this->_gatewayPort = substr(strrchr($socket_name,':'),1);
  208. $this->router = array("\\GatewayWorker\\Gateway", 'routerBind');
  209. $backtrace = debug_backtrace();
  210. $this->_autoloadRootPath = dirname($backtrace[0]['file']);
  211. }
  212. /**
  213. * {@inheritdoc}
  214. */
  215. public function run()
  216. {
  217. // 保存用户的回调,当对应的事件发生时触发
  218. $this->_onWorkerStart = $this->onWorkerStart;
  219. $this->onWorkerStart = array($this, 'onWorkerStart');
  220. // 保存用户的回调,当对应的事件发生时触发
  221. $this->_onConnect = $this->onConnect;
  222. $this->onConnect = array($this, 'onClientConnect');
  223. // onMessage禁止用户设置回调
  224. $this->onMessage = array($this, 'onClientMessage');
  225. // 保存用户的回调,当对应的事件发生时触发
  226. $this->_onClose = $this->onClose;
  227. $this->onClose = array($this, 'onClientClose');
  228. // 保存用户的回调,当对应的事件发生时触发
  229. $this->_onWorkerStop = $this->onWorkerStop;
  230. $this->onWorkerStop = array($this, 'onWorkerStop');
  231. if (!is_array($this->registerAddress)) {
  232. $this->registerAddress = array($this->registerAddress);
  233. }
  234. // 记录进程启动的时间
  235. $this->_startTime = time();
  236. // 运行父方法
  237. parent::run();
  238. }
  239. /**
  240. * 当客户端发来数据时,转发给worker处理
  241. *
  242. * @param TcpConnection $connection
  243. * @param mixed $data
  244. */
  245. public function onClientMessage($connection, $data)
  246. {
  247. $connection->pingNotResponseCount = -1;
  248. $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
  249. }
  250. /**
  251. * 当客户端连接上来时,初始化一些客户端的数据
  252. * 包括全局唯一的client_id、初始化session等
  253. *
  254. * @param TcpConnection $connection
  255. */
  256. public function onClientConnect($connection)
  257. {
  258. $connection->id = self::generateConnectionId();
  259. // 保存该连接的内部通讯的数据包报头,避免每次重新初始化
  260. $connection->gatewayHeader = array(
  261. 'local_ip' => ip2long($this->lanIp),
  262. 'local_port' => $this->lanPort,
  263. 'client_ip' => ip2long($connection->getRemoteIp()),
  264. 'client_port' => $connection->getRemotePort(),
  265. 'gateway_port' => $this->_gatewayPort,
  266. 'connection_id' => $connection->id,
  267. 'flag' => 0,
  268. );
  269. // 连接的 session
  270. $connection->session = '';
  271. // 该连接的心跳参数
  272. $connection->pingNotResponseCount = -1;
  273. // 该链接发送缓冲区大小
  274. $connection->maxSendBufferSize = $this->sendToClientBufferSize;
  275. // 保存客户端连接 connection 对象
  276. $this->_clientConnections[$connection->id] = $connection;
  277. // 如果用户有自定义 onConnect 回调,则执行
  278. if ($this->_onConnect) {
  279. call_user_func($this->_onConnect, $connection);
  280. if (isset($connection->onWebSocketConnect)) {
  281. $connection->_onWebSocketConnect = $connection->onWebSocketConnect;
  282. }
  283. }
  284. if ($connection->protocol === '\Workerman\Protocols\Websocket' || $connection->protocol === 'Workerman\Protocols\Websocket') {
  285. $connection->onWebSocketConnect = array($this, 'onWebsocketConnect');
  286. }
  287. $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECT, $connection);
  288. }
  289. /**
  290. * websocket握手时触发
  291. *
  292. * @param $connection
  293. * @param $http_buffer
  294. */
  295. public function onWebsocketConnect($connection, $http_buffer)
  296. {
  297. if (isset($connection->_onWebSocketConnect)) {
  298. call_user_func($connection->_onWebSocketConnect, $connection, $http_buffer);
  299. unset($connection->_onWebSocketConnect);
  300. }
  301. $this->sendToWorker(GatewayProtocol::CMD_ON_WEBSOCKET_CONNECT, $connection, array('get' => $_GET, 'server' => $_SERVER, 'cookie' => $_COOKIE));
  302. }
  303. /**
  304. * 生成connection id
  305. * @return int
  306. */
  307. protected function generateConnectionId()
  308. {
  309. $max_unsigned_int = 4294967295;
  310. if (self::$_connectionIdRecorder >= $max_unsigned_int) {
  311. self::$_connectionIdRecorder = 0;
  312. }
  313. while(++self::$_connectionIdRecorder <= $max_unsigned_int) {
  314. if(!isset($this->_clientConnections[self::$_connectionIdRecorder])) {
  315. break;
  316. }
  317. }
  318. return self::$_connectionIdRecorder;
  319. }
  320. /**
  321. * 发送数据给 worker 进程
  322. *
  323. * @param int $cmd
  324. * @param TcpConnection $connection
  325. * @param mixed $body
  326. * @return bool
  327. */
  328. protected function sendToWorker($cmd, $connection, $body = '')
  329. {
  330. $gateway_data = $connection->gatewayHeader;
  331. $gateway_data['cmd'] = $cmd;
  332. $gateway_data['body'] = $body;
  333. $gateway_data['ext_data'] = $connection->session;
  334. if ($this->_workerConnections) {
  335. // 调用路由函数,选择一个worker把请求转发给它
  336. /** @var TcpConnection $worker_connection */
  337. $worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
  338. if (false === $worker_connection->send($gateway_data)) {
  339. $msg = "SendBufferToWorker fail. May be the send buffer are overflow. See http://doc2.workerman.net/send-buffer-overflow.html";
  340. static::log($msg);
  341. return false;
  342. }
  343. } // 没有可用的 worker
  344. else {
  345. // gateway 启动后 1-2 秒内 SendBufferToWorker fail 是正常现象,因为与 worker 的连接还没建立起来,
  346. // 所以不记录日志,只是关闭连接
  347. $time_diff = 2;
  348. if (time() - $this->_startTime >= $time_diff) {
  349. $msg = 'SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready. See http://doc2.workerman.net/send-buffer-to-worker-fail.html';
  350. static::log($msg);
  351. }
  352. $connection->destroy();
  353. return false;
  354. }
  355. return true;
  356. }
  357. /**
  358. * 随机路由,返回 worker connection 对象
  359. *
  360. * @param array $worker_connections
  361. * @param TcpConnection $client_connection
  362. * @param int $cmd
  363. * @param mixed $buffer
  364. * @return TcpConnection
  365. */
  366. public static function routerRand($worker_connections, $client_connection, $cmd, $buffer)
  367. {
  368. return $worker_connections[array_rand($worker_connections)];
  369. }
  370. /**
  371. * client_id 与 worker 绑定
  372. *
  373. * @param array $worker_connections
  374. * @param TcpConnection $client_connection
  375. * @param int $cmd
  376. * @param mixed $buffer
  377. * @return TcpConnection
  378. */
  379. public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
  380. {
  381. if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
  382. $client_connection->businessworker_address = array_rand($worker_connections);
  383. }
  384. return $worker_connections[$client_connection->businessworker_address];
  385. }
  386. /**
  387. * 当客户端关闭时
  388. *
  389. * @param TcpConnection $connection
  390. */
  391. public function onClientClose($connection)
  392. {
  393. // 尝试通知 worker,触发 Event::onClose
  394. $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
  395. unset($this->_clientConnections[$connection->id]);
  396. // 清理 uid 数据
  397. if (!empty($connection->uid)) {
  398. $uid = $connection->uid;
  399. unset($this->_uidConnections[$uid][$connection->id]);
  400. if (empty($this->_uidConnections[$uid])) {
  401. unset($this->_uidConnections[$uid]);
  402. }
  403. }
  404. // 清理 group 数据
  405. if (!empty($connection->groups)) {
  406. foreach ($connection->groups as $group) {
  407. unset($this->_groupConnections[$group][$connection->id]);
  408. if (empty($this->_groupConnections[$group])) {
  409. unset($this->_groupConnections[$group]);
  410. }
  411. }
  412. }
  413. // 触发 onClose
  414. if ($this->_onClose) {
  415. call_user_func($this->_onClose, $connection);
  416. }
  417. }
  418. /**
  419. * 当 Gateway 启动的时候触发的回调函数
  420. *
  421. * @return void
  422. */
  423. public function onWorkerStart()
  424. {
  425. // 分配一个内部通讯端口
  426. $this->lanPort = $this->startPort + $this->id;
  427. // 如果有设置心跳,则定时执行
  428. if ($this->pingInterval > 0) {
  429. $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval / 2 : $this->pingInterval;
  430. Timer::add($timer_interval, array($this, 'ping'));
  431. }
  432. // 如果BusinessWorker ip不是127.0.0.1,则需要加gateway到BusinessWorker的心跳
  433. if ($this->lanIp !== '127.0.0.1') {
  434. Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, array($this, 'pingBusinessWorker'));
  435. }
  436. if (!class_exists('\Protocols\GatewayProtocol')) {
  437. class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol');
  438. }
  439. // 初始化 gateway 内部的监听,用于监听 worker 的连接已经连接上发来的数据
  440. $this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
  441. $this->_innerTcpWorker->reusePort = false;
  442. $this->_innerTcpWorker->listen();
  443. $this->_innerTcpWorker->name = 'GatewayInnerWorker';
  444. // 重新设置自动加载根目录
  445. Autoloader::setRootPath($this->_autoloadRootPath);
  446. // 设置内部监听的相关回调
  447. $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
  448. $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
  449. $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
  450. // 注册 gateway 的内部通讯地址,worker 去连这个地址,以便 gateway 与 worker 之间建立起 TCP 长连接
  451. $this->registerAddress();
  452. if ($this->_onWorkerStart) {
  453. call_user_func($this->_onWorkerStart, $this);
  454. }
  455. }
  456. /**
  457. * 当 worker 通过内部通讯端口连接到 gateway 时
  458. *
  459. * @param TcpConnection $connection
  460. */
  461. public function onWorkerConnect($connection)
  462. {
  463. $connection->maxSendBufferSize = $this->sendToWorkerBufferSize;
  464. $connection->authorized = $this->secretKey ? false : true;
  465. }
  466. /**
  467. * 当 worker 发来数据时
  468. *
  469. * @param TcpConnection $connection
  470. * @param mixed $data
  471. * @throws \Exception
  472. *
  473. * @return void
  474. */
  475. public function onWorkerMessage($connection, $data)
  476. {
  477. $cmd = $data['cmd'];
  478. if (empty($connection->authorized) && $cmd !== GatewayProtocol::CMD_WORKER_CONNECT && $cmd !== GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT) {
  479. self::log("Unauthorized request from " . $connection->getRemoteIp() . ":" . $connection->getRemotePort());
  480. $connection->close();
  481. return;
  482. }
  483. switch ($cmd) {
  484. // BusinessWorker连接Gateway
  485. case GatewayProtocol::CMD_WORKER_CONNECT:
  486. $worker_info = json_decode($data['body'], true);
  487. if ($worker_info['secret_key'] !== $this->secretKey) {
  488. self::log("Gateway: Worker key does not match ".var_export($this->secretKey, true)." !== ". var_export($this->secretKey));
  489. $connection->close();
  490. return;
  491. }
  492. $key = $connection->getRemoteIp() . ':' . $worker_info['worker_key'];
  493. // 在一台服务器上businessWorker->name不能相同
  494. if (isset($this->_workerConnections[$key])) {
  495. self::log("Gateway: Worker->name conflict. Key:{$key}");
  496. $connection->close();
  497. return;
  498. }
  499. $connection->key = $key;
  500. $this->_workerConnections[$key] = $connection;
  501. $connection->authorized = true;
  502. return;
  503. // GatewayClient连接Gateway
  504. case GatewayProtocol::CMD_GATEWAY_CLIENT_CONNECT:
  505. $worker_info = json_decode($data['body'], true);
  506. if ($worker_info['secret_key'] !== $this->secretKey) {
  507. self::log("Gateway: GatewayClient key does not match ".var_export($this->secretKey, true)." !== ".var_export($this->secretKey, true));
  508. $connection->close();
  509. return;
  510. }
  511. $connection->authorized = true;
  512. return;
  513. // 向某客户端发送数据,Gateway::sendToClient($client_id, $message);
  514. case GatewayProtocol::CMD_SEND_TO_ONE:
  515. if (isset($this->_clientConnections[$data['connection_id']])) {
  516. $this->_clientConnections[$data['connection_id']]->send($data['body']);
  517. }
  518. return;
  519. // 踢出用户,Gateway::closeClient($client_id, $message);
  520. case GatewayProtocol::CMD_KICK:
  521. if (isset($this->_clientConnections[$data['connection_id']])) {
  522. $this->_clientConnections[$data['connection_id']]->close($data['body']);
  523. }
  524. return;
  525. // 立即销毁用户连接, Gateway::destroyClient($client_id);
  526. case GatewayProtocol::CMD_DESTROY:
  527. if (isset($this->_clientConnections[$data['connection_id']])) {
  528. $this->_clientConnections[$data['connection_id']]->destroy();
  529. }
  530. return;
  531. // 广播, Gateway::sendToAll($message, $client_id_array)
  532. case GatewayProtocol::CMD_SEND_TO_ALL:
  533. $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
  534. $body = $data['body'];
  535. if (!$raw && $this->protocolAccelerate && $this->protocol) {
  536. $body = $this->preEncodeForClient($body);
  537. $raw = true;
  538. }
  539. $ext_data = $data['ext_data'] ? json_decode($data['ext_data'], true) : '';
  540. // $client_id_array 不为空时,只广播给 $client_id_array 指定的客户端
  541. if (isset($ext_data['connections'])) {
  542. foreach ($ext_data['connections'] as $connection_id) {
  543. if (isset($this->_clientConnections[$connection_id])) {
  544. $this->_clientConnections[$connection_id]->send($body, $raw);
  545. }
  546. }
  547. } // $client_id_array 为空时,广播给所有在线客户端
  548. else {
  549. $exclude_connection_id = !empty($ext_data['exclude']) ? $ext_data['exclude'] : null;
  550. foreach ($this->_clientConnections as $client_connection) {
  551. if (!isset($exclude_connection_id[$client_connection->id])) {
  552. $client_connection->send($body, $raw);
  553. }
  554. }
  555. }
  556. return;
  557. case GatewayProtocol::CMD_SELECT:
  558. $client_info_array = array();
  559. $ext_data = json_decode($data['ext_data'], true);
  560. if (!$ext_data) {
  561. echo 'CMD_SELECT ext_data=' . var_export($data['ext_data'], true) . '\r\n';
  562. $buffer = serialize($client_info_array);
  563. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  564. return;
  565. }
  566. $fields = $ext_data['fields'];
  567. $where = $ext_data['where'];
  568. if ($where) {
  569. $connection_box_map = array(
  570. 'groups' => $this->_groupConnections,
  571. 'uid' => $this->_uidConnections
  572. );
  573. // $where = ['groups'=>[x,x..], 'uid'=>[x,x..], 'connection_id'=>[x,x..]]
  574. foreach ($where as $key => $items) {
  575. if ($key !== 'connection_id') {
  576. $connections_box = $connection_box_map[$key];
  577. foreach ($items as $item) {
  578. if (isset($connections_box[$item])) {
  579. foreach ($connections_box[$item] as $connection_id => $client_connection) {
  580. if (!isset($client_info_array[$connection_id])) {
  581. $client_info_array[$connection_id] = array();
  582. // $fields = ['groups', 'uid', 'session']
  583. foreach ($fields as $field) {
  584. $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
  585. }
  586. }
  587. }
  588. }
  589. }
  590. } else {
  591. foreach ($items as $connection_id) {
  592. if (isset($this->_clientConnections[$connection_id])) {
  593. $client_connection = $this->_clientConnections[$connection_id];
  594. $client_info_array[$connection_id] = array();
  595. // $fields = ['groups', 'uid', 'session']
  596. foreach ($fields as $field) {
  597. $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
  598. }
  599. }
  600. }
  601. }
  602. }
  603. } else {
  604. foreach ($this->_clientConnections as $connection_id => $client_connection) {
  605. foreach ($fields as $field) {
  606. $client_info_array[$connection_id][$field] = isset($client_connection->$field) ? $client_connection->$field : null;
  607. }
  608. }
  609. }
  610. $buffer = serialize($client_info_array);
  611. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  612. return;
  613. // 获取在线群组列表
  614. case GatewayProtocol::CMD_GET_GROUP_ID_LIST:
  615. $buffer = serialize(array_keys($this->_groupConnections));
  616. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  617. return;
  618. // 重新赋值 session
  619. case GatewayProtocol::CMD_SET_SESSION:
  620. if (isset($this->_clientConnections[$data['connection_id']])) {
  621. $this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
  622. }
  623. return;
  624. // session合并
  625. case GatewayProtocol::CMD_UPDATE_SESSION:
  626. if (!isset($this->_clientConnections[$data['connection_id']])) {
  627. return;
  628. } else {
  629. if (!$this->_clientConnections[$data['connection_id']]->session) {
  630. $this->_clientConnections[$data['connection_id']]->session = $data['ext_data'];
  631. return;
  632. }
  633. $session = Context::sessionDecode($this->_clientConnections[$data['connection_id']]->session);
  634. $session_for_merge = Context::sessionDecode($data['ext_data']);
  635. $session = array_replace_recursive($session, $session_for_merge);
  636. $this->_clientConnections[$data['connection_id']]->session = Context::sessionEncode($session);
  637. }
  638. return;
  639. case GatewayProtocol::CMD_GET_SESSION_BY_CLIENT_ID:
  640. if (!isset($this->_clientConnections[$data['connection_id']])) {
  641. $session = serialize(null);
  642. } else {
  643. if (!$this->_clientConnections[$data['connection_id']]->session) {
  644. $session = serialize(array());
  645. } else {
  646. $session = $this->_clientConnections[$data['connection_id']]->session;
  647. }
  648. }
  649. $connection->send(pack('N', strlen($session)) . $session, true);
  650. return;
  651. // 获得客户端sessions
  652. case GatewayProtocol::CMD_GET_ALL_CLIENT_SESSIONS:
  653. $client_info_array = array();
  654. foreach ($this->_clientConnections as $connection_id => $client_connection) {
  655. $client_info_array[$connection_id] = $client_connection->session;
  656. }
  657. $buffer = serialize($client_info_array);
  658. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  659. return;
  660. // 判断某个 client_id 是否在线 Gateway::isOnline($client_id)
  661. case GatewayProtocol::CMD_IS_ONLINE:
  662. $buffer = serialize((int)isset($this->_clientConnections[$data['connection_id']]));
  663. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  664. return;
  665. // 将 client_id 与 uid 绑定
  666. case GatewayProtocol::CMD_BIND_UID:
  667. $uid = $data['ext_data'];
  668. if (empty($uid)) {
  669. echo "bindUid(client_id, uid) uid empty, uid=" . var_export($uid, true);
  670. return;
  671. }
  672. $connection_id = $data['connection_id'];
  673. if (!isset($this->_clientConnections[$connection_id])) {
  674. return;
  675. }
  676. $client_connection = $this->_clientConnections[$connection_id];
  677. if (isset($client_connection->uid)) {
  678. $current_uid = $client_connection->uid;
  679. unset($this->_uidConnections[$current_uid][$connection_id]);
  680. if (empty($this->_uidConnections[$current_uid])) {
  681. unset($this->_uidConnections[$current_uid]);
  682. }
  683. }
  684. $client_connection->uid = $uid;
  685. $this->_uidConnections[$uid][$connection_id] = $client_connection;
  686. return;
  687. // client_id 与 uid 解绑 Gateway::unbindUid($client_id, $uid);
  688. case GatewayProtocol::CMD_UNBIND_UID:
  689. $connection_id = $data['connection_id'];
  690. if (!isset($this->_clientConnections[$connection_id])) {
  691. return;
  692. }
  693. $client_connection = $this->_clientConnections[$connection_id];
  694. if (isset($client_connection->uid)) {
  695. $current_uid = $client_connection->uid;
  696. unset($this->_uidConnections[$current_uid][$connection_id]);
  697. if (empty($this->_uidConnections[$current_uid])) {
  698. unset($this->_uidConnections[$current_uid]);
  699. }
  700. $client_connection->uid_info = '';
  701. $client_connection->uid = null;
  702. }
  703. return;
  704. // 发送数据给 uid Gateway::sendToUid($uid, $msg);
  705. case GatewayProtocol::CMD_SEND_TO_UID:
  706. $uid_array = json_decode($data['ext_data'], true);
  707. foreach ($uid_array as $uid) {
  708. if (!empty($this->_uidConnections[$uid])) {
  709. foreach ($this->_uidConnections[$uid] as $connection) {
  710. /** @var TcpConnection $connection */
  711. $connection->send($data['body']);
  712. }
  713. }
  714. }
  715. return;
  716. // 将 $client_id 加入用户组 Gateway::joinGroup($client_id, $group);
  717. case GatewayProtocol::CMD_JOIN_GROUP:
  718. $group = $data['ext_data'];
  719. if (empty($group)) {
  720. echo "join(group) group empty, group=" . var_export($group, true);
  721. return;
  722. }
  723. $connection_id = $data['connection_id'];
  724. if (!isset($this->_clientConnections[$connection_id])) {
  725. return;
  726. }
  727. $client_connection = $this->_clientConnections[$connection_id];
  728. if (!isset($client_connection->groups)) {
  729. $client_connection->groups = array();
  730. }
  731. $client_connection->groups[$group] = $group;
  732. $this->_groupConnections[$group][$connection_id] = $client_connection;
  733. return;
  734. // 将 $client_id 从某个用户组中移除 Gateway::leaveGroup($client_id, $group);
  735. case GatewayProtocol::CMD_LEAVE_GROUP:
  736. $group = $data['ext_data'];
  737. if (empty($group)) {
  738. echo "leave(group) group empty, group=" . var_export($group, true);
  739. return;
  740. }
  741. $connection_id = $data['connection_id'];
  742. if (!isset($this->_clientConnections[$connection_id])) {
  743. return;
  744. }
  745. $client_connection = $this->_clientConnections[$connection_id];
  746. if (!isset($client_connection->groups[$group])) {
  747. return;
  748. }
  749. unset($client_connection->groups[$group], $this->_groupConnections[$group][$connection_id]);
  750. if (empty($this->_groupConnections[$group])) {
  751. unset($this->_groupConnections[$group]);
  752. }
  753. return;
  754. // 解散分组
  755. case GatewayProtocol::CMD_UNGROUP:
  756. $group = $data['ext_data'];
  757. if (empty($group)) {
  758. echo "leave(group) group empty, group=" . var_export($group, true);
  759. return;
  760. }
  761. if (empty($this->_groupConnections[$group])) {
  762. return;
  763. }
  764. foreach ($this->_groupConnections[$group] as $client_connection) {
  765. unset($client_connection->groups[$group]);
  766. }
  767. unset($this->_groupConnections[$group]);
  768. return;
  769. // 向某个用户组发送消息 Gateway::sendToGroup($group, $msg);
  770. case GatewayProtocol::CMD_SEND_TO_GROUP:
  771. $raw = (bool)($data['flag'] & GatewayProtocol::FLAG_NOT_CALL_ENCODE);
  772. $body = $data['body'];
  773. if (!$raw && $this->protocolAccelerate && $this->protocol) {
  774. $body = $this->preEncodeForClient($body);
  775. $raw = true;
  776. }
  777. $ext_data = json_decode($data['ext_data'], true);
  778. $group_array = $ext_data['group'];
  779. $exclude_connection_id = $ext_data['exclude'];
  780. foreach ($group_array as $group) {
  781. if (!empty($this->_groupConnections[$group])) {
  782. foreach ($this->_groupConnections[$group] as $connection) {
  783. if(!isset($exclude_connection_id[$connection->id]))
  784. {
  785. /** @var TcpConnection $connection */
  786. $connection->send($body, $raw);
  787. }
  788. }
  789. }
  790. }
  791. return;
  792. // 获取某用户组成员信息 Gateway::getClientSessionsByGroup($group);
  793. case GatewayProtocol::CMD_GET_CLIENT_SESSIONS_BY_GROUP:
  794. $group = $data['ext_data'];
  795. if (!isset($this->_groupConnections[$group])) {
  796. $buffer = serialize(array());
  797. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  798. return;
  799. }
  800. $client_info_array = array();
  801. foreach ($this->_groupConnections[$group] as $connection_id => $client_connection) {
  802. $client_info_array[$connection_id] = $client_connection->session;
  803. }
  804. $buffer = serialize($client_info_array);
  805. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  806. return;
  807. // 获取用户组成员数 Gateway::getClientCountByGroup($group);
  808. case GatewayProtocol::CMD_GET_CLIENT_COUNT_BY_GROUP:
  809. $group = $data['ext_data'];
  810. $count = 0;
  811. if ($group !== '') {
  812. if (isset($this->_groupConnections[$group])) {
  813. $count = count($this->_groupConnections[$group]);
  814. }
  815. } else {
  816. $count = count($this->_clientConnections);
  817. }
  818. $buffer = serialize($count);
  819. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  820. return;
  821. // 获取与某个 uid 绑定的所有 client_id Gateway::getClientIdByUid($uid);
  822. case GatewayProtocol::CMD_GET_CLIENT_ID_BY_UID:
  823. $uid = $data['ext_data'];
  824. if (empty($this->_uidConnections[$uid])) {
  825. $buffer = serialize(array());
  826. } else {
  827. $buffer = serialize(array_keys($this->_uidConnections[$uid]));
  828. }
  829. $connection->send(pack('N', strlen($buffer)) . $buffer, true);
  830. return;
  831. default :
  832. $err_msg = "gateway inner pack err cmd=$cmd";
  833. echo $err_msg;
  834. }
  835. }
  836. /**
  837. * 当worker连接关闭时
  838. *
  839. * @param TcpConnection $connection
  840. */
  841. public function onWorkerClose($connection)
  842. {
  843. if (isset($connection->key)) {
  844. unset($this->_workerConnections[$connection->key]);
  845. }
  846. }
  847. /**
  848. * 存储当前 Gateway 的内部通信地址
  849. *
  850. * @return bool
  851. */
  852. public function registerAddress()
  853. {
  854. $address = $this->lanIp . ':' . $this->lanPort;
  855. foreach ($this->registerAddress as $register_address) {
  856. $register_connection = new AsyncTcpConnection("text://{$register_address}");
  857. $secret_key = $this->secretKey;
  858. $register_connection->onConnect = function($register_connection) use ($address, $secret_key, $register_address){
  859. $register_connection->send('{"event":"gateway_connect", "address":"' . $address . '", "secret_key":"' . $secret_key . '"}');
  860. // 如果Register服务器不在本地服务器,则需要保持心跳
  861. if (strpos($register_address, '127.0.0.1') !== 0) {
  862. $register_connection->ping_timer = Timer::add(self::PERSISTENCE_CONNECTION_PING_INTERVAL, function () use ($register_connection) {
  863. $register_connection->send('{"event":"ping"}');
  864. });
  865. }
  866. };
  867. $register_connection->onClose = function ($register_connection) {
  868. if(!empty($register_connection->ping_timer)) {
  869. Timer::del($register_connection->ping_timer);
  870. }
  871. $register_connection->reconnect(1);
  872. };
  873. $register_connection->connect();
  874. }
  875. }
  876. /**
  877. * 心跳逻辑
  878. *
  879. * @return void
  880. */
  881. public function ping()
  882. {
  883. $ping_data = $this->pingData ? (string)$this->pingData : null;
  884. $raw = false;
  885. if ($this->protocolAccelerate && $ping_data && $this->protocol) {
  886. $ping_data = $this->preEncodeForClient($ping_data);
  887. $raw = true;
  888. }
  889. // 遍历所有客户端连接
  890. foreach ($this->_clientConnections as $connection) {
  891. // 上次发送的心跳还没有回复次数大于限定值就断开
  892. if ($this->pingNotResponseLimit > 0 &&
  893. $connection->pingNotResponseCount >= $this->pingNotResponseLimit * 2
  894. ) {
  895. $connection->destroy();
  896. continue;
  897. }
  898. // $connection->pingNotResponseCount 为 -1 说明最近客户端有发来消息,则不给客户端发送心跳
  899. $connection->pingNotResponseCount++;
  900. if ($ping_data) {
  901. if ($connection->pingNotResponseCount === 0 ||
  902. ($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount % 2 === 1)
  903. ) {
  904. continue;
  905. }
  906. $connection->send($ping_data, $raw);
  907. }
  908. }
  909. }
  910. /**
  911. * 向 BusinessWorker 发送心跳数据,用于保持长连接
  912. *
  913. * @return void
  914. */
  915. public function pingBusinessWorker()
  916. {
  917. $gateway_data = GatewayProtocol::$empty;
  918. $gateway_data['cmd'] = GatewayProtocol::CMD_PING;
  919. foreach ($this->_workerConnections as $connection) {
  920. $connection->send($gateway_data);
  921. }
  922. }
  923. /**
  924. * @param mixed $data
  925. *
  926. * @return string
  927. */
  928. protected function preEncodeForClient($data)
  929. {
  930. foreach ($this->_clientConnections as $client_connection) {
  931. return call_user_func(array($client_connection->protocol, 'encode'), $data, $client_connection);
  932. }
  933. }
  934. /**
  935. * 当 gateway 关闭时触发,清理数据
  936. *
  937. * @return void
  938. */
  939. public function onWorkerStop()
  940. {
  941. // 尝试触发用户设置的回调
  942. if ($this->_onWorkerStop) {
  943. call_user_func($this->_onWorkerStop, $this);
  944. }
  945. }
  946. /**
  947. * Log.
  948. * @param string $msg
  949. */
  950. public static function log($msg){
  951. Timer::add(1, function() use ($msg) {
  952. Worker::log($msg);
  953. }, null, false);
  954. }
  955. }