PHP code example of werk365 / larakafka

1. Go to this page and download the library: Download werk365/larakafka library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

werk365 / larakafka example snippets




return [
    'client' => [
        'client_name' =>  str_replace(' ', '-', strtolower(env('APP_NAME'))),
        'configs' => [
            'producer' => [
                'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))),
                'compression.codec' => 'snappy',
                'security.protocol' => 'SASL_SSL',
                'sasl.mechanisms' => 'PLAIN',
                'sasl.username' => '',
                'sasl.password' => ''
            ],
            'consumer' => [
                'client.id' => str_replace(' ', '-', strtolower(env('APP_NAME'))),
                'security.protocol' => 'SASL_SSL',
                'sasl.mechanisms' => 'PLAIN',
                'sasl.username' => '',
                'sasl.password' => '',
                'auto.offset.reset' => 'earliest'
            ]
        ],
        'broker' => '',
    ],
    'functions' => [
        'consumer' => [
            'example' => [
                'function' => 'ingest',
                'namespace' => '\App\Services\KafkaService'
            ]
        ],
    ],
    'maps' => [
        'consumer' => [
            'example' => [
                'model' => 'App\Models\Example',
                'event_id' => 'uuid',
                'model_id' => 'id',
                'attributes' => [
                    'uuid' => 'id',
                    'first_name' => 'name',
                ]
            ]
        ]
    ]
];


use Werk365\LaraKafka\LaraKafka;

$kafka = new LaraKafka();
$kafka->setTopic("string") //optional, defaults to application name
    ->setKey("string") // optional, default will be the caller classname
    ->setHeaders(["key" => "value"]) // optional, default will contain more information about caller
    ->setBody("string") // Body can also be set like: $kafka = new LaraKafka("body")
    ->produce();
 
 'listeners' => [
       // ...

        TickReceived::class => [
            ...Octane::prepareApplicationForNextOperation(),
            \App\Consumers\TestConsumer::class
        ],

        // ...
    ],

namespace App\Services;

use Illuminate\Support\Facades\Log;

class KafkaService

{
    public static function ingest($key, $headers, $body)
    {

        Log::info(json_encode($body));
    }

}

namespace App\Services;

use Werk365\LaraKafka\LaraKafka;

class KafkaService

{
    public static function ingest($key, $headers, $body)
    {
        $kafka = new LaraKafka();
        $kafka->storeMessage($body->event_attributes, ["user"]);
    }

}
bash
$ php artisan larakafka:consume {topic}
 bash
$ php artisan vendor:publish --provider="Werk365\LaraKafka\LaraKafkaServiceProvider"
bash
$ php artisan kafka:consumer topic
bash
$ php artisan larakafka:consume {topic}