PHP code example of mroosz / php-cassandra

1. Go to this page and download the library: Download mroosz/php-cassandra library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

mroosz / php-cassandra example snippets








use Cassandra\Connection;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection\ConnectionOptions;
use Cassandra\Consistency;

// Connect to Cassandra
$nodes = [
    new StreamNodeConfig(
        host: '127.0.0.1', 
        port: 9042, 
        username: 'cassandra', 
        password: 'cassandra'
    ),
];

$conn = new Connection($nodes, keyspace: 'my_keyspace');
$conn->connect();
$conn->setConsistency(Consistency::QUORUM);

// Simple query
$result = $conn->query('SELECT * FROM system.local')->asRowsResult();
foreach ($result as $row) {
    echo "Cluster: " . $row['cluster_name'] . "\n";
}


use Cassandra\Request\Options\ExecuteOptions;
use Cassandra\Value\Uuid;
use Cassandra\Consistency;

// Prepare a statement
$prepared = $conn->prepare('SELECT * FROM users WHERE id = ? AND status = ?');

// Execute with positional parameters
$result = $conn->execute(
    $prepared, 
    [
        Uuid::fromValue('550e8400-e29b-41d4-a716-446655440000'),
        'active'
    ],
    consistency: Consistency::LOCAL_QUORUM,
    options: new ExecuteOptions(pageSize: 100)
)->asRowsResult();

foreach ($result as $user) {
    echo "User: {$user['name']} ({$user['email']})\n";
}

// Execute with named parameters
$namedPrepared = $conn->prepare('SELECT * FROM users WHERE email = :email AND org_id = :org_id');
$result = $conn->execute(
    $namedPrepared,
    ['email' => '[email protected]', 'org_id' => 123],
    options: new ExecuteOptions(namesForValues: true)
)->asRowsResult();


use Cassandra\Request\Options\QueryOptions;

// Fire multiple queries concurrently
$statements = [];
$statements[] = $conn->queryAsync(
    'SELECT COUNT(*) FROM users', 
    options: new QueryOptions(pageSize: 1000)
);
$statements[] = $conn->queryAsync(
    'SELECT * FROM users LIMIT 10',
    options: new QueryOptions(pageSize: 10)
);

// Process results as they become available
$userCount = $statements[0]->getRowsResult()->fetch()['count'];
$recentUsers = $statements[1]->getRowsResult()->fetchAll();

echo "Total users: {$userCount}\n";
echo "Recent users: " . count($recentUsers) . "\n";


use Cassandra\Exception\ServerException;
use Cassandra\Exception\ConnectionException;
use Cassandra\Exception\CassandraException;

try {
    $result = $conn->query(
        'SELECT * FROM users WHERE email = ?',
        ['[email protected]'],
        Consistency::LOCAL_QUORUM
    )->asRowsResult();
    
    foreach ($result as $user) {
        echo "Found user: {$user['name']}\n";
    }
    
} catch (ServerException $e) {
    echo "Server error: " . $e->getMessage() . "\n";
    // Inspect $e->getContext() and $e->getErrorContext() for server-provided details
    
} catch (ConnectionException $e) {
    echo "Connection error: " . $e->getMessage() . "\n";
    
} catch (CassandraException $e) {
    echo "Client error: " . $e->getMessage() . "\n";
}


use Cassandra\Connection\StreamNodeConfig;

// Secure connection with TLS
$secureNode = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'secure_user',
    password: 'secure_password',
    sslOptions: [
        'cafile' => '/path/to/ca.pem',
        'verify_peer' => true,
        'verify_peer_name' => true,
    ]
);

$conn = new Connection([$secureNode], keyspace: 'production_app');
$conn->connect();

echo "Secure connection established!\n";

use Cassandra\Connection\SocketNodeConfig;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection;

// Stream transport
$streamNode = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'user',
    password: 'secret',
    connectTimeoutInSeconds: 10,
    timeoutInSeconds: 30,
);

// Stream transport with SSL/TLS
$streamTlsNode = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'user',
    password: 'secret',
    connectTimeoutInSeconds: 10,
    timeoutInSeconds: 30,
    sslOptions: [
        // See [PHP SSL context options](https://www.php.net/manual/en/context.ssl.php)
        'cafile' => '/etc/ssl/certs/ca.pem',
        'verify_peer' => true,
        'verify_peer_name' => true,
    ]
);

