feat: integrate Kafka support with producer and consumer implementation
All checks were successful
🧪✨ Tests Workflow / 🛡️ 🔒 Library Audit (push) Successful in 1m29s
🧪✨ Tests Workflow / 🧪 ✨ Database Migrations (push) Successful in 1m49s
🧪✨ Tests Workflow / 📝 ✨ Code Lint (push) Successful in 1m35s
🧪✨ Tests Workflow / 🛡️ 🔒 License Check (push) Successful in 1m42s
🧪✨ Tests Workflow / 🐙 🔍 Code Sniffer (push) Successful in 1m48s
🧪✨ Tests Workflow / 🧪 ✅ Unit Tests (push) Successful in 1m24s

This commit is contained in:
2025-11-12 15:22:23 -05:00
parent e0ba77556d
commit eacc603561
2 changed files with 4 additions and 2 deletions

View File

@@ -51,7 +51,7 @@ return [
], ],
'queue' => [ 'queue' => [
'broker' => Env::get('QUEUE_BROKER', 'kafka'), 'broker' => Env::get('QUEUE_BROKER', 'redis'),
'broker_config' => [ 'broker_config' => [
@@ -61,6 +61,7 @@ return [
'kafka' => [ 'kafka' => [
'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'kafka:9092'), 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'kafka:9092'),
'consumerGroup' => Env::get('QUEUE_KAFKA_CONSUMER_GROUP', 'default_group'),
], ],
'rabbitmq' => [ 'rabbitmq' => [

View File

@@ -24,12 +24,12 @@ class Kafka extends Broker
$conf = new Conf(); $conf = new Conf();
$conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092'); $conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092');
$this->producer = new Producer($conf); $this->producer = new Producer($conf);
$this->producer->addBrokers($config['brokers'] ?? 'localhost:9092'); $this->producer->addBrokers($config['brokers'] ?? 'localhost:9092');
$conf->set('group.id', $config['consumerGroup'] ?? 'default'); $conf->set('group.id', $config['consumerGroup'] ?? 'default');
$conf->set('auto.offset.reset', 'earliest'); $conf->set('auto.offset.reset', 'earliest');
$this->consumer = new KafkaConsumer($conf); $this->consumer = new KafkaConsumer($conf);
} }
@@ -60,6 +60,7 @@ class Kafka extends Broker
return null; return null;
} }
/** @var string | null $messageData */
$messageData = $kafkaMessage->payload; $messageData = $kafkaMessage->payload;
if ($messageData !== null) { if ($messageData !== null) {
/** @var Message $message */ /** @var Message $message */