client = \Siteworxpro\App\Services\Facades\Redis::getFacadeRoot(); $this->consumerId = php_uname('n') . ':' . getmypid(); $this->consumerGroup = $config['consumerGroup'] ?? 'default'; } private function ensureQueue(string $queueName): void { if (in_array($queueName, $this->queueNames, true)) { return; } try { $this->client->executeCommand( new RawCommand( 'XGROUP', [ 'CREATE', self::QUEUE_PREFIX . $queueName, self::CONSUMER_ID_PREFIX . $this->consumerGroup, '$', 'MKSTREAM' ] ) ); } catch (\Exception) { // If the group already exists, we catch the exception and ignore it // This is because Redis will throw an error if the group already exists // We can safely ignore this error as it means the group is already set up } $this->client->executeCommand( new RawCommand( 'XGROUP', [ 'CREATECONSUMER', self::QUEUE_PREFIX . $queueName, self::CONSUMER_ID_PREFIX . $this->consumerGroup, $this->consumerId ] ) ); $this->queueNames[] = $queueName; } public function __destruct() { foreach ($this->queueNames as $queueName) { try { $this->client->executeCommand( new RawCommand( 'XGROUP', [ 'DELCONSUMER', self::QUEUE_PREFIX . $queueName, self::CONSUMER_ID_PREFIX . $this->consumerGroup, $this->consumerId ] ) ); } catch (\Exception) { // Ignore exceptions during cleanup } } } /** * @throws \Exception */ public function publish(Queue $queue, Message $message, ?int $delay = null): void { $command = '%s * data %s'; $command = sprintf( $command, self::QUEUE_PREFIX . $queue->queueName(), base64_encode($message->serialize()) ); /** @var string $result */ $result = $this ->client ->executeCommand( new RawCommand('XADD', explode(' ', $command)), ); $message->setId($result); } public function consume(Queue $queue): Message|null { $this->ensureQueue($queue->queueName()); $command = 'GROUP %s %s COUNT 1 STREAMS %s >'; $command = sprintf( $command, self::CONSUMER_ID_PREFIX . $this->consumerGroup, $this->consumerId, self::QUEUE_PREFIX . $queue->queueName(), ); /** @var array | null $response */ $response = $this ->client ->executeCommand( new RawCommand( 'XREADGROUP', explode(' ', $command) ) ); if ($response === null || !isset($response[0][1][0][1][1])) { return null; } $messageData = base64_decode($response[0][1][0][1][1]); $messageId = $response[0][1][0][0]; if ($messageData === 'NOOP') { // If the message is a NOOP, we return null to indicate no actual message return null; } $value = unserialize($messageData, ['allowed_classes' => true]); if (!$value instanceof Message) { return null; } $value->setId($messageId); return $value; } public function acknowledge(Queue $queue, Message $message): void { $response = $this ->client ->executeCommand( new RawCommand( 'XACK', [ self::QUEUE_PREFIX . $queue->queueName(), self::CONSUMER_ID_PREFIX . $this->consumerGroup, $message->getId() ] ) ); } public function reject(Queue $queue, Message $message, bool $requeue = false): void { // TODO: Implement reject() method. } public function purge(Queue $queue): void { // TODO: Implement purge() method. } }