You've already forked Php-Template
feat: implement queue system with consumer and message handling (#14)
All checks were successful
🧪✨ Tests Workflow / 🛡️ 🔒 License Check (push) Successful in 3m1s
🧪✨ Tests Workflow / 🧪 ✨ Database Migrations (push) Successful in 3m16s
🧪✨ Tests Workflow / 🛡️ 🔒 Library Audit (push) Successful in 3m13s
🧪✨ Tests Workflow / 📝 ✨ Code Lint (push) Successful in 3m5s
🧪✨ Tests Workflow / 🐙 🔍 Code Sniffer (push) Successful in 3m11s
🧪✨ Tests Workflow / 🧪 ✅ Unit Tests (push) Successful in 1m51s
All checks were successful
🧪✨ Tests Workflow / 🛡️ 🔒 License Check (push) Successful in 3m1s
🧪✨ Tests Workflow / 🧪 ✨ Database Migrations (push) Successful in 3m16s
🧪✨ Tests Workflow / 🛡️ 🔒 Library Audit (push) Successful in 3m13s
🧪✨ Tests Workflow / 📝 ✨ Code Lint (push) Successful in 3m5s
🧪✨ Tests Workflow / 🐙 🔍 Code Sniffer (push) Successful in 3m11s
🧪✨ Tests Workflow / 🧪 ✅ Unit Tests (push) Successful in 1m51s
Reviewed-on: #14 Co-authored-by: Ron Rise <ron@siteworxpro.com> Co-committed-by: Ron Rise <ron@siteworxpro.com>
This commit was merged in pull request #14.
This commit is contained in:
190
src/Async/Brokers/Redis.php
Normal file
190
src/Async/Brokers/Redis.php
Normal file
@@ -0,0 +1,190 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace Siteworxpro\App\Async\Brokers;
|
||||
|
||||
use Predis\Client;
|
||||
use Predis\Command\RawCommand;
|
||||
use Siteworxpro\App\Async\Messages\SayHelloMessage;
|
||||
use Siteworxpro\App\Async\Queues\Queue;
|
||||
use Siteworxpro\App\Async\Messages\Message;
|
||||
use Siteworxpro\App\Helpers\Ulid;
|
||||
|
||||
class Redis extends Broker
|
||||
{
|
||||
private Client $client;
|
||||
|
||||
private string $consumerId;
|
||||
|
||||
private string $consumerGroup;
|
||||
|
||||
private const string CONSUMER_ID_PREFIX = 'consumer-group:';
|
||||
private const string QUEUE_PREFIX = 'queue:';
|
||||
|
||||
private array $queueNames = [];
|
||||
|
||||
public function __construct($config = [])
|
||||
{
|
||||
parent::__construct($config);
|
||||
|
||||
$this->client = \Siteworxpro\App\Services\Facades\Redis::getFacadeRoot();
|
||||
$this->consumerId = php_uname('n') . ':' . getmypid();
|
||||
$this->consumerGroup = $config['consumerGroup'] ?? 'default';
|
||||
}
|
||||
|
||||
private function ensureQueue(string $queueName): void
|
||||
{
|
||||
if (in_array($queueName, $this->queueNames, true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
$this->client->executeCommand(
|
||||
new RawCommand(
|
||||
'XGROUP',
|
||||
[
|
||||
'CREATE',
|
||||
self::QUEUE_PREFIX . $queueName,
|
||||
self::CONSUMER_ID_PREFIX . $this->consumerGroup,
|
||||
'$',
|
||||
'MKSTREAM'
|
||||
]
|
||||
)
|
||||
);
|
||||
} catch (\Exception) {
|
||||
// If the group already exists, we catch the exception and ignore it
|
||||
// This is because Redis will throw an error if the group already exists
|
||||
// We can safely ignore this error as it means the group is already set up
|
||||
}
|
||||
|
||||
$this->client->executeCommand(
|
||||
new RawCommand(
|
||||
'XGROUP',
|
||||
[
|
||||
'CREATECONSUMER',
|
||||
self::QUEUE_PREFIX . $queueName,
|
||||
self::CONSUMER_ID_PREFIX . $this->consumerGroup,
|
||||
$this->consumerId
|
||||
]
|
||||
)
|
||||
);
|
||||
|
||||
$this->queueNames[] = $queueName;
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
foreach ($this->queueNames as $queueName) {
|
||||
try {
|
||||
$this->client->executeCommand(
|
||||
new RawCommand(
|
||||
'XGROUP',
|
||||
[
|
||||
'DELCONSUMER',
|
||||
self::QUEUE_PREFIX . $queueName,
|
||||
self::CONSUMER_ID_PREFIX . $this->consumerGroup,
|
||||
$this->consumerId
|
||||
]
|
||||
)
|
||||
);
|
||||
} catch (\Exception) {
|
||||
// Ignore exceptions during cleanup
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function publish(Queue $queue, Message $message, ?int $delay = null): void
|
||||
{
|
||||
$command = '%s * data %s';
|
||||
$command = sprintf(
|
||||
$command,
|
||||
self::QUEUE_PREFIX .
|
||||
$queue->queueName(),
|
||||
base64_encode($message->serialize())
|
||||
);
|
||||
|
||||
/** @var string $result */
|
||||
$result = $this
|
||||
->client
|
||||
->executeCommand(
|
||||
new RawCommand('XADD', explode(' ', $command)),
|
||||
);
|
||||
|
||||
$message->setId($result);
|
||||
}
|
||||
|
||||
public function consume(Queue $queue): Message|null
|
||||
{
|
||||
$this->ensureQueue($queue->queueName());
|
||||
|
||||
$command = 'GROUP %s %s COUNT 1 STREAMS %s >';
|
||||
$command = sprintf(
|
||||
$command,
|
||||
self::CONSUMER_ID_PREFIX . $this->consumerGroup,
|
||||
$this->consumerId,
|
||||
self::QUEUE_PREFIX . $queue->queueName(),
|
||||
);
|
||||
|
||||
/** @var array | null $response */
|
||||
$response = $this
|
||||
->client
|
||||
->executeCommand(
|
||||
new RawCommand(
|
||||
'XREADGROUP',
|
||||
explode(' ', $command)
|
||||
)
|
||||
);
|
||||
|
||||
if ($response === null || !isset($response[0][1][0][1][1])) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$messageData = base64_decode($response[0][1][0][1][1]);
|
||||
$messageId = $response[0][1][0][0];
|
||||
|
||||
if ($messageData === 'NOOP') {
|
||||
// If the message is a NOOP, we return null to indicate no actual message
|
||||
return null;
|
||||
}
|
||||
|
||||
$value = unserialize($messageData, ['allowed_classes' => true]);
|
||||
if (!$value instanceof Message) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$value->setId($messageId);
|
||||
|
||||
return $value;
|
||||
}
|
||||
|
||||
public function acknowledge(Queue $queue, Message $message): void
|
||||
{
|
||||
$response = $this
|
||||
->client
|
||||
->executeCommand(
|
||||
new RawCommand(
|
||||
'XACK',
|
||||
[
|
||||
self::QUEUE_PREFIX . $queue->queueName(),
|
||||
self::CONSUMER_ID_PREFIX . $this->consumerGroup,
|
||||
$message->getId()
|
||||
]
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public function reject(Queue $queue, Message $message, bool $requeue = false): void
|
||||
{
|
||||
// TODO: Implement reject() method.
|
||||
}
|
||||
|
||||
public function purge(Queue $queue): void
|
||||
{
|
||||
|
||||
// TODO: Implement purge() method.
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user