You've already forked Php-Template
chore: added documentation (#16)
All checks were successful
🧪✨ Tests Workflow / 🛡️ 🔒 Library Audit (push) Successful in 1m42s
🧪✨ Tests Workflow / 📝 ✨ Code Lint (push) Successful in 1m38s
🧪✨ Tests Workflow / 🛡️ 🔒 License Check (push) Successful in 2m5s
🧪✨ Tests Workflow / 🧪 ✨ Database Migrations (push) Successful in 2m44s
🧪✨ Tests Workflow / 🐙 🔍 Code Sniffer (push) Successful in 2m28s
🧪✨ Tests Workflow / 🧪 ✅ Unit Tests (push) Successful in 1m27s
All checks were successful
🧪✨ Tests Workflow / 🛡️ 🔒 Library Audit (push) Successful in 1m42s
🧪✨ Tests Workflow / 📝 ✨ Code Lint (push) Successful in 1m38s
🧪✨ Tests Workflow / 🛡️ 🔒 License Check (push) Successful in 2m5s
🧪✨ Tests Workflow / 🧪 ✨ Database Migrations (push) Successful in 2m44s
🧪✨ Tests Workflow / 🐙 🔍 Code Sniffer (push) Successful in 2m28s
🧪✨ Tests Workflow / 🧪 ✅ Unit Tests (push) Successful in 1m27s
Reviewed-on: #16 Co-authored-by: Ron Rise <ron@siteworxpro.com> Co-committed-by: Ron Rise <ron@siteworxpro.com>
This commit was merged in pull request #16.
This commit is contained in:
@@ -60,6 +60,13 @@ class Kafka extends Broker
|
||||
return null;
|
||||
}
|
||||
|
||||
if ($kafkaMessage->err === RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART) {
|
||||
throw new \RuntimeException(
|
||||
"Topic '{$queue->queueName()}' or partition does not exist. Kafka does not auto-create topics" .
|
||||
" unless configured to do so."
|
||||
);
|
||||
}
|
||||
|
||||
/** @var string | null $messageData */
|
||||
$messageData = $kafkaMessage->payload;
|
||||
if ($messageData !== null) {
|
||||
|
||||
@@ -5,87 +5,96 @@ declare(ticks=1);
|
||||
namespace Siteworxpro\App\Async;
|
||||
|
||||
use Siteworxpro\App\Annotations\Async\HandlesMessage;
|
||||
use Siteworxpro\App\Async\Messages\Message;
|
||||
use Siteworxpro\App\Async\Queues\Queue;
|
||||
use Siteworxpro\App\Services\Facades\Broker;
|
||||
use Siteworxpro\App\Services\Facades\Logger;
|
||||
|
||||
/**
|
||||
* Long-running process that listens to queues, pops messages, and dispatches them to handlers.
|
||||
*/
|
||||
class Consumer
|
||||
{
|
||||
private static bool $shutDown = false;
|
||||
|
||||
/** @var array<string,string> */
|
||||
private const array QUEUES = [
|
||||
'default' => Queues\DefaultQueue::class,
|
||||
];
|
||||
|
||||
/** @var Queue[] */
|
||||
private array $queues = [];
|
||||
|
||||
/** @var array<string, string[]> message FQCN => handler FQCNs */
|
||||
private array $handlers = [];
|
||||
|
||||
private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\';
|
||||
|
||||
/**
|
||||
* @param string[] $queues Optional list of queue names (keys from self::QUEUES)
|
||||
*/
|
||||
public function __construct(array $queues = [])
|
||||
{
|
||||
if ($queues === []) {
|
||||
$queues = self::QUEUES;
|
||||
} else {
|
||||
$mappedQueues = [];
|
||||
foreach ($queues as $queueName) {
|
||||
if (isset(self::QUEUES[$queueName])) {
|
||||
$mappedQueues[] = self::QUEUES[$queueName];
|
||||
} else {
|
||||
throw new \InvalidArgumentException("Queue '$queueName' is not defined.");
|
||||
}
|
||||
}
|
||||
$queues = $mappedQueues;
|
||||
}
|
||||
$queueClasses = $queues === []
|
||||
? array_values(self::QUEUES)
|
||||
: array_map(
|
||||
static function (string $name): string {
|
||||
if (!isset(self::QUEUES[$name])) {
|
||||
throw new \InvalidArgumentException("Queue '$name' is not defined.");
|
||||
}
|
||||
return self::QUEUES[$name];
|
||||
},
|
||||
$queues
|
||||
);
|
||||
|
||||
|
||||
foreach ($queues as $queueClass) {
|
||||
$this->queues[] = new $queueClass();
|
||||
foreach ($queueClasses as $class) {
|
||||
$this->queues[] = new $class();
|
||||
}
|
||||
|
||||
$this->registerHandlers();
|
||||
}
|
||||
|
||||
/**
|
||||
* Discover handler classes under `Handlers` and register them via HandlesMessage attributes.
|
||||
*/
|
||||
private function registerHandlers(): void
|
||||
{
|
||||
$recursiveIterator = new \RecursiveIteratorIterator(
|
||||
$it = new \RecursiveIteratorIterator(
|
||||
new \RecursiveDirectoryIterator(__DIR__ . '/Handlers/')
|
||||
);
|
||||
|
||||
foreach ($recursiveIterator as $file) {
|
||||
if ($file->isFile() && $file->getExtension() === 'php') {
|
||||
$relativePath = str_replace(__DIR__ . '/Handlers/', '', $file->getPathname());
|
||||
$className = self::HANDLER_NAMESPACE . str_replace('/', '\\', substr($relativePath, 0, -4));
|
||||
/** @var \SplFileInfo $file */
|
||||
foreach ($it as $file) {
|
||||
if (!$file->isFile() || $file->getExtension() !== 'php') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$relative = str_replace(__DIR__ . '/Handlers/', '', $file->getPathname());
|
||||
$class = self::HANDLER_NAMESPACE . str_replace('/', '\\', substr($relative, 0, -4));
|
||||
|
||||
if (class_exists($className)) {
|
||||
$reflection = new \ReflectionClass($className);
|
||||
$attributes = $reflection->getAttributes(HandlesMessage::class);
|
||||
foreach ($attributes as $attribute) {
|
||||
$instance = $attribute->newInstance();
|
||||
$messageClass = $instance->getMessageClass();
|
||||
$this->handlers[$messageClass][] = $className;
|
||||
}
|
||||
}
|
||||
if (!class_exists($class)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$ref = new \ReflectionClass($class);
|
||||
foreach ($ref->getAttributes(HandlesMessage::class) as $attr) {
|
||||
$messageClass = $attr->newInstance()->getMessageClass();
|
||||
$this->handlers[$messageClass][] = $class;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param $signal
|
||||
* Signal handler used to initiate graceful or immediate shutdown.
|
||||
*/
|
||||
public static function handleSignal($signal): void
|
||||
public static function handleSignal(int $signal): void
|
||||
{
|
||||
switch ($signal) {
|
||||
// Graceful
|
||||
case SIGINT:
|
||||
case SIGTERM:
|
||||
case SIGHUP:
|
||||
self::$shutDown = true;
|
||||
|
||||
break;
|
||||
|
||||
// Not Graceful
|
||||
return;
|
||||
case SIGKILL:
|
||||
exit(9);
|
||||
}
|
||||
@@ -96,6 +105,9 @@ class Consumer
|
||||
return self::$shutDown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the consumer main loop.
|
||||
*/
|
||||
public function start(): void
|
||||
{
|
||||
if (!\function_exists('pcntl_signal')) {
|
||||
@@ -103,10 +115,11 @@ class Consumer
|
||||
}
|
||||
|
||||
Logger::info('Starting queue consumer...');
|
||||
Logger::info('Using Broker: ' . Broker::getFacadeRoot()::class);
|
||||
|
||||
\pcntl_signal(SIGINT, [self::class, 'handleSignal']);
|
||||
\pcntl_signal(SIGTERM, [self::class, 'handleSignal']);
|
||||
\pcntl_signal(SIGHUP, [self::class, 'handleSignal']);
|
||||
foreach ([SIGINT, SIGTERM, SIGHUP] as $sig) {
|
||||
\pcntl_signal($sig, [self::class, 'handleSignal']);
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if ($this->shouldShutDown()) {
|
||||
@@ -118,40 +131,43 @@ class Consumer
|
||||
foreach ($this->queues as $queue) {
|
||||
Logger::info('Listening to queue: ' . $queue->queueName());
|
||||
$message = $queue->pop();
|
||||
if ($message) {
|
||||
Logger::info('Processing message of type: ' . get_class($message));
|
||||
|
||||
$handlers = $this->getHandlerForMessage($message);
|
||||
|
||||
foreach ($handlers as $handler) {
|
||||
$handler($message);
|
||||
}
|
||||
if (!$message) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Logger::info('Processing message of type: ' . get_class($message));
|
||||
|
||||
foreach ($this->getHandlersForMessage($message) as $handler) {
|
||||
$handler($message);
|
||||
}
|
||||
|
||||
// Continue polling from the top of the loop after processing a message.
|
||||
continue 2;
|
||||
}
|
||||
|
||||
// Avoid busy-looping when no messages are available.
|
||||
sleep(1);
|
||||
}
|
||||
}
|
||||
|
||||
private function getHandlerForMessage($message): array
|
||||
/**
|
||||
* @return callable[] Handler instances invokable with the message
|
||||
*/
|
||||
private function getHandlersForMessage(Message $message): array
|
||||
{
|
||||
$callables = [];
|
||||
|
||||
$messageClass = get_class($message);
|
||||
if (isset($this->handlers[$messageClass])) {
|
||||
$handlerClasses = $this->handlers[$messageClass];
|
||||
|
||||
foreach ($handlerClasses as $handlerClass) {
|
||||
if (class_exists($handlerClass)) {
|
||||
$handlerInstance = new $handlerClass();
|
||||
|
||||
$callables[] = $handlerInstance;
|
||||
}
|
||||
}
|
||||
|
||||
return $callables;
|
||||
if (!isset($this->handlers[$messageClass])) {
|
||||
throw new \RuntimeException("No handler found for message class: $messageClass");
|
||||
}
|
||||
|
||||
throw new \RuntimeException("No handler found for message class: $messageClass");
|
||||
$callables = [];
|
||||
foreach ($this->handlers[$messageClass] as $handlerClass) {
|
||||
if (class_exists($handlerClass)) {
|
||||
$callables[] = new $handlerClass();
|
||||
}
|
||||
}
|
||||
|
||||
return $callables;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user