Getting started with Laravel Kafka [In-Depth Tutorial]


Laravel

Author: Steve Alila
Reviewer: Deepak Prasad

In today's fast-paced and data-driven world, real-time data processing and event-driven architectures are becoming increasingly important.

Laravel, one of the most popular PHP frameworks, seamlessly integrates with Apache Kafka through the powerful Laravel Kafka package. This integration empowers you to build scalable, fault-tolerant, and real-time applications easily.

This blog post explores everything you need to know to get started with this powerful combination. We walk you through the installation and setup process. We then demonstrate how to produce messages to Kafka topics and showcase how to consume messages for efficient processing.

Read on to learn more.

 

Installing and Setting up Laravel Kafka

Get the Requirements

Laravel Kafka requires PHP version 8.0+, Laravel version 8+, and rdkafka PHP extension, which provides a production-ready, Kafka client for PHP. You can install the PHP extension on a Debian-based distribution using the following command.

sudo apt install librdkafka-dev

Getting started with Laravel Kafka [In-Depth Tutorial]

 

Install Laravel Kafka

Run the following command to install Laravel Kafka:

composer require mateusjunges/laravel-kafka

Add the --ignore-platform-reqs option to the command if you get an error similar to Package mateusjunges/laravel-kafka has requirements incompatible with your PHP version, PHP extensions and Composer version after installing the requirements.

 

Set up Laravel Kafka

Publish the config files.

php artisan vendor:publish --tag=laravel-kafka-config

Laravel Kafka installed

 

Your config/kafka.php file should appear like this:

<?php

return [
    /*
     | Your kafka brokers url.
     */
    'brokers' => env('KAFKA_BROKERS', 'localhost:9092'),

    /*
     | Kafka consumers belonging to the same consumer group share a group id.
     | The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by
     | establishing that each partition is only consumed by a single consumer from the group.
     | This config defines the consumer group id you want to use for your project.
     */
    'consumer_group_id' => env('KAFKA_CONSUMER_GROUP_ID', 'group'),

    'consumer_timeout_ms' => env("KAFKA_CONSUMER_DEFAULT_TIMEOUT", 2000),

    /*
     | After the consumer receives its assignment from the coordinator,
     | it must determine the initial position for each assigned partition.
     | When the group is first created, before any messages have been consumed, the position is set according to a configurable
     | offset reset policy (auto.offset.reset). Typically, consumption starts either at the earliest offset or the latest offset.
     | You can choose between "latest", "earliest" or "none".
     */
    'offset_reset' => env('KAFKA_OFFSET_RESET', 'latest'),

    /*
     | If you set enable.auto.commit (which is the default), then the consumer will automatically commit offsets periodically at the
     | interval set by auto.commit.interval.ms.
     */
    'auto_commit' => env('KAFKA_AUTO_COMMIT', true),

    'sleep_on_error' => env('KAFKA_ERROR_SLEEP', 5),

    'partition' => env('KAFKA_PARTITION', 0),

    /*
     | Kafka supports 4 compression codecs: none , gzip , lz4 and snappy
     */
    'compression' => env('KAFKA_COMPRESSION_TYPE', 'snappy'),

    /*
     | Choose if debug is enabled or not.
     */
    'debug' => env('KAFKA_DEBUG', false),

    /*
     | Repository for batching messages together
     | Implement BatchRepositoryInterface to save batches in different storage
     */
    'batch_repository' => env('KAFKA_BATCH_REPOSITORY', \Junges\Kafka\BatchRepositories\InMemoryBatchRepository::class),

    /*
     | The sleep time in milliseconds that will be used when retrying flush
     */
    'flush_retry_sleep_in_ms' => 100,

    /*
     | The cache driver that will be used
     */
    'cache_driver' => env('KAFKA_CACHE_DRIVER', env('CACHE_DRIVER', 'file')),
];

Getting started with Laravel Kafka [In-Depth Tutorial]

 

Producing Messages with Laravel Kafka

You can publish messages to Kafka using the publishOn method, which returns a ProducerBuilder instance.

use Junges\Kafka\Facades\Kafka;
Kafka::publishOn('topic')

