PHP code example of kode / fibers

1. Go to this page and download the library: Download kode/fibers library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

kode / fibers example snippets


use Kode\Fibers\Fibers;

$result = Fibers::go(fn() => 'hello');

$ctxResult = Fibers::withContext(
    ['trace_id' => 'trace-001'],
    fn() => \Kode\Context\Context::get('trace_id')
);

$batch = Fibers::batch([1, 2, 3], fn(int $item) => $item * 2, 2);
$features = Fibers::runtimeFeatures();

Kode\Fibers\Core\FiberPool       // 主纤程池
Kode\Fibers\Channel\Channel      // 通信通道
Kode\Fibers\Task\TaskRunner      // 任务执行器
Kode\Fibers\Support\CpuInfo      // CPU 核心探测
Kode\Fibers\Contracts\Runnable   // 可运行接口

use Kode\Fibers\Facades\Fiber;

// 使用门面
$result = Fiber::run(fn() => {
    // 模拟异步操作
    usleep(100000);
    return 'Hello from Fiber!';
});

// 或使用辅助函数
$result = fiber_run(fn() => {
    usleep(100000);
    return 'Hello from Fiber!';
});

echo $result; // 输出: Hello from Fiber!

use Kode\Fibers\Core\FiberPool;
use Kode\Fibers\Support\CpuInfo;

// 创建纤程池,自动检测CPU核心数
$pool = new FiberPool([
    'size' => CpuInfo::getRecommendedPoolSize(4),  // CPU核心数 * 4
    'max_exec_time' => 30,                         // 单任务最长执行时间(秒)
    'gc_interval' => 100,                          // 每执行100次触发GC
    'max_retries' => 3,                            // 自动重试次数
    'retry_delay' => 0.5                           // 重试间隔(秒)
]);

// 并行执行多个任务
$results = $pool->concurrent([
    fn() => file_get_contents('http://api.a.com'),
    fn() => file_get_contents('http://api.b.com'),
    fn() => \RedisClient::get('key')
], 10); // 总超时10秒

print_r($results);

// 使用事件回调监控任务状态
$pool = new FiberPool([
    'onTaskStart' => fn($task) => logger()->info('Task started'),
    'onTaskComplete' => fn($task, $result) => logger()->info('Task completed'),
    'onTaskFail' => fn($task, $error) => logger()->error('Task failed', ['error' => $error->getMessage()])
]);

// 内部实现示例(用户无需关心)
if (PHP_VERSION_ID < 80400) {
    // 启用延迟析构任务队列,避免直接在析构函数中suspend
    Fibers::enableSafeDestructMode();
}

// config/fibers.php
return [
    'strict_destruct_check' => false, // 开发调试时可关闭
];

use Kode\Fibers\Support\CpuInfo;

$cpuCount = CpuInfo::get(); // 获取系统CPU核心数
$recommendedSize = CpuInfo::getRecommendedPoolSize(4); // CPU核心数 × 4

use Kode\Fibers\Core\FiberPool;
use Kode\Fibers\Support\CpuInfo;

$pool = new FiberPool([
    'size' => CpuInfo::getRecommendedPoolSize(3), // CPU × 3
    'name' => 'http-worker',
    'max_exec_time' => 30, // 单任务超时时间
    'max_retries' => 3, // 失败重试次数
    'retry_delay' => 0.5, // 重试间隔(秒)
    'gc_interval' => 100, // GC触发间隔
    'concurrent_limit' => 500, // 最大并发任务数
    'onCreate' => fn($id) => logger()->info("Fiber #$id created"),
    'onDestroy' => fn($id) => logger()->info("Fiber #$id destroyed"),
    'onTaskStart' => fn($taskId) => logger()->debug("Task $taskId started"),
    'onTaskComplete' => fn($taskId, $result) => logger()->debug("Task $taskId completed"),
    'onTaskFail' => fn($taskId, $error) => logger()->error("Task $taskId failed: {$error->getMessage()}")
]);

// 执行单个任务
$singleResult = $pool->run(fn() => file_get_contents('https://api.example.com'));

// 并行执行多个任务
$results = $pool->concurrent([
    fn() => file_get_contents('https://api.a.com'),
    fn() => file_get_contents('https://api.b.com'),
    fn() => file_get_contents('https://api.c.com')
], 10); // 总超时10秒

