PHP code example of hiblaphp / postgres

1. Go to this page and download the library: Download hiblaphp/postgres library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

hiblaphp / postgres example snippets


use Hibla\Postgres\PostgresClient;
use function Hibla\await;

// The client is lazy by default, so no connections are opened until the first query.
$client = new PostgresClient('postgresql://test_user:[email protected]/mydb');

// Simple query
$users = await($client->query('SELECT * FROM users WHERE active = $1', [true]));
echo $users->rowCount;

// Named parameters
$user = await(
    $client->query(
        'SELECT * FROM users WHERE email = :email AND status = :status',
        ['email' => '[email protected]', 'status' => 'active']
    )
);

// Positional ? placeholders (converted to $n automatically)
$orders = await(
    $client->query(
        'SELECT * FROM orders WHERE user_id = ? AND status = ?',
        [$userId, 'pending']
    )
);

// Prepared statement (recommended for repeated execution)
$stmt = await(
    $client->prepare('SELECT * FROM users WHERE email = :email')
);
$result = await($stmt->execute(['email' => '[email protected]']));
await($stmt->close());

// Streaming large result sets
$stream = await($client->stream('SELECT * FROM logs ORDER BY id DESC'));
foreach ($stream as $row) {
    processLog($row);
}

// Pub/Sub
await($client->notify('user.events', json_encode(['type' => 'login', 'id' => 42])));

use function Hibla\await;
use Hibla\Promise\Promise;

// Three queries run concurrently. Connections are borrowed from the pool
// (and created on demand) only as each query starts.
[$users, $orders, $stats] = await(
    Promise::all([
        $client->query('SELECT * FROM users'),
        $client->query('SELECT * FROM orders'),
        $client->query('SELECT COUNT(*) FROM stats'),
    ])
);

use Hibla\Sql\SqlClientInterface;

class UserRepository
{
    public function __construct(private readonly SqlClientInterface $db) {}
}

use Hibla\Postgres\ValueObjects\PgSqlConfig;

$config = new PgSqlConfig(
    host: '127.0.0.1',
    port: 5432,
    username: 'app_user',
    password: 'secret',
    database: 'production',
    sslmode: 'verify-full',
    sslCa: '/etc/ssl/certs/ca-bundle.crt',
    enableServerSideCancellation: true,
    resetConnection: true,
    castPreparedTypes: true,
);

$client = new PostgresClient($config, maxConnections: 20);

$config = new PgSqlConfig(
    host: '127.0.0.1',
    username: 'app_user',
    password: 'secret',
    database: 'mydb',
);

$config = PgSqlConfig::fromArray([
    'host'                            => '127.0.0.1',
    'port'                            => 5432,
    'username'                        => 'app_user',
    'password'                        => 'secret',
    'database'                        => 'mydb',
    'sslmode'                         => 'prefer',
    'ssl_ca'                          => '/path/to/ca.pem',
    'ssl_cert'                        => '/path/to/client-cert.pem',
    'ssl_key'                         => '/path/to/client-key.pem',
    'connect_timeout'                 => 10,
    'application_name'                => 'my_app',
    'reset_connection'                => true,
    'enable_server_side_cancellation' => false,
    'kill_timeout_seconds'            => 3.0,
    'cast_prepared_types'             => true,
]);

$config = PgSqlConfig::fromUri(
    'postgresql://app_user:[email protected]:5432/mydb'
    . '?sslmode=verify-full'
    . '&ssl_ca=/path/to/ca.pem'
    . '&reset_connection=true'
    . '&enable_server_side_cancellation=true'
    . '&kill_timeout_seconds=5'
    . '&cast_prepared_types=true'
    . '&application_name=my_app'
);

$base = PgSqlConfig::fromArray([
    'host'     => '127.0.0.1',
    'username' => 'app_user',
    'password' => 'secret',
    'database' => 'mydb',
]);

// Regular client, cancellation off (default)
$readClient = new PostgresClient($base);

// Long-running report client, cancellation on so queries can be interrupted
$reportClient = new PostgresClient(
    $base->withQueryCancellation(true),
    maxConnections: 2,
);

$base = new PgSqlConfig(
    host: 'db.internal',
    username: 'app',
    password: 'secret',
    sslmode: 'verify-full',
    sslCa: '/etc/ssl/ca.pem',
);

$userDb   = new PostgresClient(
    PgSqlConfig::fromArray([...(array) $base, 'database' => 'users']),
    maxConnections: 10
);
$reportDb = new PostgresClient(
    $base->withQueryCancellation(true),
    maxConnections: 2
);

use Hibla\Postgres\PostgresClient;

