From f8b988ca0d95997c0939a8403301a4f28366f5cd Mon Sep 17 00:00:00 2001 From: Ron Rise Date: Wed, 12 Nov 2025 11:30:32 -0500 Subject: [PATCH 1/3] feat: enhance consumer initialization to support custom queue names --- src/Async/Consumer.php | 21 ++++++++++++++++++--- src/Cli/Commands/Queue/Start.php | 11 +++++++---- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/Async/Consumer.php b/src/Async/Consumer.php index 38572d8..26d9fda 100644 --- a/src/Async/Consumer.php +++ b/src/Async/Consumer.php @@ -13,7 +13,7 @@ class Consumer private static bool $shutDown = false; private const array QUEUES = [ - Queues\DefaultQueue::class, + 'default' => Queues\DefaultQueue::class, ]; private array $queues = []; @@ -22,9 +22,24 @@ class Consumer private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\'; - public function __construct() + public function __construct(array $queues = []) { - foreach (self::QUEUES as $queueClass) { + if ($queues === []) { + $queues = self::QUEUES; + } else { + $mappedQueues = []; + foreach ($queues as $queueName) { + if (isset(self::QUEUES[$queueName])) { + $mappedQueues[] = self::QUEUES[$queueName]; + } else { + throw new \InvalidArgumentException("Queue '$queueName' is not defined."); + } + } + $queues = $mappedQueues; + } + + + foreach ($queues as $queueClass) { $this->queues[] = new $queueClass(); } diff --git a/src/Cli/Commands/Queue/Start.php b/src/Cli/Commands/Queue/Start.php index cc52e4d..9962270 100644 --- a/src/Cli/Commands/Queue/Start.php +++ b/src/Cli/Commands/Queue/Start.php @@ -13,14 +13,17 @@ class Start extends Command implements CommandInterface public function __construct() { parent::__construct('queue:start', 'Start the queue consumer to process messages.'); - - $this->argument('[name]', 'Your name') - ->option('-g, --greet', 'Include a greeting message'); + $this->argument('[queues]', 'The name of the queue to consume from. ex. "first_queue,second_queue"'); } public function execute(): int { - $consumer = new Consumer(); + $queues = []; + if ($this->values()['queues'] !== null) { + $queues = explode(',', $this->values()['queues']); + } + + $consumer = new Consumer($queues); $consumer->start(); return 0; -- 2.49.1 From e0ba77556d869f79fafb92087990ac2e4d360995 Mon Sep 17 00:00:00 2001 From: Ron Rise Date: Wed, 12 Nov 2025 15:16:03 -0500 Subject: [PATCH 2/3] feat: integrate Kafka support with producer and consumer implementation --- Dockerfile | 11 + composer.json | 3 +- composer.lock | 199 +++++++++++------- config.php | 5 +- docker-compose.yml | 36 ++++ src/Async/Brokers/Kafka.php | 62 +++++- src/Cli/Commands/Queue/Start.php | 3 + .../BrokerServiceProvider.php | 2 +- 8 files changed, 231 insertions(+), 90 deletions(-) diff --git a/Dockerfile b/Dockerfile index f7ae416..9c82804 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,12 +14,23 @@ RUN composer install --optimize-autoloader --ignore-platform-reqs --no-dev # Use the official PHP CLI image with Alpine Linux for the second stage FROM php:8.4.14-alpine AS php +ARG KAFKA_ENABLED=0 + # Move the production PHP configuration file to the default location RUN mv /usr/local/etc/php/php.ini-production /usr/local/etc/php/php.ini \ && apk add libpq-dev linux-headers --no-cache \ && docker-php-ext-install pdo_pgsql sockets pcntl \ && rm -rf /var/cache/apk/* +RUN if [ "$KAFKA_ENABLED" -eq 1 ] ; then \ + echo "Kafka support enabled" ; \ + apk add autoconf g++ librdkafka-dev make --no-cache ; \ + pecl install rdkafka && docker-php-ext-enable rdkafka ; \ + else \ + echo "Kafka support disabled" ; \ + exit 0 ; \ + fi + # Set the working directory to /app WORKDIR /app diff --git a/composer.json b/composer.json index c4c4e89..c741b15 100644 --- a/composer.json +++ b/composer.json @@ -28,7 +28,8 @@ "mockery/mockery": "^1.6", "squizlabs/php_codesniffer": "^3.12", "lendable/composer-license-checker": "^1.2", - "phpstan/phpstan": "^2.1.31" + "phpstan/phpstan": "^2.1.31", + "kwn/php-rdkafka-stubs": "^2.2" }, "scripts": { "tests:all": [ diff --git a/composer.lock b/composer.lock index f5f9c5a..2667d82 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "f920b7224ee908f6a4270f200dbbca3a", + "content-hash": "7c2d40400d6f4d0469324dc1645eba3c", "packages": [ { "name": "adhocore/cli", @@ -300,16 +300,16 @@ }, { "name": "google/protobuf", - "version": "v4.32.1", + "version": "v4.33.0", "source": { "type": "git", "url": "https://github.com/protocolbuffers/protobuf-php.git", - "reference": "c4ed1c1f9bbc1e91766e2cd6c0af749324fe87cb" + "reference": "b50269e23204e5ae859a326ec3d90f09efe3047d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/c4ed1c1f9bbc1e91766e2cd6c0af749324fe87cb", - "reference": "c4ed1c1f9bbc1e91766e2cd6c0af749324fe87cb", + "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/b50269e23204e5ae859a326ec3d90f09efe3047d", + "reference": "b50269e23204e5ae859a326ec3d90f09efe3047d", "shasum": "" }, "require": { @@ -338,22 +338,22 @@ "proto" ], "support": { - "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.32.1" + "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.33.0" }, - "time": "2025-09-14T05:14:52+00:00" + "time": "2025-10-15T20:10:28+00:00" }, { "name": "illuminate/collections", - "version": "v12.34.0", + "version": "v12.38.0", "source": { "type": "git", "url": "https://github.com/illuminate/collections.git", - "reference": "b323866d9e571f8c444f3ccca6f645c05fadf568" + "reference": "deb291b109b6f7fd776a3550a120771137b3c5d1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/illuminate/collections/zipball/b323866d9e571f8c444f3ccca6f645c05fadf568", - "reference": "b323866d9e571f8c444f3ccca6f645c05fadf568", + "url": "https://api.github.com/repos/illuminate/collections/zipball/deb291b109b6f7fd776a3550a120771137b3c5d1", + "reference": "deb291b109b6f7fd776a3550a120771137b3c5d1", "shasum": "" }, "require": { @@ -399,11 +399,11 @@ "issues": "https://github.com/laravel/framework/issues", "source": "https://github.com/laravel/framework" }, - "time": "2025-10-10T13:31:43+00:00" + "time": "2025-10-30T12:22:05+00:00" }, { "name": "illuminate/conditionable", - "version": "v12.34.0", + "version": "v12.38.0", "source": { "type": "git", "url": "https://github.com/illuminate/conditionable.git", @@ -449,7 +449,7 @@ }, { "name": "illuminate/container", - "version": "v12.34.0", + "version": "v12.38.0", "source": { "type": "git", "url": "https://github.com/illuminate/container.git", @@ -510,7 +510,7 @@ }, { "name": "illuminate/contracts", - "version": "v12.34.0", + "version": "v12.38.0", "source": { "type": "git", "url": "https://github.com/illuminate/contracts.git", @@ -558,16 +558,16 @@ }, { "name": "illuminate/database", - "version": "v12.34.0", + "version": "v12.38.0", "source": { "type": "git", "url": "https://github.com/illuminate/database.git", - "reference": "3ad07bda64019d18fc6fda97fec0b3b7cb6ecae1" + "reference": "eacbdddf31f655fba5406fdf31bd264d880dd1a8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/illuminate/database/zipball/3ad07bda64019d18fc6fda97fec0b3b7cb6ecae1", - "reference": "3ad07bda64019d18fc6fda97fec0b3b7cb6ecae1", + "url": "https://api.github.com/repos/illuminate/database/zipball/eacbdddf31f655fba5406fdf31bd264d880dd1a8", + "reference": "eacbdddf31f655fba5406fdf31bd264d880dd1a8", "shasum": "" }, "require": { @@ -625,11 +625,11 @@ "issues": "https://github.com/laravel/framework/issues", "source": "https://github.com/laravel/framework" }, - "time": "2025-10-10T13:33:40+00:00" + "time": "2025-11-11T14:13:21+00:00" }, { "name": "illuminate/macroable", - "version": "v12.34.0", + "version": "v12.38.0", "source": { "type": "git", "url": "https://github.com/illuminate/macroable.git", @@ -675,16 +675,16 @@ }, { "name": "illuminate/support", - "version": "v12.34.0", + "version": "v12.38.0", "source": { "type": "git", "url": "https://github.com/illuminate/support.git", - "reference": "89291f59ef6c170c00f10a41c566c49ee32ca09a" + "reference": "008b6c0d45f548de0f801d60a5854a7f9e4dd32f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/illuminate/support/zipball/89291f59ef6c170c00f10a41c566c49ee32ca09a", - "reference": "89291f59ef6c170c00f10a41c566c49ee32ca09a", + "url": "https://api.github.com/repos/illuminate/support/zipball/008b6c0d45f548de0f801d60a5854a7f9e4dd32f", + "reference": "008b6c0d45f548de0f801d60a5854a7f9e4dd32f", "shasum": "" }, "require": { @@ -750,7 +750,7 @@ "issues": "https://github.com/laravel/framework/issues", "source": "https://github.com/laravel/framework" }, - "time": "2025-10-13T21:11:33+00:00" + "time": "2025-11-06T14:27:18+00:00" }, { "name": "laravel/serializable-closure", @@ -1857,16 +1857,16 @@ }, { "name": "roadrunner-php/roadrunner-api-dto", - "version": "v1.13.0", + "version": "v1.14.0", "source": { "type": "git", "url": "https://github.com/roadrunner-php/roadrunner-api-dto.git", - "reference": "8a683f5057005bef742916847c0befbf9a00c543" + "reference": "e6efb759f0a73b8516b7f28317230ecd4010005e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/roadrunner-php/roadrunner-api-dto/zipball/8a683f5057005bef742916847c0befbf9a00c543", - "reference": "8a683f5057005bef742916847c0befbf9a00c543", + "url": "https://api.github.com/repos/roadrunner-php/roadrunner-api-dto/zipball/e6efb759f0a73b8516b7f28317230ecd4010005e", + "reference": "e6efb759f0a73b8516b7f28317230ecd4010005e", "shasum": "" }, "require": { @@ -1912,7 +1912,7 @@ "docs": "https://docs.roadrunner.dev", "forum": "https://forum.roadrunner.dev", "issues": "https://github.com/roadrunner-server/roadrunner/issues", - "source": "https://github.com/roadrunner-php/roadrunner-api-dto/tree/v1.13.0" + "source": "https://github.com/roadrunner-php/roadrunner-api-dto/tree/v1.14.0" }, "funding": [ { @@ -1920,7 +1920,7 @@ "type": "github" } ], - "time": "2025-08-12T14:04:38+00:00" + "time": "2025-11-06T13:03:11+00:00" }, { "name": "robinvdvleuten/ulid", @@ -1972,9 +1972,9 @@ "name": "siteworxpro/config", "version": "1.1.1", "source": { - "type": "", - "url": "", - "reference": "" + "type": "git", + "url": "https://gitea.siteworxpro.com/php-packages/config", + "reference": "1.1.1" }, "dist": { "type": "zip", @@ -2941,16 +2941,16 @@ }, { "name": "symfony/translation-contracts", - "version": "v3.6.0", + "version": "v3.6.1", "source": { "type": "git", "url": "https://github.com/symfony/translation-contracts.git", - "reference": "df210c7a2573f1913b2d17cc95f90f53a73d8f7d" + "reference": "65a8bc82080447fae78373aa10f8d13b38338977" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/translation-contracts/zipball/df210c7a2573f1913b2d17cc95f90f53a73d8f7d", - "reference": "df210c7a2573f1913b2d17cc95f90f53a73d8f7d", + "url": "https://api.github.com/repos/symfony/translation-contracts/zipball/65a8bc82080447fae78373aa10f8d13b38338977", + "reference": "65a8bc82080447fae78373aa10f8d13b38338977", "shasum": "" }, "require": { @@ -2999,7 +2999,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/translation-contracts/tree/v3.6.0" + "source": "https://github.com/symfony/translation-contracts/tree/v3.6.1" }, "funding": [ { @@ -3010,12 +3010,16 @@ "url": "https://github.com/fabpot", "type": "github" }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, { "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "type": "tidelift" } ], - "time": "2024-09-27T08:32:26+00:00" + "time": "2025-07-15T13:41:35+00:00" }, { "name": "voku/portable-ascii", @@ -3144,6 +3148,44 @@ }, "time": "2025-04-30T06:54:44+00:00" }, + { + "name": "kwn/php-rdkafka-stubs", + "version": "v2.2.1", + "source": { + "type": "git", + "url": "https://github.com/kwn/php-rdkafka-stubs.git", + "reference": "23b865d6b3e8fe1f080aa7371dc1da3339361996" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/kwn/php-rdkafka-stubs/zipball/23b865d6b3e8fe1f080aa7371dc1da3339361996", + "reference": "23b865d6b3e8fe1f080aa7371dc1da3339361996", + "shasum": "" + }, + "require": { + "ext-rdkafka": ">=4.0" + }, + "require-dev": { + "phpunit/phpunit": "^8.2.4" + }, + "type": "library", + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Karol Wnuk", + "email": "k.wnuk@ascetic.pl" + } + ], + "description": "Rdkafka extension stubs for your IDE", + "support": { + "issues": "https://github.com/kwn/php-rdkafka-stubs/issues", + "source": "https://github.com/kwn/php-rdkafka-stubs/tree/v2.2.1" + }, + "time": "2022-08-16T15:27:51+00:00" + }, { "name": "lendable/composer-license-checker", "version": "1.2.2", @@ -3347,16 +3389,16 @@ }, { "name": "nikic/php-parser", - "version": "v5.6.1", + "version": "v5.6.2", "source": { "type": "git", "url": "https://github.com/nikic/PHP-Parser.git", - "reference": "f103601b29efebd7ff4a1ca7b3eeea9e3336a2a2" + "reference": "3a454ca033b9e06b63282ce19562e892747449bb" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/f103601b29efebd7ff4a1ca7b3eeea9e3336a2a2", - "reference": "f103601b29efebd7ff4a1ca7b3eeea9e3336a2a2", + "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/3a454ca033b9e06b63282ce19562e892747449bb", + "reference": "3a454ca033b9e06b63282ce19562e892747449bb", "shasum": "" }, "require": { @@ -3399,9 +3441,9 @@ ], "support": { "issues": "https://github.com/nikic/PHP-Parser/issues", - "source": "https://github.com/nikic/PHP-Parser/tree/v5.6.1" + "source": "https://github.com/nikic/PHP-Parser/tree/v5.6.2" }, - "time": "2025-08-13T20:13:15+00:00" + "time": "2025-10-21T19:32:17+00:00" }, { "name": "phar-io/manifest", @@ -3523,11 +3565,11 @@ }, { "name": "phpstan/phpstan", - "version": "2.1.31", + "version": "2.1.32", "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/ead89849d879fe203ce9292c6ef5e7e76f867b96", - "reference": "ead89849d879fe203ce9292c6ef5e7e76f867b96", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e126cad1e30a99b137b8ed75a85a676450ebb227", + "reference": "e126cad1e30a99b137b8ed75a85a676450ebb227", "shasum": "" }, "require": { @@ -3572,7 +3614,7 @@ "type": "github" } ], - "time": "2025-10-10T14:14:11+00:00" + "time": "2025-11-11T15:18:17+00:00" }, { "name": "phpunit/php-code-coverage", @@ -3910,16 +3952,16 @@ }, { "name": "phpunit/phpunit", - "version": "12.4.1", + "version": "12.4.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "fc5413a2e6d240d2f6d9317bdf7f0a24e73de194" + "reference": "a94ea4d26d865875803b23aaf78c3c2c670ea2ea" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/fc5413a2e6d240d2f6d9317bdf7f0a24e73de194", - "reference": "fc5413a2e6d240d2f6d9317bdf7f0a24e73de194", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/a94ea4d26d865875803b23aaf78c3c2c670ea2ea", + "reference": "a94ea4d26d865875803b23aaf78c3c2c670ea2ea", "shasum": "" }, "require": { @@ -3987,7 +4029,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/12.4.1" + "source": "https://github.com/sebastianbergmann/phpunit/tree/12.4.2" }, "funding": [ { @@ -4011,7 +4053,7 @@ "type": "tidelift" } ], - "time": "2025-10-09T14:08:29+00:00" + "time": "2025-10-30T08:41:39+00:00" }, { "name": "sebastian/cli-parser", @@ -4912,16 +4954,16 @@ }, { "name": "squizlabs/php_codesniffer", - "version": "3.13.4", + "version": "3.13.5", "source": { "type": "git", "url": "https://github.com/PHPCSStandards/PHP_CodeSniffer.git", - "reference": "ad545ea9c1b7d270ce0fc9cbfb884161cd706119" + "reference": "0ca86845ce43291e8f5692c7356fccf3bcf02bf4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/PHPCSStandards/PHP_CodeSniffer/zipball/ad545ea9c1b7d270ce0fc9cbfb884161cd706119", - "reference": "ad545ea9c1b7d270ce0fc9cbfb884161cd706119", + "url": "https://api.github.com/repos/PHPCSStandards/PHP_CodeSniffer/zipball/0ca86845ce43291e8f5692c7356fccf3bcf02bf4", + "reference": "0ca86845ce43291e8f5692c7356fccf3bcf02bf4", "shasum": "" }, "require": { @@ -4938,11 +4980,6 @@ "bin/phpcs" ], "type": "library", - "extra": { - "branch-alias": { - "dev-master": "3.x-dev" - } - }, "notification-url": "https://packagist.org/downloads/", "license": [ "BSD-3-Clause" @@ -4992,7 +5029,7 @@ "type": "thanks_dev" } ], - "time": "2025-09-05T05:47:09+00:00" + "time": "2025-11-04T16:30:35+00:00" }, { "name": "staabm/side-effects-detector", @@ -5048,16 +5085,16 @@ }, { "name": "symfony/console", - "version": "v7.3.4", + "version": "v7.3.6", "source": { "type": "git", "url": "https://github.com/symfony/console.git", - "reference": "2b9c5fafbac0399a20a2e82429e2bd735dcfb7db" + "reference": "c28ad91448f86c5f6d9d2c70f0cf68bf135f252a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/console/zipball/2b9c5fafbac0399a20a2e82429e2bd735dcfb7db", - "reference": "2b9c5fafbac0399a20a2e82429e2bd735dcfb7db", + "url": "https://api.github.com/repos/symfony/console/zipball/c28ad91448f86c5f6d9d2c70f0cf68bf135f252a", + "reference": "c28ad91448f86c5f6d9d2c70f0cf68bf135f252a", "shasum": "" }, "require": { @@ -5122,7 +5159,7 @@ "terminal" ], "support": { - "source": "https://github.com/symfony/console/tree/v7.3.4" + "source": "https://github.com/symfony/console/tree/v7.3.6" }, "funding": [ { @@ -5142,7 +5179,7 @@ "type": "tidelift" } ], - "time": "2025-09-22T15:31:00+00:00" + "time": "2025-11-04T01:21:42+00:00" }, { "name": "symfony/polyfill-ctype", @@ -5461,16 +5498,16 @@ }, { "name": "symfony/service-contracts", - "version": "v3.6.0", + "version": "v3.6.1", "source": { "type": "git", "url": "https://github.com/symfony/service-contracts.git", - "reference": "f021b05a130d35510bd6b25fe9053c2a8a15d5d4" + "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/symfony/service-contracts/zipball/f021b05a130d35510bd6b25fe9053c2a8a15d5d4", - "reference": "f021b05a130d35510bd6b25fe9053c2a8a15d5d4", + "url": "https://api.github.com/repos/symfony/service-contracts/zipball/45112560a3ba2d715666a509a0bc9521d10b6c43", + "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43", "shasum": "" }, "require": { @@ -5524,7 +5561,7 @@ "standards" ], "support": { - "source": "https://github.com/symfony/service-contracts/tree/v3.6.0" + "source": "https://github.com/symfony/service-contracts/tree/v3.6.1" }, "funding": [ { @@ -5535,12 +5572,16 @@ "url": "https://github.com/fabpot", "type": "github" }, + { + "url": "https://github.com/nicolas-grekas", + "type": "github" + }, { "url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "type": "tidelift" } ], - "time": "2025-04-25T09:37:31+00:00" + "time": "2025-07-15T11:30:57+00:00" }, { "name": "symfony/string", diff --git a/config.php b/config.php index 5ba6ef3..ed89760 100644 --- a/config.php +++ b/config.php @@ -51,7 +51,7 @@ return [ ], 'queue' => [ - 'broker' => Env::get('QUEUE_BROKER', 'redis'), + 'broker' => Env::get('QUEUE_BROKER', 'kafka'), 'broker_config' => [ @@ -60,8 +60,7 @@ return [ ], 'kafka' => [ - 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'localhost:9092'), - 'topic' => Env::get('QUEUE_KAFKA_TOPIC', 'my_topic'), + 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'kafka:9092'), ], 'rabbitmq' => [ diff --git a/docker-compose.yml b/docker-compose.yml index c02a850..5bfca2e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -68,6 +68,8 @@ services: volumes: - .:/app build: + args: + KAFKA_ENABLED: "1" context: . dockerfile: Dockerfile entrypoint: "/bin/sh -c 'while true; do sleep 30; done;'" @@ -88,6 +90,40 @@ services: DB_HOST: postgres JWT_SIGNING_KEY: a-string-secret-at-least-256-bits-long + ## Kafka and Zookeeper for local development + kafka-ui: + image: kafbat/kafka-ui:latest # Or kafbat/kafka-ui:latest for newer Kafka + container_name: kafka-ui + ports: + - "8080:8080" # Expose the UI port + environment: + KAFKA_CLUSTERS_0_NAME: local-kafka-cluster + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + depends_on: + kafka: + condition: service_started + zookeeper: + condition: service_started + zookeeper: + image: ubuntu/zookeeper:latest + environment: + ALLOW_ANONYMOUS_LOGIN: "yes" + ports: + - "2181:2181" + kafka: + image: ubuntu/kafka:latest + environment: + KAFKA_BROKER_ID: 1 + KAFKA_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: "yes" + ports: + - "9092:9092" + depends_on: + zookeeper: + condition: service_started + redis: image: redis:latest healthcheck: diff --git a/src/Async/Brokers/Kafka.php b/src/Async/Brokers/Kafka.php index d056b76..fc0c8b3 100644 --- a/src/Async/Brokers/Kafka.php +++ b/src/Async/Brokers/Kafka.php @@ -4,33 +4,83 @@ declare(strict_types=1); namespace Siteworxpro\App\Async\Brokers; +use RdKafka\Conf; +use RdKafka\Exception; +use RdKafka\KafkaConsumer; +use RdKafka\Producer; use Siteworxpro\App\Async\Queues\Queue; use Siteworxpro\App\Async\Messages\Message; class Kafka extends Broker { - public function publish(Queue $queue, Message $message, ?int $delay = null): void + private Producer $producer; + + private KafkaConsumer $consumer; + + public function __construct($config = []) { - // TODO: Implement publish() method. + parent::__construct($config); + + + $conf = new Conf(); + $conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092'); + $this->producer = new Producer($conf); + $this->producer->addBrokers($config['brokers'] ?? 'localhost:9092'); + + $conf->set('group.id', $config['consumerGroup'] ?? 'default'); + $conf->set('auto.offset.reset', 'earliest'); + + $this->consumer = new KafkaConsumer($conf); } - public function consume(Queue $queue): Message | null + public function __destruct() { + $this->producer->flush(1000); + } + + /** + * @throws \Exception + */ + public function publish(Queue $queue, Message $message, ?int $delay = null): void + { + $topic = $this->producer->newTopic($queue->queueName()); + $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->serialize(), $message->getId()); + $this->producer->flush(1000); + } + + /** + * @throws Exception + */ + public function consume(Queue $queue): Message|null + { + $this->consumer->subscribe([$queue->queueName()]); + $kafkaMessage = $this->consumer->consume(1000); + + if ($kafkaMessage->err === RD_KAFKA_RESP_ERR__TIMED_OUT) { + return null; + } + + $messageData = $kafkaMessage->payload; + if ($messageData !== null) { + /** @var Message $message */ + $message = unserialize($messageData, ['allowed_classes' => true]); + $message->setId((string)$kafkaMessage->offset); + + return $message; + } + return null; } public function acknowledge(Queue $queue, Message $message): void { - // TODO: Implement acknowledge() method. } public function reject(Queue $queue, Message $message, bool $requeue = false): void { - // TODO: Implement reject() method. } public function purge(Queue $queue): void { - // TODO: Implement purge() method. } } diff --git a/src/Cli/Commands/Queue/Start.php b/src/Cli/Commands/Queue/Start.php index 9962270..c688f48 100644 --- a/src/Cli/Commands/Queue/Start.php +++ b/src/Cli/Commands/Queue/Start.php @@ -6,6 +6,7 @@ namespace Siteworxpro\App\Cli\Commands\Queue; use Ahc\Cli\Input\Command; use Siteworxpro\App\Async\Consumer; +use Siteworxpro\App\Async\Messages\SayHelloMessage; use Siteworxpro\App\Cli\Commands\CommandInterface; class Start extends Command implements CommandInterface @@ -23,6 +24,8 @@ class Start extends Command implements CommandInterface $queues = explode(',', $this->values()['queues']); } + SayHelloMessage::dispatch("hello from queue consumer!"); + $consumer = new Consumer($queues); $consumer->start(); diff --git a/src/Services/ServiceProviders/BrokerServiceProvider.php b/src/Services/ServiceProviders/BrokerServiceProvider.php index 37c5d4d..dafe190 100644 --- a/src/Services/ServiceProviders/BrokerServiceProvider.php +++ b/src/Services/ServiceProviders/BrokerServiceProvider.php @@ -14,7 +14,7 @@ class BrokerServiceProvider extends ServiceProvider { $this->app->singleton(Broker::class, function (): Broker { $configName = Config::get('queue.broker'); - $brokerConfig = Config::get('queue.' . $configName) ?? []; + $brokerConfig = Config::get('queue.broker_config.' . $configName) ?? []; $brokerClass = Broker::BROKER_TYPES[$configName] ?? null; -- 2.49.1 From eacc603561a6ceb667d24ef546cdbbe780cf9d5d Mon Sep 17 00:00:00 2001 From: Ron Rise Date: Wed, 12 Nov 2025 15:22:23 -0500 Subject: [PATCH 3/3] feat: integrate Kafka support with producer and consumer implementation --- config.php | 3 ++- src/Async/Brokers/Kafka.php | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/config.php b/config.php index ed89760..81ef275 100644 --- a/config.php +++ b/config.php @@ -51,7 +51,7 @@ return [ ], 'queue' => [ - 'broker' => Env::get('QUEUE_BROKER', 'kafka'), + 'broker' => Env::get('QUEUE_BROKER', 'redis'), 'broker_config' => [ @@ -61,6 +61,7 @@ return [ 'kafka' => [ 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'kafka:9092'), + 'consumerGroup' => Env::get('QUEUE_KAFKA_CONSUMER_GROUP', 'default_group'), ], 'rabbitmq' => [ diff --git a/src/Async/Brokers/Kafka.php b/src/Async/Brokers/Kafka.php index fc0c8b3..95f30fc 100644 --- a/src/Async/Brokers/Kafka.php +++ b/src/Async/Brokers/Kafka.php @@ -24,12 +24,12 @@ class Kafka extends Broker $conf = new Conf(); $conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092'); + $this->producer = new Producer($conf); $this->producer->addBrokers($config['brokers'] ?? 'localhost:9092'); $conf->set('group.id', $config['consumerGroup'] ?? 'default'); $conf->set('auto.offset.reset', 'earliest'); - $this->consumer = new KafkaConsumer($conf); } @@ -60,6 +60,7 @@ class Kafka extends Broker return null; } + /** @var string | null $messageData */ $messageData = $kafkaMessage->payload; if ($messageData !== null) { /** @var Message $message */ -- 2.49.1