2 Commits

Author SHA1 Message Date
e0ba77556d feat: integrate Kafka support with producer and consumer implementation
Some checks failed
🧪✨ Tests Workflow / 🛡️ 🔒 Library Audit (push) Successful in 2m48s
🧪✨ Tests Workflow / 📝 ✨ Code Lint (push) Successful in 2m38s
🧪✨ Tests Workflow / 🛡️ 🔒 License Check (push) Successful in 3m26s
🧪✨ Tests Workflow / 🧪 ✨ Database Migrations (push) Successful in 3m42s
🧪✨ Tests Workflow / 🐙 🔍 Code Sniffer (push) Failing after 3m28s
🧪✨ Tests Workflow / 🧪 ✅ Unit Tests (push) Successful in 2m4s
2025-11-12 15:16:03 -05:00
f8b988ca0d feat: enhance consumer initialization to support custom queue names 2025-11-12 11:30:32 -05:00
9 changed files with 256 additions and 97 deletions

View File

@@ -14,12 +14,23 @@ RUN composer install --optimize-autoloader --ignore-platform-reqs --no-dev
# Use the official PHP CLI image with Alpine Linux for the second stage # Use the official PHP CLI image with Alpine Linux for the second stage
FROM php:8.4.14-alpine AS php FROM php:8.4.14-alpine AS php
ARG KAFKA_ENABLED=0
# Move the production PHP configuration file to the default location # Move the production PHP configuration file to the default location
RUN mv /usr/local/etc/php/php.ini-production /usr/local/etc/php/php.ini \ RUN mv /usr/local/etc/php/php.ini-production /usr/local/etc/php/php.ini \
&& apk add libpq-dev linux-headers --no-cache \ && apk add libpq-dev linux-headers --no-cache \
&& docker-php-ext-install pdo_pgsql sockets pcntl \ && docker-php-ext-install pdo_pgsql sockets pcntl \
&& rm -rf /var/cache/apk/* && rm -rf /var/cache/apk/*
RUN if [ "$KAFKA_ENABLED" -eq 1 ] ; then \
echo "Kafka support enabled" ; \
apk add autoconf g++ librdkafka-dev make --no-cache ; \
pecl install rdkafka && docker-php-ext-enable rdkafka ; \
else \
echo "Kafka support disabled" ; \
exit 0 ; \
fi
# Set the working directory to /app # Set the working directory to /app
WORKDIR /app WORKDIR /app

View File

@@ -28,7 +28,8 @@
"mockery/mockery": "^1.6", "mockery/mockery": "^1.6",
"squizlabs/php_codesniffer": "^3.12", "squizlabs/php_codesniffer": "^3.12",
"lendable/composer-license-checker": "^1.2", "lendable/composer-license-checker": "^1.2",
"phpstan/phpstan": "^2.1.31" "phpstan/phpstan": "^2.1.31",
"kwn/php-rdkafka-stubs": "^2.2"
}, },
"scripts": { "scripts": {
"tests:all": [ "tests:all": [

199
composer.lock generated
View File

@@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically" "This file is @generated automatically"
], ],
"content-hash": "f920b7224ee908f6a4270f200dbbca3a", "content-hash": "7c2d40400d6f4d0469324dc1645eba3c",
"packages": [ "packages": [
{ {
"name": "adhocore/cli", "name": "adhocore/cli",
@@ -300,16 +300,16 @@
}, },
{ {
"name": "google/protobuf", "name": "google/protobuf",
"version": "v4.32.1", "version": "v4.33.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/protocolbuffers/protobuf-php.git", "url": "https://github.com/protocolbuffers/protobuf-php.git",
"reference": "c4ed1c1f9bbc1e91766e2cd6c0af749324fe87cb" "reference": "b50269e23204e5ae859a326ec3d90f09efe3047d"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/c4ed1c1f9bbc1e91766e2cd6c0af749324fe87cb", "url": "https://api.github.com/repos/protocolbuffers/protobuf-php/zipball/b50269e23204e5ae859a326ec3d90f09efe3047d",
"reference": "c4ed1c1f9bbc1e91766e2cd6c0af749324fe87cb", "reference": "b50269e23204e5ae859a326ec3d90f09efe3047d",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -338,22 +338,22 @@
"proto" "proto"
], ],
"support": { "support": {
"source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.32.1" "source": "https://github.com/protocolbuffers/protobuf-php/tree/v4.33.0"
}, },
"time": "2025-09-14T05:14:52+00:00" "time": "2025-10-15T20:10:28+00:00"
}, },
{ {
"name": "illuminate/collections", "name": "illuminate/collections",
"version": "v12.34.0", "version": "v12.38.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/illuminate/collections.git", "url": "https://github.com/illuminate/collections.git",
"reference": "b323866d9e571f8c444f3ccca6f645c05fadf568" "reference": "deb291b109b6f7fd776a3550a120771137b3c5d1"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/illuminate/collections/zipball/b323866d9e571f8c444f3ccca6f645c05fadf568", "url": "https://api.github.com/repos/illuminate/collections/zipball/deb291b109b6f7fd776a3550a120771137b3c5d1",
"reference": "b323866d9e571f8c444f3ccca6f645c05fadf568", "reference": "deb291b109b6f7fd776a3550a120771137b3c5d1",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -399,11 +399,11 @@
"issues": "https://github.com/laravel/framework/issues", "issues": "https://github.com/laravel/framework/issues",
"source": "https://github.com/laravel/framework" "source": "https://github.com/laravel/framework"
}, },
"time": "2025-10-10T13:31:43+00:00" "time": "2025-10-30T12:22:05+00:00"
}, },
{ {
"name": "illuminate/conditionable", "name": "illuminate/conditionable",
"version": "v12.34.0", "version": "v12.38.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/illuminate/conditionable.git", "url": "https://github.com/illuminate/conditionable.git",
@@ -449,7 +449,7 @@
}, },
{ {
"name": "illuminate/container", "name": "illuminate/container",
"version": "v12.34.0", "version": "v12.38.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/illuminate/container.git", "url": "https://github.com/illuminate/container.git",
@@ -510,7 +510,7 @@
}, },
{ {
"name": "illuminate/contracts", "name": "illuminate/contracts",
"version": "v12.34.0", "version": "v12.38.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/illuminate/contracts.git", "url": "https://github.com/illuminate/contracts.git",
@@ -558,16 +558,16 @@
}, },
{ {
"name": "illuminate/database", "name": "illuminate/database",
"version": "v12.34.0", "version": "v12.38.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/illuminate/database.git", "url": "https://github.com/illuminate/database.git",
"reference": "3ad07bda64019d18fc6fda97fec0b3b7cb6ecae1" "reference": "eacbdddf31f655fba5406fdf31bd264d880dd1a8"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/illuminate/database/zipball/3ad07bda64019d18fc6fda97fec0b3b7cb6ecae1", "url": "https://api.github.com/repos/illuminate/database/zipball/eacbdddf31f655fba5406fdf31bd264d880dd1a8",
"reference": "3ad07bda64019d18fc6fda97fec0b3b7cb6ecae1", "reference": "eacbdddf31f655fba5406fdf31bd264d880dd1a8",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -625,11 +625,11 @@
"issues": "https://github.com/laravel/framework/issues", "issues": "https://github.com/laravel/framework/issues",
"source": "https://github.com/laravel/framework" "source": "https://github.com/laravel/framework"
}, },
"time": "2025-10-10T13:33:40+00:00" "time": "2025-11-11T14:13:21+00:00"
}, },
{ {
"name": "illuminate/macroable", "name": "illuminate/macroable",
"version": "v12.34.0", "version": "v12.38.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/illuminate/macroable.git", "url": "https://github.com/illuminate/macroable.git",
@@ -675,16 +675,16 @@
}, },
{ {
"name": "illuminate/support", "name": "illuminate/support",
"version": "v12.34.0", "version": "v12.38.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/illuminate/support.git", "url": "https://github.com/illuminate/support.git",
"reference": "89291f59ef6c170c00f10a41c566c49ee32ca09a" "reference": "008b6c0d45f548de0f801d60a5854a7f9e4dd32f"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/illuminate/support/zipball/89291f59ef6c170c00f10a41c566c49ee32ca09a", "url": "https://api.github.com/repos/illuminate/support/zipball/008b6c0d45f548de0f801d60a5854a7f9e4dd32f",
"reference": "89291f59ef6c170c00f10a41c566c49ee32ca09a", "reference": "008b6c0d45f548de0f801d60a5854a7f9e4dd32f",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -750,7 +750,7 @@
"issues": "https://github.com/laravel/framework/issues", "issues": "https://github.com/laravel/framework/issues",
"source": "https://github.com/laravel/framework" "source": "https://github.com/laravel/framework"
}, },
"time": "2025-10-13T21:11:33+00:00" "time": "2025-11-06T14:27:18+00:00"
}, },
{ {
"name": "laravel/serializable-closure", "name": "laravel/serializable-closure",
@@ -1857,16 +1857,16 @@
}, },
{ {
"name": "roadrunner-php/roadrunner-api-dto", "name": "roadrunner-php/roadrunner-api-dto",
"version": "v1.13.0", "version": "v1.14.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/roadrunner-php/roadrunner-api-dto.git", "url": "https://github.com/roadrunner-php/roadrunner-api-dto.git",
"reference": "8a683f5057005bef742916847c0befbf9a00c543" "reference": "e6efb759f0a73b8516b7f28317230ecd4010005e"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/roadrunner-php/roadrunner-api-dto/zipball/8a683f5057005bef742916847c0befbf9a00c543", "url": "https://api.github.com/repos/roadrunner-php/roadrunner-api-dto/zipball/e6efb759f0a73b8516b7f28317230ecd4010005e",
"reference": "8a683f5057005bef742916847c0befbf9a00c543", "reference": "e6efb759f0a73b8516b7f28317230ecd4010005e",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -1912,7 +1912,7 @@
"docs": "https://docs.roadrunner.dev", "docs": "https://docs.roadrunner.dev",
"forum": "https://forum.roadrunner.dev", "forum": "https://forum.roadrunner.dev",
"issues": "https://github.com/roadrunner-server/roadrunner/issues", "issues": "https://github.com/roadrunner-server/roadrunner/issues",
"source": "https://github.com/roadrunner-php/roadrunner-api-dto/tree/v1.13.0" "source": "https://github.com/roadrunner-php/roadrunner-api-dto/tree/v1.14.0"
}, },
"funding": [ "funding": [
{ {
@@ -1920,7 +1920,7 @@
"type": "github" "type": "github"
} }
], ],
"time": "2025-08-12T14:04:38+00:00" "time": "2025-11-06T13:03:11+00:00"
}, },
{ {
"name": "robinvdvleuten/ulid", "name": "robinvdvleuten/ulid",
@@ -1972,9 +1972,9 @@
"name": "siteworxpro/config", "name": "siteworxpro/config",
"version": "1.1.1", "version": "1.1.1",
"source": { "source": {
"type": "", "type": "git",
"url": "", "url": "https://gitea.siteworxpro.com/php-packages/config",
"reference": "" "reference": "1.1.1"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
@@ -2941,16 +2941,16 @@
}, },
{ {
"name": "symfony/translation-contracts", "name": "symfony/translation-contracts",
"version": "v3.6.0", "version": "v3.6.1",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/translation-contracts.git", "url": "https://github.com/symfony/translation-contracts.git",
"reference": "df210c7a2573f1913b2d17cc95f90f53a73d8f7d" "reference": "65a8bc82080447fae78373aa10f8d13b38338977"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/translation-contracts/zipball/df210c7a2573f1913b2d17cc95f90f53a73d8f7d", "url": "https://api.github.com/repos/symfony/translation-contracts/zipball/65a8bc82080447fae78373aa10f8d13b38338977",
"reference": "df210c7a2573f1913b2d17cc95f90f53a73d8f7d", "reference": "65a8bc82080447fae78373aa10f8d13b38338977",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -2999,7 +2999,7 @@
"standards" "standards"
], ],
"support": { "support": {
"source": "https://github.com/symfony/translation-contracts/tree/v3.6.0" "source": "https://github.com/symfony/translation-contracts/tree/v3.6.1"
}, },
"funding": [ "funding": [
{ {
@@ -3010,12 +3010,16 @@
"url": "https://github.com/fabpot", "url": "https://github.com/fabpot",
"type": "github" "type": "github"
}, },
{
"url": "https://github.com/nicolas-grekas",
"type": "github"
},
{ {
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2024-09-27T08:32:26+00:00" "time": "2025-07-15T13:41:35+00:00"
}, },
{ {
"name": "voku/portable-ascii", "name": "voku/portable-ascii",
@@ -3144,6 +3148,44 @@
}, },
"time": "2025-04-30T06:54:44+00:00" "time": "2025-04-30T06:54:44+00:00"
}, },
{
"name": "kwn/php-rdkafka-stubs",
"version": "v2.2.1",
"source": {
"type": "git",
"url": "https://github.com/kwn/php-rdkafka-stubs.git",
"reference": "23b865d6b3e8fe1f080aa7371dc1da3339361996"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/kwn/php-rdkafka-stubs/zipball/23b865d6b3e8fe1f080aa7371dc1da3339361996",
"reference": "23b865d6b3e8fe1f080aa7371dc1da3339361996",
"shasum": ""
},
"require": {
"ext-rdkafka": ">=4.0"
},
"require-dev": {
"phpunit/phpunit": "^8.2.4"
},
"type": "library",
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Karol Wnuk",
"email": "k.wnuk@ascetic.pl"
}
],
"description": "Rdkafka extension stubs for your IDE",
"support": {
"issues": "https://github.com/kwn/php-rdkafka-stubs/issues",
"source": "https://github.com/kwn/php-rdkafka-stubs/tree/v2.2.1"
},
"time": "2022-08-16T15:27:51+00:00"
},
{ {
"name": "lendable/composer-license-checker", "name": "lendable/composer-license-checker",
"version": "1.2.2", "version": "1.2.2",
@@ -3347,16 +3389,16 @@
}, },
{ {
"name": "nikic/php-parser", "name": "nikic/php-parser",
"version": "v5.6.1", "version": "v5.6.2",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/nikic/PHP-Parser.git", "url": "https://github.com/nikic/PHP-Parser.git",
"reference": "f103601b29efebd7ff4a1ca7b3eeea9e3336a2a2" "reference": "3a454ca033b9e06b63282ce19562e892747449bb"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/f103601b29efebd7ff4a1ca7b3eeea9e3336a2a2", "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/3a454ca033b9e06b63282ce19562e892747449bb",
"reference": "f103601b29efebd7ff4a1ca7b3eeea9e3336a2a2", "reference": "3a454ca033b9e06b63282ce19562e892747449bb",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -3399,9 +3441,9 @@
], ],
"support": { "support": {
"issues": "https://github.com/nikic/PHP-Parser/issues", "issues": "https://github.com/nikic/PHP-Parser/issues",
"source": "https://github.com/nikic/PHP-Parser/tree/v5.6.1" "source": "https://github.com/nikic/PHP-Parser/tree/v5.6.2"
}, },
"time": "2025-08-13T20:13:15+00:00" "time": "2025-10-21T19:32:17+00:00"
}, },
{ {
"name": "phar-io/manifest", "name": "phar-io/manifest",
@@ -3523,11 +3565,11 @@
}, },
{ {
"name": "phpstan/phpstan", "name": "phpstan/phpstan",
"version": "2.1.31", "version": "2.1.32",
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/phpstan/phpstan/zipball/ead89849d879fe203ce9292c6ef5e7e76f867b96", "url": "https://api.github.com/repos/phpstan/phpstan/zipball/e126cad1e30a99b137b8ed75a85a676450ebb227",
"reference": "ead89849d879fe203ce9292c6ef5e7e76f867b96", "reference": "e126cad1e30a99b137b8ed75a85a676450ebb227",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -3572,7 +3614,7 @@
"type": "github" "type": "github"
} }
], ],
"time": "2025-10-10T14:14:11+00:00" "time": "2025-11-11T15:18:17+00:00"
}, },
{ {
"name": "phpunit/php-code-coverage", "name": "phpunit/php-code-coverage",
@@ -3910,16 +3952,16 @@
}, },
{ {
"name": "phpunit/phpunit", "name": "phpunit/phpunit",
"version": "12.4.1", "version": "12.4.2",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/sebastianbergmann/phpunit.git", "url": "https://github.com/sebastianbergmann/phpunit.git",
"reference": "fc5413a2e6d240d2f6d9317bdf7f0a24e73de194" "reference": "a94ea4d26d865875803b23aaf78c3c2c670ea2ea"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/fc5413a2e6d240d2f6d9317bdf7f0a24e73de194", "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/a94ea4d26d865875803b23aaf78c3c2c670ea2ea",
"reference": "fc5413a2e6d240d2f6d9317bdf7f0a24e73de194", "reference": "a94ea4d26d865875803b23aaf78c3c2c670ea2ea",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -3987,7 +4029,7 @@
"support": { "support": {
"issues": "https://github.com/sebastianbergmann/phpunit/issues", "issues": "https://github.com/sebastianbergmann/phpunit/issues",
"security": "https://github.com/sebastianbergmann/phpunit/security/policy", "security": "https://github.com/sebastianbergmann/phpunit/security/policy",
"source": "https://github.com/sebastianbergmann/phpunit/tree/12.4.1" "source": "https://github.com/sebastianbergmann/phpunit/tree/12.4.2"
}, },
"funding": [ "funding": [
{ {
@@ -4011,7 +4053,7 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2025-10-09T14:08:29+00:00" "time": "2025-10-30T08:41:39+00:00"
}, },
{ {
"name": "sebastian/cli-parser", "name": "sebastian/cli-parser",
@@ -4912,16 +4954,16 @@
}, },
{ {
"name": "squizlabs/php_codesniffer", "name": "squizlabs/php_codesniffer",
"version": "3.13.4", "version": "3.13.5",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/PHPCSStandards/PHP_CodeSniffer.git", "url": "https://github.com/PHPCSStandards/PHP_CodeSniffer.git",
"reference": "ad545ea9c1b7d270ce0fc9cbfb884161cd706119" "reference": "0ca86845ce43291e8f5692c7356fccf3bcf02bf4"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/PHPCSStandards/PHP_CodeSniffer/zipball/ad545ea9c1b7d270ce0fc9cbfb884161cd706119", "url": "https://api.github.com/repos/PHPCSStandards/PHP_CodeSniffer/zipball/0ca86845ce43291e8f5692c7356fccf3bcf02bf4",
"reference": "ad545ea9c1b7d270ce0fc9cbfb884161cd706119", "reference": "0ca86845ce43291e8f5692c7356fccf3bcf02bf4",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -4938,11 +4980,6 @@
"bin/phpcs" "bin/phpcs"
], ],
"type": "library", "type": "library",
"extra": {
"branch-alias": {
"dev-master": "3.x-dev"
}
},
"notification-url": "https://packagist.org/downloads/", "notification-url": "https://packagist.org/downloads/",
"license": [ "license": [
"BSD-3-Clause" "BSD-3-Clause"
@@ -4992,7 +5029,7 @@
"type": "thanks_dev" "type": "thanks_dev"
} }
], ],
"time": "2025-09-05T05:47:09+00:00" "time": "2025-11-04T16:30:35+00:00"
}, },
{ {
"name": "staabm/side-effects-detector", "name": "staabm/side-effects-detector",
@@ -5048,16 +5085,16 @@
}, },
{ {
"name": "symfony/console", "name": "symfony/console",
"version": "v7.3.4", "version": "v7.3.6",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/console.git", "url": "https://github.com/symfony/console.git",
"reference": "2b9c5fafbac0399a20a2e82429e2bd735dcfb7db" "reference": "c28ad91448f86c5f6d9d2c70f0cf68bf135f252a"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/console/zipball/2b9c5fafbac0399a20a2e82429e2bd735dcfb7db", "url": "https://api.github.com/repos/symfony/console/zipball/c28ad91448f86c5f6d9d2c70f0cf68bf135f252a",
"reference": "2b9c5fafbac0399a20a2e82429e2bd735dcfb7db", "reference": "c28ad91448f86c5f6d9d2c70f0cf68bf135f252a",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -5122,7 +5159,7 @@
"terminal" "terminal"
], ],
"support": { "support": {
"source": "https://github.com/symfony/console/tree/v7.3.4" "source": "https://github.com/symfony/console/tree/v7.3.6"
}, },
"funding": [ "funding": [
{ {
@@ -5142,7 +5179,7 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2025-09-22T15:31:00+00:00" "time": "2025-11-04T01:21:42+00:00"
}, },
{ {
"name": "symfony/polyfill-ctype", "name": "symfony/polyfill-ctype",
@@ -5461,16 +5498,16 @@
}, },
{ {
"name": "symfony/service-contracts", "name": "symfony/service-contracts",
"version": "v3.6.0", "version": "v3.6.1",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/service-contracts.git", "url": "https://github.com/symfony/service-contracts.git",
"reference": "f021b05a130d35510bd6b25fe9053c2a8a15d5d4" "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/service-contracts/zipball/f021b05a130d35510bd6b25fe9053c2a8a15d5d4", "url": "https://api.github.com/repos/symfony/service-contracts/zipball/45112560a3ba2d715666a509a0bc9521d10b6c43",
"reference": "f021b05a130d35510bd6b25fe9053c2a8a15d5d4", "reference": "45112560a3ba2d715666a509a0bc9521d10b6c43",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@@ -5524,7 +5561,7 @@
"standards" "standards"
], ],
"support": { "support": {
"source": "https://github.com/symfony/service-contracts/tree/v3.6.0" "source": "https://github.com/symfony/service-contracts/tree/v3.6.1"
}, },
"funding": [ "funding": [
{ {
@@ -5535,12 +5572,16 @@
"url": "https://github.com/fabpot", "url": "https://github.com/fabpot",
"type": "github" "type": "github"
}, },
{
"url": "https://github.com/nicolas-grekas",
"type": "github"
},
{ {
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony", "url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2025-04-25T09:37:31+00:00" "time": "2025-07-15T11:30:57+00:00"
}, },
{ {
"name": "symfony/string", "name": "symfony/string",

View File

@@ -51,7 +51,7 @@ return [
], ],
'queue' => [ 'queue' => [
'broker' => Env::get('QUEUE_BROKER', 'redis'), 'broker' => Env::get('QUEUE_BROKER', 'kafka'),
'broker_config' => [ 'broker_config' => [
@@ -60,8 +60,7 @@ return [
], ],
'kafka' => [ 'kafka' => [
'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'localhost:9092'), 'brokers' => Env::get('QUEUE_KAFKA_BROKERS', 'kafka:9092'),
'topic' => Env::get('QUEUE_KAFKA_TOPIC', 'my_topic'),
], ],
'rabbitmq' => [ 'rabbitmq' => [

View File

@@ -68,6 +68,8 @@ services:
volumes: volumes:
- .:/app - .:/app
build: build:
args:
KAFKA_ENABLED: "1"
context: . context: .
dockerfile: Dockerfile dockerfile: Dockerfile
entrypoint: "/bin/sh -c 'while true; do sleep 30; done;'" entrypoint: "/bin/sh -c 'while true; do sleep 30; done;'"
@@ -88,6 +90,40 @@ services:
DB_HOST: postgres DB_HOST: postgres
JWT_SIGNING_KEY: a-string-secret-at-least-256-bits-long JWT_SIGNING_KEY: a-string-secret-at-least-256-bits-long
## Kafka and Zookeeper for local development
kafka-ui:
image: kafbat/kafka-ui:latest # Or kafbat/kafka-ui:latest for newer Kafka
container_name: kafka-ui
ports:
- "8080:8080" # Expose the UI port
environment:
KAFKA_CLUSTERS_0_NAME: local-kafka-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
kafka:
condition: service_started
zookeeper:
condition: service_started
zookeeper:
image: ubuntu/zookeeper:latest
environment:
ALLOW_ANONYMOUS_LOGIN: "yes"
ports:
- "2181:2181"
kafka:
image: ubuntu/kafka:latest
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: "yes"
ports:
- "9092:9092"
depends_on:
zookeeper:
condition: service_started
redis: redis:
image: redis:latest image: redis:latest
healthcheck: healthcheck:

View File

@@ -4,33 +4,83 @@ declare(strict_types=1);
namespace Siteworxpro\App\Async\Brokers; namespace Siteworxpro\App\Async\Brokers;
use RdKafka\Conf;
use RdKafka\Exception;
use RdKafka\KafkaConsumer;
use RdKafka\Producer;
use Siteworxpro\App\Async\Queues\Queue; use Siteworxpro\App\Async\Queues\Queue;
use Siteworxpro\App\Async\Messages\Message; use Siteworxpro\App\Async\Messages\Message;
class Kafka extends Broker class Kafka extends Broker
{ {
public function publish(Queue $queue, Message $message, ?int $delay = null): void private Producer $producer;
private KafkaConsumer $consumer;
public function __construct($config = [])
{ {
// TODO: Implement publish() method. parent::__construct($config);
$conf = new Conf();
$conf->set('bootstrap.servers', $config['brokers'] ?? 'localhost:9092');
$this->producer = new Producer($conf);
$this->producer->addBrokers($config['brokers'] ?? 'localhost:9092');
$conf->set('group.id', $config['consumerGroup'] ?? 'default');
$conf->set('auto.offset.reset', 'earliest');
$this->consumer = new KafkaConsumer($conf);
} }
public function consume(Queue $queue): Message | null public function __destruct()
{ {
$this->producer->flush(1000);
}
/**
* @throws \Exception
*/
public function publish(Queue $queue, Message $message, ?int $delay = null): void
{
$topic = $this->producer->newTopic($queue->queueName());
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $message->serialize(), $message->getId());
$this->producer->flush(1000);
}
/**
* @throws Exception
*/
public function consume(Queue $queue): Message|null
{
$this->consumer->subscribe([$queue->queueName()]);
$kafkaMessage = $this->consumer->consume(1000);
if ($kafkaMessage->err === RD_KAFKA_RESP_ERR__TIMED_OUT) {
return null;
}
$messageData = $kafkaMessage->payload;
if ($messageData !== null) {
/** @var Message $message */
$message = unserialize($messageData, ['allowed_classes' => true]);
$message->setId((string)$kafkaMessage->offset);
return $message;
}
return null; return null;
} }
public function acknowledge(Queue $queue, Message $message): void public function acknowledge(Queue $queue, Message $message): void
{ {
// TODO: Implement acknowledge() method.
} }
public function reject(Queue $queue, Message $message, bool $requeue = false): void public function reject(Queue $queue, Message $message, bool $requeue = false): void
{ {
// TODO: Implement reject() method.
} }
public function purge(Queue $queue): void public function purge(Queue $queue): void
{ {
// TODO: Implement purge() method.
} }
} }