// From DSN string, lazy, no connections opened yet
$client = new PostgresClient('postgresql://user:pass@localhost:5432/mydb');

// From array
$client = new PostgresClient([
    'host'     => '127.0.0.1',
    'port'     => 5432,
    'username' => 'test_user',
    'password' => 'test_password',
    'database' => 'test',
]);

// With explicit pool settings
$client = new PostgresClient(
    config: 'postgresql://...',
    minConnections: 0,
    maxConnections: 20,
    idleTimeout: 300,
    maxLifetime: 3600,
    statementCacheSize: 512,
    enableStatementCache: true,
    maxWaiters: 100,
    acquireTimeout: 10.0,
    enableServerSideCancellation: true,
    resetConnection: true,
    castPreparedTypes: true,
    onConnect: function (ConnectionSetup $setup) {
        await($setup->execute("SET SESSION TIME ZONE 'UTC'"));
    },
);

$result = await($client->query('SELECT * FROM users LIMIT 10'));

$result = await(
    $client->query(
        'SELECT id, name FROM users WHERE created_at > $1 AND status = $2',
        [$since, 'active']
    )
);

$result = await(
    $client->query(
        'SELECT id, name FROM users WHERE created_at > ? AND status = ?',
        [$since, 'active']
    )
);

$result = await(
    $client->query(
        'SELECT id, name FROM users WHERE created_at > :since AND status = :status',
        ['since' => $since, 'status' => 'active']
    )
);

// Named params with execute()
$result = await(
    $client->query(
        'INSERT INTO orders (user_id, total, status) VALUES (:userId, :total, :status)',
        ['status' => 'pending', 'total' => 99.99, 'userId' => 42] // any key order
    )
);

// Named params via prepare(), most useful when executing the same statement repeatedly
$stmt = await(
    $client->prepare(
        'SELECT * FROM products WHERE category_id = :categoryId AND price > :minPrice'
    )
);

$electronics = await($stmt->execute(['categoryId' => 1, 'minPrice' => 50.00]));
$clothing    = await($stmt->execute(['categoryId' => 2, 'minPrice' => 25.00]));

await($stmt->close());

// Returns affected row count
$count = await(
    $client->execute(
        'UPDATE users SET last_login = NOW() WHERE id = :id',
        ['id' => $userId]
    )
);

// Returns the first column of the first row as an integer (designed for use with RETURNING id)
$newId = await(
    $client->executeGetId(
        'INSERT INTO users (name, email) VALUES (:name, :email) RETURNING id',
        ['name' => 'Alice', 'email' => '[email protected]']
    )
);

// Returns first row as associative array, or null
$user = await(
    $client->fetchOne(
        'SELECT * FROM users WHERE id = :id',
        ['id' => $userId]
    )
);

// Returns value of first column (or named column) from first row
$name = await(
    $client->fetchValue(
        'SELECT name FROM users WHERE id = :id',
        ['id' => $userId]
    )
);

// Positional placeholders (? converted to $n)
$stmt = await(
    $client->prepare('SELECT * FROM products WHERE category_id = ? AND price > ?')
);
$result1 = await($stmt->execute([1, 50.00]));
$result2 = await($stmt->execute([2, 100.00]));
await($stmt->close());

// Named placeholders, order of keys in execute() does not matter
$stmt = await(
    $client->prepare(
        'SELECT * FROM products WHERE category_id = :categoryId AND price > :minPrice'
    )
);
$result1 = await($stmt->execute(['categoryId' => 1, 'minPrice' => 50.00]));
$result2 = await($stmt->execute(['minPrice' => 100.00, 'categoryId' => 2]));
await($stmt->close());

$stream = await(
    $client->stream('SELECT * FROM large_table ORDER BY id', bufferSize: 200)
);

// Inspect stream metadata before iterating
echo $stream->columnCount;
print_r($stream->columns);

foreach ($stream as $row) {
    processRow($row);
}

$stmt = await(
    $client->prepare('SELECT * FROM logs WHERE created_at > :since AND level = :level')
);

$stream = await(
    $stmt->executeStream(['since' => $since, 'level' => 'error'])
);

foreach ($stream as $row) {
    processRow($row);
}

$stream = await($client->stream('SELECT * FROM huge_table'));

foreach ($stream as $row) {
    if (shouldStop($row)) {
        $stream->cancel();
        break;
    }
    processRow($row);
}

> await(
>     async(function () use ($client) {
>         $stream = await($client->stream($sql));
>         foreach ($stream as $row) { ... }
>     })
> );
> 