// Socket transport
$socketNode = new SocketNodeConfig(
    host: '10.0.0.10',
    port: 9042,
    username: 'user',
    password: 'secret',
    // See [PHP socket_get_option documentation](https://www.php.net/manual/en/function.socket-get-option.php)
    socketOptions: [SO_RCVTIMEO => ['sec' => 10, 'usec' => 0]]
);

$conn = new Connection([$streamNode, $streamTlsNode, $socketNode], keyspace: 'app');
$conn->connect();

use Cassandra\Value\Uuid;
use Cassandra\Consistency;
use Cassandra\Request\Options\QueryOptions;

$rowsResult = $conn->query(
    'SELECT id, name FROM ks.users WHERE id = ?',
    [Uuid::fromValue($id)],
    consistency: Consistency::ONE,
    options: new QueryOptions(pageSize: 100)
)->asRowsResult();

use Cassandra\Request\Options\QueryOptions;

$s1 = $conn->queryAsync('SELECT count(*) FROM ks.t1', options: new QueryOptions(pageSize: 1000));
$s2 = $conn->queryAsync('SELECT count(*) FROM ks.t2', options: new QueryOptions(pageSize: 1000));

$r2 = $s2->getResult()->asRowsResult();
$r1 = $s1->getResult()->asRowsResult();

// For simple queries
$pages = $conn->queryAll('SELECT * FROM ks.users WHERE org_id = ?', [$orgId]);
foreach ($pages as $page) {
    foreach ($page as $row) {
        // ...
    }
}

use Cassandra\Request\Options\ExecuteOptions;

$prepared = $conn->prepare('SELECT * FROM ks.users WHERE email = :email');

$rowsResult = $conn->execute(
    $prepared,
    ['email' => '[email protected]'],
    options: new ExecuteOptions(
        namesForValues: true,
        pageSize: 50
    )
)->asRowsResult();

use Cassandra\Request\Options\ExecuteOptions;

$options = new ExecuteOptions(pageSize: 100, namesForValues: true);
$result = $conn->execute($prepared, ['org_id' => 1], options: $options)->asRowsResult();

do {
    foreach ($result as $row) {
        // process row
    }

    $pagingState = $result->getRowsMetadata()->pagingState;
    if ($pagingState === null) break;

    $options = new ExecuteOptions(
        pageSize: 100,
        namesForValues: true,
        pagingState: $pagingState
    );
    $result = $conn->execute($result, [], options: $options)->asRowsResult(); // reuse previous RowsResult for metadata id
} while (true);

use Cassandra\Request\Options\ExecuteOptions;

$pages = $conn->executeAll($prepared, ['org_id' => 1], options: new ExecuteOptions(namesForValues: true));

use Cassandra\Consistency;
use Cassandra\Request\Batch;
use Cassandra\Request\BatchType;
use Cassandra\Value\Uuid;
use Cassandra\Value\Varchar;

$batch = new Batch(type: BatchType::LOGGED, consistency: Consistency::QUORUM);

// Prepared in batch (namesForValues: use associative array)
$prepared = $conn->prepare('UPDATE ks.users SET age = :age WHERE id = :id');
$batch->appendPreparedStatement($prepared, ['age' => 21, 'id' => 'c5419d81-499e-4c9c-ac0c-fa6ba3ebc2bc']);

// Simple query in batch (positional)
$batch->appendQuery(
    'INSERT INTO ks.users (id, name, age) VALUES (?, ?, ?)',
    [
        Uuid::fromValue('c5420d81-499e-4c9c-ac0c-fa6ba3ebc2bc'),
        Varchar::fromValue('Mark'),
        20,
    ]
);

$conn->batch($batch);

use Cassandra\Response\Result\FetchType;

$r = $conn->query('SELECT role FROM system_auth.roles')->asRowsResult();
foreach ($r as $i => $row) {
    echo $row['role'], "\n";
}

// Fetch single row
$result = $conn->query('SELECT id, name, email FROM users WHERE id = ?', [$userId])->asRowsResult();
$user = $result->fetch(FetchType::ASSOC);
if ($user) {
    echo "User: {$user['name']} <{$user['email']}>\n";
}

// Fetch all rows at once
$allUsers = $result->fetchAll(FetchType::ASSOC);
foreach ($allUsers as $user) {
    echo "User: {$user['name']}\n";
}

// Fetch specific column values
$result = $conn->query('SELECT name FROM users WHERE org_id = ?', [123])->asRowsResult();
$names = $result->fetchAllColumns(0); // Get all values from first column
print_r($names);

