You've already forked monolog-handlers
back to working
This commit is contained in:
411
src/Handler/CloudWatch.php
Executable file
411
src/Handler/CloudWatch.php
Executable file
@@ -0,0 +1,411 @@
|
||||
<?php
|
||||
|
||||
namespace Siteworx\MonologHandlers\Handler;
|
||||
|
||||
use Aws\CloudWatchLogs\CloudWatchLogsClient;
|
||||
use Aws\CloudWatchLogs\Exception\CloudWatchLogsException;
|
||||
use Monolog\Formatter\FormatterInterface;
|
||||
use Monolog\Formatter\LineFormatter;
|
||||
use Monolog\Handler\AbstractProcessingHandler;
|
||||
use Monolog\Level;
|
||||
use Monolog\LogRecord;
|
||||
|
||||
class CloudWatch extends AbstractProcessingHandler
|
||||
{
|
||||
/**
|
||||
* Requests per second limit (https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html)
|
||||
*/
|
||||
const RPS_LIMIT = 5;
|
||||
|
||||
/**
|
||||
* Event size limit (https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html)
|
||||
*
|
||||
* @var int
|
||||
*/
|
||||
const EVENT_SIZE_LIMIT = 262118; // 262144 - reserved 26
|
||||
|
||||
/**
|
||||
* The batch of log events in a single PutLogEvents request cannot span more than 24 hours.
|
||||
*/
|
||||
const TIMESPAN_LIMIT = 86400000;
|
||||
|
||||
private CloudWatchLogsClient $client;
|
||||
|
||||
private string $group;
|
||||
|
||||
private string $stream;
|
||||
|
||||
private int $retention;
|
||||
|
||||
private bool $initialized = false;
|
||||
|
||||
private ?string $sequenceToken = null;
|
||||
|
||||
private int $batchSize;
|
||||
|
||||
private array $buffer = [];
|
||||
|
||||
private array $tags = [];
|
||||
|
||||
private bool $createGroup;
|
||||
|
||||
/**
|
||||
* Data amount limit (http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html)
|
||||
*/
|
||||
private int $dataAmountLimit = 1048576;
|
||||
|
||||
private int $currentDataAmount = 0;
|
||||
|
||||
private int $remainingRequests = self::RPS_LIMIT;
|
||||
|
||||
private \DateTime $savedTime;
|
||||
|
||||
private ?int $earliestTimestamp = null;
|
||||
|
||||
/**
|
||||
* CloudWatchLogs constructor.
|
||||
* @param CloudWatchLogsClient $client
|
||||
*
|
||||
* Log group names must be unique within a region for an AWS account.
|
||||
* Log group names can be between 1 and 512 characters long.
|
||||
* Log group names consist of the following characters: a-z, A-Z, 0-9, '_' (underscore), '-' (hyphen),
|
||||
* '/' (forward slash), and '.' (period).
|
||||
* @param string $group
|
||||
*
|
||||
* Log stream names must be unique within the log group.
|
||||
* Log stream names can be between 1 and 512 characters long.
|
||||
* The ':' (colon) and '*' (asterisk) characters are not allowed.
|
||||
* @param string $stream
|
||||
*
|
||||
* @param int $retention
|
||||
* @param int $batchSize
|
||||
* @param array $tags
|
||||
* @param Level $level
|
||||
* @param bool $bubble
|
||||
* @param bool $createGroup
|
||||
*
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function __construct(
|
||||
CloudWatchLogsClient $client,
|
||||
$group,
|
||||
string $stream,
|
||||
int $retention = 14,
|
||||
int $batchSize = 10000,
|
||||
array $tags = [],
|
||||
Level $level = Level::Debug,
|
||||
bool $bubble = true,
|
||||
bool $createGroup = true
|
||||
) {
|
||||
if ($batchSize > 10000) {
|
||||
throw new \InvalidArgumentException('Batch size can not be greater than 10000');
|
||||
}
|
||||
|
||||
$this->client = $client;
|
||||
$this->group = $group;
|
||||
$this->stream = $stream;
|
||||
$this->retention = $retention;
|
||||
$this->batchSize = $batchSize;
|
||||
$this->tags = $tags;
|
||||
$this->createGroup = $createGroup;
|
||||
|
||||
parent::__construct($level, $bubble);
|
||||
|
||||
$this->savedTime = new \DateTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
* @throws \DateMalformedStringException
|
||||
*/
|
||||
protected function write(LogRecord $record): void
|
||||
{
|
||||
|
||||
$records = $this->formatRecords($record);
|
||||
|
||||
foreach ($records as $record) {
|
||||
if ($this->willMessageSizeExceedLimit($record) || $this->willMessageTimestampExceedLimit($record)) {
|
||||
$this->flushBuffer();
|
||||
}
|
||||
|
||||
$this->addToBuffer($record);
|
||||
|
||||
if (count($this->buffer) >= $this->batchSize) {
|
||||
$this->flushBuffer();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param LogRecord $record
|
||||
*/
|
||||
private function addToBuffer(LogRecord $record): void
|
||||
{
|
||||
$this->currentDataAmount += $this->getMessageSize($record);
|
||||
|
||||
$timestamp = $record->datetime->getTimestamp();
|
||||
|
||||
if (!$this->earliestTimestamp || $timestamp < $this->earliestTimestamp) {
|
||||
$this->earliestTimestamp = $timestamp;
|
||||
}
|
||||
|
||||
$this->buffer[] = $record;
|
||||
}
|
||||
|
||||
private function flushBuffer(): void
|
||||
{
|
||||
if (!empty($this->buffer)) {
|
||||
if (false === $this->initialized) {
|
||||
$this->initialize();
|
||||
}
|
||||
|
||||
// send items, retry once with a fresh sequence token
|
||||
try {
|
||||
$this->send($this->buffer);
|
||||
} catch (CloudWatchLogsException $e) {
|
||||
$this->refreshSequenceToken();
|
||||
$this->send($this->buffer);
|
||||
}
|
||||
|
||||
// clear buffer
|
||||
$this->buffer = [];
|
||||
|
||||
// clear the earliest timestamp
|
||||
$this->earliestTimestamp = null;
|
||||
|
||||
// clear data amount
|
||||
$this->currentDataAmount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private function checkThrottle(): void
|
||||
{
|
||||
$current = new \DateTime();
|
||||
$diff = $current->diff($this->savedTime)->s;
|
||||
$sameSecond = $diff === 0;
|
||||
|
||||
if ($sameSecond && $this->remainingRequests > 0) {
|
||||
$this->remainingRequests--;
|
||||
} elseif ($sameSecond && $this->remainingRequests === 0) {
|
||||
sleep(1);
|
||||
$this->remainingRequests = self::RPS_LIMIT;
|
||||
} elseif (!$sameSecond) {
|
||||
$this->remainingRequests = self::RPS_LIMIT;
|
||||
}
|
||||
|
||||
$this->savedTime = new \DateTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
|
||||
*
|
||||
* @param LogRecord $record
|
||||
* @return int
|
||||
*/
|
||||
private function getMessageSize(LogRecord $record): int
|
||||
{
|
||||
return strlen($record->message) + 26;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the specified record's message size in addition to the
|
||||
* size of the current queued messages will exceed AWS CloudWatch's limit.
|
||||
*
|
||||
* @param LogRecord $record
|
||||
* @return bool
|
||||
*/
|
||||
protected function willMessageSizeExceedLimit(LogRecord $record): bool
|
||||
{
|
||||
return $this->currentDataAmount + $this->getMessageSize($record) >= $this->dataAmountLimit;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the specified record's timestamp exceeds the 24 hour timespan limit
|
||||
* for all batched messages written in a single call to PutLogEvents.
|
||||
*
|
||||
* @param LogRecord $record
|
||||
* @return bool
|
||||
*/
|
||||
protected function willMessageTimestampExceedLimit(LogRecord $record): bool
|
||||
{
|
||||
return $this->earliestTimestamp && $record->datetime->getTimestamp() - $this->earliestTimestamp > self::TIMESPAN_LIMIT;
|
||||
}
|
||||
|
||||
/**
|
||||
* Event size in the batch can not be bigger than 256 KB
|
||||
* https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
|
||||
*
|
||||
* @param LogRecord $entry
|
||||
* @return LogRecord[]
|
||||
*/
|
||||
private function formatRecords(LogRecord $entry): array
|
||||
{
|
||||
$entries = str_split($entry->formatted, self::EVENT_SIZE_LIMIT);
|
||||
$timestamp = $entry->datetime->format('U.u') * 1000;
|
||||
$format = str_contains($timestamp, '.') ? 'U.u' : 'U';
|
||||
|
||||
$dateTimeImmutable = \DateTimeImmutable::createFromFormat($format, $timestamp);
|
||||
$records = [];
|
||||
|
||||
foreach ($entries as $e) {
|
||||
$records[] = new LogRecord($dateTimeImmutable, $entry->channel, $entry->level, $e, $entry->context);
|
||||
}
|
||||
|
||||
return $records;
|
||||
}
|
||||
|
||||
/**
|
||||
* The batch of events must satisfy the following constraints:
|
||||
* - The maximum batch size is 1,048,576 bytes, and this size is calculated as the sum of all event messages in
|
||||
* UTF-8, plus 26 bytes for each log event.
|
||||
* - None of the log events in the batch can be more than 2 hours in the future.
|
||||
* - None of the log events in the batch can be older than 14 days or the retention period of the log group.
|
||||
* - The log events in the batch must be in chronological ordered by their timestamp (the time the event occurred,
|
||||
* expressed as the number of milliseconds since Jan 1, 1970 00:00:00 UTC).
|
||||
* - The maximum number of log events in a batch is 10,000.
|
||||
* - A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails.
|
||||
*
|
||||
* @param array $entries
|
||||
*
|
||||
* @throws \Aws\CloudWatchLogs\Exception\CloudWatchLogsException Thrown by putLogEvents for example in case of an
|
||||
* invalid sequence token
|
||||
*/
|
||||
private function send(array $entries): void
|
||||
{
|
||||
// AWS expects to receive entries in chronological order...
|
||||
usort($entries, static function (LogRecord $a, LogRecord $b) {
|
||||
if ($a->datetime->getTimestamp() < $b->datetime->getTimestamp()) {
|
||||
return -1;
|
||||
} elseif ($a->datetime->getTimestamp() > $b->datetime->getTimestamp()) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
});
|
||||
|
||||
$data = [
|
||||
'logGroupName' => $this->group,
|
||||
'logStreamName' => $this->stream,
|
||||
'logEvents' => $entries
|
||||
];
|
||||
|
||||
if (!empty($this->sequenceToken)) {
|
||||
$data['sequenceToken'] = $this->sequenceToken;
|
||||
}
|
||||
|
||||
$this->checkThrottle();
|
||||
|
||||
$response = $this->client->putLogEvents($data);
|
||||
|
||||
$this->sequenceToken = $response->get('nextSequenceToken');
|
||||
}
|
||||
|
||||
private function initializeGroup(): void
|
||||
{
|
||||
// fetch existing groups
|
||||
$existingGroups =
|
||||
$this
|
||||
->client
|
||||
->describeLogGroups(['logGroupNamePrefix' => $this->group])
|
||||
->get('logGroups');
|
||||
|
||||
// extract existing groups names
|
||||
$existingGroupsNames = array_map(
|
||||
function ($group) {
|
||||
return $group['logGroupName'];
|
||||
},
|
||||
$existingGroups
|
||||
);
|
||||
|
||||
// create group and set retention policy if not created yet
|
||||
if (!in_array($this->group, $existingGroupsNames, true)) {
|
||||
$createLogGroupArguments = ['logGroupName' => $this->group];
|
||||
|
||||
if (!empty($this->tags)) {
|
||||
$createLogGroupArguments['tags'] = $this->tags;
|
||||
}
|
||||
|
||||
$this
|
||||
->client
|
||||
->createLogGroup($createLogGroupArguments);
|
||||
|
||||
if ($this->retention !== null) {
|
||||
$this
|
||||
->client
|
||||
->putRetentionPolicy(
|
||||
[
|
||||
'logGroupName' => $this->group,
|
||||
'retentionInDays' => $this->retention,
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private function initialize(): void
|
||||
{
|
||||
if ($this->createGroup) {
|
||||
$this->initializeGroup();
|
||||
}
|
||||
|
||||
$this->refreshSequenceToken();
|
||||
}
|
||||
|
||||
private function refreshSequenceToken(): void
|
||||
{
|
||||
// fetch existing streams
|
||||
$existingStreams =
|
||||
$this
|
||||
->client
|
||||
->describeLogStreams(
|
||||
[
|
||||
'logGroupName' => $this->group,
|
||||
'logStreamNamePrefix' => $this->stream,
|
||||
]
|
||||
)->get('logStreams');
|
||||
|
||||
// extract existing streams names
|
||||
$existingStreamsNames = array_map(
|
||||
function ($stream) {
|
||||
|
||||
// set sequence token
|
||||
if ($stream['logStreamName'] === $this->stream && isset($stream['uploadSequenceToken'])) {
|
||||
$this->sequenceToken = $stream['uploadSequenceToken'];
|
||||
}
|
||||
|
||||
return $stream['logStreamName'];
|
||||
},
|
||||
$existingStreams
|
||||
);
|
||||
|
||||
// create stream if not created
|
||||
if (!in_array($this->stream, $existingStreamsNames, true)) {
|
||||
$this
|
||||
->client
|
||||
->createLogStream(
|
||||
[
|
||||
'logGroupName' => $this->group,
|
||||
'logStreamName' => $this->stream
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
$this->initialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function getDefaultFormatter(): FormatterInterface
|
||||
{
|
||||
return new LineFormatter("%channel%: %level_name%: %message% %context% %extra%", null, false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
$this->flushBuffer();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user