$result = await(
    $client->transaction(function (TransactionInterface $tx) use ($from, $to) {
        await(
            $tx->execute(
                'UPDATE accounts SET balance = balance - :amount WHERE id = :id',
                ['amount' => 100, 'id' => $from]
            )
        );
        await(
            $tx->execute(
                'UPDATE accounts SET balance = balance + :amount WHERE id = :id',
                ['amount' => 100, 'id' => $to]
            )
        );

        return 'Transfer completed';
    })
);

use Hibla\Sql\TransactionOptions;
use Hibla\Postgres\Enums\IsolationLevel;

await(
    $client->transaction(
        function (TransactionInterface $tx) use ($from, $to) {
            await(
                $tx->execute(
                    'UPDATE accounts SET balance = balance - :amount WHERE id = :id',
                    ['amount' => 100, 'id' => $from]
                )
            );
            await(
                $tx->execute(
                    'UPDATE accounts SET balance = balance + :amount WHERE id = :id',
                    ['amount' => 100, 'id' => $to]
                )
            );
        },
        TransactionOptions::default()
            ->withAttempts(3)
            ->withIsolationLevel(IsolationLevel::REPEATABLE_READ)
    )
);

use Hibla\Sql\TransactionOptions;
use Hibla\Postgres\Enums\IsolationLevel;

$options = TransactionOptions::default()
    ->withAttempts(5)
    ->withIsolationLevel(IsolationLevel::SERIALIZABLE)
    ->withRetryableExceptions([MyOptimisticLockException::class]);

    class MyOptimisticLockException extends \RuntimeException
        implements \Hibla\Sql\Exceptions\RetryableException {}
    

    // Retry by class list
    $options = TransactionOptions::default()
        ->withAttempts(3)
        ->withRetryableExceptions([ThirdPartyConflictException::class]);

    // Retry by predicate
    $options = TransactionOptions::default()
        ->withAttempts(3)
        ->withRetryableExceptions(
            fn(\Throwable $e) => $e instanceof ThirdPartyConflictException && $e->getCode() === 409
        );
    

$tx = await($client->beginTransaction());

try {
    await(
        $tx->execute(
            'UPDATE accounts SET balance = balance - :amount WHERE id = :id',
            ['amount' => 100, 'id' => $from]
        )
    );
    await(
        $tx->execute(
            'UPDATE accounts SET balance = balance + :amount WHERE id = :id',
            ['amount' => 100, 'id' => $to]
        )
    );
    await($tx->commit());
} catch (\Throwable $e) {
    await($tx->rollback());
    throw $e;
}

$tx = await($client->beginTransaction());

try {
    await($tx->savepoint('before_risky'));

    try {
        await(
            $tx->execute(
                'INSERT INTO external_refs (id) VALUES (:id)',
                ['id' => $externalId]
            )
        );
    } catch (\Throwable $e) {
        // Rolling back to the savepoint also clears the tainted state.
        await($tx->rollbackTo('before_risky'));
    }

    await($tx->releaseSavepoint('before_risky'));
    await($tx->commit());
} catch (\Throwable $e) {
    await($tx->rollback());
    throw $e;
}

$promise = $client->transaction(function (TransactionInterface $tx) {
    await($tx->execute('UPDATE ...'));
    await($tx->execute('UPDATE ...')); // still running when cancelled
});

Loop::addTimer(2.0, fn() => $promise->cancel());
// The running query is interrupted, ROLLBACK is issued, connection returned to pool.

await(
    $client->transaction(function (TransactionInterface $tx) use ($externalId) {
        await(
            $tx->execute(
                'INSERT INTO audit_log (event) VALUES (:event)',
                ['event' => 'attempt']
            )
        );

        await($tx->savepoint('before_risky_op'));

        try {
            await(
                $tx->execute(
                    'INSERT INTO external_refs (id) VALUES (:id)',
                    ['id' => $externalId]
                )
            );
        } catch (\Throwable $e) {
            // Rolls back to the savepoint and clears the tainted state,
            // so subsequent queries are allowed to continue.
            await($tx->rollbackTo('before_risky_op'));
        }

        await($tx->releaseSavepoint('before_risky_op'));
    })
);

await(
    $client->transaction(function (TransactionInterface $tx) use ($user) {
        await(
            $tx->execute(
                'INSERT INTO users (name, email) VALUES (:name, :email)',
                ['name' => $user->name, 'email' => $user->email]
            )
        );

        // Fires only if COMMIT succeeds
        $tx->onCommit(function () use ($user) {
            EventDispatcher::dispatch(new UserCreated($user));
        });

        // Fires if the transaction rolls back
        $tx->onRollback(function () use ($user) {
            Logger::warning("Failed to persist user: {$user->email}");
        });
    })
);

// Send a notification to a channel with an optional payload
await(
    $client->notify('user.events', json_encode(['type' => 'login', 'userId' => 42]))
);

