PHP code example of reactphp-x / register-center
1. Go to this page and download the library: Download reactphp-x/register-center library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
reactphp-x / register-center example snippets
React\EventLoop\Loop;
use ReactphpX\RegisterCenter\Register;
use Monolog\Logger;
use Monolog\Level;
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\LineFormatter;
// 创建事件循环
$loop = Loop::get();
// 创建日志器
$logger = new Logger('registration-center');
$handler = new StreamHandler('php://stdout', Level::Debug);
$handler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n",
null,
true,
true
));
$logger->pushHandler($handler);
// 创建并启动注册中心
$center = new Register(8010, $loop, $logger);
$center->start();
// 运行事件循环
$loop->run();
React\EventLoop\Loop;
use ReactphpX\RegisterCenter\Master;
use ReactphpX\RegisterCenter\ServiceRegistry;
use Monolog\Logger;
use Monolog\Level;
use Monolog\Handler\StreamHandler;
use Monolog\Formatter\LineFormatter;
// 注册服务
ServiceRegistry::register('hello-world', new class {
public function sayHello() {
$date = date('Y-m-d H:i:s');
return "Hello, world! $date\n";
}
public function sayHello2($name) {
$date = date('Y-m-d H:i:s');
return [
'date' => $date,
'name' => $name,
'version' => '1.0.0',
'description' => 'Hello, world!',
'author' => 'ReactPHP X',
'email' => '[email protected] ',
'url' => 'https://github.com/reactphp-x',
'license' => 'MIT',
];
}
});
// 创建日志器
$logger = new Logger('master');
$handler = new StreamHandler('php://stdout', Level::Debug);
$handler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n",
null,
true,
true
));
$logger->pushHandler($handler);
// 创建主节点
$master = new Master(
retryAttempts: PHP_INT_MAX, // 无限重试
retryDelay: 3.0, // 重试间隔3秒
reconnectOnClose: true, // 断开时自动重连
logger: $logger
);
// 设置事件处理器
$master->on('error', function (\Exception $e, $context = []) {
echo "Error: " . $e->getMessage() . "\n";
echo "Context: " . json_encode($context) . "\n";
});
$master->on('connect', function ($tunnelStream) use ($master) {
// 身份验证
$tunnelStream->write([
'cmd' => 'auth',
'token' => 'register-center-token-2024'
]);
// 监听来自注册中心的命令
$tunnelStream->on('cmd', function ($cmd, $message) use ($tunnelStream, $master) {
echo "Received command: $cmd\n";
echo "Message: " . json_encode($message) . "\n";
if ($cmd === 'register') {
// 连接到新的注册中心
$registers = $message['registers'];
foreach ($registers as $register) {
$master->connectViaConnector($register['host'], $register['port']);
}
} elseif ($cmd === 'remove') {
// 移除注册中心连接
$registers = $message['registers'];
foreach ($registers as $register) {
$master->removeConnection($register['host'], $register['port']);
}
}
});
});
$master->on('close', function ($id, $url) {
echo "Disconnected from $url (ID: $id)\n";
});
// 连接到注册中心
$master->connectViaConnector('127.0.0.1', 8010);
// 运行事件循环
Loop::get()->run();
use ReactphpX\RegisterCenter\ServiceRegistry;
// 注册简单服务
ServiceRegistry::register('hello-service', new class {
public function sayHello() {
return "Hello, world! " . date('Y-m-d H:i:s');
}
public function greet($name, $title = 'Mr.') {
return "Hello, {$title} {$name}!";
}
});
// 注册业务服务
ServiceRegistry::register('user-service', new class {
private $users = [
1 => ['id' => 1, 'name' => 'Alice', 'email' => '[email protected] '],
2 => ['id' => 2, 'name' => 'Bob', 'email' => '[email protected] '],
];
public function getUser($id) {
return $this->users[$id] ?? null;
}
public function getAllUsers() {
return array_values($this->users);
}
public function createUser($name, $email) {
$id = max(array_keys($this->users)) + 1;
$this->users[$id] = [
'id' => $id,
'name' => $name,
'email' => $email
];
return $this->users[$id];
}
public function updateUser($id, $data) {
if (!isset($this->users[$id])) {
throw new \Exception("User not found");
}
$this->users[$id] = array_merge($this->users[$id], $data);
return $this->users[$id];
}
});
// 注册异步服务
ServiceRegistry::register('async-service', new class {
public function processTask($taskId, $data) {
// 模拟异步处理
$startTime = microtime(true);
// 处理业务逻辑
$result = array_map('strtoupper', $data);
$endTime = microtime(true);
return [
'taskId' => $taskId,
'result' => $result,
'processingTime' => $endTime - $startTime,
'timestamp' => time()
];
}
});
// 检查服务是否存在
if (ServiceRegistry::has('hello-service')) {
echo "Service registered successfully\n";
}
// 无参数调用
$result = ServiceRegistry::execute('hello-service', 'sayHello');
echo $result; // Hello, world! 2024-01-01 12:00:00
// 带参数调用
$result = ServiceRegistry::execute('hello-service', 'greet', [
'name' => 'John',
'title' => 'Dr.'
]);
echo $result; // Hello, Dr. John!
// 业务服务调用
$user = ServiceRegistry::execute('user-service', 'getUser', ['id' => 1]);
print_r($user);
$allUsers = ServiceRegistry::execute('user-service', 'getAllUsers');
print_r($allUsers);
// 错误处理
try {
$result = ServiceRegistry::execute('user-service', 'updateUser', [
'id' => 999,
'data' => ['name' => 'Updated Name']
]);
} catch (\Exception $e) {
echo "Service error: " . $e->getMessage() . "\n";
}
// 在注册中心调用单个主节点上的服务
$masters = $center->getConnectedMasters();
if (!empty($masters)) {
$masterId = array_key_first($masters);
$stream = $center->runOnMaster($masterId, function ($stream) {
// 调用用户服务
$result = ServiceRegistry::execute('user-service', 'getAllUsers');
$stream->write($result);
$stream->end();
});
$stream->on('data', function ($data) use ($masterId) {
echo "Users from master {$masterId}: " . json_encode($data) . "\n";
});
}
// 在所有主节点上执行相同的服务
$loop->addPeriodicTimer(10, function () use ($center) {
$masters = $center->getConnectedMasters();
if (empty($masters)) {
echo "No masters connected\n";
return;
}
echo "Executing services on " . count($masters) . " masters\n";
// 批量执行多个服务方法
$streams = $center->runOnAllMasters(function ($stream) {
$results = [];
// 执行多个服务调用
$results['greeting'] = ServiceRegistry::execute('hello-service', 'sayHello');
$results['users'] = ServiceRegistry::execute('user-service', 'getAllUsers');
$results['task'] = ServiceRegistry::execute('async-service', 'processTask', [
'taskId' => uniqid(),
'data' => ['hello', 'world', 'reactphp']
]);
$stream->write($results);
$stream->end();
});
// 处理所有主节点的响应
foreach ($streams as $masterId => $stream) {
$stream->on('data', function ($data) use ($masterId) {
echo "\n=== Response from master {$masterId} ===\n";
if (is_array($data)) {
foreach ($data as $service => $result) {
echo "Service '{$service}': " . json_encode($result) . "\n";
}
} else {
echo "Raw response: {$data}\n";
}
});
$stream->on('error', function ($error) use ($masterId) {
echo "Error from master {$masterId}: {$error}\n";
});
$stream->on('close', function () use ($masterId) {
echo "Connection to master {$masterId} closed\n";
});
}
});
// 异步调用模式 - 任务分发
function distributeTask($center, $taskData) {
$masters = $center->getConnectedMasters();
$taskChunks = array_chunk($taskData, ceil(count($taskData) / count($masters)));
$results = [];
$completedTasks = 0;
$totalMasters = count($masters);
foreach ($masters as $masterId => $master) {
$chunk = array_shift($taskChunks);
if (!$chunk) continue;
$stream = $center->runOnMaster($masterId, function ($stream) use ($chunk) {
$result = ServiceRegistry::execute('async-service', 'processTask', [
'taskId' => uniqid(),
'data' => $chunk
]);
$stream->write($result);
$stream->end();
});
$stream->on('data', function ($data) use (&$results, &$completedTasks, $totalMasters, $masterId) {
$results[$masterId] = $data;
$completedTasks++;
echo "Task completed on master {$masterId} ({$completedTasks}/{$totalMasters})\n";
// 所有任务完成后的处理
if ($completedTasks === $totalMasters) {
echo "All tasks completed!\n";
$finalResult = array_merge(...array_column($results, 'result'));
echo "Final result: " . json_encode($finalResult) . "\n";
}
});
}
}
// 使用示例
$taskData = ['apple', 'banana', 'cherry', 'date', 'elderberry'];
distributeTask($center, $taskData);
// 连接池优化调用
class ServiceCaller {
private $center;
private $callQueue = [];
private $processingQueue = false;
public function __construct($center) {
$this->center = $center;
}
public function queueCall($service, $method, $params = [], $callback = null) {
$this->callQueue[] = [
'service' => $service,
'method' => $method,
'params' => $params,
'callback' => $callback,
'timestamp' => microtime(true)
];
if (!$this->processingQueue) {
$this->processQueue();
}
}
private function processQueue() {
if (empty($this->callQueue)) {
$this->processingQueue = false;
return;
}
$this->processingQueue = true;
$batch = array_splice($this->callQueue, 0, 10); // 批量处理10个调用
$masters = $this->center->getConnectedMasters();
if (empty($masters)) {
// 重新排队等待主节点连接
$this->callQueue = array_merge($batch, $this->callQueue);
$this->processingQueue = false;
return;
}
// 选择负载最低的主节点
$masterId = $this->selectLeastLoadedMaster($masters);
$stream = $this->center->runOnMaster($masterId, function ($stream) use ($batch) {
$results = [];
foreach ($batch as $call) {
try {
$result = ServiceRegistry::execute(
$call['service'],
$call['method'],
$call['params']
);
$results[] = [
'success' => true,
'result' => $result,
'call' => $call
];
} catch (\Exception $e) {
$results[] = [
'success' => false,
'error' => $e->getMessage(),
'call' => $call
];
}
}
$stream->write($results);
$stream->end();
});
$stream->on('data', function ($results) use ($batch) {
foreach ($results as $result) {
if (isset($result['call']['callback']) && is_callable($result['call']['callback'])) {
$result['call']['callback']($result);
}
}
// 继续处理队列
$this->processQueue();
});
}
private function selectLeastLoadedMaster($masters) {
// 简单的负载均衡算法,实际应用中可以基于更复杂的指标
return array_rand($masters);
}
}
// 使用示例
$caller = new ServiceCaller($center);
// 队列化调用
$caller->queueCall('user-service', 'getUser', ['id' => 1], function ($result) {
if ($result['success']) {
echo "User: " . json_encode($result['result']) . "\n";
} else {
echo "Error: " . $result['error'] . "\n";
}
});
$caller->queueCall('hello-service', 'greet', ['name' => 'Alice'], function ($result) {
echo "Greeting: " . $result['result'] . "\n";
});
// ✅ 好的服务设计
ServiceRegistry::register('product-service', new class {
// 返回明确的数据结构
public function getProduct($id) {
return [
'id' => $id,
'name' => 'Product Name',
'price' => 99.99,
'stock' => 10,
'status' => 'active'
];
}
// 参数验证
public function updateProduct($id, $data) {
if (!is_numeric($id) || $id <= 0) {
throw new \InvalidArgumentException('Invalid product ID');
}
$allowedFields = ['name', 'price', 'stock', 'status'];
$filteredData = array_intersect_key($data, array_flip($allowedFields));
// 更新逻辑...
return $this->getProduct($id);
}
// 幂等性操作
public function activateProduct($id) {
// 重复调用不会产生副作用
return $this->updateProduct($id, ['status' => 'active']);
}
});
// ❌ 避免的服务设计
ServiceRegistry::register('bad-service', new class {
// 不要返回不一致的数据结构
public function getData($type) {
if ($type === 'array') {
return ['data' => 'value'];
} else {
return 'string value'; // 不一致的返回类型
}
}
// 不要在服务中直接输出
public function processData($data) {
echo "Processing..."; // ❌ 避免直接输出
return $data;
}
});
// 统一的错误处理服务
ServiceRegistry::register('error-handler', new class {
public function handleServiceCall($service, $method, $params = []) {
try {
$result = ServiceRegistry::execute($service, $method, $params);
return [
'success' => true,
'data' => $result,
'timestamp' => time()
];
} catch (\InvalidArgumentException $e) {
return [
'success' => false,
'error' => 'validation_error',
'message' => $e->getMessage(),
'timestamp' => time()
];
} catch (\Exception $e) {
return [
'success' => false,
'error' => 'service_error',
'message' => $e->getMessage(),
'timestamp' => time()
];
}
}
});
// 远程调用的错误处理
function callServiceWithRetry($center, $service, $method, $params = [], $maxRetries = 3) {
$attempt = 0;
$executeCall = function() use ($center, $service, $method, $params, &$attempt, $maxRetries, &$executeCall) {
$attempt++;
$masters = $center->getConnectedMasters();
if (empty($masters)) {
if ($attempt < $maxRetries) {
// 等待后重试
Loop::get()->addTimer(1, $executeCall);
return;
}
throw new \Exception('No masters available after ' . $maxRetries . ' attempts');
}
$masterId = array_rand($masters);
$stream = $center->runOnMaster($masterId, function ($stream) use ($service, $method, $params) {
$result = ServiceRegistry::execute('error-handler', 'handleServiceCall', [
'service' => $service,
'method' => $method,
'params' => $params
]);
$stream->write($result);
$stream->end();
});
$stream->on('data', function ($data) use ($attempt, $maxRetries, $executeCall) {
if (!$data['success'] && $attempt < $maxRetries) {
echo "Call failed, retrying... (attempt {$attempt}/{$maxRetries})\n";
Loop::get()->addTimer(2, $executeCall);
} else {
echo "Final result: " . json_encode($data) . "\n";
}
});
$stream->on('error', function ($error) use ($attempt, $maxRetries, $executeCall) {
echo "Stream error: {$error}\n";
if ($attempt < $maxRetries) {
Loop::get()->addTimer(2, $executeCall);
}
});
};
$executeCall();
}
// 使用示例
callServiceWithRetry($center, 'product-service', 'getProduct', ['id' => 1]);
// 服务调用监控
class ServiceMonitor {
private $metrics = [];
public function trackCall($service, $method, $duration, $success) {
$key = "{$service}.{$method}";
if (!isset($this->metrics[$key])) {
$this->metrics[$key] = [
'total_calls' => 0,
'success_calls' => 0,
'failed_calls' => 0,
'total_duration' => 0,
'avg_duration' => 0,
'min_duration' => PHP_FLOAT_MAX,
'max_duration' => 0
];
}
$metric = &$this->metrics[$key];
$metric['total_calls']++;
if ($success) {
$metric['success_calls']++;
} else {
$metric['failed_calls']++;
}
$metric['total_duration'] += $duration;
$metric['avg_duration'] = $metric['total_duration'] / $metric['total_calls'];
$metric['min_duration'] = min($metric['min_duration'], $duration);
$metric['max_duration'] = max($metric['max_duration'], $duration);
}
public function getMetrics() {
return $this->metrics;
}
public function reset() {
$this->metrics = [];
}
}
// 监控服务包装器
ServiceRegistry::register('monitored-service', new class {
private $monitor;
public function __construct() {
$this->monitor = new ServiceMonitor();
}
public function executeWithMonitoring($service, $method, $params = []) {
$startTime = microtime(true);
$success = false;
try {
$result = ServiceRegistry::execute($service, $method, $params);
$success = true;
return $result;
} catch (\Exception $e) {
throw $e;
} finally {
$duration = microtime(true) - $startTime;
$this->monitor->trackCall($service, $method, $duration, $success);
}
}
public function getMetrics() {
return $this->monitor->getMetrics();
}
});
// 定期输出性能指标
$loop->addPeriodicTimer(30, function () {
$metrics = ServiceRegistry::execute('monitored-service', 'getMetrics');
echo "\n=== Service Performance Metrics ===\n";
foreach ($metrics as $service => $metric) {
echo sprintf(
"%s: calls=%d, success=%.1f%%, avg=%.3fs, min=%.3fs, max=%.3fs\n",
$service,
$metric['total_calls'],
($metric['success_calls'] / $metric['total_calls']) * 100,
$metric['avg_duration'],
$metric['min_duration'],
$metric['max_duration']
);
}
});
// 版本化服务注册
class VersionedServiceRegistry {
private static $services = [];
public static function register($name, $version, $instance) {
$key = "{$name}@{$version}";
self::$services[$key] = $instance;
// 同时注册为默认版本(如果是最新的)
if (!isset(self::$services[$name]) || version_compare($version, self::getVersion($name), '>')) {
self::$services[$name] = $instance;
}
}
public static function execute($name, $method, $params = [], $version = null) {
$key = $version ? "{$name}@{$version}" : $name;
if (!isset(self::$services[$key])) {
throw new \Exception("Service {$key} not found");
}
return ServiceRegistry::execute($key, $method, $params);
}
private static function getVersion($name) {
// 从服务键中提取版本号
foreach (array_keys(self::$services) as $key) {
if (strpos($key, $name . '@') === 0) {
return substr($key, strlen($name) + 1);
}
}
return '1.0.0';
}
}
// 注册不同版本的服务
VersionedServiceRegistry::register('api-service', '1.0.0', new class {
public function getData() {
return ['version' => '1.0.0', 'data' => 'old format'];
}
});
VersionedServiceRegistry::register('api-service', '2.0.0', new class {
public function getData() {
return [
'version' => '2.0.0',
'data' => ['id' => 1, 'name' => 'new format'],
'metadata' => ['timestamp' => time()]
];
}
});
// 调用指定版本
$oldResult = VersionedServiceRegistry::execute('api-service', 'getData', [], '1.0.0');
$newResult = VersionedServiceRegistry::execute('api-service', 'getData', [], '2.0.0');
$defaultResult = VersionedServiceRegistry::execute('api-service', 'getData'); // 使用最新版本
$master->on('connect', function ($tunnelStream) {
// 发送认证令牌
$tunnelStream->write([
'cmd' => 'auth',
'token' => 'register-center-token-2024'
]);
// 监听认证结果
$tunnelStream->on('cmd', function ($cmd, $message) {
if ($cmd === 'auth-success') {
echo "Authentication successful\n";
} elseif ($cmd === 'auth-failed') {
echo "Authentication failed: " . $message['reason'] . "\n";
}
});
});
// 定时器:10秒后通知所有主节点有新的注册中心可连接
Loop::addTimer(10, function () use ($center) {
$center->writeRawMessageToAllMasters([
'cmd' => 'register',
'registers' => [
[
'host' => '127.0.0.1',
'port' => 8011,
]
]
]);
});
// 定时器:20秒后移除注册中心
Loop::addTimer(20, function () use ($center) {
$center->writeRawMessageToAllMasters([
'cmd' => 'remove',
'registers' => [
[
'host' => '127.0.0.1',
'port' => 8011,
]
]
]);
});
// 主节点处理注册中心管理命令
$master->on('connect', function ($tunnelStream) use ($master) {
$tunnelStream->on('cmd', function ($cmd, $message) use ($master) {
echo "Received command: $cmd\n";
echo "Message: " . json_encode($message) . "\n";
if ($cmd === 'register') {
// 连接到新的注册中心
$registers = $message['registers'];
foreach ($registers as $register) {
$master->connectViaConnector($register['host'], $register['port']);
}
} elseif ($cmd === 'remove') {
// 移除注册中心连接
$registers = $message['registers'];
foreach ($registers as $register) {
$master->removeConnection($register['host'], $register['port']);
}
}
});
});
// 获取已连接的主节点
$masters = $center->getConnectedMasters();
echo "Connected masters: " . count($masters) . "\n";
// 定期获取服务状态
Loop::addPeriodicTimer(1, function () use ($center) {
$services = $center->getServicesMaster();
echo "Available services: ";
var_export($services);
});
use Monolog\Logger;
use Monolog\Level;
use Monolog\Handler\StreamHandler;
use Monolog\Handler\FileHandler;
use Monolog\Formatter\LineFormatter;
// 创建日志器
$logger = new Logger('registration-center');
// 控制台输出
$consoleHandler = new StreamHandler('php://stdout', Level::Debug);
$consoleHandler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n",
null,
true,
true
));
// 文件输出
$fileHandler = new FileHandler('logs/register-center.log', Level::INFO);
$fileHandler->setFormatter(new LineFormatter(
"[%datetime%] %channel%.%level_name%: %message% %context%\n"
));
$logger->pushHandler($consoleHandler);
$logger->pushHandler($fileHandler);
// 使用日志器
$center = new Register(8010, $loop, $logger);
$master = new Master(logger: $logger);
// 用户服务
ServiceRegistry::register('user-service', new UserService());
// 订单服务
ServiceRegistry::register('order-service', new OrderService());
// 支付服务
ServiceRegistry::register('payment-service', new PaymentService());
// 注册中心分发任务到各个主节点
$center->runOnAllMasters(function ($stream) use ($taskData) {
$result = ServiceRegistry::execute('task-processor', 'processTask', $taskData);
$stream->write($result);
});
// 获取所有可用的主节点
$masters = $center->getConnectedMasters();
// 选择负载最低的主节点执行任务
$selectedMaster = selectLeastLoadedMaster($masters);
$center->runOnMaster($selectedMaster, function ($stream) {
// 执行特定任务
});
// 主节点会自动重连到可用的注册中心
$master = new Master(
retryAttempts: PHP_INT_MAX,
retryDelay: 3.0,
reconnectOnClose: true
);
new Register(int $port, LoopInterface $loop, ?LoggerInterface $logger = null)
new Master(
int $retryAttempts = 3,
float $retryDelay = 1.0,
bool $reconnectOnClose = false,
?LoggerInterface $logger = null
)
// 主节点错误处理
$master->on('error', function (\Exception $e, $context = []) {
echo "错误:" . $e->getMessage() . "\n";
echo "上下文:" . json_encode($context) . "\n";
// 记录错误日志
$logger->error('Master error', [
'exception' => $e->getMessage(),
'context' => $context
]);
});
// 连接关闭处理
$master->on('close', function ($id, $url) {
echo "与 $url 的连接已关闭 (ID: $id)\n";
// 可以在这里实现重连逻辑
if ($shouldReconnect) {
$master->connectViaConnector($host, $port);
}
});
// 启用详细日志
$logger = new Logger('debug');
$logger->pushHandler(new StreamHandler('php://stdout', Level::DEBUG));
// 监控连接状态
$loop->addPeriodicTimer(10, function () use ($center) {
$masters = $center->getConnectedMasters();
echo "当前连接的主节点数量: " . count($masters) . "\n";
foreach ($masters as $id => $master) {
echo "主节点 ID: $id\n";
}
});
// 服务执行调试
try {
$result = ServiceRegistry::execute('service-name', 'method-name', $params);
echo "服务执行成功: " . json_encode($result) . "\n";
} catch (\Exception $e) {
echo "服务执行失败: " . $e->getMessage() . "\n";
}
// 配置合理的重试参数
$master = new Master(
retryAttempts: 5, // 适中的重试次数
retryDelay: 2.0, // 合理的重试间隔
reconnectOnClose: true // 启用自动重连
);
// 生产环境使用较高的日志级别
$handler = new StreamHandler('php://stdout', Level::WARNING);
// 开发环境使用详细日志
$handler = new StreamHandler('php://stdout', Level::DEBUG);
// 定期清理不需要的服务
ServiceRegistry::remove('unused-service');
// 限制连接数量
if (count($center->getConnectedMasters()) > $maxConnections) {
// 拒绝新连接或关闭旧连接
}
bash
# 终端 1: 启动主注册中心
php examples/register.php
# 终端 2: 启动从注册中心
php examples/register1.php
# 终端 3: 启动主节点
php examples/master.php
bash
# 终端 1: 启动主注册中心
php examples/register.php
# 终端 2: 启动从注册中心
php examples/register1.php
# 终端 3: 启动主节点
php examples/master.php