// Fetch key-value pairs
$result = $conn->query('SELECT id, name FROM users WHERE active = true')->asRowsResult();
$userMap = $result->fetchAllKeyPairs(0, 1); // id => name mapping
print_r($userMap);

// Different fetch types
$result = $conn->query('SELECT id, name, email FROM users LIMIT 5')->asRowsResult();

// Associative array (default)
$row = $result->fetch(FetchType::ASSOC);
// Returns: ['id' => '...', 'name' => '...', 'email' => '...']

// Numeric array
$row = $result->fetch(FetchType::NUM);
// Returns: [0 => '...', 1 => '...', 2 => '...']

// Both associative and numeric
$row = $result->fetch(FetchType::BOTH);
// Returns: ['id' => '...', 0 => '...', 'name' => '...', 1 => '...', ...]

use Cassandra\Request\Options\QueryOptions;

$pageSize = 100;
$options = new QueryOptions(pageSize: $pageSize);
$result = $conn->query('SELECT * FROM large_table', [], options: $options)->asRowsResult();

$totalProcessed = 0;
do {
    foreach ($result as $row) {
        // Process each row
        echo "Processing: {$row['id']}\n";
        $totalProcessed++;
    }
    
    $pagingState = $result->getRowsMetadata()->pagingState;
    if ($pagingState === null) {
        break; // No more pages
    }
    
    // Fetch next page
    $options = new QueryOptions(pageSize: $pageSize, pagingState: $pagingState);
    $result = $conn->query('SELECT * FROM large_table', [], options: $options)->asRowsResult();
    
} while (true);

echo "Total processed: {$totalProcessed} rows\n";

use Cassandra\Response\Result\RowClassInterface;

final class UserRow implements RowClassInterface {
    public function __construct(private array $row, array $args = []) {}
    public function id(): string { return (string) $this->row['id']; }
    public function name(): string { return (string) $this->row['name']; }
}

$rows = $conn->query('SELECT id, name FROM ks.users')->asRowsResult();
$rows->configureFetchObject(UserRow::class);

foreach ($rows as $user) {
    echo $user->name(), "\n";
}

use Cassandra\Value\Ascii;
use Cassandra\Value\Bigint;
use Cassandra\Value\Blob;
use Cassandra\Value\Boolean;
use Cassandra\Value\Counter;
use Cassandra\Value\Custom;
use Cassandra\Value\Date;
use Cassandra\Value\Decimal;
use Cassandra\Value\Double;
use Cassandra\Value\Duration;
use Cassandra\Value\Float32;
use Cassandra\Value\Inet;
use Cassandra\Value\Int32;
use Cassandra\Value\ListCollection;
use Cassandra\Value\MapCollection;
use Cassandra\Value\SetCollection;
use Cassandra\Value\Smallint;
use Cassandra\Value\Time;
use Cassandra\Value\Timestamp;
use Cassandra\Value\Timeuuid;
use Cassandra\Value\Tinyint;
use Cassandra\Value\Tuple;
use Cassandra\Value\UDT;
use Cassandra\Value\Uuid;
use Cassandra\Value\Varchar;
use Cassandra\Value\Varint;
use Cassandra\Value\Vector;
use Cassandra\Type;

// Scalars
Ascii::fromValue('hello');
Bigint::fromValue(10_000_000_000);
Blob::fromValue("\x01\x02");
Boolean::fromValue(true);
Counter::fromValue(1000);
Custom::fromValue('custom_data', 'my.custom.Type');
Decimal::fromValue('123.456');
Double::fromValue(2.718281828459);
Float32::fromValue(2.718);
Inet::fromValue('192.168.0.1');
Int32::fromValue(-123);
Smallint::fromValue(2048);
Timeuuid::fromValue('8db96410-8dba-11f0-b0eb-325096b39f47');
Tinyint::fromValue(12);
Uuid::fromValue('78b58041-06dd-4181-a14f-ce0c1979f51c');
Varchar::fromValue('hello ✅');
Varint::fromValue(10000000000);

// Temporal
Date::fromValue('2011-02-03');
Duration::fromValue('89h4m48s');
Time::fromValue('08:12:54.123456789');
Timestamp::fromValue('2011-02-03T04:05:00.000+0000');

// Collections / Tuples / UDT / Vector
ListCollection::fromValue([1, 2, 3], Type::INT);
MapCollection::fromValue(['a' => 1], Type::ASCII, Type::INT);
SetCollection::fromValue([1, 2, 3], Type::INT);
Tuple::fromValue([1, 'x'], [Type::INT, Type::VARCHAR]);
UDT::fromValue(['id' => 1, 'name' => 'n'], ['id' => Type::INT, 'name' => Type::VARCHAR]);
Vector::fromValue([0.12, -0.3, 0.9], Type::FLOAT, dimensions: 3);