// Send without a payload
await($client->notify('cache.invalidate'));

$listener = await($client->createListener());

// Subscribe to one or more channels
await(
    $listener->listen('user.events', function (string $channel, string $payload, int $pid) {
        $event = json_decode($payload, true);
        echo "Received on {$channel} from PID {$pid}: " . print_r($event, true);
    })
);

await(
    $listener->listen('cache.invalidate', function (string $channel, string $payload, int $pid) {
        CacheManager::flush();
    })
);

// Multiple callbacks can be registered to the same channel
await(
    $listener->listen('user.events', function (string $channel, string $payload, int $pid) {
        AuditLogger::log($channel, $payload);
    })
);

await($listener->unlisten('user.events'));

await($listener->close());

// Customize the reconnect backoff window
$listener = await(
    $client->createListener(
        minReconnectInterval: 0.5,  // first retry after 0.5 seconds
        maxReconnectInterval: 60.0, // cap at 60 seconds
    )
);

$client = new PostgresClient(
    config: $config,
    minConnections: 0,
    maxConnections: 50,
    idleTimeout: 600,
    maxLifetime: 3600,
    acquireTimeout: 10.0,
    resetConnection: true,
);

// Graceful: stops new work, waits for active queries to finish, then closes
await($client->closeAsync(timeout: 30.0));

// Force: closes everything immediately, rejects pending waiters
$client->close();

$result = await($client->healthCheck());
// e.g. ['total_checked' => 5, 'healthy' => 4, 'unhealthy' => 1]

$stats = $client->stats;
// Returns an associative array with keys like:
// 'active_connections', 'total_connections', 'pooled_connections',
// 'waiting_requests', 'draining_connections', 'max_size', ...

// Prefer SSL but allow plaintext fallback (the default)
$client = new PostgresClient([
    'host'    => 'db.example.com',
    'sslmode' => 'prefer',
    // ...
]);

// Require SSL with full server certificate verification
$client = new PostgresClient([
    'host'    => 'db.example.com',
    'sslmode' => 'verify-full',
    'ssl_ca'  => '/etc/ssl/certs/ca-bundle.crt',
    // ...
]);

// Mutual TLS (client certificate and key)
$client = new PostgresClient([
    'host'     => 'db.example.com',
    'sslmode'  => 'verify-full',
    'ssl_ca'   => '/path/to/ca.pem',
    'ssl_cert' => '/path/to/client-cert.pem',
    'ssl_key'  => '/path/to/client-key.pem',
    // ...
]);

$client = new PostgresClient(
    config: $config,
    enableServerSideCancellation: true,
);

$promise = $client->query('SELECT * FROM huge_table');
Loop::addTimer(5.0, fn() => $promise->cancel()); // pg_cancel_backend dispatched

$client = new PostgresClient(
    config: $config,
    onConnect: function (ConnectionSetup $setup) {
        await($setup->execute("SET SESSION TIME ZONE 'UTC'"));
        await($setup->execute("SET search_path TO myschema, public"));
    }
);

$client = new PostgresClient(
    config: $config,
    enableStatementCache: true,
    statementCacheSize: 512
);

// Invalidate all caches, for example after a schema change
$client->clearStatementCache();

$result = await(
    $client->query(
        'SELECT price, quantity FROM products WHERE id = ?',
        [1]
    )
);

$row = $result->fetchOne();
// $row['price']    => string("19.99")   (NUMERIC, preserved as string)
// $row['quantity'] => int(5)            (int4, cast to int)

$tax   = bcmul($row['price'], '0.20', 2); // "4.00"
$total = bcadd($row['price'], $tax, 2);   // "23.99"

$result = await($client->query('SELECT * FROM users'));

echo $result->rowCount;      // int, rows in result set
echo $result->affectedRows;  // int, rows affected by INSERT/UPDATE/DELETE
echo $result->connectionId;  // int, backend PID from pg_get_pid()
echo $result->insertedOid;   // int|null, OID of the inserted row if applicable
echo $result->columnCount;   // int, number of columns
echo $result->columns;       // list<string> of column names

foreach ($result as $row) {
    echo $row['name'];
}

$row = $result->fetchOne();           // first row as associative array, or null
$all = $result->fetchAll();           // all rows as array of associative arrays
$col = $result->fetchColumn('name');  // all values from a named column
$col = $result->fetchColumn(0);       // all values from column index 0

$result = await($client->query('SELECT * FROM users; SELECT * FROM orders'));

foreach ($result as $row) {
    echo $row['name']; // first result set: users
}

$next = $result->nextResult();
if ($next !== null) {
    foreach ($next as $row) {
        echo $row['total']; // second result set: orders
    }
}