PHP code example of seanmorris / ksqlc

1. Go to this page and download the library: Download seanmorris/ksqlc library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

seanmorris / ksqlc example snippets



use \SeanMorris\Ksqlc\Ksqlc;

$ksqlc = new Ksqlc('http://your-ksql-server:8088/');



$stream = $ksqlc->stream('SELECT * FROM EVENT_STREAM EMIT CHANGES');

foreach($stream as $row)
{
	// $row == {"ROWKEY": "XXX", "ROWTIME": "YYY", ...}
	if($row->property === 'something')
	{
		break;
	}
}

unset($stream);



$queryOne = 'SELECT * FROM EVENTS WHERE BODY = "AAA" EMIT CHANGES LIMIT 20';
$queryTwo = 'SELECT * FROM STREAM WHERE BODY = "BBB" EMIT CHANGES LIMIT 20';

$stream = $ksqlc->multiplex(
	[$queryOne, 'earliest'],
	[$queryTwo, 'earliest']
);

foreach($stream as $row)
{
	/* Stream processing... */
}


$stream = $ksqlc->stream('SELECT * FROM EVENT_STREAM EMIT CHANGES LIMIT 20');

foreach($stream as $row)
{
	/* Stream processing... */
}


$stream = $ksqlc->stream($queryString, 'earliest'); ## process everything
$stream = $ksqlc->stream($queryString, 'latest');   ## process new records


$query  = 'SELECT * FROM EVENT_STREAM EMIT CHANGES LIMIT 20';
$stream = $ksqlc->stream($queryString, 'latest', TRUE);

foreach($stream as $row)
{
	var_dump($row);
}


$results = $ksqlc->run('SHOW TABLES');
var_dump( $results );
// object SeanMorris\Ksqlc\Result {
//  $type          => "tables"
//  $warnings      => {}
//  $statementText => "SHOW TABLES"
// }

foreach($results as $table)
{
	var_dump( $table );
	// object stdClass {
	// 	$type       => "TABLE"
	// 	$name       => "event_table"
	// 	$topic      => "event_table"
	// 	$format     => "JSON"
	// 	$isWindowed => false
	// }
}


[$streams, $tables] = $ksqlc->run('SHOW STREAMS', 'SHOW TABLES');

foreach($streams as $stream)
{
	// ...
}

foreach($tables as $table)
{
	// ...
}