use Cassandra\Type;
use Cassandra\Value\ListCollection;
use Cassandra\Value\SetCollection;
use Cassandra\Value\MapCollection;
use Cassandra\Value\Tuple;
use Cassandra\Value\UDT;

// List<int>
ListCollection::fromValue([1,2,3], Type::INT);

// Set<text>
SetCollection::fromValue(['a','b'], Type::VARCHAR);

// Map<text,int>
MapCollection::fromValue(['a' => 1], Type::ASCII, Type::INT);

// Tuple<int,text,boolean>
Tuple::fromValue([1, 'x', true], [Type::INT, Type::VARCHAR, Type::BOOLEAN]);

// UDT<id:int, name:text>
UDT::fromValue(['id' => 1, 'name' => 'n'], ['id' => Type::INT, 'name' => Type::VARCHAR]);

// Frozen list<udt<id:int, friends<list<text>>>>
ListCollection::fromValue(
    [
        ['id' => 1, 'friends' => ['a','b']],
        ['id' => 2, 'friends' => []],
    ],
    [
        'type' => Type::LIST,
        'valueType' => [
            'type' => Type::UDT,
            'isFrozen' => true,
            'valueTypes' => [
                'id' => Type::INT,
                'friends' => [
                    'type' => Type::LIST,
                    'valueType' => Type::VARCHAR,
                ],
            ],
        ],
        'isFrozen' => true,
    ]
);

// Map<text, tuple<int, udt<code:int, tags<set<text>>>>>
MapCollection::fromValue(
    [
        'a' => [1, ['code' => 7, 'tags' => ['x','y']]],
    ],
    [
        'type' => Type::MAP,
        'keyType' => Type::VARCHAR,
        'valueType' => [
            'type' => Type::TUPLE,
            'valueTypes' => [
                Type::INT,
                [
                    'type' => Type::UDT,
                    'valueTypes' => [
                        'code' => Type::INT,
                        'tags' => [
                            'type' => Type::SET,
                            'valueType' => Type::VARCHAR,
                        ],
                    ],
                ],
            ],
        ],
    ]
);

// UDT with nested list<map<text, tuple<int,text>>>
UDT::fromValue(
    [
        'id' => 1,
        'items' => [
            ['a' => [1, 'one']],
            ['b' => [2, 'two']],
        ],
    ],
    [
        'id' => Type::INT,
        'items' => [
            'type' => Type::LIST,
            'valueType' => [
                'type' => Type::MAP,
                'keyType' => Type::VARCHAR,
                'valueType' => [
                    'type' => Type::TUPLE,
                    'valueTypes' => [Type::INT, Type::VARCHAR],
                ],
            ],
        ],
    ]
);

// Vector<float> with 3 dimensions
Vector::fromValue([0.1, -0.5, 0.8], Type::FLOAT, dimensions: 3);

// Vector<double> with 128 dimensions (common for embeddings)
Vector::fromValue(array_fill(0, 128, 0.0), Type::DOUBLE, dimensions: 128);

use Cassandra\Value\SetCollection;
use Cassandra\Type;

SetCollection::fromValue([
    [
        'id' => 1,
        'name' => 'string',
        'active' => true,
        'friends' => ['a', 'b'],
        'drinks' => [['qty' => 5, 'brand' => 'Pepsi']],
    ],
], [
    [
        'type' => Type::UDT,
        'valueTypes' => [
            'id' => Type::INT,
            'name' => Type::VARCHAR,
            'active' => Type::BOOLEAN,
            'friends' => [
                'type' => Type::LIST, 
                'valueType' => Type::VARCHAR
            ],
            'drinks' => [
                'type' => Type::LIST, 
                'valueType' => [
                    'type' => Type::UDT,
                    'valueTypes' => [
                        'qty' => Type::INT,
                        'brand' => Type::VARCHAR
                    ],
                ]
            ],
        ],
    ],
]);

use Cassandra\EventListener;
use Cassandra\Response\Event;
use Cassandra\Request\Register;
use Cassandra\EventType;

$conn->registerEventListener(new class () implements EventListener {
    public function onEvent(Event $event): void {
        // Inspect $event->getType() and $event->getData()
    }
});

