1. Go to this page and download the library: Download mroosz/php-cassandra library. Choose the download type require.
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
mroosz / php-cassandra example snippets
use Cassandra\Connection;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection\ConnectionOptions;
use Cassandra\Consistency;
// Connect to Cassandra
$nodes = [
new StreamNodeConfig(
host: '127.0.0.1',
port: 9042,
username: 'cassandra',
password: 'cassandra'
),
];
$conn = new Connection($nodes, keyspace: 'my_keyspace');
$conn->connect();
$conn->setConsistency(Consistency::QUORUM);
// Simple query
$result = $conn->query('SELECT * FROM system.local')->asRowsResult();
foreach ($result as $row) {
echo "Cluster: " . $row['cluster_name'] . "\n";
}
use Cassandra\Request\Options\ExecuteOptions;
use Cassandra\Value\Uuid;
use Cassandra\Consistency;
// Prepare a statement
$prepared = $conn->prepare('SELECT * FROM users WHERE id = ? AND status = ?');
// Execute with positional parameters
$result = $conn->execute(
$prepared,
[
Uuid::fromValue('550e8400-e29b-41d4-a716-446655440000'),
'active'
],
consistency: Consistency::LOCAL_QUORUM,
options: new ExecuteOptions(pageSize: 100)
)->asRowsResult();
foreach ($result as $user) {
echo "User: {$user['name']} ({$user['email']})\n";
}
// Execute with named parameters
$namedPrepared = $conn->prepare('SELECT * FROM users WHERE email = :email AND org_id = :org_id');
$result = $conn->execute(
$namedPrepared,
['email' => '[email protected]', 'org_id' => 123],
options: new ExecuteOptions(namesForValues: true)
)->asRowsResult();
use Cassandra\Request\Options\QueryOptions;
// Fire multiple queries concurrently
$statements = [];
$statements[] = $conn->queryAsync(
'SELECT COUNT(*) FROM users',
options: new QueryOptions(pageSize: 1000)
);
$statements[] = $conn->queryAsync(
'SELECT * FROM users LIMIT 10',
options: new QueryOptions(pageSize: 10)
);
// Process results as they become available
$userCount = $statements[0]->getRowsResult()->fetch()['count'];
$recentUsers = $statements[1]->getRowsResult()->fetchAll();
echo "Total users: {$userCount}\n";
echo "Recent users: " . count($recentUsers) . "\n";
use Cassandra\Connection\SocketNodeConfig;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection;
// Stream transport
$streamNode = new StreamNodeConfig(
host: 'tls://cassandra.example.com',
port: 9042,
username: 'user',
password: 'secret',
connectTimeoutInSeconds: 10,
timeoutInSeconds: 30,
);
// Stream transport with SSL/TLS
$streamTlsNode = new StreamNodeConfig(
host: 'tls://cassandra.example.com',
port: 9042,
username: 'user',
password: 'secret',
connectTimeoutInSeconds: 10,
timeoutInSeconds: 30,
sslOptions: [
// See [PHP SSL context options](https://www.php.net/manual/en/context.ssl.php)
'cafile' => '/etc/ssl/certs/ca.pem',
'verify_peer' => true,
'verify_peer_name' => true,
]
);
// Socket transport
$socketNode = new SocketNodeConfig(
host: '10.0.0.10',
port: 9042,
username: 'user',
password: 'secret',
// See [PHP socket_get_option documentation](https://www.php.net/manual/en/function.socket-get-option.php)
socketOptions: [SO_RCVTIMEO => ['sec' => 10, 'usec' => 0]]
);
$conn = new Connection([$streamNode, $streamTlsNode, $socketNode], keyspace: 'app');
$conn->connect();
use Cassandra\Value\Uuid;
use Cassandra\Consistency;
use Cassandra\Request\Options\QueryOptions;
$rowsResult = $conn->query(
'SELECT id, name FROM ks.users WHERE id = ?',
[Uuid::fromValue($id)],
consistency: Consistency::ONE,
options: new QueryOptions(pageSize: 100)
)->asRowsResult();
use Cassandra\Request\Options\QueryOptions;
$s1 = $conn->queryAsync('SELECT count(*) FROM ks.t1', options: new QueryOptions(pageSize: 1000));
$s2 = $conn->queryAsync('SELECT count(*) FROM ks.t2', options: new QueryOptions(pageSize: 1000));
$r2 = $s2->getResult()->asRowsResult();
$r1 = $s1->getResult()->asRowsResult();
// For simple queries
$pages = $conn->queryAll('SELECT * FROM ks.users WHERE org_id = ?', [$orgId]);
foreach ($pages as $page) {
foreach ($page as $row) {
// ...
}
}
use Cassandra\Request\Options\ExecuteOptions;
$prepared = $conn->prepare('SELECT * FROM ks.users WHERE email = :email');
$rowsResult = $conn->execute(
$prepared,
['email' => '[email protected]'],
options: new ExecuteOptions(
namesForValues: true,
pageSize: 50
)
)->asRowsResult();
use Cassandra\Request\Options\ExecuteOptions;
$options = new ExecuteOptions(pageSize: 100, namesForValues: true);
$result = $conn->execute($prepared, ['org_id' => 1], options: $options)->asRowsResult();
do {
foreach ($result as $row) {
// process row
}
$pagingState = $result->getRowsMetadata()->pagingState;
if ($pagingState === null) break;
$options = new ExecuteOptions(
pageSize: 100,
namesForValues: true,
pagingState: $pagingState
);
$result = $conn->execute($result, [], options: $options)->asRowsResult(); // reuse previous RowsResult for metadata id
} while (true);
use Cassandra\Request\Options\ExecuteOptions;
$pages = $conn->executeAll($prepared, ['org_id' => 1], options: new ExecuteOptions(namesForValues: true));
use Cassandra\Consistency;
use Cassandra\Request\Batch;
use Cassandra\Request\BatchType;
use Cassandra\Value\Uuid;
use Cassandra\Value\Varchar;
$batch = new Batch(type: BatchType::LOGGED, consistency: Consistency::QUORUM);
// Prepared in batch (namesForValues: use associative array)
$prepared = $conn->prepare('UPDATE ks.users SET age = :age WHERE id = :id');
$batch->appendPreparedStatement($prepared, ['age' => 21, 'id' => 'c5419d81-499e-4c9c-ac0c-fa6ba3ebc2bc']);
// Simple query in batch (positional)
$batch->appendQuery(
'INSERT INTO ks.users (id, name, age) VALUES (?, ?, ?)',
[
Uuid::fromValue('c5420d81-499e-4c9c-ac0c-fa6ba3ebc2bc'),
Varchar::fromValue('Mark'),
20,
]
);
$conn->batch($batch);
use Cassandra\Response\Result\FetchType;
$r = $conn->query('SELECT role FROM system_auth.roles')->asRowsResult();
foreach ($r as $i => $row) {
echo $row['role'], "\n";
}
// Fetch single row
$result = $conn->query('SELECT id, name, email FROM users WHERE id = ?', [$userId])->asRowsResult();
$user = $result->fetch(FetchType::ASSOC);
if ($user) {
echo "User: {$user['name']} <{$user['email']}>\n";
}
// Fetch all rows at once
$allUsers = $result->fetchAll(FetchType::ASSOC);
foreach ($allUsers as $user) {
echo "User: {$user['name']}\n";
}
// Fetch specific column values
$result = $conn->query('SELECT name FROM users WHERE org_id = ?', [123])->asRowsResult();
$names = $result->fetchAllColumns(0); // Get all values from first column
print_r($names);
// Fetch key-value pairs
$result = $conn->query('SELECT id, name FROM users WHERE active = true')->asRowsResult();
$userMap = $result->fetchAllKeyPairs(0, 1); // id => name mapping
print_r($userMap);
// Different fetch types
$result = $conn->query('SELECT id, name, email FROM users LIMIT 5')->asRowsResult();
// Associative array (default)
$row = $result->fetch(FetchType::ASSOC);
// Returns: ['id' => '...', 'name' => '...', 'email' => '...']
// Numeric array
$row = $result->fetch(FetchType::NUM);
// Returns: [0 => '...', 1 => '...', 2 => '...']
// Both associative and numeric
$row = $result->fetch(FetchType::BOTH);
// Returns: ['id' => '...', 0 => '...', 'name' => '...', 1 => '...', ...]
use Cassandra\Request\Options\QueryOptions;
$pageSize = 100;
$options = new QueryOptions(pageSize: $pageSize);
$result = $conn->query('SELECT * FROM large_table', [], options: $options)->asRowsResult();
$totalProcessed = 0;
do {
foreach ($result as $row) {
// Process each row
echo "Processing: {$row['id']}\n";
$totalProcessed++;
}
$pagingState = $result->getRowsMetadata()->pagingState;
if ($pagingState === null) {
break; // No more pages
}
// Fetch next page
$options = new QueryOptions(pageSize: $pageSize, pagingState: $pagingState);
$result = $conn->query('SELECT * FROM large_table', [], options: $options)->asRowsResult();
} while (true);
echo "Total processed: {$totalProcessed} rows\n";
use Cassandra\Response\Result\RowClassInterface;
final class UserRow implements RowClassInterface {
public function __construct(private array $row, array $args = []) {}
public function id(): string { return (string) $this->row['id']; }
public function name(): string { return (string) $this->row['name']; }
}
$rows = $conn->query('SELECT id, name FROM ks.users')->asRowsResult();
$rows->configureFetchObject(UserRow::class);
foreach ($rows as $user) {
echo $user->name(), "\n";
}
use Cassandra\Value\Ascii;
use Cassandra\Value\Bigint;
use Cassandra\Value\Blob;
use Cassandra\Value\Boolean;
use Cassandra\Value\Counter;
use Cassandra\Value\Custom;
use Cassandra\Value\Date;
use Cassandra\Value\Decimal;
use Cassandra\Value\Double;
use Cassandra\Value\Duration;
use Cassandra\Value\Float32;
use Cassandra\Value\Inet;
use Cassandra\Value\Int32;
use Cassandra\Value\ListCollection;
use Cassandra\Value\MapCollection;
use Cassandra\Value\SetCollection;
use Cassandra\Value\Smallint;
use Cassandra\Value\Time;
use Cassandra\Value\Timestamp;
use Cassandra\Value\Timeuuid;
use Cassandra\Value\Tinyint;
use Cassandra\Value\Tuple;
use Cassandra\Value\UDT;
use Cassandra\Value\Uuid;
use Cassandra\Value\Varchar;
use Cassandra\Value\Varint;
use Cassandra\Value\Vector;
use Cassandra\Type;
// Scalars
Ascii::fromValue('hello');
Bigint::fromValue(10_000_000_000);
Blob::fromValue("\x01\x02");
Boolean::fromValue(true);
Counter::fromValue(1000);
Custom::fromValue('custom_data', 'my.custom.Type');
Decimal::fromValue('123.456');
Double::fromValue(2.718281828459);
Float32::fromValue(2.718);
Inet::fromValue('192.168.0.1');
Int32::fromValue(-123);
Smallint::fromValue(2048);
Timeuuid::fromValue('8db96410-8dba-11f0-b0eb-325096b39f47');
Tinyint::fromValue(12);
Uuid::fromValue('78b58041-06dd-4181-a14f-ce0c1979f51c');
Varchar::fromValue('hello ✅');
Varint::fromValue(10000000000);
// Temporal
Date::fromValue('2011-02-03');
Duration::fromValue('89h4m48s');
Time::fromValue('08:12:54.123456789');
Timestamp::fromValue('2011-02-03T04:05:00.000+0000');
// Collections / Tuples / UDT / Vector
ListCollection::fromValue([1, 2, 3], Type::INT);
MapCollection::fromValue(['a' => 1], Type::ASCII, Type::INT);
SetCollection::fromValue([1, 2, 3], Type::INT);
Tuple::fromValue([1, 'x'], [Type::INT, Type::VARCHAR]);
UDT::fromValue(['id' => 1, 'name' => 'n'], ['id' => Type::INT, 'name' => Type::VARCHAR]);
Vector::fromValue([0.12, -0.3, 0.9], Type::FLOAT, dimensions: 3);
use Cassandra\EventListener;
use Cassandra\Response\Event;
use Cassandra\Request\Register;
use Cassandra\EventType;
$conn->registerEventListener(new class () implements EventListener {
public function onEvent(Event $event): void {
// Inspect $event->getType() and $event->getData()
}
});
$conn->syncRequest(new Register([
EventType::TOPOLOGY_CHANGE,
EventType::STATUS_CHANGE,
EventType::SCHEMA_CHANGE,
]));
// process events (simplest possible loop)
while (true) {
$conn->waitForNextEvent();
sleep(1);
}
// In your app loop, poll without blocking
if ($event = $conn->tryReadNextEvent()) {
// handle $event
}
// Or drain all currently available events
while ($event = $conn->tryReadNextEvent()) {
// handle $event
}
use Cassandra\Request\Query;
$req = new Query('SELECT now() FROM system.local');
$req->enableTracing();
$req->setPayload(['my-key' => 'my-value']);
$result = $conn->syncRequest($req);
use Cassandra\Request\Options\QueryOptions;
use Cassandra\Request\Options\ExecuteOptions;
use Cassandra\Consistency;
// Fire two queries concurrently
$s1 = $conn->queryAsync('SELECT count(*) FROM ks.t1', options: new QueryOptions(pageSize: 1000));
$s2 = $conn->queryAsync('SELECT count(*) FROM ks.t2', options: new QueryOptions(pageSize: 1000));
// Do other work here...
// Resolve in any order
$r2 = $s2->getRowsResult();
$r1 = $s1->getRowsResult();
// Issue several statements
$handles = [];
for ($i = 0; $i < 10; $i++) {
$handles[] = $conn->queryAsync('SELECT now() FROM system.local');
}
$conn->waitForStatements($handles);
foreach ($handles as $h) {
$rows = $h->getRowsResult();
// process
}
// Fire off work in various places...
// Later in your loop: non-blocking drain up to 32 available responses
$processed = $conn->drainAvailableResponses(32);
if ($processed > 0) {
// some statements just became ready; you can consume their results now
}
// Or: non-blocking check for a specific statement
if ($conn->tryResolveStatement($s1)) {
$rows = $s1->getRowsResult();
}
// Or: wait until any of several statements completes
$ready = $conn->waitForAnyStatement([$s1, $s2]);
// $ready is whichever completed first
use Cassandra\Request\Options\PrepareOptions;
use Cassandra\Request\Options\ExecuteOptions;
// Prepare asynchronously
$pStmt = $conn->prepareAsync('SELECT id, name FROM ks.users WHERE org_id = ?');
$prepared = $pStmt->getPreparedResult();
// Execute asynchronously with paging
$s = $conn->executeAsync(
$prepared,
[123],
consistency: Consistency::LOCAL_QUORUM,
options: new ExecuteOptions(pageSize: 200)
);
// Block for rows when you need them
$rows = $s->getRowsResult();
// Block until any statement completes:
$stmt = $conn->waitForAnyStatement([$s1, $s2, $s3]);
// Block until the next event arrives:
$event = $conn->waitForNextEvent();
use Cassandra\Connection;
use Cassandra\Connection\ConnectionOptions;
$conn = new Connection(
$nodes,
keyspace: 'app',
options: new ConnectionOptions(enableCompression: true)
);
use Cassandra\Exception\StatementException;
use Cassandra\Exception\ServerException;
use Cassandra\Exception\ConnectionException;
use Cassandra\Exception\CassandraException;
try {
$result = $conn->query('SELECT * FROM users WHERE id = ?', [$userId])
->asRowsResult();
foreach ($result as $row) {
// Process row
}
} catch (ServerException $e) {
// Server returned an error response
error_log("Server error: " . $e->getMessage());
} catch (ConnectionException $e) {
// Network/connection issues
error_log("Connection error: " . $e->getMessage());
} catch (StatementException $e) {
// Wrong result type access (e.g., calling asRowsResult() on non-rows result)
error_log("Statement error: " . $e->getMessage());
} catch (CassandraException $e) {
// Other client-side errors
error_log("Client error: " . $e->getMessage());
}
use Cassandra\Connection\ConnectionOptions;
use Cassandra\Connection\NodeSelectionStrategy;
$options = new ConnectionOptions(
enableCompression: true, // Enable LZ4 compression (default: false)
throwOnOverload: true, // Throw on server overload (v4+, default: false)
nodeSelectionStrategy: NodeSelectionStrategy::RoundRobin, // Node selection (default: Random)
preparedResultCacheSize: 200, // Prepared statement cache size (default: 100)
);
use Cassandra\Request\Options\QueryOptions;
use Cassandra\SerialConsistency;
$queryOptions = new QueryOptions(
autoPrepare: true, // Auto-prepare for type safety (default: true)
pageSize: 1000, // Page size (min 100, default: 5000)
pagingState: $previousPagingState, // For pagination (default: null)
serialConsistency: SerialConsistency::SERIAL, // Serial consistency (default: null)
defaultTimestamp: 1640995200000000, // Default timestamp (microseconds, default: null)
namesForValues: true, // Use named parameters (auto-detected if null)
keyspace: 'my_keyspace', // Per-request keyspace (v5 only, default: null)
nowInSeconds: time(), // Current time override (v5 only, default: null)
);
use Cassandra\Request\Options\ExecuteOptions;
$executeOptions = new ExecuteOptions(
// All QueryOptions properties plus:
skipMetadata: true, // Skip result metadata (default: false)
autoPrepare: false, // Not applicable for execute
pageSize: 500,
namesForValues: true,
// ... other QueryOptions
);
use Cassandra\Request\Options\PrepareOptions;
$prepareOptions = new PrepareOptions(
keyspace: 'my_keyspace', // Keyspace for preparation (v5 only)
);
use Cassandra\Request\Options\BatchOptions;
use Cassandra\SerialConsistency;
$batchOptions = new BatchOptions(
serialConsistency: SerialConsistency::LOCAL_SERIAL,
defaultTimestamp: 1640995200000000, // Microseconds since epoch
keyspace: 'my_keyspace', // v5 only
nowInSeconds: time(), // v5 only
);
use Cassandra\Value\ValueEncodeConfig;
use Cassandra\Value\EncodeOption\DateEncodeOption;
use Cassandra\Value\EncodeOption\DurationEncodeOption;
use Cassandra\Value\EncodeOption\TimeEncodeOption;
use Cassandra\Value\EncodeOption\TimestampEncodeOption;
use Cassandra\Value\EncodeOption\VarintEncodeOption;
$conn->configureValueEncoding(new ValueEncodeConfig(
dateEncodeOption: DateEncodeOption::AS_DATETIME_IMMUTABLE,
durationEncodeOption: DurationEncodeOption::AS_DATEINTERVAL,
timeEncodeOption: TimeEncodeOption::AS_DATETIME_IMMUTABLE,
timestampEncodeOption: TimestampEncodeOption::AS_DATETIME_IMMUTABLE,
varintEncodeOption: VarintEncodeOption::AS_STRING,
));
use Cassandra\EventListener;
use Cassandra\WarningsListener;
// Event listener
$conn->registerEventListener(new class implements EventListener {
public function onEvent(\Cassandra\Response\Event $event): void {
error_log("Cassandra event: " . $event->getType());
}
});
// Warnings listener
$conn->registerWarningsListener(new class implements WarningsListener {
public function onWarnings(array $warnings, $request, $response): void {
foreach ($warnings as $warning) {
error_log("Cassandra warning: $warning");
}
}
});
use Cassandra\Value\Timestamp;
// Current time
$now = Timestamp::now();
// From string
$timestamp = Timestamp::fromValue('2024-01-15T10:30:00Z');
// From Unix timestamp
$timestamp = Timestamp::fromValue(1705312200000); // milliseconds
// DataStax Driver (old)
$cluster = Cassandra::cluster()
->withContactPoints('127.0.0.1')
->withPort(9042)
->withCredentials('username', 'password')
->build();
$session = $cluster->connect('keyspace_name');
// php-cassandra (new)
use Cassandra\Connection;
use Cassandra\Connection\StreamNodeConfig;
$conn = new Connection([
new StreamNodeConfig('127.0.0.1', 9042, 'username', 'password')
], keyspace: 'keyspace_name');
$conn->connect();
// DataStax Driver (old)
$statement = new Cassandra\SimpleStatement('SELECT * FROM users WHERE id = ?');
$result = $session->execute($statement, ['arguments' => [$userId]]);
// php-cassandra (new)
$result = $conn->query('SELECT * FROM users WHERE id = ?', [$userId])->asRowsResult();
// DataStax Driver (old)
$statement = $session->prepare('SELECT * FROM users WHERE id = ?');
$result = $session->execute($statement, ['arguments' => [$userId]]);
// php-cassandra (new)
$prepared = $conn->prepare('SELECT * FROM users WHERE id = ?');
$result = $conn->execute($prepared, [$userId])->asRowsResult();
// DataStax Driver (old)
$uuid = new Cassandra\Uuid('550e8400-e29b-41d4-a716-446655440000');
$timestamp = new Cassandra\Timestamp(time());
// php-cassandra (new)
use Cassandra\Value\Uuid;
use Cassandra\Value\Timestamp;
$uuid = Uuid::fromValue('550e8400-e29b-41d4-a716-446655440000');
$timestamp = Timestamp::fromValue(time() * 1000);
use Cassandra\Connection;
use Cassandra\Connection\SocketNodeConfig;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection\ConnectionOptions;
// Stream with TLS and persistent
$stream = new StreamNodeConfig(
host: 'tls://cassandra.example.com',
port: 9042,
username: 'user',
password: 'secret',
connectTimeoutInSeconds: 5,
timeoutInSeconds: 15,
persistent: true,
sslOptions: [
'cafile' => '/etc/ssl/certs/ca.pem',
'verify_peer' => true,
'verify_peer_name' => true,
]
);
// Socket with custom timeouts
$socket = new SocketNodeConfig(
host: '127.0.0.1',
port: 9042,
username: 'user',
password: 'secret',
socketOptions: [
SO_RCVTIMEO => ['sec' => 5, 'usec' => 0],
SO_SNDTIMEO => ['sec' => 5, 'usec' => 0],
]
);
$conn = new Connection([$socket, $stream], options: new ConnectionOptions(enableCompression: true));
use Cassandra\Connection;
use Cassandra\Value\ValueEncodeConfig;
use Cassandra\Value\EncodeOption\TimestampEncodeOption;
use Cassandra\Value\EncodeOption\DateEncodeOption;
$conn = new Connection([$socket]);
$conn->configureValueEncoding(new ValueEncodeConfig(
timestampEncodeOption: TimestampEncodeOption::AS_INT,
dateEncodeOption: DateEncodeOption::AS_INT,
));
use Cassandra\WarningsListener;
use Cassandra\Request\Request;
use Cassandra\Response\Response;
$conn->registerWarningsListener(new class () implements WarningsListener {
public function onWarnings(array $warnings, Request $request, Response $response): void {
error_log('Cassandra warnings: ' . implode('; ', $warnings));
}
});
use Cassandra\EventListener;
use Cassandra\Response\Event;
$conn->registerEventListener(new class () implements EventListener {
public function onEvent(Event $event): void {
// enqueue to worker, react to topology/status/schema changes
}
});
// Non-busy loop with backoff
while (true) {
$conn->waitForNextEvent();
usleep(200_000); // 200ms
}
bash
composer
bash
git clone https://github.com/MichaelRoosz/php-cassandra.git
cd php-cassandra
Loading please wait ...
Before you can download the PHP files, the dependencies should be resolved. This can take some minutes. Please be patient.