feat: implement queue system with consumer and message handling #14

Merged
rrise merged 2 commits from feat/queue into master 2025-11-12 12:00:34 +00:00
7 changed files with 9 additions and 12 deletions
Showing only changes of commit c73234117b - Show all commits

View File

@@ -16,7 +16,7 @@ class Kafka extends Broker
public function consume(Queue $queue): Message | null public function consume(Queue $queue): Message | null
{ {
// TODO: Implement consume() method. return null;
} }
public function acknowledge(Queue $queue, Message $message): void public function acknowledge(Queue $queue, Message $message): void

View File

@@ -16,7 +16,7 @@ class RabbitMQ extends Broker
public function consume(Queue $queue): Message | null public function consume(Queue $queue): Message | null
{ {
// TODO: Implement consume() method. return null;
} }
public function acknowledge(Queue $queue, Message $message): void public function acknowledge(Queue $queue, Message $message): void

View File

@@ -151,11 +151,14 @@ class Redis extends Broker
return null; return null;
} }
/** @var Message $value */
$value = unserialize($messageData, ['allowed_classes' => true]); $value = unserialize($messageData, ['allowed_classes' => true]);
if (!$value instanceof Message) {
return null;
}
$value->setId($messageId); $value->setId($messageId);
return $value instanceof Message ? $value : null; return $value;
} }
public function acknowledge(Queue $queue, Message $message): void public function acknowledge(Queue $queue, Message $message): void

View File

@@ -16,7 +16,7 @@ class Sqs extends Broker
public function consume(Queue $queue): Message | null public function consume(Queue $queue): Message | null
{ {
// TODO: Implement consume() method. return null;
} }
public function acknowledge(Queue $queue, Message $message): void public function acknowledge(Queue $queue, Message $message): void

View File

@@ -24,7 +24,6 @@ class Consumer
public function __construct() public function __construct()
{ {
/** @var Queue $queueClass */
foreach (self::QUEUES as $queueClass) { foreach (self::QUEUES as $queueClass) {
$this->queues[] = new $queueClass(); $this->queues[] = new $queueClass();
} }

View File

@@ -13,7 +13,6 @@ use Spiral\Goridge\RPC\RPC;
class Logger implements LoggerInterface class Logger implements LoggerInterface
{ {
private ?RRLogger $rpcLogger = null; private ?RRLogger $rpcLogger = null;
private \Monolog\Logger $monologLogger; private \Monolog\Logger $monologLogger;

View File

@@ -19,11 +19,7 @@ class BrokerServiceProvider extends ServiceProvider
$brokerClass = Broker::BROKER_TYPES[$configName] ?? null; $brokerClass = Broker::BROKER_TYPES[$configName] ?? null;
if ($brokerClass && class_exists($brokerClass)) { if ($brokerClass && class_exists($brokerClass)) {
$class = new $brokerClass($brokerConfig); return new $brokerClass($brokerConfig);
if ($class instanceof Broker) {
return $class;
}
} }
throw new \RuntimeException("Broker class $brokerClass does not exist."); throw new \RuntimeException("Broker class $brokerClass does not exist.");