feat/queue-kafka (#15)
All checks were successful
🧪✨ Tests Workflow / 🛡️ 🔒 Library Audit (push) Successful in 1m26s
🧪✨ Tests Workflow / 🧪 ✨ Database Migrations (push) Successful in 2m8s
🧪✨ Tests Workflow / 🛡️ 🔒 License Check (push) Successful in 59s
🧪✨ Tests Workflow / 📝 ✨ Code Lint (push) Successful in 1m14s
🧪✨ Tests Workflow / 🐙 🔍 Code Sniffer (push) Successful in 1m16s
🧪✨ Tests Workflow / 🧪 ✅ Unit Tests (push) Successful in 1m13s

Reviewed-on: #15
Co-authored-by: Ron Rise <ron@siteworxpro.com>
Co-committed-by: Ron Rise <ron@siteworxpro.com>
This commit was merged in pull request #15.
This commit is contained in:
2025-11-12 20:29:42 +00:00
committed by Siteworx Pro Gitea
parent 2879cbe203
commit e4a55af694
9 changed files with 257 additions and 96 deletions

View File

@@ -4,33 +4,84 @@ declare(strict_types=1);
namespace Siteworxpro\App\Async\Brokers;
use RdKafka\Conf;
use RdKafka\Exception;
use RdKafka\KafkaConsumer;
use RdKafka\Producer;
use Siteworxpro\App\Async\Queues\Queue;
use Siteworxpro\App\Async\Messages\Message;
class Kafka extends Broker
{
public function publish(Queue $queue, Message $message, ?int $delay = null): void
private Producer $producer;
private KafkaConsumer $consumer;
public function __construct($config = [])
{
// TODO: Implement publish() method.
parent::__construct($config);
$conf = new Conf();
$conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092');
$this->producer = new Producer($conf);
$this->producer->addBrokers($config['brokers'] ?? 'localhost:9092');
$conf->set('group.id', $config['consumerGroup'] ?? 'default');
$conf->set('auto.offset.reset', 'earliest');
$this->consumer = new KafkaConsumer($conf);
}
public function consume(Queue $queue): Message | null
public function __destruct()
{
$this->producer->flush(1000);
}
/**
* @throws \Exception
*/
public function publish(Queue $queue, Message $message, ?int $delay = null): void
{
$topic = $this->producer->newTopic($queue->queueName());
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->serialize(), $message->getId());
$this->producer->flush(1000);
}
/**
* @throws Exception
*/
public function consume(Queue $queue): Message|null
{
$this->consumer->subscribe([$queue->queueName()]);
$kafkaMessage = $this->consumer->consume(1000);
if ($kafkaMessage->err === RD_KAFKA_RESP_ERR__TIMED_OUT) {
return null;
}
/** @var string | null $messageData */
$messageData = $kafkaMessage->payload;
if ($messageData !== null) {
/** @var Message $message */
$message = unserialize($messageData, ['allowed_classes' => true]);
$message->setId((string)$kafkaMessage->offset);
return $message;
}
return null;
}
public function acknowledge(Queue $queue, Message $message): void
{
// TODO: Implement acknowledge() method.
}
public function reject(Queue $queue, Message $message, bool $requeue = false): void
{
// TODO: Implement reject() method.
}
public function purge(Queue $queue): void
{
// TODO: Implement purge() method.
}
}

View File

@@ -13,7 +13,7 @@ class Consumer
private static bool $shutDown = false;
private const array QUEUES = [
Queues\DefaultQueue::class,
'default' => Queues\DefaultQueue::class,
];
private array $queues = [];
@@ -22,9 +22,24 @@ class Consumer
private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\';
public function __construct()
public function __construct(array $queues = [])
{
foreach (self::QUEUES as $queueClass) {
if ($queues === []) {
$queues = self::QUEUES;
} else {
$mappedQueues = [];
foreach ($queues as $queueName) {
if (isset(self::QUEUES[$queueName])) {
$mappedQueues[] = self::QUEUES[$queueName];
} else {
throw new \InvalidArgumentException("Queue '$queueName' is not defined.");
}
}
$queues = $mappedQueues;
}
foreach ($queues as $queueClass) {
$this->queues[] = new $queueClass();
}

View File

@@ -6,6 +6,7 @@ namespace Siteworxpro\App\Cli\Commands\Queue;
use Ahc\Cli\Input\Command;
use Siteworxpro\App\Async\Consumer;
use Siteworxpro\App\Async\Messages\SayHelloMessage;
use Siteworxpro\App\Cli\Commands\CommandInterface;
class Start extends Command implements CommandInterface
@@ -13,14 +14,19 @@ class Start extends Command implements CommandInterface
public function __construct()
{
parent::__construct('queue:start', 'Start the queue consumer to process messages.');
$this->argument('[name]', 'Your name')
->option('-g, --greet', 'Include a greeting message');
$this->argument('[queues]', 'The name of the queue to consume from. ex. "first_queue,second_queue"');
}
public function execute(): int
{
$consumer = new Consumer();
$queues = [];
if ($this->values()['queues'] !== null) {
$queues = explode(',', $this->values()['queues']);
}
SayHelloMessage::dispatch("hello from queue consumer!");
$consumer = new Consumer($queues);
$consumer->start();
return 0;

View File

@@ -14,7 +14,7 @@ class BrokerServiceProvider extends ServiceProvider
{
$this->app->singleton(Broker::class, function (): Broker {
$configName = Config::get('queue.broker');
$brokerConfig = Config::get('queue.' . $configName) ?? [];
$brokerConfig = Config::get('queue.broker_config.' . $configName) ?? [];
$brokerClass = Broker::BROKER_TYPES[$configName] ?? null;