$conn->syncRequest(new Register([
    EventType::TOPOLOGY_CHANGE,
    EventType::STATUS_CHANGE,
    EventType::SCHEMA_CHANGE,
]));

// process events (simplest possible loop)
while (true) {
    $conn->waitForNextEvent();
    sleep(1);
}

// In your app loop, poll without blocking
if ($event = $conn->tryReadNextEvent()) {
    // handle $event
}

// Or drain all currently available events
while ($event = $conn->tryReadNextEvent()) {
    // handle $event
}

use Cassandra\Request\Query;

$req = new Query('SELECT now() FROM system.local');
$req->enableTracing();
$req->setPayload(['my-key' => 'my-value']);

$result = $conn->syncRequest($req);

use Cassandra\Request\Options\QueryOptions;
use Cassandra\Request\Options\ExecuteOptions;
use Cassandra\Consistency;

// Fire two queries concurrently
$s1 = $conn->queryAsync('SELECT count(*) FROM ks.t1', options: new QueryOptions(pageSize: 1000));
$s2 = $conn->queryAsync('SELECT count(*) FROM ks.t2', options: new QueryOptions(pageSize: 1000));

// Do other work here...

// Resolve in any order
$r2 = $s2->getRowsResult();
$r1 = $s1->getRowsResult();

// Issue several statements
$handles = [];
for ($i = 0; $i < 10; $i++) {
    $handles[] = $conn->queryAsync('SELECT now() FROM system.local');
}

$conn->waitForStatements($handles);

foreach ($handles as $h) {
    $rows = $h->getRowsResult();
    // process
}

// Fire off work in various places...

// Later in your loop: non-blocking drain up to 32 available responses
$processed = $conn->drainAvailableResponses(32);
if ($processed > 0) {
    // some statements just became ready; you can consume their results now
}

// Or: non-blocking check for a specific statement
if ($conn->tryResolveStatement($s1)) {
    $rows = $s1->getRowsResult();
}

// Or: wait until any of several statements completes
$ready = $conn->waitForAnyStatement([$s1, $s2]);
// $ready is whichever completed first

use Cassandra\Request\Options\PrepareOptions;
use Cassandra\Request\Options\ExecuteOptions;

// Prepare asynchronously
$pStmt = $conn->prepareAsync('SELECT id, name FROM ks.users WHERE org_id = ?');
$prepared = $pStmt->getPreparedResult();

// Execute asynchronously with paging
$s = $conn->executeAsync(
    $prepared,
    [123],
    consistency: Consistency::LOCAL_QUORUM,
    options: new ExecuteOptions(pageSize: 200)
);

// Block for rows when you need them
$rows = $s->getRowsResult();

// Block until any statement completes:
$stmt = $conn->waitForAnyStatement([$s1, $s2, $s3]);

// Block until the next event arrives:
$event = $conn->waitForNextEvent();

use Cassandra\Connection;
use Cassandra\Connection\ConnectionOptions;

$conn = new Connection(
    $nodes,
    keyspace: 'app',
    options: new ConnectionOptions(enableCompression: true)
);

use Cassandra\Exception\StatementException;
use Cassandra\Exception\ServerException;
use Cassandra\Exception\ConnectionException;
use Cassandra\Exception\CassandraException;

try {
    $result = $conn->query('SELECT * FROM users WHERE id = ?', [$userId])
        ->asRowsResult();
    
    foreach ($result as $row) {
        // Process row
    }
    
} catch (ServerException $e) {
    // Server returned an error response
    error_log("Server error: " . $e->getMessage());
    
} catch (ConnectionException $e) {
    // Network/connection issues
    error_log("Connection error: " . $e->getMessage());
    
} catch (StatementException $e) {
    // Wrong result type access (e.g., calling asRowsResult() on non-rows result)
    error_log("Statement error: " . $e->getMessage());
    
} catch (CassandraException $e) {
    // Other client-side errors
    error_log("Client error: " . $e->getMessage());
}

use Cassandra\Exception\ServerException\{
    UnavailableException,
    ReadTimeoutException,
    WriteTimeoutException,
    OverloadedException,
    SyntaxErrorException,
    InvalidException,
    UnauthorizedException,
    AlreadyExistsException
};

try {
    $conn->query('CREATE TABLE users (id UUID PRIMARY KEY, name TEXT)');
    
} catch (AlreadyExistsException $e) {
    // Table already exists - this might be OK
    echo "Table already exists, continuing...\n";
    
} catch (UnauthorizedException $e) {
    // Permission denied
    throw new \RuntimeException("Insufficient permissions: " . $e->getMessage());
    
} catch (SyntaxErrorException $e) {
    // CQL syntax error
    throw new \RuntimeException("Invalid CQL syntax: " . $e->getMessage());
    
} catch (InvalidException $e) {
    // Invalid query (e.g., wrong types)
    throw new \RuntimeException("Invalid query: " . $e->getMessage());
}

