diff --git a/config.php b/config.php index ed89760..81ef275 100644 --- a/config.php +++ b/config.php @@ -51,7 +51,7 @@ return [ ], 'queue' => [ - 'broker' => Env::get('QUEUE_BROKER', 'kafka'), + 'broker' => Env::get('QUEUE_BROKER', 'redis'), 'broker_config' => [ @@ -61,6 +61,7 @@ return [ 'kafka' => [ 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'kafka:9092'), + 'consumerGroup' => Env::get('QUEUE_KAFKA_CONSUMER_GROUP', 'default_group'), ], 'rabbitmq' => [ diff --git a/src/Async/Brokers/Kafka.php b/src/Async/Brokers/Kafka.php index fc0c8b3..95f30fc 100644 --- a/src/Async/Brokers/Kafka.php +++ b/src/Async/Brokers/Kafka.php @@ -24,12 +24,12 @@ class Kafka extends Broker $conf = new Conf(); $conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092'); + $this->producer = new Producer($conf); $this->producer->addBrokers($config['brokers'] ?? 'localhost:9092'); $conf->set('group.id', $config['consumerGroup'] ?? 'default'); $conf->set('auto.offset.reset', 'earliest'); - $this->consumer = new KafkaConsumer($conf); } @@ -60,6 +60,7 @@ class Kafka extends Broker return null; } + /** @var string | null $messageData */ $messageData = $kafkaMessage->payload; if ($messageData !== null) { /** @var Message $message */