PHP code example of mingyukim / php-kafka

1. Go to this page and download the library: Download mingyukim/php-kafka library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

mingyukim / php-kafka example snippets



return [
    /*
     * Kafka 브로커에 보이는 클라이언트의 ID를 설정합니다. 클라이언트 ID는 주로 로그와 메트릭을 생성할 때 사용됩니다.
     */
    'client-id' => '',
    /*
     * bootstrap.servers는 Kafka 브로커에 연결하는 데 필요한 호스트 이름과 포트 번호를 설정합니다.
     */
    'bootstrap-servers' => 'your-kafka:your-port',
    /*
     * group.id는 Kafka 컨슈머 그룹의 ID를 설정합니다. 같은 그룹 ID를 가진 컨슈머들은 같은 메시지를 동시에 처리하지 않습니다.
     */
    'group-id' => 'your-kafka-group-id',
    /*
     * enable.partition.eof는 파티션의 끝(end of file, EOF)에 도달했을 때 표시를 내보내는지 여부를 설정합니다. 이 옵션이 true로 설정되면, 파티션의 끝에 도달하면 컨슈머에게 알려줍니다.
     */
    'enable-partition-eof' => 'true',
    /*
     * auto.offset.reset는 Kafka 컨슈머가 이전에 커밋된 오프셋이 없거나, 현재 오프셋이 더 이상 유효하지 않은 경우 어떻게 처리할지를 설정합니다. 'earliest'로 설정하면 가장 처음의 오프셋에서 시작합니다.
     */
    'auto-offset-reset' => 'earliest',
    /*
     * log_level는 로깅 레벨을 설정합니다. 여기서는 LOG_DEBUG로 설정되어 있으므로, 디버그 레벨의 로그를 출력합니다.
     */
    'log-level' => (string) LOG_DEBUG,
    /*
     * debug는 어떤 종류의 정보를 디버그할지를 설정합니다. 'all'로 설정하면 모든 종류의 정보를 디버그합니다.
     */
    'debug' => 'all',




    /*
     * SASL(간단한 인증 및 보안 계층) 메커니즘을 설정합니다. 일반적으로 "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" 등이 사용됩니다. 빈 문자열은 사용하지 않겠다는 의미입니다.
     */
    'sasl-mechanisms' => '',
    /*
     * 브로커의 호스트 이름 검증에 사용할 알고리즘을 설정합니다. https는 브로커의 호스트 이름을 검증하려면 HTTPS처럼 사용됩니다.
     */
    'ssl-endpoint-identification-algorithm' => '', // https
    /*
     * SASL 인증에 사용할 사용자 이름 및 패스워드를 설정합니다.
     */
    'sasl-username' => '',
    'sasl-password' => '',


    /*
     * 사용할 보안 프로토콜을 설정합니다. 'ssl'은 SSL/TLS를 사용하여 통신을 암호화합니다.
     */
    'security-protocol' => '', // 'ssl'
    /*
     * CA(Certificate Authority) 인증서의 위치를 설정합니다. CA 인증서는 브로커의 서버 인증서를 검증하는 데 사용됩니다.
     */
    'ssl-ca-location' => '', //  __DIR__.'/../../../keys/ca.pem'
    /*
     * 클라이언트의 공개키 인증서의 위치를 설정합니다.
     */
    'ssl-certificate-location' => '', // __DIR__.'/../../../keys/kafka.cert'
    /*
     * 클라이언트의 개인키 인증서의 위치를 설정합니다.
     */
    'ssl-key-location' => '', // __DIR__.'/../../../keys/kafka.key'
];


