Queues\DefaultQueue::class, ]; private array $queues = []; private array $handlers = []; private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\'; public function __construct(array $queues = []) { if ($queues === []) { $queues = self::QUEUES; } else { $mappedQueues = []; foreach ($queues as $queueName) { if (isset(self::QUEUES[$queueName])) { $mappedQueues[] = self::QUEUES[$queueName]; } else { throw new \InvalidArgumentException("Queue '$queueName' is not defined."); } } $queues = $mappedQueues; } foreach ($queues as $queueClass) { $this->queues[] = new $queueClass(); } $this->registerHandlers(); } private function registerHandlers(): void { $recursiveIterator = new \RecursiveIteratorIterator( new \RecursiveDirectoryIterator(__DIR__ . '/Handlers/') ); foreach ($recursiveIterator as $file) { if ($file->isFile() && $file->getExtension() === 'php') { $relativePath = str_replace(__DIR__ . '/Handlers/', '', $file->getPathname()); $className = self::HANDLER_NAMESPACE . str_replace('/', '\\', substr($relativePath, 0, -4)); if (class_exists($className)) { $reflection = new \ReflectionClass($className); $attributes = $reflection->getAttributes(HandlesMessage::class); foreach ($attributes as $attribute) { $instance = $attribute->newInstance(); $messageClass = $instance->getMessageClass(); $this->handlers[$messageClass][] = $className; } } } } } /** * @param $signal */ public static function handleSignal($signal): void { switch ($signal) { // Graceful case SIGINT: case SIGTERM: case SIGHUP: self::$shutDown = true; break; // Not Graceful case SIGKILL: exit(9); } } private function shouldShutDown(): bool { return self::$shutDown; } public function start(): void { if (!\function_exists('pcntl_signal')) { throw new \RuntimeException('The pcntl extension is required to handle signals.'); } Logger::info('Starting queue consumer...'); \pcntl_signal(SIGINT, [self::class, 'handleSignal']); \pcntl_signal(SIGTERM, [self::class, 'handleSignal']); \pcntl_signal(SIGHUP, [self::class, 'handleSignal']); while (true) { if ($this->shouldShutDown()) { Logger::info('Shutting down queue consumer...'); break; } /** @var Queue $queue */ foreach ($this->queues as $queue) { Logger::info('Listening to queue: ' . $queue->queueName()); $message = $queue->pop(); if ($message) { Logger::info('Processing message of type: ' . get_class($message)); $handlers = $this->getHandlerForMessage($message); foreach ($handlers as $handler) { $handler($message); } } } sleep(1); } } private function getHandlerForMessage($message): array { $callables = []; $messageClass = get_class($message); if (isset($this->handlers[$messageClass])) { $handlerClasses = $this->handlers[$messageClass]; foreach ($handlerClasses as $handlerClass) { if (class_exists($handlerClass)) { $handlerInstance = new $handlerClass(); $callables[] = $handlerInstance; } } return $callables; } throw new \RuntimeException("No handler found for message class: $messageClass"); } }