function executeWithRetry(callable $operation, int $maxRetries = 3): mixed
{
    $attempt = 0;
    $delay = 100; // Start with 100ms
    
    while ($attempt < $maxRetries) {
        try {
            return $operation();
            
        } catch (UnavailableException | ReadTimeoutException | WriteTimeoutException | OverloadedException $e) {
            $attempt++;
            
            if ($attempt >= $maxRetries) {
                throw $e; // Re-throw on final attempt
            }
            
            // Exponential backoff with jitter
            $jitter = rand(0, $delay / 2);
            usleep(($delay + $jitter) * 1000);
            $delay *= 2;
            
            error_log("Retrying operation (attempt {$attempt}/{$maxRetries}) after error: " . $e->getMessage());
            
        } catch (ServerException $e) {
            // Don't retry non-transient errors
            throw $e;
        }
    }
}

// Usage
$result = executeWithRetry(function() use ($conn, $userId) {
    return $conn->query('SELECT * FROM users WHERE id = ?', [$userId])
        ->asRowsResult();
});

use Cassandra\Exception\ServerException\{ReadTimeoutException, WriteTimeoutException};

try {
    $result = $conn->query(
        'SELECT * FROM large_table WHERE complex_condition = ?',
        [$condition],
        Consistency::QUORUM
    )->asRowsResult();
    
} catch (ReadTimeoutException $e) {
    $ctx = $e->getErrorContext();
    $consistency = $ctx->consistency;
    $received = $ctx->nodesAnswered;
    $x = $e->getErrorContext();
    $consistency = $ctx->consistency;
    $received = $ctx->nodesAcknowledged;
    $

use Cassandra\Exception\ServerException\{UnavailableException, ReadTimeoutException, WriteTimeoutException};

try {
    $conn->query('SELECT * FROM users');
    
} catch (UnavailableException $e) {
    $ctx = $e->getErrorContext();
    echo "Consistency: " . $ctx->consistency->name . "\n";
    echo "Required: " . $ctx->nodesRequired . "\n";
    echo "Alive: " . $ctx->nodesAlive . "\n";
    
} catch (ReadTimeoutException $e) {
    $ctx = $e->getErrorContext();
    echo "Consistency: " . $ctx->consistency->name . "\n";
    echo "Received: " . $ctx->nodesAnswered . "\n";
    echo "Required: " . $ctx->nodesRequired . "\n";
    echo "Data present: " . ($ctx->dataPresent ? 'yes' : 'no') . "\n";
    
} catch (WriteTimeoutException $e) {
    $ctx = $e->getErrorContext();
    echo "Write type: " . $ctx->writeType->value . "\n";
    echo "Consistency: " . $ctx->consistency->name . "\n";
    echo "Received: " . $ctx->nodesAcknowledged . "\n";
    echo "Required: " . $ctx->nodesRequired . "\n";
}

use Cassandra\Connection\StreamNodeConfig;

$node = new StreamNodeConfig(
    host: 'cassandra.example.com',        // Can     password: 'secret',                   // Password (optional)
    connectTimeoutInSeconds: 10,          // Connection timeout (default: 5)
    timeoutInSeconds: 30,                 // I/O timeout (default: 30)
    persistent: true,                     // Use persistent connections
    sslOptions: [                         // SSL/TLS options (see PHP SSL context)
        'verify_peer' => true,            // Verify peer certificate
        'verify_peer_name' => true,       // Verify peer name
        'cafile' => '/path/to/ca.pem',    // CA certificate file
        'local_cert' => '/path/to/cert.pem', // Client certificate
        'local_pk' => '/path/to/key.pem', // Client private key
        'passphrase' => 'cert_password',  // Private key passphrase
        'ciphers' => 'HIGH:!aNULL',       // Cipher list
        'crypto_method' => STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT,
    ]
);

use Cassandra\Connection\SocketNodeConfig;

$node = new SocketNodeConfig(
    host: '127.0.0.1',                    // Cassandra host
    port: 9042,                           // Cassandra port (default: 9042)
    username: 'cassandra',                // Username (optional)
    password: 'cassandra',                // Password (optional)
    socketOptions: [                      // Socket-specific options
        SO_RCVTIMEO => ['sec' => 10, 'usec' => 0],  // Receive timeout
        SO_SNDTIMEO => ['sec' => 10, 'usec' => 0],  // Send timeout
        SO_KEEPALIVE => 1,                // Keep-alive
    ]
);

use Cassandra\Connection\ConnectionOptions;
use Cassandra\Connection\NodeSelectionStrategy;

$options = new ConnectionOptions(
    enableCompression: true,              // Enable LZ4 compression (default: false)
    throwOnOverload: true,                // Throw on server overload (v4+, default: false)
    nodeSelectionStrategy: NodeSelectionStrategy::RoundRobin, // Node selection (default: Random)
    preparedResultCacheSize: 200,         // Prepared statement cache size (default: 100)
);

use Cassandra\Request\Options\QueryOptions;
use Cassandra\SerialConsistency;

$queryOptions = new QueryOptions(
    autoPrepare: true,                    // Auto-prepare for type safety (default: true)
    pageSize: 1000,                       // Page size (min 100, default: 5000)
    pagingState: $previousPagingState,    // For pagination (default: null)
    serialConsistency: SerialConsistency::SERIAL, // Serial consistency (default: null)
    defaultTimestamp: 1640995200000000,   // Default timestamp (microseconds, default: null)
    namesForValues: true,                 // Use named parameters (auto-detected if null)
    keyspace: 'my_keyspace',              // Per-request keyspace (v5 only, default: null)
    nowInSeconds: time(),                 // Current time override (v5 only, default: null)
);

use Cassandra\Request\Options\ExecuteOptions;

$executeOptions = new ExecuteOptions(
    // All QueryOptions properties plus:
    skipMetadata: true,                   // Skip result metadata (default: false)
    autoPrepare: false,                   // Not applicable for execute
    pageSize: 500,
    namesForValues: true,
    // ... other QueryOptions
);

use Cassandra\Request\Options\PrepareOptions;

$prepareOptions = new PrepareOptions(
    keyspace: 'my_keyspace',              // Keyspace for preparation (v5 only)
);

use Cassandra\Request\Options\BatchOptions;
use Cassandra\SerialConsistency;

$batchOptions = new BatchOptions(
    serialConsistency: SerialConsistency::LOCAL_SERIAL,
    defaultTimestamp: 1640995200000000,   // Microseconds since epoch
    keyspace: 'my_keyspace',              // v5 only
    nowInSeconds: time(),                 // v5 only
);

use Cassandra\Value\ValueEncodeConfig;
use Cassandra\Value\EncodeOption\DateEncodeOption;
use Cassandra\Value\EncodeOption\DurationEncodeOption;
use Cassandra\Value\EncodeOption\TimeEncodeOption;
use Cassandra\Value\EncodeOption\TimestampEncodeOption;
use Cassandra\Value\EncodeOption\VarintEncodeOption;

$conn->configureValueEncoding(new ValueEncodeConfig(
    dateEncodeOption: DateEncodeOption::AS_DATETIME_IMMUTABLE,
    durationEncodeOption: DurationEncodeOption::AS_DATEINTERVAL,
    timeEncodeOption: TimeEncodeOption::AS_DATETIME_IMMUTABLE,
    timestampEncodeOption: TimestampEncodeOption::AS_DATETIME_IMMUTABLE,
    varintEncodeOption: VarintEncodeOption::AS_STRING,
));

use Cassandra\EventListener;
use Cassandra\WarningsListener;

// Event listener
$conn->registerEventListener(new class implements EventListener {
    public function onEvent(\Cassandra\Response\Event $event): void {
        error_log("Cassandra event: " . $event->getType());
    }
});

// Warnings listener
$conn->registerWarningsListener(new class implements WarningsListener {
    public function onWarnings(array $warnings, $request, $response): void {
        foreach ($warnings as $warning) {
            error_log("Cassandra warning: $warning");
        }
    }
});

// Map
$profile = MapCollection::fromValue(['role' => 'admin', 'level' => 'senior'], Type::VARCHAR, Type::VARCHAR);

// List
$tags = ListCollection::fromValue(['php', 'cassandra', 'database'], Type::VARCHAR);

// UDT
$address = UDT::fromValue(
    ['street' => '123 Main St', 'city' => 'New York', 'zip' => '10001'],
    ['street' => Type::VARCHAR, 'city' => Type::VARCHAR, 'zip' => Type::VARCHAR]
);

use Cassandra\Value\Timestamp;

// Current time
$now = Timestamp::now();

// From string
$timestamp = Timestamp::fromValue('2024-01-15T10:30:00Z');

// From Unix timestamp
$timestamp = Timestamp::fromValue(1705312200000); // milliseconds

// DataStax Driver (old)
$cluster = Cassandra::cluster()
    ->withContactPoints('127.0.0.1')
    ->withPort(9042)
    ->withCredentials('username', 'password')
    ->build();
$session = $cluster->connect('keyspace_name');

// php-cassandra (new)
use Cassandra\Connection;
use Cassandra\Connection\StreamNodeConfig;

$conn = new Connection([
    new StreamNodeConfig('127.0.0.1', 9042, 'username', 'password')
], keyspace: 'keyspace_name');
$conn->connect();

// DataStax Driver (old)
$statement = new Cassandra\SimpleStatement('SELECT * FROM users WHERE id = ?');
$result = $session->execute($statement, ['arguments' => [$userId]]);

// php-cassandra (new)
$result = $conn->query('SELECT * FROM users WHERE id = ?', [$userId])->asRowsResult();

// DataStax Driver (old)
$statement = $session->prepare('SELECT * FROM users WHERE id = ?');
$result = $session->execute($statement, ['arguments' => [$userId]]);

// php-cassandra (new)
$prepared = $conn->prepare('SELECT * FROM users WHERE id = ?');
$result = $conn->execute($prepared, [$userId])->asRowsResult();

// DataStax Driver (old)
$uuid = new Cassandra\Uuid('550e8400-e29b-41d4-a716-446655440000');
$timestamp = new Cassandra\Timestamp(time());

// php-cassandra (new)
use Cassandra\Value\Uuid;
use Cassandra\Value\Timestamp;

$uuid = Uuid::fromValue('550e8400-e29b-41d4-a716-446655440000');
$timestamp = Timestamp::fromValue(time() * 1000);

// DataStax Driver (old)
$future = $session->executeAsync($statement);
$result = $future->get();

// php-cassandra (new)
$statement = $conn->queryAsync('SELECT * FROM users');
$result = $statement->getRowsResult();

use Cassandra\Connection;
use Cassandra\Connection\SocketNodeConfig;
use Cassandra\Connection\StreamNodeConfig;
use Cassandra\Connection\ConnectionOptions;

// Stream with TLS and persistent
$stream = new StreamNodeConfig(
    host: 'tls://cassandra.example.com',
    port: 9042,
    username: 'user',
    password: 'secret',
    connectTimeoutInSeconds: 5,
    timeoutInSeconds: 15,
    persistent: true,
    sslOptions: [
        'cafile' => '/etc/ssl/certs/ca.pem',
        'verify_peer' => true,
        'verify_peer_name' => true,
    ]
);

// Socket with custom timeouts
$socket = new SocketNodeConfig(
    host: '127.0.0.1',
    port: 9042,
    username: 'user',
    password: 'secret',
    socketOptions: [
        SO_RCVTIMEO => ['sec' => 5, 'usec' => 0],
        SO_SNDTIMEO => ['sec' => 5, 'usec' => 0],
    ]
);

$conn = new Connection([$socket, $stream], options: new ConnectionOptions(enableCompression: true));

use Cassandra\Connection;
use Cassandra\Value\ValueEncodeConfig;
use Cassandra\Value\EncodeOption\TimestampEncodeOption;
use Cassandra\Value\EncodeOption\DateEncodeOption;

$conn = new Connection([$socket]);
$conn->configureValueEncoding(new ValueEncodeConfig(
    timestampEncodeOption: TimestampEncodeOption::AS_INT,
    dateEncodeOption: DateEncodeOption::AS_INT,
));

use Cassandra\WarningsListener;
use Cassandra\Request\Request;
use Cassandra\Response\Response;

$conn->registerWarningsListener(new class () implements WarningsListener {
    public function onWarnings(array $warnings, Request $request, Response $response): void {
        error_log('Cassandra warnings: ' . implode('; ', $warnings));
    }
});

use Cassandra\EventListener;
use Cassandra\Response\Event;

$conn->registerEventListener(new class () implements EventListener {
    public function onEvent(Event $event): void {
        // enqueue to worker, react to topology/status/schema changes
    }
});

// Non-busy loop with backoff
while (true) {
    $conn->waitForNextEvent();
    usleep(200_000); // 200ms
}
bash
composer 
bash
   git clone https://github.com/MichaelRoosz/php-cassandra.git
   cd php-cassandra