diff --git a/src/Async/Consumer.php b/src/Async/Consumer.php index 38572d8..26d9fda 100644 --- a/src/Async/Consumer.php +++ b/src/Async/Consumer.php @@ -13,7 +13,7 @@ class Consumer private static bool $shutDown = false; private const array QUEUES = [ - Queues\DefaultQueue::class, + 'default' => Queues\DefaultQueue::class, ]; private array $queues = []; @@ -22,9 +22,24 @@ class Consumer private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\'; - public function __construct() + public function __construct(array $queues = []) { - foreach (self::QUEUES as $queueClass) { + if ($queues === []) { + $queues = self::QUEUES; + } else { + $mappedQueues = []; + foreach ($queues as $queueName) { + if (isset(self::QUEUES[$queueName])) { + $mappedQueues[] = self::QUEUES[$queueName]; + } else { + throw new \InvalidArgumentException("Queue '$queueName' is not defined."); + } + } + $queues = $mappedQueues; + } + + + foreach ($queues as $queueClass) { $this->queues[] = new $queueClass(); } diff --git a/src/Cli/Commands/Queue/Start.php b/src/Cli/Commands/Queue/Start.php index cc52e4d..9962270 100644 --- a/src/Cli/Commands/Queue/Start.php +++ b/src/Cli/Commands/Queue/Start.php @@ -13,14 +13,17 @@ class Start extends Command implements CommandInterface public function __construct() { parent::__construct('queue:start', 'Start the queue consumer to process messages.'); - - $this->argument('[name]', 'Your name') - ->option('-g, --greet', 'Include a greeting message'); + $this->argument('[queues]', 'The name of the queue to consume from. ex. "first_queue,second_queue"'); } public function execute(): int { - $consumer = new Consumer(); + $queues = []; + if ($this->values()['queues'] !== null) { + $queues = explode(',', $this->values()['queues']); + } + + $consumer = new Consumer($queues); $consumer->start(); return 0;