PHP code example of jenner / kafka-php
1. Go to this page and download the library: Download jenner/kafka-php library . Choose the download type require .
2. Extract the ZIP file and open the index.php.
3. Add this code to the index.php.
<?php
require_once('vendor/autoload.php');
/* Start to develop here. Best regards https://php-download.com/ */
jenner / kafka-php example snippets php
$produce = \Kafka\Produce::getInstance('localhost:2181', 3000);
$produce->setRequireAck(-1);
$produce->setMessages('test', 0, array('test1111111'));
$produce->setMessages('test6', 0, array('test1111111'));
$produce->setMessages('test6', 2, array('test1111111'));
$produce->setMessages('test6', 1, array('test111111111133'));
$result = $produce->send();
var_dump($result);
php
$consumer = \Kafka\Consumer::getInstance('localhost:2181');
$consumer->setGroup('testgroup');
$consumer->setPartition('test', 0);
$consumer->setPartition('test6', 2, 10);
$result = $consumer->fetch();
foreach ($result as $topicName => $topic) {
foreach ($topic as $partId => $partition) {
var_dump($partition->getHighOffset());
foreach ($partition as $message) {
var_dump((string)$message);
}
}
}
php
$data = array(
' 'data' => array(
array(
'topic_name' => 'test',
'partitions' => array(
array(
'partition_id' => 0,
'messages' => array(
'message1',
'message2',
),
),
),
),
),
);
$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->produceRequest($data);
$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->produceResponse();
var_dump($result);
php
$data = array(
'data' => array(
array(
'topic_name' => 'test',
'partitions' => array(
array(
'partition_id' => 0,
'offset' => 0,
),
),
),
),
);
$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchRequest($data);
$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchResponse();
var_dump($result);
php
$data = array(
'data' => array(
array(
'topic_name' => 'test',
'partitions' => array(
array(
'partition_id' => 0,
'max_offset' => 10,
'time' => -1,
),
),
),
),
);
$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->offsetRequest($data);
$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->offsetResponse();
var_dump($result);
php
array(
'topic_name1', // topic name
);
php
$data = array(
'test'
);
$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->metadataRequest($data);
$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->metadataResponse();
var_dump($result);
php
$data = array(
'group_id' => 'testgroup',
'data' => array(
array(
'topic_name' => 'test',
'partitions' => array(
array(
'partition_id' => 0,
'offset' => 2,
),
),
),
),
);
$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->commitOffsetRequest($data);
$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->commitOffsetResponse();
var_dump($result);
php
$data = array(
'group_id' => 'testgroup',
'data' => array(
array(
'topic_name' => 'test',
'partitions' => array(
array(
'partition_id' => 0,
),
),
),
),
);
$conn = new \Kafka\Socket('localhost', '9092');
$conn->connect();
$encoder = new \Kafka\Protocol\Encoder($conn);
$encoder->fetchOffsetRequest($data);
$decoder = new \Kafka\Protocol\Decoder($conn);
$result = $decoder->fetchOffsetResponse();
var_dump($result);