View File

@@ -13,7 +13,7 @@ class Consumer
private static bool $shutDown = false; private static bool $shutDown = false;
private const array QUEUES = [ private const array QUEUES = [
Queues\DefaultQueue::class, 'default' => Queues\DefaultQueue::class,
]; ];
private array $queues = []; private array $queues = [];
@@ -22,9 +22,24 @@ class Consumer
private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\'; private const string HANDLER_NAMESPACE = 'Siteworxpro\\App\\Async\\Handlers\\';
public function __construct() public function __construct(array $queues = [])
{ {
foreach (self::QUEUES as $queueClass) { if ($queues === []) {
$queues = self::QUEUES;
} else {
$mappedQueues = [];
foreach ($queues as $queueName) {
if (isset(self::QUEUES[$queueName])) {
$mappedQueues[] = self::QUEUES[$queueName];
} else {
throw new \InvalidArgumentException("Queue '$queueName' is not defined.");
}
}
$queues = $mappedQueues;
}
foreach ($queues as $queueClass) {
$this->queues[] = new $queueClass(); $this->queues[] = new $queueClass();
} }

View File

@@ -6,6 +6,7 @@ namespace Siteworxpro\App\Cli\Commands\Queue;
use Ahc\Cli\Input\Command; use Ahc\Cli\Input\Command;
use Siteworxpro\App\Async\Consumer; use Siteworxpro\App\Async\Consumer;
use Siteworxpro\App\Async\Messages\SayHelloMessage;
use Siteworxpro\App\Cli\Commands\CommandInterface; use Siteworxpro\App\Cli\Commands\CommandInterface;
class Start extends Command implements CommandInterface class Start extends Command implements CommandInterface
@@ -13,14 +14,19 @@ class Start extends Command implements CommandInterface
public function __construct() public function __construct()
{ {
parent::__construct('queue:start', 'Start the queue consumer to process messages.'); parent::__construct('queue:start', 'Start the queue consumer to process messages.');
$this->argument('[queues]', 'The name of the queue to consume from. ex. "first_queue,second_queue"');
$this->argument('[name]', 'Your name')
->option('-g, --greet', 'Include a greeting message');
} }
public function execute(): int public function execute(): int
{ {
$consumer = new Consumer(); $queues = [];
if ($this->values()['queues'] !== null) {
$queues = explode(',', $this->values()['queues']);
}
SayHelloMessage::dispatch("hello from queue consumer!");
$consumer = new Consumer($queues);
$consumer->start(); $consumer->start();
return 0; return 0;

View File

@@ -14,7 +14,7 @@ class BrokerServiceProvider extends ServiceProvider
{ {
$this->app->singleton(Broker::class, function (): Broker { $this->app->singleton(Broker::class, function (): Broker {
$configName = Config::get('queue.broker'); $configName = Config::get('queue.broker');
$brokerConfig = Config::get('queue.' . $configName) ?? []; $brokerConfig = Config::get('queue.broker_config.' . $configName) ?? [];
$brokerClass = Broker::BROKER_TYPES[$configName] ?? null; $brokerClass = Broker::BROKER_TYPES[$configName] ?? null;