feat/queue-kafka #15

Merged
rrise merged 3 commits from feat/queue-kafka into master 2025-11-12 20:29:45 +00:00
2 changed files with 25 additions and 7 deletions
Showing only changes of commit f8b988ca0d - Show all commits

View File

@@ -13,7 +13,7 @@ class Consumer
private static bool $shutDown = false;
private const array QUEUES = [
Queues\DefaultQueue::class,
'default' => Queues\DefaultQueue::class,
];
private array $queues = [];
@@ -22,9 +22,24 @@ class Consumer
private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\';
public function __construct()
public function __construct(array $queues = [])
{
foreach (self::QUEUES as $queueClass) {
if ($queues === []) {
$queues = self::QUEUES;
} else {
$mappedQueues = [];
foreach ($queues as $queueName) {
if (isset(self::QUEUES[$queueName])) {
$mappedQueues[] = self::QUEUES[$queueName];
} else {
throw new \InvalidArgumentException("Queue '$queueName' is not defined.");
}
}
$queues = $mappedQueues;
}
foreach ($queues as $queueClass) {
$this->queues[] = new $queueClass();
}

View File

@@ -13,14 +13,17 @@ class Start extends Command implements CommandInterface
public function __construct()
{
parent::__construct('queue:start', 'Start the queue consumer to process messages.');
$this->argument('[name]', 'Your name')
->option('-g, --greet', 'Include a greeting message');
$this->argument('[queues]', 'The name of the queue to consume from. ex. "first_queue,second_queue"');
}
public function execute(): int
{
$consumer = new Consumer();
$queues = [];
if ($this->values()['queues'] !== null) {
$queues = explode(',', $this->values()['queues']);
}
$consumer = new Consumer($queues);
$consumer->start();
return 0;