From eacc603561a6ceb667d24ef546cdbbe780cf9d5d Mon Sep 17 00:00:00 2001 From: Ron Rise Date: Wed, 12 Nov 2025 15:22:23 -0500 Subject: [PATCH] feat: integrate Kafka support with producer and consumer implementation --- config.php | 3 ++- src/Async/Brokers/Kafka.php | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/config.php b/config.php index ed89760..81ef275 100644 --- a/config.php +++ b/config.php @@ -51,7 +51,7 @@ return [ ], 'queue' => [ - 'broker' => Env::get('QUEUE_BROKER', 'kafka'), + 'broker' => Env::get('QUEUE_BROKER', 'redis'), 'broker_config' => [ @@ -61,6 +61,7 @@ return [ 'kafka' => [ 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'kafka:9092'), + 'consumerGroup' => Env::get('QUEUE_KAFKA_CONSUMER_GROUP', 'default_group'), ], 'rabbitmq' => [ diff --git a/src/Async/Brokers/Kafka.php b/src/Async/Brokers/Kafka.php index fc0c8b3..95f30fc 100644 --- a/src/Async/Brokers/Kafka.php +++ b/src/Async/Brokers/Kafka.php @@ -24,12 +24,12 @@ class Kafka extends Broker $conf = new Conf(); $conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092'); + $this->producer = new Producer($conf); $this->producer->addBrokers($config['brokers'] ?? 'localhost:9092'); $conf->set('group.id', $config['consumerGroup'] ?? 'default'); $conf->set('auto.offset.reset', 'earliest'); - $this->consumer = new KafkaConsumer($conf); } @@ -60,6 +60,7 @@ class Kafka extends Broker return null; } + /** @var string | null $messageData */ $messageData = $kafkaMessage->payload; if ($messageData !== null) { /** @var Message $message */