10000) { throw new \InvalidArgumentException('Batch size can not be greater than 10000'); } $this->client = $client; $this->group = $group; $this->stream = $stream; $this->retention = $retention; $this->batchSize = $batchSize; $this->tags = $tags; $this->createGroup = $createGroup; parent::__construct($level, $bubble); $this->savedTime = new \DateTime; } /** * {@inheritdoc} * @throws \DateMalformedStringException */ protected function write(LogRecord $record): void { $records = $this->formatRecords($record); foreach ($records as $record) { if ($this->willMessageSizeExceedLimit($record) || $this->willMessageTimestampExceedLimit($record)) { $this->flushBuffer(); } $this->addToBuffer($record); if (count($this->buffer) >= $this->batchSize) { $this->flushBuffer(); } } } /** * @param LogRecord $record */ private function addToBuffer(LogRecord $record): void { $this->currentDataAmount += $this->getMessageSize($record); $timestamp = $record->datetime->getTimestamp(); if (!$this->earliestTimestamp || $timestamp < $this->earliestTimestamp) { $this->earliestTimestamp = $timestamp; } $this->buffer[] = $record; } private function flushBuffer(): void { if (!empty($this->buffer)) { if (false === $this->initialized) { $this->initialize(); } // send items, retry once with a fresh sequence token try { $this->send($this->buffer); } catch (CloudWatchLogsException $e) { $this->refreshSequenceToken(); $this->send($this->buffer); } // clear buffer $this->buffer = []; // clear the earliest timestamp $this->earliestTimestamp = null; // clear data amount $this->currentDataAmount = 0; } } private function checkThrottle(): void { $current = new \DateTime(); $diff = $current->diff($this->savedTime)->s; $sameSecond = $diff === 0; if ($sameSecond && $this->remainingRequests > 0) { $this->remainingRequests--; } elseif ($sameSecond && $this->remainingRequests === 0) { sleep(1); $this->remainingRequests = self::RPS_LIMIT; } elseif (!$sameSecond) { $this->remainingRequests = self::RPS_LIMIT; } $this->savedTime = new \DateTime(); } /** * http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html * * @param LogRecord $record * @return int */ private function getMessageSize(LogRecord $record): int { return strlen($record->message) + 26; } /** * Determine whether the specified record's message size in addition to the * size of the current queued messages will exceed AWS CloudWatch's limit. * * @param LogRecord $record * @return bool */ protected function willMessageSizeExceedLimit(LogRecord $record): bool { return $this->currentDataAmount + $this->getMessageSize($record) >= $this->dataAmountLimit; } /** * Determine whether the specified record's timestamp exceeds the 24 hour timespan limit * for all batched messages written in a single call to PutLogEvents. * * @param LogRecord $record * @return bool */ protected function willMessageTimestampExceedLimit(LogRecord $record): bool { return $this->earliestTimestamp && $record->datetime->getTimestamp() - $this->earliestTimestamp > self::TIMESPAN_LIMIT; } /** * Event size in the batch can not be bigger than 256 KB * https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html * * @param LogRecord $entry * @return LogRecord[] */ private function formatRecords(LogRecord $entry): array { $entries = str_split($entry->formatted, self::EVENT_SIZE_LIMIT); $timestamp = $entry->datetime->format('U.u') * 1000; $format = str_contains($timestamp, '.') ? 'U.u' : 'U'; $dateTimeImmutable = \DateTimeImmutable::createFromFormat($format, $timestamp); $records = []; foreach ($entries as $e) { $records[] = new LogRecord($dateTimeImmutable, $entry->channel, $entry->level, $e, $entry->context); } return $records; } /** * The batch of events must satisfy the following constraints: * - The maximum batch size is 1,048,576 bytes, and this size is calculated as the sum of all event messages in * UTF-8, plus 26 bytes for each log event. * - None of the log events in the batch can be more than 2 hours in the future. * - None of the log events in the batch can be older than 14 days or the retention period of the log group. * - The log events in the batch must be in chronological ordered by their timestamp (the time the event occurred, * expressed as the number of milliseconds since Jan 1, 1970 00:00:00 UTC). * - The maximum number of log events in a batch is 10,000. * - A batch of log events in a single request cannot span more than 24 hours. Otherwise, the operation fails. * * @param array $entries * * @throws \Aws\CloudWatchLogs\Exception\CloudWatchLogsException Thrown by putLogEvents for example in case of an * invalid sequence token */ private function send(array $entries): void { // AWS expects to receive entries in chronological order... usort($entries, static function (LogRecord $a, LogRecord $b) { if ($a->datetime->getTimestamp() < $b->datetime->getTimestamp()) { return -1; } elseif ($a->datetime->getTimestamp() > $b->datetime->getTimestamp()) { return 1; } return 0; }); $data = [ 'logGroupName' => $this->group, 'logStreamName' => $this->stream, 'logEvents' => $entries ]; if (!empty($this->sequenceToken)) { $data['sequenceToken'] = $this->sequenceToken; } $this->checkThrottle(); /** * @var int $key * @var LogRecord $logEvent */ foreach ($data['logEvents'] as $key => $logEvent) { $data['logEvents'][$key] = [ 'message' => $logEvent->message, 'timestamp' => $logEvent->datetime->getTimestamp() ]; } $response = $this->client->putLogEvents($data); $this->sequenceToken = $response->get('nextSequenceToken'); } private function initializeGroup(): void { // fetch existing groups $existingGroups = $this ->client ->describeLogGroups(['logGroupNamePrefix' => $this->group]) ->get('logGroups'); // extract existing groups names $existingGroupsNames = array_map( function ($group) { return $group['logGroupName']; }, $existingGroups ); // create group and set retention policy if not created yet if (!in_array($this->group, $existingGroupsNames, true)) { $createLogGroupArguments = ['logGroupName' => $this->group]; if (!empty($this->tags)) { $createLogGroupArguments['tags'] = $this->tags; } $this ->client ->createLogGroup($createLogGroupArguments); if ($this->retention !== null) { $this ->client ->putRetentionPolicy( [ 'logGroupName' => $this->group, 'retentionInDays' => $this->retention, ] ); } } } private function initialize(): void { if ($this->createGroup) { $this->initializeGroup(); } $this->refreshSequenceToken(); } private function refreshSequenceToken(): void { // fetch existing streams $existingStreams = $this ->client ->describeLogStreams( [ 'logGroupName' => $this->group, 'logStreamNamePrefix' => $this->stream, ] )->get('logStreams'); // extract existing streams names $existingStreamsNames = array_map( function ($stream) { // set sequence token if ($stream['logStreamName'] === $this->stream && isset($stream['uploadSequenceToken'])) { $this->sequenceToken = $stream['uploadSequenceToken']; } return $stream['logStreamName']; }, $existingStreams ); // create stream if not created if (!in_array($this->stream, $existingStreamsNames, true)) { $this ->client ->createLogStream( [ 'logGroupName' => $this->group, 'logStreamName' => $this->stream ] ); } $this->initialized = true; } /** * {@inheritdoc} */ protected function getDefaultFormatter(): FormatterInterface { return new LineFormatter("%channel%: %level_name%: %message% %context% %extra%", null, false, true); } /** * {@inheritdoc} */ public function close(): void { $this->flushBuffer(); } }