PHP code example of kode / fibers
1. Go to this page and download the library: Download kode/fibers library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
kode / fibers example snippets
use Kode\Fibers\Fibers;
$result = Fibers::go(fn() => 'hello');
$ctxResult = Fibers::withContext(
['trace_id' => 'trace-001'],
fn() => \Kode\Context\Context::get('trace_id')
);
$batch = Fibers::batch([1, 2, 3], fn(int $item) => $item * 2, 2);
$features = Fibers::runtimeFeatures();
Kode\Fibers\Core\FiberPool // 主纤程池
Kode\Fibers\Channel\Channel // 通信通道
Kode\Fibers\Task\TaskRunner // 任务执行器
Kode\Fibers\Support\CpuInfo // CPU 核心探测
Kode\Fibers\Contracts\Runnable // 可运行接口
use Kode\Fibers\Facades\Fiber;
// 使用门面
$result = Fiber::run(fn() => {
// 模拟异步操作
usleep(100000);
return 'Hello from Fiber!';
});
// 或使用辅助函数
$result = fiber_run(fn() => {
usleep(100000);
return 'Hello from Fiber!';
});
echo $result; // 输出: Hello from Fiber!
use Kode\Fibers\Core\FiberPool;
use Kode\Fibers\Support\CpuInfo;
// 创建纤程池,自动检测CPU核心数
$pool = new FiberPool([
'size' => CpuInfo::getRecommendedPoolSize(4), // CPU核心数 * 4
'max_exec_time' => 30, // 单任务最长执行时间(秒)
'gc_interval' => 100, // 每执行100次触发GC
'max_retries' => 3, // 自动重试次数
'retry_delay' => 0.5 // 重试间隔(秒)
]);
// 并行执行多个任务
$results = $pool->concurrent([
fn() => file_get_contents('http://api.a.com'),
fn() => file_get_contents('http://api.b.com'),
fn() => \RedisClient::get('key')
], 10); // 总超时10秒
print_r($results);
// 使用事件回调监控任务状态
$pool = new FiberPool([
'onTaskStart' => fn($task) => logger()->info('Task started'),
'onTaskComplete' => fn($task, $result) => logger()->info('Task completed'),
'onTaskFail' => fn($task, $error) => logger()->error('Task failed', ['error' => $error->getMessage()])
]);
// 内部实现示例(用户无需关心)
if (PHP_VERSION_ID < 80400) {
// 启用延迟析构任务队列,避免直接在析构函数中suspend
Fibers::enableSafeDestructMode();
}
// config/fibers.php
return [
'strict_destruct_check' => false, // 开发调试时可关闭
];
use Kode\Fibers\Support\CpuInfo;
$cpuCount = CpuInfo::get(); // 获取系统CPU核心数
$recommendedSize = CpuInfo::getRecommendedPoolSize(4); // CPU核心数 × 4
use Kode\Fibers\Core\FiberPool;
use Kode\Fibers\Support\CpuInfo;
$pool = new FiberPool([
'size' => CpuInfo::getRecommendedPoolSize(3), // CPU × 3
'name' => 'http-worker',
'max_exec_time' => 30, // 单任务超时时间
'max_retries' => 3, // 失败重试次数
'retry_delay' => 0.5, // 重试间隔(秒)
'gc_interval' => 100, // GC触发间隔
'concurrent_limit' => 500, // 最大并发任务数
'onCreate' => fn($id) => logger()->info("Fiber #$id created"),
'onDestroy' => fn($id) => logger()->info("Fiber #$id destroyed"),
'onTaskStart' => fn($taskId) => logger()->debug("Task $taskId started"),
'onTaskComplete' => fn($taskId, $result) => logger()->debug("Task $taskId completed"),
'onTaskFail' => fn($taskId, $error) => logger()->error("Task $taskId failed: {$error->getMessage()}")
]);
// 执行单个任务
$singleResult = $pool->run(fn() => file_get_contents('https://api.example.com'));
// 并行执行多个任务
$results = $pool->concurrent([
fn() => file_get_contents('https://api.a.com'),
fn() => file_get_contents('https://api.b.com'),
fn() => file_get_contents('https://api.c.com')
], 10); // 总超时10秒
// 获取池状态信息
$stats = $pool->getStats();
print_r($stats);
/*
输出示例:
Array(
[active_fibers] => 12
[total_tasks] => 156
[completed_tasks] => 144
[failed_tasks] => 3
[average_execution_time] => 0.125
)*/
// 关闭池并释放资源
$pool->shutdown();
// 优雅关闭(等待当前任务完成)
$pool->shutdown(true);
return [
// 默认纤程池配置
'default_pool' => [
'size' => env('FIBER_POOL_SIZE', CpuInfo::getRecommendedPoolSize(4)),
'timeout' => 30,
'max_retries' => 3,
'retry_delay' => 0.5,
'context' => ['user_id' => null]
],
// 通信通道配置
'channels' => [
'orders' => ['buffer_size' => 100],
'logs' => ['buffer_size' => 50]
],
// 功能开关
'features' => [
'auto_suspend_io' => true,
'enable_monitoring' => true,
'strict_destruct_check' => env('APP_ENV') === 'production'
],
// 环境检测配置
'environment' => [
'check_disabled_functions' => true,
'
// 在 app/Providers/AppServiceProvider.php 中注册
public function register()
{
$this->app->singleton(FiberPool::class, function () {
return new FiberPool(config('fibers.default_pool'));
});
}
// 使用
public function index(FiberPool $pool)
{
$results = $pool->concurrent([...]);
return response()->json($results);
}
// 在控制器中使用
public function index(FiberPool $pool)
{
$results = $pool->concurrent([...]);
return $this->json($results);
}
// 在 config/common.php 中配置
return [
'components' => [
'fibers' => [
'class' => \Kode\Fibers\Core\FiberManager::class,
'poolConfig' => $params['fibers.default_pool']
]
]
];
// 在 config/service.php 中注册
return [
'fibers' => \Kode\Fibers\Providers\ThinkPHPService::class
];
// 创建配置文件
$config = new FiberPool($config['default_pool']);
// 使用纤程池
$results = $pool->concurrent([...]);
use Kode\Fibers\Attributes\FiberSafe;
use Kode\Fibers\Attributes\Timeout;
use Kode\Fibers\Attributes\ChannelListener;
#[FiberSafe] // 表示该方法可在纤程中安全调用
class ApiService
{
#[Timeout(10)] // 设置10秒超时
public function fetchUser(int $id): array
{
return json_decode(file_get_contents("https://api.com/users/$id"), true);
}
#[ChannelListener('order.created')] // 监听通道事件
public function onOrderCreated(array $data): void
{
// 处理订单创建事件
}
}
/**
* @method static mixed run(callable $task, float $timeout = null)
* @method static FiberPool pool(array $options = [])
* @method static Channel channel(string $name, int $buffer = 0)
* @method static array concurrent(array $tasks, float $timeout = null)
*/
class Fiber {}
#[FiberSafe]
function processUserData(User $user): array {
// 函数逻辑...
return ['id' => $user->id, 'name' => $user->name];
}
// IDE会自动提示类型错误
Fiber::run(fn() => processUserData(null)); // 提示:参数1应为User类型
use Kode\Fibers\Channel\Channel;
// 创建带缓冲区的通道(缓冲区大小为10)
$ch = Channel::make('download-results', 10);
// 或者使用辅助函数
$ch = fiber_channel('download-results', 10);
// 生产者:向通道发送数据
Fiber::run(function () use ($ch) {
foreach ([1, 2, 3] as $i) {
$ch->push("Data $i");
usleep(100000); // 模拟异步操作
}
$ch->close(); // 关闭通道,防止内存泄漏
});
// 消费者:从通道接收数据
while ($msg = $ch->pop(1)) { // 超时1秒
echo $msg . "\n";
}
// 非阻塞模式
if ($ch->canPush()) {
$ch->push('non-blocking data');
}
// 带超时的非阻塞模式
if ($ch->tryPush('data', 0.5)) { // 0.5秒超时
echo 'Data pushed successfully';
}
use Kode\Fibers\Event\EventBus;
use Kode\Fibers\Event\Event;
// 定义事件类
class PaymentSuccessEvent extends Event {
public function __construct(public array $paymentData) {
parent::__construct('payment.success', $paymentData);
}
}
// 注册事件监听器
EventBus::on('payment.success', function (Event $event) {
$paymentData = $event->getData();
// 处理支付成功事件
notifyAdmin($paymentData);
});
// 注册一次性监听器(只触发一次)
EventBus::once('user.login', fn($event) => logFirstLogin($event->getData()));
// 触发事件
EventBus::fire(new PaymentSuccessEvent([
'order_id' => 123,
'amount' => 99.99,
'user_id' => 456
]));
// 移除事件监听器
EventBus::off('payment.success');
// 带有优先级的事件监听器
EventBus::on('system.shutdown', fn() => saveCriticalData(), 100); // 高优先级
EventBus::on('system.shutdown', fn() => cleanTempFiles(), 50); // 中优先级
EventBus::on('system.shutdown', fn() => logShutdown(), 10); // 低优先级
use Kode\Fibers\Support\Environment;
// 运行全面的环境诊断
$issues = Environment::diagnose();
// 打印诊断结果
foreach ($issues as $issue) {
echo "⚠️ {$issue['type']}: {$issue['message']}" . PHP_EOL;
if (isset($issue['recommendation'])) {
echo "💡 建议: {$issue['recommendation']}" . PHP_EOL;
}
}
// 检查特定功能是否可用
if (Environment::hasDisabledFunctions(['exec', 'shell_exec'])) {
// 提供替代方案
}
// 检查必要的扩展是否安装
if (!Environment::hasRequiredExtensions(['curl', 'mbstring'])) {
// 提示用户安装扩展
}
// 检查是否可以在析构函数中安全使用Fiber
if (Environment::supportsDestructInFiber()) {
// PHP 8.4+ 行为
} else {
// PHP < 8.4 兼容行为
}
// 场景:并行获取多个 API 数据
$data = [
Fiber::run(fn() => httpGet('/users')),
Fiber::run(fn() => httpGet('/posts')),
Fiber::run(fn() => cacheGet('stats'))
];
// 场景:非阻塞读取文件
$content = Fiber::run(fn() => {
$handle = fopen('large-file.txt', 'r');
$content = fread($handle, 1024 * 1024);
fclose($handle);
return $content;
});
// 场景:异步等待外部服务响应
$response = Fiber::run(function() {
$client = new HttpClient();
return $client->get('https://api.example.com');
}, 5); // 5秒超时
// 场景1:API网关并行请求聚合
$pool = new FiberPool(['size' => 128]);
$userRequests = array_map(fn($userId) => fn() => getUserData($userId), $userIds);
$userData = $pool->concurrent($userRequests, 10);
// 场景2:批量数据处理
$pool = new FiberPool([
'size' => CpuInfo::getRecommendedPoolSize(4),
'max_exec_time' => 60,
'max_retries' => 3
]);
$results = $pool->concurrent(array_map(function($item) {
return function() use ($item) {
// 处理单个项目
return processItem($item);
};
}, $batchItems));
// 场景3:定时任务执行器
$pool = new FiberPool(['name' => 'scheduler']);
foreach ($tasks as $task) {
$pool->run(function() use ($task) {
$delay = $task->getDelay();
usleep($delay * 1000000); // 非阻塞睡眠
return $task->execute();
});
}
use Kode\Fibers\HttpClient\HttpClient;
// 自动选择最佳驱动
$client = new HttpClient();
$response = $client->get('https://api.example.com');
// 检查当前使用的驱动
if ($client->isUsingNativeDriver()) {
echo "使用原生 cURL 驱动(无需额外依赖)";
} else {
echo "使用 kode/http-client 包驱动(功能更完整)";
}
bash
# 自动检测框架类型并生成配置
php vendor/bin/fibers init
# 指定框架类型
php vendor/bin/fibers init --framework=laravel
bash
# 运行环境诊断
php vendor/bin/fibers diagnose
bash
# 初始化配置文件
php vendor/bin/fibers init
# 指定框架类型初始化
php vendor/bin/fibers init --framework=laravel
# 运行环境诊断
php vendor/bin/fibers diagnose
# 查看当前运行的 Fiber ID 和状态
php vendor/bin/fibers status
# 清理僵尸纤程
php vendor/bin/fibers cleanup
# 性能压测(测试最大吞吐)
php vendor/bin/fibers benchmark --concurrency=1000
# 查看帮助信息
php vendor/bin/fibers --help
# 查看特定命令帮助
php vendor/bin/fibers diagnose --help