From 6dd32884cf449629a112152acc714e9b36337026 Mon Sep 17 00:00:00 2001 From: Ron Rise Date: Wed, 12 Nov 2025 00:16:17 -0500 Subject: [PATCH] feat: implement queue system with consumer and message handling --- Dockerfile | 2 +- composer.json | 4 +- composer.lock | 151 +++++++++++++- config.php | 32 +++ docker-compose.yml | 2 + src/Annotations/Async/HandlesMessage.php | 24 +++ src/Async/Brokers/Broker.php | 19 ++ src/Async/Brokers/BrokerInterface.php | 21 ++ src/Async/Brokers/Kafka.php | 36 ++++ src/Async/Brokers/RabbitMQ.php | 36 ++++ src/Async/Brokers/Redis.php | 187 ++++++++++++++++++ src/Async/Brokers/Sqs.php | 36 ++++ src/Async/Consumer.php | 143 ++++++++++++++ src/Async/Handlers/HandlerInterface.php | 12 ++ src/Async/Handlers/SayHelloHandler.php | 21 ++ src/Async/Messages/Message.php | 102 ++++++++++ src/Async/Messages/SayHelloMessage.php | 41 ++++ src/Async/Queues/DefaultQueue.php | 13 ++ src/Async/Queues/Queue.php | 28 +++ src/Cli/App.php | 2 + src/Cli/Commands/Queue/Start.php | 28 +++ src/Helpers/Ulid.php | 13 ++ src/Kernel.php | 2 + src/Log/Logger.php | 118 +++++++++++ src/Services/Facades/Broker.php | 29 +++ src/Services/Facades/Logger.php | 14 +- .../BrokerServiceProvider.php | 32 +++ .../LoggerServiceProvider.php | 8 +- 28 files changed, 1142 insertions(+), 14 deletions(-) create mode 100644 src/Annotations/Async/HandlesMessage.php create mode 100644 src/Async/Brokers/Broker.php create mode 100644 src/Async/Brokers/BrokerInterface.php create mode 100644 src/Async/Brokers/Kafka.php create mode 100644 src/Async/Brokers/RabbitMQ.php create mode 100644 src/Async/Brokers/Redis.php create mode 100644 src/Async/Brokers/Sqs.php create mode 100644 src/Async/Consumer.php create mode 100644 src/Async/Handlers/HandlerInterface.php create mode 100644 src/Async/Handlers/SayHelloHandler.php create mode 100644 src/Async/Messages/Message.php create mode 100644 src/Async/Messages/SayHelloMessage.php create mode 100644 src/Async/Queues/DefaultQueue.php create mode 100644 src/Async/Queues/Queue.php create mode 100644 src/Cli/Commands/Queue/Start.php create mode 100644 src/Helpers/Ulid.php create mode 100644 src/Log/Logger.php create mode 100644 src/Services/Facades/Broker.php create mode 100644 src/Services/ServiceProviders/BrokerServiceProvider.php diff --git a/Dockerfile b/Dockerfile index ae5d98a..f7ae416 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ FROM php:8.4.14-alpine AS php # Move the production PHP configuration file to the default location RUN mv /usr/local/etc/php/php.ini-production /usr/local/etc/php/php.ini \ && apk add libpq-dev linux-headers --no-cache \ - && docker-php-ext-install pdo_pgsql sockets \ + && docker-php-ext-install pdo_pgsql sockets pcntl \ && rm -rf /var/cache/apk/* # Set the working directory to /app diff --git a/composer.json b/composer.json index d768687..c4c4e89 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,9 @@ "predis/predis": "^v3.2.0", "siteworxpro/http-status": "0.0.2", "lcobucci/jwt": "^5.6", - "adhocore/cli": "^1.9" + "adhocore/cli": "^1.9", + "robinvdvleuten/ulid": "^5.0", + "monolog/monolog": "^3.9" }, "require-dev": { "phpunit/phpunit": "^12.4", diff --git a/composer.lock b/composer.lock index 450b663..f5f9c5a 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "f7dc2e6131715ed6eec2d9f851949b80", + "content-hash": "f920b7224ee908f6a4270f200dbbca3a", "packages": [ { "name": "adhocore/cli", @@ -976,6 +976,109 @@ ], "time": "2024-11-25T08:10:15+00:00" }, + { + "name": "monolog/monolog", + "version": "3.9.0", + "source": { + "type": "git", + "url": "https://github.com/Seldaek/monolog.git", + "reference": "10d85740180ecba7896c87e06a166e0c95a0e3b6" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/Seldaek/monolog/zipball/10d85740180ecba7896c87e06a166e0c95a0e3b6", + "reference": "10d85740180ecba7896c87e06a166e0c95a0e3b6", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "psr/log": "^2.0 || ^3.0" + }, + "provide": { + "psr/log-implementation": "3.0.0" + }, + "require-dev": { + "aws/aws-sdk-php": "^3.0", + "doctrine/couchdb": "~1.0@dev", + "elasticsearch/elasticsearch": "^7 || ^8", + "ext-json": "*", + "graylog2/gelf-php": "^1.4.2 || ^2.0", + "guzzlehttp/guzzle": "^7.4.5", + "guzzlehttp/psr7": "^2.2", + "mongodb/mongodb": "^1.8", + "php-amqplib/php-amqplib": "~2.4 || ^3", + "php-console/php-console": "^3.1.8", + "phpstan/phpstan": "^2", + "phpstan/phpstan-deprecation-rules": "^2", + "phpstan/phpstan-strict-rules": "^2", + "phpunit/phpunit": "^10.5.17 || ^11.0.7", + "predis/predis": "^1.1 || ^2", + "rollbar/rollbar": "^4.0", + "ruflin/elastica": "^7 || ^8", + "symfony/mailer": "^5.4 || ^6", + "symfony/mime": "^5.4 || ^6" + }, + "suggest": { + "aws/aws-sdk-php": "Allow sending log messages to AWS services like DynamoDB", + "doctrine/couchdb": "Allow sending log messages to a CouchDB server", + "elasticsearch/elasticsearch": "Allow sending log messages to an Elasticsearch server via official client", + "ext-amqp": "Allow sending log messages to an AMQP server (1.0+ required)", + "ext-curl": "Required to send log messages using the IFTTTHandler, the LogglyHandler, the SendGridHandler, the SlackWebhookHandler or the TelegramBotHandler", + "ext-mbstring": "Allow to work properly with unicode symbols", + "ext-mongodb": "Allow sending log messages to a MongoDB server (via driver)", + "ext-openssl": "Required to send log messages using SSL", + "ext-sockets": "Allow sending log messages to a Syslog server (via UDP driver)", + "graylog2/gelf-php": "Allow sending log messages to a GrayLog2 server", + "mongodb/mongodb": "Allow sending log messages to a MongoDB server (via library)", + "php-amqplib/php-amqplib": "Allow sending log messages to an AMQP server using php-amqplib", + "rollbar/rollbar": "Allow sending log messages to Rollbar", + "ruflin/elastica": "Allow sending log messages to an Elastic Search server" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-main": "3.x-dev" + } + }, + "autoload": { + "psr-4": { + "Monolog\\": "src/Monolog" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Jordi Boggiano", + "email": "j.boggiano@seld.be", + "homepage": "https://seld.be" + } + ], + "description": "Sends your logs to files, sockets, inboxes, databases and various web services", + "homepage": "https://github.com/Seldaek/monolog", + "keywords": [ + "log", + "logging", + "psr-3" + ], + "support": { + "issues": "https://github.com/Seldaek/monolog/issues", + "source": "https://github.com/Seldaek/monolog/tree/3.9.0" + }, + "funding": [ + { + "url": "https://github.com/Seldaek", + "type": "github" + }, + { + "url": "https://tidelift.com/funding/github/packagist/monolog/monolog", + "type": "tidelift" + } + ], + "time": "2025-03-24T10:02:05+00:00" + }, { "name": "nesbot/carbon", "version": "3.10.3", @@ -1819,6 +1922,52 @@ ], "time": "2025-08-12T14:04:38+00:00" }, + { + "name": "robinvdvleuten/ulid", + "version": "v5.0.0", + "source": { + "type": "git", + "url": "https://github.com/robinvdvleuten/php-ulid.git", + "reference": "5389c9a2ff020815cc1f2b840334fdcb84ae3f35" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/robinvdvleuten/php-ulid/zipball/5389c9a2ff020815cc1f2b840334fdcb84ae3f35", + "reference": "5389c9a2ff020815cc1f2b840334fdcb84ae3f35", + "shasum": "" + }, + "require": { + "php": "^7.2|^8.0" + }, + "require-dev": { + "phpbench/phpbench": "^1.0.0-alpha3", + "phpunit/phpunit": "^8.5", + "symfony/phpunit-bridge": "^5.1" + }, + "type": "library", + "autoload": { + "psr-4": { + "Ulid\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Robin van der Vleuten", + "email": "robin@webstronauts.co" + } + ], + "description": "Universally Unique Lexicographically Sortable Identifier (ULID) implementation for PHP.", + "homepage": "https://github.com/robinvdvleuten/php-ulid", + "support": { + "issues": "https://github.com/robinvdvleuten/php-ulid/issues", + "source": "https://github.com/robinvdvleuten/php-ulid/tree/v5.0.0" + }, + "time": "2020-12-06T19:13:21+00:00" + }, { "name": "siteworxpro/config", "version": "1.1.1", diff --git a/config.php b/config.php index 5717aea..5ba6ef3 100644 --- a/config.php +++ b/config.php @@ -48,5 +48,37 @@ return [ 'audience' => Env::get('JWT_AUDIENCE', 'my_audience'), 'issuer' => Env::get('JWT_ISSUER', 'my_issuer'), 'strict_validation' => Env::get('JWT_STRICT_VALIDATION', true, 'bool'), + ], + + 'queue' => [ + 'broker' => Env::get('QUEUE_BROKER', 'redis'), + + 'broker_config' => [ + + 'redis' => [ + 'consumerGroup' => Env::get('QUEUE_REDIS_CONSUMER_GROUP', ''), + ], + + 'kafka' => [ + 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'localhost:9092'), + 'topic' => Env::get('QUEUE_KAFKA_TOPIC', 'my_topic'), + ], + + 'rabbitmq' => [ + 'host' => Env::get('QUEUE_RABBITMQ_HOST', 'localhost'), + 'port' => Env::get('QUEUE_RABBITMQ_PORT', 5672, 'int'), + 'username' => Env::get('QUEUE_RABBITMQ_USERNAME', 'guest'), + 'password' => Env::get('QUEUE_RABBITMQ_PASSWORD', 'guest'), + 'vhost' => Env::get('QUEUE_RABBITMQ_VHOST', '/'), + ], + + 'sqs' => [ + 'key' => Env::get('QUEUE_SQS_KEY', ''), + 'secret' => Env::get('QUEUE_SQS_SECRET', ''), + 'region' => Env::get('QUEUE_SQS_REGION', 'us-east-1'), + 'version' => Env::get('QUEUE_SQS_VERSION', 'latest'), + 'queue_url' => Env::get('QUEUE_SQS_QUEUE_URL', ''), + ] + ] ] ]; diff --git a/docker-compose.yml b/docker-compose.yml index 544b7c7..c02a850 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -72,6 +72,8 @@ services: dockerfile: Dockerfile entrypoint: "/bin/sh -c 'while true; do sleep 30; done;'" depends_on: + migration-container: + condition: service_completed_successfully traefik: condition: service_healthy redis: diff --git a/src/Annotations/Async/HandlesMessage.php b/src/Annotations/Async/HandlesMessage.php new file mode 100644 index 0000000..9717932 --- /dev/null +++ b/src/Annotations/Async/HandlesMessage.php @@ -0,0 +1,24 @@ +messageClass; + } +} diff --git a/src/Async/Brokers/Broker.php b/src/Async/Brokers/Broker.php new file mode 100644 index 0000000..ff8586e --- /dev/null +++ b/src/Async/Brokers/Broker.php @@ -0,0 +1,19 @@ + Redis::class, + 'rabbitmq' => RabbitMQ::class, + 'kafka' => Kafka::class, + 'sqs' => Sqs::class, + ]; + + public function __construct(protected $config = []) + { + } +} diff --git a/src/Async/Brokers/BrokerInterface.php b/src/Async/Brokers/BrokerInterface.php new file mode 100644 index 0000000..6e27300 --- /dev/null +++ b/src/Async/Brokers/BrokerInterface.php @@ -0,0 +1,21 @@ +client = \Siteworxpro\App\Services\Facades\Redis::getFacadeRoot(); + $this->consumerId = php_uname('n') . ':' . getmypid(); + $this->consumerGroup = $config['consumerGroup'] ?? 'default'; + } + + private function ensureQueue(string $queueName): void + { + if (in_array($queueName, $this->queueNames, true)) { + return; + } + + try { + $this->client->executeCommand( + new RawCommand( + 'XGROUP', + [ + 'CREATE', + self::QUEUE_PREFIX . $queueName, + self::CONSUMER_ID_PREFIX . $this->consumerGroup, + '$', + 'MKSTREAM' + ] + ) + ); + } catch (\Exception) { + // If the group already exists, we catch the exception and ignore it + // This is because Redis will throw an error if the group already exists + // We can safely ignore this error as it means the group is already set up + } + + $this->client->executeCommand( + new RawCommand( + 'XGROUP', + [ + 'CREATECONSUMER', + self::QUEUE_PREFIX . $queueName, + self::CONSUMER_ID_PREFIX . $this->consumerGroup, + $this->consumerId + ] + ) + ); + + $this->queueNames[] = $queueName; + } + + public function __destruct() + { + foreach ($this->queueNames as $queueName) { + try { + $this->client->executeCommand( + new RawCommand( + 'XGROUP', + [ + 'DELCONSUMER', + self::QUEUE_PREFIX . $queueName, + self::CONSUMER_ID_PREFIX . $this->consumerGroup, + $this->consumerId + ] + ) + ); + } catch (\Exception) { + // Ignore exceptions during cleanup + } + } + } + + /** + * @throws \Exception + */ + public function publish(Queue $queue, Message $message, ?int $delay = null): void + { + $command = '%s * data %s'; + $command = sprintf( + $command, + self::QUEUE_PREFIX . + $queue->queueName(), + base64_encode($message->serialize()) + ); + + /** @var string $result */ + $result = $this + ->client + ->executeCommand( + new RawCommand('XADD', explode(' ', $command)), + ); + + $message->setId($result); + } + + public function consume(Queue $queue): Message|null + { + $this->ensureQueue($queue->queueName()); + + $command = 'GROUP %s %s COUNT 1 STREAMS %s >'; + $command = sprintf( + $command, + self::CONSUMER_ID_PREFIX . $this->consumerGroup, + $this->consumerId, + self::QUEUE_PREFIX . $queue->queueName(), + ); + + /** @var array | null $response */ + $response = $this + ->client + ->executeCommand( + new RawCommand( + 'XREADGROUP', + explode(' ', $command) + ) + ); + + if ($response === null || !isset($response[0][1][0][1][1])) { + return null; + } + + $messageData = base64_decode($response[0][1][0][1][1]); + $messageId = $response[0][1][0][0]; + + if ($messageData === 'NOOP') { + // If the message is a NOOP, we return null to indicate no actual message + return null; + } + + /** @var Message $value */ + $value = unserialize($messageData, ['allowed_classes' => true]); + $value->setId($messageId); + + return $value instanceof Message ? $value : null; + } + + public function acknowledge(Queue $queue, Message $message): void + { + $response = $this + ->client + ->executeCommand( + new RawCommand( + 'XACK', + [ + self::QUEUE_PREFIX . $queue->queueName(), + self::CONSUMER_ID_PREFIX . $this->consumerGroup, + $message->getId() + ] + ) + ); + } + + public function reject(Queue $queue, Message $message, bool $requeue = false): void + { + // TODO: Implement reject() method. + } + + public function purge(Queue $queue): void + { + + // TODO: Implement purge() method. + } +} diff --git a/src/Async/Brokers/Sqs.php b/src/Async/Brokers/Sqs.php new file mode 100644 index 0000000..67b437c --- /dev/null +++ b/src/Async/Brokers/Sqs.php @@ -0,0 +1,36 @@ +queues[] = new $queueClass(); + } + + $this->registerHandlers(); + } + + private function registerHandlers(): void + { + $recursiveIterator = new \RecursiveIteratorIterator( + new \RecursiveDirectoryIterator(__DIR__ . '/Handlers/') + ); + + foreach ($recursiveIterator as $file) { + if ($file->isFile() && $file->getExtension() === 'php') { + $relativePath = str_replace(__DIR__ . '/Handlers/', '', $file->getPathname()); + $className = self::HANDLER_NAMESPACE . str_replace('/', '\\', substr($relativePath, 0, -4)); + + + if (class_exists($className)) { + $reflection = new \ReflectionClass($className); + $attributes = $reflection->getAttributes(HandlesMessage::class); + foreach ($attributes as $attribute) { + $instance = $attribute->newInstance(); + $messageClass = $instance->getMessageClass(); + $this->handlers[$messageClass][] = $className; + } + } + } + } + } + + /** + * @param $signal + */ + public static function handleSignal($signal): void + { + switch ($signal) { + // Graceful + case SIGINT: + case SIGTERM: + case SIGHUP: + self::$shutDown = true; + + break; + + // Not Graceful + case SIGKILL: + exit(9); + } + } + + private function shouldShutDown(): bool + { + return self::$shutDown; + } + + public function start(): void + { + if (!\function_exists('pcntl_signal')) { + throw new \RuntimeException('The pcntl extension is required to handle signals.'); + } + + Logger::info('Starting queue consumer...'); + + \pcntl_signal(SIGINT, [self::class, 'handleSignal']); + \pcntl_signal(SIGTERM, [self::class, 'handleSignal']); + \pcntl_signal(SIGHUP, [self::class, 'handleSignal']); + + while (true) { + if ($this->shouldShutDown()) { + Logger::info('Shutting down queue consumer...'); + break; + } + + /** @var Queue $queue */ + foreach ($this->queues as $queue) { + Logger::info('Listening to queue: ' . $queue->queueName()); + $message = $queue->pop(); + if ($message) { + Logger::info('Processing message of type: ' . get_class($message)); + + $handlers = $this->getHandlerForMessage($message); + + foreach ($handlers as $handler) { + $handler($message); + } + } + } + + sleep(1); + } + } + + private function getHandlerForMessage($message): array + { + $callables = []; + + $messageClass = get_class($message); + if (isset($this->handlers[$messageClass])) { + $handlerClasses = $this->handlers[$messageClass]; + + foreach ($handlerClasses as $handlerClass) { + if (class_exists($handlerClass)) { + $handlerInstance = new $handlerClass(); + + $callables[] = $handlerInstance; + } + } + + return $callables; + } + + throw new \RuntimeException("No handler found for message class: $messageClass"); + } +} diff --git a/src/Async/Handlers/HandlerInterface.php b/src/Async/Handlers/HandlerInterface.php new file mode 100644 index 0000000..a649af4 --- /dev/null +++ b/src/Async/Handlers/HandlerInterface.php @@ -0,0 +1,12 @@ +getPayload()['name'] ?? 'Guest'; + + Logger::info(sprintf("Hello, %s!", $name)); + } +} diff --git a/src/Async/Messages/Message.php b/src/Async/Messages/Message.php new file mode 100644 index 0000000..c099a5c --- /dev/null +++ b/src/Async/Messages/Message.php @@ -0,0 +1,102 @@ +uniqueId = Ulid::generate(); + $this->timestamp = time(); + } + + protected function getQueue(): Queue + { + if ($this->queue === '') { + $this->queue = static::DEFAULT_QUEUE; + } + + return new $this->queue(); + } + + public function getId(): string + { + return $this->id; + } + + /** + * @param string $id + */ + public function setId(string $id): void + { + $this->id = $id; + } + + public function getPayload(): array + { + return $this->payload; + } + + public function getTimestamp(): int + { + return $this->timestamp; + } + + public function __serialize(): array + { + return [ + 'id' => $this->id, + 'payload' => $this->payload, + 'timestamp' => $this->timestamp, + 'queue' => $this->queue, + ]; + } + + public function __unserialize(array $data): void + { + $this->id = $data['id']; + $this->payload = $data['payload']; + $this->timestamp = $data['timestamp']; + $this->queue = $data['queue']; + } + + public function serialize(): string + { + return serialize($this); + } + + public function unserialize(string $data): Message + { + $unserializedData = unserialize($data, ['allowed_classes' => [Message::class]]); + + $this->id = $unserializedData['id']; + $this->uniqueId = $unserializedData['uniqueId']; + $this->payload = $unserializedData['payload']; + $this->timestamp = $unserializedData['timestamp']; + $this->queue = $unserializedData['queue']; + + return $this; + } +} diff --git a/src/Async/Messages/SayHelloMessage.php b/src/Async/Messages/SayHelloMessage.php new file mode 100644 index 0000000..3bed371 --- /dev/null +++ b/src/Async/Messages/SayHelloMessage.php @@ -0,0 +1,41 @@ +getQueue(), + $message + ); + } + + public static function dispatchLater(int $delay, ...$args): void + { + $name = $args[0] ?? 'World'; + $message = new self($name); + Broker::publishLater( + $message->getQueue(), + $message, + $delay + ); + } + + private function __construct( + private readonly string $name + ) { + parent::__construct(); + + $this->payload = [ + 'name' => $this->name, + ]; + } +} diff --git a/src/Async/Queues/DefaultQueue.php b/src/Async/Queues/DefaultQueue.php new file mode 100644 index 0000000..8c0b4db --- /dev/null +++ b/src/Async/Queues/DefaultQueue.php @@ -0,0 +1,13 @@ +app = new Application('Php-Template', Config::get('app.version') ?? 'dev-master'); $this->app->add(new DemoCommand()); + $this->app->add(new Start()); } public function run(): int diff --git a/src/Cli/Commands/Queue/Start.php b/src/Cli/Commands/Queue/Start.php new file mode 100644 index 0000000..cc52e4d --- /dev/null +++ b/src/Cli/Commands/Queue/Start.php @@ -0,0 +1,28 @@ +argument('[name]', 'Your name') + ->option('-g, --greet', 'Include a greeting message'); + } + + public function execute(): int + { + $consumer = new Consumer(); + $consumer->start(); + + return 0; + } +} diff --git a/src/Helpers/Ulid.php b/src/Helpers/Ulid.php new file mode 100644 index 0000000..6f74ff4 --- /dev/null +++ b/src/Helpers/Ulid.php @@ -0,0 +1,13 @@ +getRandomness(); + } +} diff --git a/src/Kernel.php b/src/Kernel.php index e7cb9e5..8237102 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -9,6 +9,7 @@ use Siteworx\Config\Config as SWConfig; use Siteworxpro\App\Services\Facade; use Siteworxpro\App\Services\Facades\Config; use Siteworxpro\App\Services\Facades\Dispatcher; +use Siteworxpro\App\Services\ServiceProviders\BrokerServiceProvider; use Siteworxpro\App\Services\ServiceProviders\DispatcherServiceProvider; use Siteworxpro\App\Services\ServiceProviders\LoggerServiceProvider; use Siteworxpro\App\Services\ServiceProviders\RedisServiceProvider; @@ -19,6 +20,7 @@ class Kernel LoggerServiceProvider::class, RedisServiceProvider::class, DispatcherServiceProvider::class, + BrokerServiceProvider::class ]; /** diff --git a/src/Log/Logger.php b/src/Log/Logger.php new file mode 100644 index 0000000..916bdc2 --- /dev/null +++ b/src/Log/Logger.php @@ -0,0 +1,118 @@ + 0, + LogLevel::ALERT => 1, + LogLevel::CRITICAL => 2, + LogLevel::ERROR => 3, + LogLevel::WARNING => 4, + LogLevel::NOTICE => 5, + LogLevel::INFO => 6, + LogLevel::DEBUG => 7, + ]; + + public function __construct(private readonly string $level = LogLevel::DEBUG) + { + if (isset($_SERVER['RR_RPC'])) { + $rpc = RPC::create('tcp://127.0.0.1:6001'); + $this->rpcLogger = new RRLogger($rpc); + } + + $this->monologLogger = new \Monolog\Logger('app_logger'); + $formatter = new JsonFormatter(); + $this->monologLogger->pushHandler(new StreamHandler('php://stdout')->setFormatter($formatter)); + } + + public function emergency(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::EMERGENCY, $message, $context); + } + + public function alert(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::ALERT, $message, $context); + } + + public function critical(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::CRITICAL, $message, $context); + } + + public function error(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::ERROR, $message, $context); + } + + public function warning(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::WARNING, $message, $context); + } + + public function notice(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::NOTICE, $message, $context); + } + + public function info(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::INFO, $message, $context); + } + + public function debug(\Stringable|string $message, array $context = []): void + { + $this->log(LogLevel::DEBUG, $message, $context); + } + + public function log($level, \Stringable|string $message, array $context = []): void + { + if ($this->levels[$level] > $this->levels[$this->level]) { + return; + } + + if ($this->rpcLogger) { + switch ($level) { + case LogLevel::DEBUG: + $this->rpcLogger->debug((string)$message, $context); + break; + case LogLevel::NOTICE: + case LogLevel::INFO: + $this->rpcLogger->info((string)$message, $context); + break; + case LogLevel::WARNING: + $this->rpcLogger->warning((string)$message, $context); + break; + case LogLevel::CRITICAL: + case LogLevel::ERROR: + case LogLevel::ALERT: + case LogLevel::EMERGENCY: + $this->rpcLogger->error((string)$message, $context); + break; + default: + $this->rpcLogger->log((string)$message, $context); + break; + } + + return; + } + + $this->monologLogger->log($this->levels[$level], (string)$message, $context); + } +} diff --git a/src/Services/Facades/Broker.php b/src/Services/Facades/Broker.php new file mode 100644 index 0000000..822dfc7 --- /dev/null +++ b/src/Services/Facades/Broker.php @@ -0,0 +1,29 @@ +app->singleton(Broker::class, function (): Broker { + $configName = Config::get('queue.broker'); + $brokerConfig = Config::get('queue.' . $configName) ?? []; + + $brokerClass = Broker::BROKER_TYPES[$configName] ?? null; + + if ($brokerClass && class_exists($brokerClass)) { + $class = new $brokerClass($brokerConfig); + + if ($class instanceof Broker) { + return $class; + } + } + + throw new \RuntimeException("Broker class $brokerClass does not exist."); + }); + } +} diff --git a/src/Services/ServiceProviders/LoggerServiceProvider.php b/src/Services/ServiceProviders/LoggerServiceProvider.php index d179f38..768d9c3 100644 --- a/src/Services/ServiceProviders/LoggerServiceProvider.php +++ b/src/Services/ServiceProviders/LoggerServiceProvider.php @@ -5,16 +5,14 @@ declare(strict_types=1); namespace Siteworxpro\App\Services\ServiceProviders; use Illuminate\Support\ServiceProvider; -use RoadRunner\Logger\Logger as RRLogger; -use Spiral\Goridge\RPC\RPC; +use Siteworxpro\App\Log\Logger; class LoggerServiceProvider extends ServiceProvider { public function register(): void { - $this->app->singleton(RRLogger::class, function () { - $rpc = RPC::create('tcp://127.0.0.1:6001'); - return new RRLogger($rpc); + $this->app->singleton(Logger::class, function () { + return new Logger(); }); } }