All checks were successful
🚀 Publish Release Package / publish (push) Successful in 2m10s
130 lines
2.6 KiB
Go
130 lines
2.6 KiB
Go
package log
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
|
|
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
|
|
"sort"
|
|
"time"
|
|
)
|
|
|
|
type CloudWatch struct {
|
|
Handler
|
|
client *cloudwatchlogs.Client
|
|
buffer []logFormat
|
|
opts Options
|
|
nextSequenceToken *string
|
|
}
|
|
|
|
type Options struct {
|
|
GroupName string
|
|
StreamName string
|
|
BufferLimit int
|
|
}
|
|
|
|
func NewCloudWatch(client *cloudwatchlogs.Client, opts Options) *CloudWatch {
|
|
|
|
c := &CloudWatch{
|
|
client: client,
|
|
opts: opts,
|
|
buffer: make([]logFormat, 0),
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
time.Sleep(10 * time.Second)
|
|
c.flush()
|
|
}
|
|
}()
|
|
|
|
return c
|
|
}
|
|
|
|
func NewCloudWatchWithDefaultClient(opts Options) (*CloudWatch, error) {
|
|
cfg, err := config.LoadDefaultConfig(context.TODO())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client := cloudwatchlogs.NewFromConfig(cfg)
|
|
|
|
stream, err := client.DescribeLogStreams(context.TODO(), &cloudwatchlogs.DescribeLogStreamsInput{
|
|
LogGroupName: &opts.GroupName,
|
|
LogStreamNamePrefix: &opts.StreamName,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(stream.LogStreams) == 0 {
|
|
_, err = client.CreateLogStream(context.TODO(), &cloudwatchlogs.CreateLogStreamInput{
|
|
LogGroupName: &opts.GroupName,
|
|
LogStreamName: &opts.StreamName,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return NewCloudWatch(client, opts), nil
|
|
}
|
|
|
|
func (c *CloudWatch) Log(msg logFormat) {
|
|
c.buffer = append(c.buffer, msg)
|
|
|
|
if len(c.buffer) >= c.opts.BufferLimit {
|
|
c.flush()
|
|
}
|
|
}
|
|
|
|
func (c *CloudWatch) flush() {
|
|
if c.buffer == nil || len(c.buffer) == 0 {
|
|
return
|
|
}
|
|
|
|
res, err := c.client.PutLogEvents(context.TODO(), &cloudwatchlogs.PutLogEventsInput{
|
|
LogEvents: c.toLogEvents(c.buffer),
|
|
LogGroupName: &c.opts.GroupName,
|
|
LogStreamName: &c.opts.StreamName,
|
|
SequenceToken: c.nextSequenceToken,
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
c.nextSequenceToken = res.NextSequenceToken
|
|
|
|
c.buffer = []logFormat{}
|
|
}
|
|
|
|
func (c *CloudWatch) toLogEvents(logs []logFormat) []types.InputLogEvent {
|
|
events := make([]types.InputLogEvent, len(logs))
|
|
|
|
for i, log := range logs {
|
|
d, err := json.Marshal(log)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
t, err := time.Parse(time.RFC3339Nano, log.DateTime)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
events[i] = types.InputLogEvent{
|
|
Message: aws.String(string(d)),
|
|
Timestamp: aws.Int64(t.UnixMilli()),
|
|
}
|
|
}
|
|
|
|
// sort events by timestamp
|
|
sort.Slice(events, func(i, j int) bool {
|
|
return *events[i].Timestamp < *events[j].Timestamp
|
|
})
|
|
|
|
return events
|
|
}
|