return [
    /*
     * Kafka 브로커에 보이는 클라이언트의 ID를 설정합니다. 클라이언트 ID는 주로 로그와 메트릭을 생성할 때 사용됩니다.
     */
    'client-id' => '',
    /*
     * bootstrap.servers는 Kafka 브로커에 연결하는 데 필요한 호스트 이름과 포트 번호를 설정합니다.
     */
    'bootstrap-servers' => 'localhost:9092',
    /*
     * Kafka 브로커에 연결하는 데 필요한 호스트 이름과 포트 번호를 설정합니다.
     */
    'metadata-broker-list' => 'localhost:9092',
    /*
     * Kafka 프로듀서가 메시지를 압축하는 데 사용할 코덱을 설정합니다. 'none', 'gzip', 'lz4', 'snappy', 'zstd' 등을 설정할 수 있습니다.
     */
    'compression-codec' => 'snappy',
    /*
     * 메시지 전송에 대한 타임아웃을 설정합니다. 설정된 시간(밀리초 단위) 동안 메시지 전송이 완료되지 않으면 프로듀서는 재시도합니다.
     */
    'message-timeout-ms' => '5000', // 5초
    /*
     * 프로듀서의 아이덤포턴스를 설정합니다. 이 옵션은 "정확히 한 번의" 메시지 전송을 보장합니다. 이는 중복 메시지를 방지하고, 메시지의 순서를 유지합니다.
     */
    'enable-idempotence' => 'true',


    /*
     * SASL(간단한 인증 및 보안 계층) 메커니즘을 설정합니다. 일반적으로 "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512" 등이 사용됩니다. 빈 문자열은 사용하지 않겠다는 의미입니다.
     */
    'sasl-mechanisms' => '',
    /*
     * 브로커의 호스트 이름 검증에 사용할 알고리즘을 설정합니다. https는 브로커의 호스트 이름을 검증하려면 HTTPS처럼 사용됩니다.
     */
    'ssl-endpoint-identification-algorithm' => '', // https
    /*
     * SASL 인증에 사용할 사용자 이름 및 패스워드를 설정합니다.
     */
    'sasl-username' => '',
    'sasl-password' => '',


    /*
     * 사용할 보안 프로토콜을 설정합니다. 'ssl'은 SSL/TLS를 사용하여 통신을 암호화합니다.
     */
    'security-protocol' => '', // 'ssl'
    /*
     * CA(Certificate Authority) 인증서의 위치를 설정합니다. CA 인증서는 브로커의 서버 인증서를 검증하는 데 사용됩니다.
     */
    'ssl-ca-location' => '', //  __DIR__.'/../../../keys/ca.pem'
    /*
     * 클라이언트의 공개키 인증서의 위치를 설정합니다.
     */
    'ssl-certificate-location' => '', // __DIR__.'/../../../keys/kafka.cert'
    /*
     * 클라이언트의 개인키 인증서의 위치를 설정합니다.
     */
    'ssl-key-location' => '', // __DIR__.'/../../../keys/kafka.key'
];

use MingyuKim\PhpKafka\Classes\KafkaConsumer;

// consumer 및 필요 객체 초기화
$kafkaConsumer = KafkaConsumer::getInstance();
$dataLibrary = DataLibrary::getInstance();
// 토픽 구독
$kafkaConsumer->subscribe('test-topic');

// messages 읽어오기
while (true) {
    $message = $kafkaConsumer->consume(1); // wait for up to 1 second
    // Process the message as needed...
    $payload = $message->payload;
    $payloadArray = $dataLibrary->convert(gettype($payload),'array',$payload);
    dump($payloadArray);
}


use MingyuKim\PhpKafka\Classes\KafkaProducer;

// producer 및 필요 객체 초기화
$producer = KafkaProducer::getInstance();
$apiLibrary = ApiLibrary::getInstance();
$dataLibrary = DataLibrary::getInstance();

// 테스트 Json 데이터 생성
$apiLibrary->setApiUrl('https://jsonplaceholder.typicode.com/posts');
$apiLibrary->setMethod('GET');
$resultArray = $apiLibrary->sendRequest('array');

foreach ($resultArray as $result) {
    $jsonResult = $dataLibrary->convert(gettype($result), 'string', $result);

    //프로듀싱 하기
    $producer->produce('test-topic',$jsonResult);
}
shell
sudo apt-get update && sudo apt-get upgrade -y
sudo apt-get install librdkafka-dev
sudo apt-get install php-dev
sudo apt-get install php-pear
sudo pecl install rdkafka
# 설치된 php 버전의 ini파일에 extension 추가하기
echo "extension=rdkafka.so" | sudo tee -a /etc/php/8.1/cli/php.ini 

composer 
json
"  "mingyukim/php-kafka": "*"
}
shell
 php artisan vendor:publish --provider="MingyuKim\PhpKafka\PhpKafkaProvider" --tag="config"