1. Go to this page and download the library: Download hiblaphp/postgres library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
hiblaphp / postgres example snippets
use Hibla\Postgres\PostgresClient;
use function Hibla\await;
// The client is lazy by default, so no connections are opened until the first query.
$client = new PostgresClient('postgresql://test_user:[email protected]/mydb');
// Simple query
$users = await($client->query('SELECT * FROM users WHERE active = $1', [true]));
echo $users->rowCount;
// Named parameters
$user = await(
$client->query(
'SELECT * FROM users WHERE email = :email AND status = :status',
['email' => '[email protected]', 'status' => 'active']
)
);
// Positional ? placeholders (converted to $n automatically)
$orders = await(
$client->query(
'SELECT * FROM orders WHERE user_id = ? AND status = ?',
[$userId, 'pending']
)
);
// Prepared statement (recommended for repeated execution)
$stmt = await(
$client->prepare('SELECT * FROM users WHERE email = :email')
);
$result = await($stmt->execute(['email' => '[email protected]']));
await($stmt->close());
// Streaming large result sets
$stream = await($client->stream('SELECT * FROM logs ORDER BY id DESC'));
foreach ($stream as $row) {
processLog($row);
}
// Pub/Sub
await($client->notify('user.events', json_encode(['type' => 'login', 'id' => 42])));
use function Hibla\await;
use Hibla\Promise\Promise;
// Three queries run concurrently. Connections are borrowed from the pool
// (and created on demand) only as each query starts.
[$users, $orders, $stats] = await(
Promise::all([
$client->query('SELECT * FROM users'),
$client->query('SELECT * FROM orders'),
$client->query('SELECT COUNT(*) FROM stats'),
])
);
use Hibla\Sql\SqlClientInterface;
class UserRepository
{
public function __construct(private readonly SqlClientInterface $db) {}
}
use Hibla\Postgres\PostgresClient;
// From DSN string, lazy, no connections opened yet
$client = new PostgresClient('postgresql://user:pass@localhost:5432/mydb');
// From array
$client = new PostgresClient([
'host' => '127.0.0.1',
'port' => 5432,
'username' => 'test_user',
'password' => 'test_password',
'database' => 'test',
]);
// With explicit pool settings
$client = new PostgresClient(
config: 'postgresql://...',
minConnections: 0,
maxConnections: 20,
idleTimeout: 300,
maxLifetime: 3600,
statementCacheSize: 512,
enableStatementCache: true,
maxWaiters: 100,
acquireTimeout: 10.0,
enableServerSideCancellation: true,
resetConnection: true,
castPreparedTypes: true,
onConnect: function (ConnectionSetup $setup) {
await($setup->execute("SET SESSION TIME ZONE 'UTC'"));
},
);
$result = await($client->query('SELECT * FROM users LIMIT 10'));
$result = await(
$client->query(
'SELECT id, name FROM users WHERE created_at > $1 AND status = $2',
[$since, 'active']
)
);
$result = await(
$client->query(
'SELECT id, name FROM users WHERE created_at > ? AND status = ?',
[$since, 'active']
)
);
$result = await(
$client->query(
'SELECT id, name FROM users WHERE created_at > :since AND status = :status',
['since' => $since, 'status' => 'active']
)
);
// Named params with execute()
$result = await(
$client->query(
'INSERT INTO orders (user_id, total, status) VALUES (:userId, :total, :status)',
['status' => 'pending', 'total' => 99.99, 'userId' => 42] // any key order
)
);
// Named params via prepare(), most useful when executing the same statement repeatedly
$stmt = await(
$client->prepare(
'SELECT * FROM products WHERE category_id = :categoryId AND price > :minPrice'
)
);
$electronics = await($stmt->execute(['categoryId' => 1, 'minPrice' => 50.00]));
$clothing = await($stmt->execute(['categoryId' => 2, 'minPrice' => 25.00]));
await($stmt->close());
// Returns affected row count
$count = await(
$client->execute(
'UPDATE users SET last_login = NOW() WHERE id = :id',
['id' => $userId]
)
);
// Returns the first column of the first row as an integer (designed for use with RETURNING id)
$newId = await(
$client->executeGetId(
'INSERT INTO users (name, email) VALUES (:name, :email) RETURNING id',
['name' => 'Alice', 'email' => '[email protected]']
)
);
// Returns first row as associative array, or null
$user = await(
$client->fetchOne(
'SELECT * FROM users WHERE id = :id',
['id' => $userId]
)
);
// Returns value of first column (or named column) from first row
$name = await(
$client->fetchValue(
'SELECT name FROM users WHERE id = :id',
['id' => $userId]
)
);
// Positional placeholders (? converted to $n)
$stmt = await(
$client->prepare('SELECT * FROM products WHERE category_id = ? AND price > ?')
);
$result1 = await($stmt->execute([1, 50.00]));
$result2 = await($stmt->execute([2, 100.00]));
await($stmt->close());
// Named placeholders, order of keys in execute() does not matter
$stmt = await(
$client->prepare(
'SELECT * FROM products WHERE category_id = :categoryId AND price > :minPrice'
)
);
$result1 = await($stmt->execute(['categoryId' => 1, 'minPrice' => 50.00]));
$result2 = await($stmt->execute(['minPrice' => 100.00, 'categoryId' => 2]));
await($stmt->close());
$stream = await(
$client->stream('SELECT * FROM large_table ORDER BY id', bufferSize: 200)
);
// Inspect stream metadata before iterating
echo $stream->columnCount;
print_r($stream->columns);
foreach ($stream as $row) {
processRow($row);
}
$stmt = await(
$client->prepare('SELECT * FROM logs WHERE created_at > :since AND level = :level')
);
$stream = await(
$stmt->executeStream(['since' => $since, 'level' => 'error'])
);
foreach ($stream as $row) {
processRow($row);
}
$stream = await($client->stream('SELECT * FROM huge_table'));
foreach ($stream as $row) {
if (shouldStop($row)) {
$stream->cancel();
break;
}
processRow($row);
}
$result = await(
$client->transaction(function (TransactionInterface $tx) use ($from, $to) {
await(
$tx->execute(
'UPDATE accounts SET balance = balance - :amount WHERE id = :id',
['amount' => 100, 'id' => $from]
)
);
await(
$tx->execute(
'UPDATE accounts SET balance = balance + :amount WHERE id = :id',
['amount' => 100, 'id' => $to]
)
);
return 'Transfer completed';
})
);
use Hibla\Sql\TransactionOptions;
use Hibla\Postgres\Enums\IsolationLevel;
await(
$client->transaction(
function (TransactionInterface $tx) use ($from, $to) {
await(
$tx->execute(
'UPDATE accounts SET balance = balance - :amount WHERE id = :id',
['amount' => 100, 'id' => $from]
)
);
await(
$tx->execute(
'UPDATE accounts SET balance = balance + :amount WHERE id = :id',
['amount' => 100, 'id' => $to]
)
);
},
TransactionOptions::default()
->withAttempts(3)
->withIsolationLevel(IsolationLevel::REPEATABLE_READ)
)
);
use Hibla\Sql\TransactionOptions;
use Hibla\Postgres\Enums\IsolationLevel;
$options = TransactionOptions::default()
->withAttempts(5)
->withIsolationLevel(IsolationLevel::SERIALIZABLE)
->withRetryableExceptions([MyOptimisticLockException::class]);
class MyOptimisticLockException extends \RuntimeException
implements \Hibla\Sql\Exceptions\RetryableException {}
// Retry by class list
$options = TransactionOptions::default()
->withAttempts(3)
->withRetryableExceptions([ThirdPartyConflictException::class]);
// Retry by predicate
$options = TransactionOptions::default()
->withAttempts(3)
->withRetryableExceptions(
fn(\Throwable $e) => $e instanceof ThirdPartyConflictException && $e->getCode() === 409
);
$tx = await($client->beginTransaction());
try {
await(
$tx->execute(
'UPDATE accounts SET balance = balance - :amount WHERE id = :id',
['amount' => 100, 'id' => $from]
)
);
await(
$tx->execute(
'UPDATE accounts SET balance = balance + :amount WHERE id = :id',
['amount' => 100, 'id' => $to]
)
);
await($tx->commit());
} catch (\Throwable $e) {
await($tx->rollback());
throw $e;
}
$tx = await($client->beginTransaction());
try {
await($tx->savepoint('before_risky'));
try {
await(
$tx->execute(
'INSERT INTO external_refs (id) VALUES (:id)',
['id' => $externalId]
)
);
} catch (\Throwable $e) {
// Rolling back to the savepoint also clears the tainted state.
await($tx->rollbackTo('before_risky'));
}
await($tx->releaseSavepoint('before_risky'));
await($tx->commit());
} catch (\Throwable $e) {
await($tx->rollback());
throw $e;
}
$promise = $client->transaction(function (TransactionInterface $tx) {
await($tx->execute('UPDATE ...'));
await($tx->execute('UPDATE ...')); // still running when cancelled
});
Loop::addTimer(2.0, fn() => $promise->cancel());
// The running query is interrupted, ROLLBACK is issued, connection returned to pool.
await(
$client->transaction(function (TransactionInterface $tx) use ($externalId) {
await(
$tx->execute(
'INSERT INTO audit_log (event) VALUES (:event)',
['event' => 'attempt']
)
);
await($tx->savepoint('before_risky_op'));
try {
await(
$tx->execute(
'INSERT INTO external_refs (id) VALUES (:id)',
['id' => $externalId]
)
);
} catch (\Throwable $e) {
// Rolls back to the savepoint and clears the tainted state,
// so subsequent queries are allowed to continue.
await($tx->rollbackTo('before_risky_op'));
}
await($tx->releaseSavepoint('before_risky_op'));
})
);
await(
$client->transaction(function (TransactionInterface $tx) use ($user) {
await(
$tx->execute(
'INSERT INTO users (name, email) VALUES (:name, :email)',
['name' => $user->name, 'email' => $user->email]
)
);
// Fires only if COMMIT succeeds
$tx->onCommit(function () use ($user) {
EventDispatcher::dispatch(new UserCreated($user));
});
// Fires if the transaction rolls back
$tx->onRollback(function () use ($user) {
Logger::warning("Failed to persist user: {$user->email}");
});
})
);
// Send a notification to a channel with an optional payload
await(
$client->notify('user.events', json_encode(['type' => 'login', 'userId' => 42]))
);
// Send without a payload
await($client->notify('cache.invalidate'));
$listener = await($client->createListener());
// Subscribe to one or more channels
await(
$listener->listen('user.events', function (string $channel, string $payload, int $pid) {
$event = json_decode($payload, true);
echo "Received on {$channel} from PID {$pid}: " . print_r($event, true);
})
);
await(
$listener->listen('cache.invalidate', function (string $channel, string $payload, int $pid) {
CacheManager::flush();
})
);
// Multiple callbacks can be registered to the same channel
await(
$listener->listen('user.events', function (string $channel, string $payload, int $pid) {
AuditLogger::log($channel, $payload);
})
);
await($listener->unlisten('user.events'));
await($listener->close());
// Customize the reconnect backoff window
$listener = await(
$client->createListener(
minReconnectInterval: 0.5, // first retry after 0.5 seconds
maxReconnectInterval: 60.0, // cap at 60 seconds
)
);
// Graceful: stops new work, waits for active queries to finish, then closes
await($client->closeAsync(timeout: 30.0));
// Force: closes everything immediately, rejects pending waiters
$client->close();
$client = new PostgresClient(
config: $config,
onConnect: function (ConnectionSetup $setup) {
await($setup->execute("SET SESSION TIME ZONE 'UTC'"));
await($setup->execute("SET search_path TO myschema, public"));
}
);
$client = new PostgresClient(
config: $config,
enableStatementCache: true,
statementCacheSize: 512
);
// Invalidate all caches, for example after a schema change
$client->clearStatementCache();
$result = await(
$client->query(
'SELECT price, quantity FROM products WHERE id = ?',
[1]
)
);
$row = $result->fetchOne();
// $row['price'] => string("19.99") (NUMERIC, preserved as string)
// $row['quantity'] => int(5) (int4, cast to int)
$result = await($client->query('SELECT * FROM users'));
echo $result->rowCount; // int, rows in result set
echo $result->affectedRows; // int, rows affected by INSERT/UPDATE/DELETE
echo $result->connectionId; // int, backend PID from pg_get_pid()
echo $result->insertedOid; // int|null, OID of the inserted row if applicable
echo $result->columnCount; // int, number of columns
echo $result->columns; // list<string> of column names
foreach ($result as $row) {
echo $row['name'];
}
$row = $result->fetchOne(); // first row as associative array, or null
$all = $result->fetchAll(); // all rows as array of associative arrays
$col = $result->fetchColumn('name'); // all values from a named column
$col = $result->fetchColumn(0); // all values from column index 0
$result = await($client->query('SELECT * FROM users; SELECT * FROM orders'));
foreach ($result as $row) {
echo $row['name']; // first result set: users
}
$next = $result->nextResult();
if ($next !== null) {
foreach ($next as $row) {
echo $row['total']; // second result set: orders
}
}
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.