// 获取池状态信息
$stats = $pool->getStats();
print_r($stats);
/*
输出示例:
Array(
    [active_fibers] => 12
    [total_tasks] => 156
    [completed_tasks] => 144
    [failed_tasks] => 3
    [average_execution_time] => 0.125
)*/

// 关闭池并释放资源
$pool->shutdown();

// 优雅关闭(等待当前任务完成)
$pool->shutdown(true);

return [
    // 默认纤程池配置
    'default_pool' => [
        'size' => env('FIBER_POOL_SIZE', CpuInfo::getRecommendedPoolSize(4)),
        'timeout' => 30,
        'max_retries' => 3,
        'retry_delay' => 0.5,
        'context' => ['user_id' => null]
    ],
    // 通信通道配置
    'channels' => [
        'orders' => ['buffer_size' => 100],
        'logs' => ['buffer_size' => 50]
    ],
    // 功能开关
    'features' => [
        'auto_suspend_io' => true,
        'enable_monitoring' => true,
        'strict_destruct_check' => env('APP_ENV') === 'production'
    ],
    // 环境检测配置
    'environment' => [
        'check_disabled_functions' => true,
        '

// 在 app/Providers/AppServiceProvider.php 中注册
public function register()
{
    $this->app->singleton(FiberPool::class, function () {
        return new FiberPool(config('fibers.default_pool'));
    });
}

// 使用
public function index(FiberPool $pool)
{
    $results = $pool->concurrent([...]);
    return response()->json($results);
}

// 在控制器中使用
public function index(FiberPool $pool)
{
    $results = $pool->concurrent([...]);
    return $this->json($results);
}

// 在 config/common.php 中配置
return [
    'components' => [
        'fibers' => [
            'class' => \Kode\Fibers\Core\FiberManager::class,
            'poolConfig' => $params['fibers.default_pool']
        ]
    ]
];

// 在 config/service.php 中注册
return [
    'fibers' => \Kode\Fibers\Providers\ThinkPHPService::class
];

// 创建配置文件
$config = new FiberPool($config['default_pool']);

// 使用纤程池
$results = $pool->concurrent([...]);

use Kode\Fibers\Attributes\FiberSafe;
use Kode\Fibers\Attributes\Timeout;
use Kode\Fibers\Attributes\ChannelListener;

#[FiberSafe] // 表示该方法可在纤程中安全调用
class ApiService 
{
    #[Timeout(10)] // 设置10秒超时
    public function fetchUser(int $id): array
    {
        return json_decode(file_get_contents("https://api.com/users/$id"), true);
    }

    #[ChannelListener('order.created')] // 监听通道事件
    public function onOrderCreated(array $data): void
    {
        // 处理订单创建事件
    }
}

/**
 * @method static mixed run(callable $task, float $timeout = null)
 * @method static FiberPool pool(array $options = [])
 * @method static Channel channel(string $name, int $buffer = 0)
 * @method static array concurrent(array $tasks, float $timeout = null)
 */
class Fiber {}

#[FiberSafe]
function processUserData(User $user): array {
    // 函数逻辑...
    return ['id' => $user->id, 'name' => $user->name];
}

// IDE会自动提示类型错误
Fiber::run(fn() => processUserData(null)); // 提示:参数1应为User类型

use Kode\Fibers\Channel\Channel;

// 创建带缓冲区的通道(缓冲区大小为10)
$ch = Channel::make('download-results', 10);

// 或者使用辅助函数
$ch = fiber_channel('download-results', 10);

// 生产者:向通道发送数据
Fiber::run(function () use ($ch) {
    foreach ([1, 2, 3] as $i) {
        $ch->push("Data $i");
        usleep(100000); // 模拟异步操作
    }
    $ch->close(); // 关闭通道,防止内存泄漏
});

// 消费者:从通道接收数据
while ($msg = $ch->pop(1)) { // 超时1秒
    echo $msg . "\n";
}

// 非阻塞模式
if ($ch->canPush()) {
    $ch->push('non-blocking data');
}

// 带超时的非阻塞模式
if ($ch->tryPush('data', 0.5)) { // 0.5秒超时
    echo 'Data pushed successfully';
}

use Kode\Fibers\Event\EventBus;
use Kode\Fibers\Event\Event;

// 定义事件类
class PaymentSuccessEvent extends Event {
    public function __construct(public array $paymentData) {
        parent::__construct('payment.success', $paymentData);
    }
}

// 注册事件监听器
EventBus::on('payment.success', function (Event $event) {
    $paymentData = $event->getData();
    // 处理支付成功事件
    notifyAdmin($paymentData);
});

// 注册一次性监听器(只触发一次)
EventBus::once('user.login', fn($event) => logFirstLogin($event->getData()));

// 触发事件
EventBus::fire(new PaymentSuccessEvent([
    'order_id' => 123,
    'amount' => 99.99,
    'user_id' => 456
]));

// 移除事件监听器
EventBus::off('payment.success');

// 带有优先级的事件监听器
EventBus::on('system.shutdown', fn() => saveCriticalData(), 100); // 高优先级
EventBus::on('system.shutdown', fn() => cleanTempFiles(), 50); // 中优先级
EventBus::on('system.shutdown', fn() => logShutdown(), 10); // 低优先级

use Kode\Fibers\Support\Environment;

// 运行全面的环境诊断
$issues = Environment::diagnose();

// 打印诊断结果
foreach ($issues as $issue) {
    echo "⚠️ {$issue['type']}: {$issue['message']}" . PHP_EOL;
    if (isset($issue['recommendation'])) {
        echo "💡 建议: {$issue['recommendation']}" . PHP_EOL;
    }
}

// 检查特定功能是否可用
if (Environment::hasDisabledFunctions(['exec', 'shell_exec'])) {
    // 提供替代方案
}

// 检查必要的扩展是否安装
if (!Environment::hasRequiredExtensions(['curl', 'mbstring'])) {
    // 提示用户安装扩展
}

// 检查是否可以在析构函数中安全使用Fiber
if (Environment::supportsDestructInFiber()) {
    // PHP 8.4+ 行为
} else {
    // PHP < 8.4 兼容行为
}

// 场景:并行获取多个 API 数据
$data = [
    Fiber::run(fn() => httpGet('/users')),
    Fiber::run(fn() => httpGet('/posts')),
    Fiber::run(fn() => cacheGet('stats'))
];

// 场景:非阻塞读取文件
$content = Fiber::run(fn() => {
    $handle = fopen('large-file.txt', 'r');
    $content = fread($handle, 1024 * 1024);
    fclose($handle);
    return $content;
});

// 场景:异步等待外部服务响应
$response = Fiber::run(function() {
    $client = new HttpClient();
    return $client->get('https://api.example.com');
}, 5); // 5秒超时

// 场景1:API网关并行请求聚合
$pool = new FiberPool(['size' => 128]);

$userRequests = array_map(fn($userId) => fn() => getUserData($userId), $userIds);
$userData = $pool->concurrent($userRequests, 10);

// 场景2:批量数据处理
$pool = new FiberPool([
    'size' => CpuInfo::getRecommendedPoolSize(4),
    'max_exec_time' => 60,
    'max_retries' => 3
]);

$results = $pool->concurrent(array_map(function($item) {
    return function() use ($item) {
        // 处理单个项目
        return processItem($item);
    };
}, $batchItems));

// 场景3:定时任务执行器
$pool = new FiberPool(['name' => 'scheduler']);

foreach ($tasks as $task) {
    $pool->run(function() use ($task) {
        $delay = $task->getDelay();
        usleep($delay * 1000000); // 非阻塞睡眠
        return $task->execute();
    });
}

use Kode\Fibers\HttpClient\HttpClient;

// 自动选择最佳驱动
$client = new HttpClient();
$response = $client->get('https://api.example.com');

// 检查当前使用的驱动
if ($client->isUsingNativeDriver()) {
    echo "使用原生 cURL 驱动(无需额外依赖)";
} else {
    echo "使用 kode/http-client 包驱动(功能更完整)";
}
bash
# 自动检测框架类型并生成配置
php vendor/bin/fibers init

# 指定框架类型
php vendor/bin/fibers init --framework=laravel
bash
# 运行环境诊断
php vendor/bin/fibers diagnose
bash
# 初始化配置文件
php vendor/bin/fibers init

# 指定框架类型初始化
php vendor/bin/fibers init --framework=laravel

# 运行环境诊断
php vendor/bin/fibers diagnose

# 查看当前运行的 Fiber ID 和状态
php vendor/bin/fibers status

# 清理僵尸纤程
php vendor/bin/fibers cleanup

# 性能压测(测试最大吞吐)
php vendor/bin/fibers benchmark --concurrency=1000

# 查看帮助信息
php vendor/bin/fibers --help

# 查看特定命令帮助
php vendor/bin/fibers diagnose --help