Files
logger/cloudwatch.go
Ron Rise d09d40877c
All checks were successful
🚀 Publish Release Package / publish (push) Successful in 2m10s
refactor package name from golang_logger to log
2025-06-18 18:15:03 -04:00

130 lines
2.6 KiB
Go

package log
import (
"context"
"encoding/json"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"sort"
"time"
)
type CloudWatch struct {
Handler
client *cloudwatchlogs.Client
buffer []logFormat
opts Options
nextSequenceToken *string
}
type Options struct {
GroupName string
StreamName string
BufferLimit int
}
func NewCloudWatch(client *cloudwatchlogs.Client, opts Options) *CloudWatch {
c := &CloudWatch{
client: client,
opts: opts,
buffer: make([]logFormat, 0),
}
go func() {
for {
time.Sleep(10 * time.Second)
c.flush()
}
}()
return c
}
func NewCloudWatchWithDefaultClient(opts Options) (*CloudWatch, error) {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, err
}
client := cloudwatchlogs.NewFromConfig(cfg)
stream, err := client.DescribeLogStreams(context.TODO(), &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: &opts.GroupName,
LogStreamNamePrefix: &opts.StreamName,
})
if err != nil {
return nil, err
}
if len(stream.LogStreams) == 0 {
_, err = client.CreateLogStream(context.TODO(), &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: &opts.GroupName,
LogStreamName: &opts.StreamName,
})
if err != nil {
return nil, err
}
}
return NewCloudWatch(client, opts), nil
}
func (c *CloudWatch) Log(msg logFormat) {
c.buffer = append(c.buffer, msg)
if len(c.buffer) >= c.opts.BufferLimit {
c.flush()
}
}
func (c *CloudWatch) flush() {
if c.buffer == nil || len(c.buffer) == 0 {
return
}
res, err := c.client.PutLogEvents(context.TODO(), &cloudwatchlogs.PutLogEventsInput{
LogEvents: c.toLogEvents(c.buffer),
LogGroupName: &c.opts.GroupName,
LogStreamName: &c.opts.StreamName,
SequenceToken: c.nextSequenceToken,
})
if err != nil {
panic(err)
}
c.nextSequenceToken = res.NextSequenceToken
c.buffer = []logFormat{}
}
func (c *CloudWatch) toLogEvents(logs []logFormat) []types.InputLogEvent {
events := make([]types.InputLogEvent, len(logs))
for i, log := range logs {
d, err := json.Marshal(log)
if err != nil {
continue
}
t, err := time.Parse(time.RFC3339Nano, log.DateTime)
if err != nil {
continue
}
events[i] = types.InputLogEvent{
Message: aws.String(string(d)),
Timestamp: aws.Int64(t.UnixMilli()),
}
}
// sort events by timestamp
sort.Slice(events, func(i, j int) bool {
return *events[i].Timestamp < *events[j].Timestamp
})
return events
}