package golang_logger import ( "context" "encoding/json" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" "sort" "time" ) type CloudWatch struct { Handler client *cloudwatchlogs.Client buffer []logFormat opts Options nextSequenceToken *string } type Options struct { GroupName string StreamName string BufferLimit int } func NewCloudWatch(client *cloudwatchlogs.Client, opts Options) *CloudWatch { c := &CloudWatch{ client: client, opts: opts, buffer: make([]logFormat, 0), } go func() { for { time.Sleep(10 * time.Second) c.flush() } }() return c } func NewCloudWatchWithDefaultClient(opts Options) (*CloudWatch, error) { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { return nil, err } client := cloudwatchlogs.NewFromConfig(cfg) stream, err := client.DescribeLogStreams(context.TODO(), &cloudwatchlogs.DescribeLogStreamsInput{ LogGroupName: &opts.GroupName, LogStreamNamePrefix: &opts.StreamName, }) if err != nil { return nil, err } if len(stream.LogStreams) == 0 { _, err = client.CreateLogStream(context.TODO(), &cloudwatchlogs.CreateLogStreamInput{ LogGroupName: &opts.GroupName, LogStreamName: &opts.StreamName, }) if err != nil { return nil, err } } return NewCloudWatch(client, opts), nil } func (c *CloudWatch) Log(msg logFormat) { c.buffer = append(c.buffer, msg) if len(c.buffer) >= c.opts.BufferLimit { c.flush() } } func (c *CloudWatch) flush() { if c.buffer == nil || len(c.buffer) == 0 { return } res, err := c.client.PutLogEvents(context.TODO(), &cloudwatchlogs.PutLogEventsInput{ LogEvents: c.toLogEvents(c.buffer), LogGroupName: &c.opts.GroupName, LogStreamName: &c.opts.StreamName, SequenceToken: c.nextSequenceToken, }) if err != nil { panic(err) } c.nextSequenceToken = res.NextSequenceToken c.buffer = []logFormat{} } func (c *CloudWatch) toLogEvents(logs []logFormat) []types.InputLogEvent { events := make([]types.InputLogEvent, len(logs)) for i, log := range logs { d, err := json.Marshal(log) if err != nil { continue } t, err := time.Parse(time.RFC3339Nano, log.DateTime) if err != nil { continue } events[i] = types.InputLogEvent{ Message: aws.String(string(d)), Timestamp: aws.Int64(t.UnixMilli()), } } // sort events by timestamp sort.Slice(events, func(i, j int) bool { return *events[i].Timestamp < *events[j].Timestamp }) return events }