The ProducerBuilder instance contains methods to configure the Kafka producer. Some of the methods are withConfigOption, withConfigOptions, withDebugEnabled, and withDebugDisabled.

use Junges\Kafka\Facades\Kafka;

$producer = Kafka::publishOn('topic')
    ->withConfigOption('property-name', 'property-value')
    ->withConfigOptions([
        'property-name' => 'property-value'
    ])
    ->withDebugEnabled() // To enable debug mode
    ->withDebugDisabled() // To disable debug mode
    ->withDebugEnabled(false) // Also to disable debug mode

The withConfigOption method sets a \RdKafka\Conf::class option. The withConfigOption sets a config per call, whereas the withConfigOptions takes an array of options of name - value format.

The withHeaders method lets you configure message headers.

use Junges\Kafka\Facades\Kafka;

$producer = Kafka::publishOn('topic')
    ->withHeaders([
        'header-key' => 'header-value'
    ])

You can configure message body with withMessage or withBodyKey. The withMessage accepts Message class as an argument and sets the entire message with it.

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;

$new_message = new Message(
    headers: ['header-key' => 'header-value'],
    body: ['key' => 'value'],
    key: 'kafka key here'  
)

$producer = Kafka::publishOn('topic')->withMessage($new_message);

On the other hand, withBodyKey sets only a message key.

use Junges\Kafka\Facades\Kafka;
$producer = Kafka::publishOn('topic')->withBodyKey('key', 'value');

You can use the key with the withKafkaKey method.

use Junges\Kafka\Facades\Kafka;
$producer = Kafka::publishOn('topic')->withKafkaKey('key');

Lastly, you can send the message to Kafka using the send method.

$producer->send();

 

Consuming Messages with Laravel Kafka

Your application can read messages from a Kafka topic by creating a consumer object, subscribing to the topic and receiving messages.

The Kafka class' createConsumer method creates a consumer and returns ConsumerBuilder class, which you can use to configure the consumer.

use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer();

You can specify the topics, consumer group id, and the broker.

use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer(['topic_1', 'topic_2'], 'group_id', 'broker');

Some of the methods of the ConsumerBuilder class are subscribe, withConsumerGroupId, withHandler, build, and consume. The subscribe lets you subscribe to a topic or array of topics.

use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer()->subscribe('topic');

The withConsumerGroupId lets you attach a consumer to a group. The consumers in a group share group id and divide the topic partitions fairly amongst themselves with each consumer being attached to one partition.

use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer()->withConsumerGroupId('consumer_group');

The withHandler method lets you specify a handler for your consumer.

$consumer->withHandler(function(\Junges\Kafka\Contracts\KafkaConsumerMessage $message) {
    // Handle your message
});

The KafkaConsumerMessage contract returns methods to get the message properties. Here are some of the methods and what they return:

  • getKey(): Kafka Message Key.
  • getTopicName(): Topic where the message was published.
  • getPartition(): Kafka partition where the message was published.
  • getHeaders(): Kafka message headers.
  • getBody(): Message body.
  • getOffset(): The offset where the message was published.

Lastly, you can build and consume the message with respective methods.

use Junges\Kafka\Facades\Kafka;
$consumer = Kafka::createConsumer()->build();
$consumer->consume();

 

Conclusion

Laravel Kafka provides seamless integration between the Laravel framework and Apache Kafka, enabling you to build scalable, fault-tolerant, and real-time applications.

This blog post covered installation, setup, production, and consumption of messages using Laravel Kafka. Now you should be able to set up Laravel Kafka, configure Kafka connections, and create producers and consumers.

 

Steve Alila

Steve Alila

He specializes in web design, WordPress development, and data analysis, with proficiency in Python, JavaScript, and data extraction tools. Additionally, he excels in web API development, AI integration, and data presentation using Matplotlib and Plotly. You can connect with him on his LinkedIn profile.

Can't find what you're searching for? Let us assist you.

Enter your query below, and we'll provide instant results tailored to your needs.

If my articles on GoLinuxCloud has helped you, kindly consider buying me a coffee as a token of appreciation.

Buy GoLinuxCloud a Coffee

For any other feedbacks or questions you can send mail to admin@golinuxcloud.com

Thank You for your support!!

Leave a Comment