You've already forked logger
add initial implementation of golang_logger with CloudWatch and StdErr handlers
All checks were successful
🚀 Publish Release Package / publish (push) Successful in 26s
All checks were successful
🚀 Publish Release Package / publish (push) Successful in 26s
This commit is contained in:
129
cloudwatch.go
Normal file
129
cloudwatch.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package golang_logger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
|
||||
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
type CloudWatch struct {
|
||||
Handler
|
||||
client *cloudwatchlogs.Client
|
||||
buffer []logFormat
|
||||
opts Options
|
||||
nextSequenceToken *string
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
GroupName string
|
||||
StreamName string
|
||||
BufferLimit int
|
||||
}
|
||||
|
||||
func NewCloudWatch(client *cloudwatchlogs.Client, opts Options) *CloudWatch {
|
||||
|
||||
c := &CloudWatch{
|
||||
client: client,
|
||||
opts: opts,
|
||||
buffer: make([]logFormat, 0),
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(10 * time.Second)
|
||||
c.flush()
|
||||
}
|
||||
}()
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func NewCloudWatchWithDefaultClient(opts Options) (*CloudWatch, error) {
|
||||
cfg, err := config.LoadDefaultConfig(context.TODO())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := cloudwatchlogs.NewFromConfig(cfg)
|
||||
|
||||
stream, err := client.DescribeLogStreams(context.TODO(), &cloudwatchlogs.DescribeLogStreamsInput{
|
||||
LogGroupName: &opts.GroupName,
|
||||
LogStreamNamePrefix: &opts.StreamName,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(stream.LogStreams) == 0 {
|
||||
_, err = client.CreateLogStream(context.TODO(), &cloudwatchlogs.CreateLogStreamInput{
|
||||
LogGroupName: &opts.GroupName,
|
||||
LogStreamName: &opts.StreamName,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return NewCloudWatch(client, opts), nil
|
||||
}
|
||||
|
||||
func (c *CloudWatch) Log(msg logFormat) {
|
||||
c.buffer = append(c.buffer, msg)
|
||||
|
||||
if len(c.buffer) >= c.opts.BufferLimit {
|
||||
c.flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CloudWatch) flush() {
|
||||
if c.buffer == nil || len(c.buffer) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
res, err := c.client.PutLogEvents(context.TODO(), &cloudwatchlogs.PutLogEventsInput{
|
||||
LogEvents: c.toLogEvents(c.buffer),
|
||||
LogGroupName: &c.opts.GroupName,
|
||||
LogStreamName: &c.opts.StreamName,
|
||||
SequenceToken: c.nextSequenceToken,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c.nextSequenceToken = res.NextSequenceToken
|
||||
|
||||
c.buffer = []logFormat{}
|
||||
}
|
||||
|
||||
func (c *CloudWatch) toLogEvents(logs []logFormat) []types.InputLogEvent {
|
||||
events := make([]types.InputLogEvent, len(logs))
|
||||
|
||||
for i, log := range logs {
|
||||
d, err := json.Marshal(log)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
t, err := time.Parse(time.RFC3339Nano, log.DateTime)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
events[i] = types.InputLogEvent{
|
||||
Message: aws.String(string(d)),
|
||||
Timestamp: aws.Int64(t.UnixMilli()),
|
||||
}
|
||||
}
|
||||
|
||||
// sort events by timestamp
|
||||
sort.Slice(events, func(i, j int) bool {
|
||||
return *events[i].Timestamp < *events[j].Timestamp
|
||||
})
|
||||
|
||||
return events
|
||||
}
|
||||
Reference in New Issue
Block a user