Skip to content

Commit

Permalink
Make loki syncer use internal atomic map
Browse files Browse the repository at this point in the history
  • Loading branch information
dongxuny committed Jan 29, 2022
1 parent a3e692c commit 86ee807
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 96 deletions.
238 changes: 146 additions & 92 deletions syncer_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ func isValidLabelName(name string) bool {
}

// LokiSyncerOption options for lokiSyncer
type LokiSyncerOption func(syncer *lokiSyncer)
type LokiSyncerOption func(syncer *LokiSyncer)

// WithLokiAddr provide loki address
func WithLokiAddr(addr string) LokiSyncerOption {
return func(syncer *lokiSyncer) {
return func(syncer *LokiSyncer) {
if len(addr) > 0 {
syncer.addr = addr
}
Expand All @@ -41,7 +41,7 @@ func WithLokiAddr(addr string) LokiSyncerOption {

// WithLokiPath provide loki path
func WithLokiPath(in string) LokiSyncerOption {
return func(syncer *lokiSyncer) {
return func(syncer *LokiSyncer) {
if len(in) > 0 {
syncer.path = in
}
Expand All @@ -50,37 +50,37 @@ func WithLokiPath(in string) LokiSyncerOption {

// WithLokiUsername provide loki username
func WithLokiUsername(name string) LokiSyncerOption {
return func(syncer *lokiSyncer) {
return func(syncer *LokiSyncer) {
syncer.username = name
}
}

// WithLokiPassword provide loki password
func WithLokiPassword(pass string) LokiSyncerOption {
return func(syncer *lokiSyncer) {
return func(syncer *LokiSyncer) {
syncer.password = pass
}
}

// WithLokiClientTls provide loki http client TLS config
func WithLokiClientTls(conf *tls.Config) LokiSyncerOption {
return func(syncer *lokiSyncer) {
return func(syncer *LokiSyncer) {
syncer.tlsConfig = conf
}
}

// WithLokiLabel provide labels, should follow isValidLabelName()
func WithLokiLabel(key, value string) LokiSyncerOption {
return func(syncer *lokiSyncer) {
if len(key) > 0 && len(value) > 0 {
syncer.labels[key] = value
return func(syncer *LokiSyncer) {
if len(key) > 0 && len(value) > 0 && isValidLabelName(key) {
syncer.labels.Set(key, value)
}
}
}

// WithLokiMaxBatchWaitMs provide max batch wait time in milli
func WithLokiMaxBatchWaitMs(in time.Duration) LokiSyncerOption {
return func(syncer *lokiSyncer) {
return func(syncer *LokiSyncer) {
if in.Milliseconds() > 0 {
syncer.maxBatchWaitMs = in
}
Expand All @@ -89,19 +89,19 @@ func WithLokiMaxBatchWaitMs(in time.Duration) LokiSyncerOption {

// WithLokiMaxBatchSize provide max batch size
func WithLokiMaxBatchSize(batchSize int) LokiSyncerOption {
return func(syncer *lokiSyncer) {
return func(syncer *LokiSyncer) {
if batchSize > 0 {
syncer.maxBatchSize = batchSize
}
}
}

// NewLokiSyncer create new lokiSyncer
func NewLokiSyncer(opts ...LokiSyncerOption) *lokiSyncer {
syncer := &lokiSyncer{
func NewLokiSyncer(opts ...LokiSyncerOption) *LokiSyncer {
syncer := &LokiSyncer{
addr: "localhost:3100",
path: "/loki/api/v1/push",
labels: make(map[string]string),
labels: newAtomicMap(),
maxBatchWaitMs: 3000 * time.Millisecond,
maxBatchSize: 1000,
quitChannel: make(chan struct{}),
Expand All @@ -113,12 +113,7 @@ func NewLokiSyncer(opts ...LokiSyncerOption) *lokiSyncer {
}

// convert label key if illegal
syncer.labels["rk_logger"] = "v1"
for k := range syncer.labels {
if !isValidLabelName(k) {
delete(syncer.labels, k)
}
}
syncer.labels.Set("rk_logger", "v1")

// init http client
syncer.initHttpClient()
Expand All @@ -129,13 +124,11 @@ func NewLokiSyncer(opts ...LokiSyncerOption) *lokiSyncer {
// add wait group
syncer.waitGroup.Add(1)

go syncer.run()

return syncer
}

// Init http client
func (syncer *lokiSyncer) initHttpClient() {
func (syncer *LokiSyncer) initHttpClient() {
// adjust loki addr
strings.TrimPrefix(syncer.addr, "http://")
strings.TrimPrefix(syncer.addr, "https://")
Expand All @@ -154,70 +147,32 @@ func (syncer *lokiSyncer) initHttpClient() {
}

// Init basic auth header
func (syncer *lokiSyncer) initBasicAuth() {
func (syncer *LokiSyncer) initBasicAuth() {
if len(syncer.username) > 0 && len(syncer.password) > 0 {
auth := syncer.username + ":" + syncer.password
syncer.basicAuthHeader = "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
}
}

// Loki syncer which will periodically send logs to Loki
type lokiSyncer struct {
addr string `yaml:"addr" json:"addr"`
path string `yaml:"path" json:"path"`
username string `yaml:"username" json:"username"`
password string `yaml:"password" json:"password"`
basicAuthHeader string `yaml:"-" json:"-"`
tlsConfig *tls.Config `yaml:"-" json:"-"`
maxBatchWaitMs time.Duration `yaml:"maxBatchWaitMs" json:"maxBatchWaitMs"`
maxBatchSize int `yaml:"maxBatchSize" json:"maxBatchSize"`
labels map[string]string `yaml:"-" json:"-"`
logChannel chan *lokiValue `yaml:"-" json:"-"`
quitChannel chan struct{} `yaml:"-" json:"-"`
waitGroup sync.WaitGroup `yaml:"-" json:"-"`
httpClient *http.Client `yaml:"-" json:"-"`
}

// run periodic jobs
func (syncer *lokiSyncer) run() {
var batch []*lokiValue
batchSize := 0
waitChannel := time.NewTimer(syncer.maxBatchWaitMs)

defer func() {
if batchSize > 0 {
syncer.send(batch)
}

syncer.waitGroup.Done()
}()

for {
select {
case <-syncer.quitChannel:
return
case entry := <-syncer.logChannel:
batch = append(batch, entry)
batchSize++
if batchSize >= syncer.maxBatchSize {
syncer.send(batch)
batch = []*lokiValue{}
batchSize = 0
waitChannel.Reset(syncer.maxBatchWaitMs)
}
case <-waitChannel.C:
if batchSize > 0 {
syncer.send(batch)
batch = []*lokiValue{}
batchSize = 0
}
waitChannel.Reset(syncer.maxBatchWaitMs)
}
}
// LokiSyncer which will periodically send logs to Loki
type LokiSyncer struct {
addr string `yaml:"addr" json:"addr"`
path string `yaml:"path" json:"path"`
username string `yaml:"username" json:"username"`
password string `yaml:"-" json:"-"`
basicAuthHeader string `yaml:"-" json:"-"`
tlsConfig *tls.Config `yaml:"-" json:"-"`
maxBatchWaitMs time.Duration `yaml:"maxBatchWaitMs" json:"maxBatchWaitMs"`
maxBatchSize int `yaml:"maxBatchSize" json:"maxBatchSize"`
labels *atomicMap `yaml:"-" json:"-"`
logChannel chan *lokiValue `yaml:"-" json:"-"`
quitChannel chan struct{} `yaml:"-" json:"-"`
waitGroup sync.WaitGroup `yaml:"-" json:"-"`
httpClient *http.Client `yaml:"-" json:"-"`
}

// Send message to remote loki server
func (syncer *lokiSyncer) send(entries []*lokiValue) {
func (syncer *LokiSyncer) send(entries []*lokiValue) {
streams := syncer.newLokiStreamList(entries)

req, _ := http.NewRequest(http.MethodPost, syncer.addr+syncer.path, bytes.NewBuffer(streams))
Expand All @@ -239,25 +194,76 @@ func (syncer *lokiSyncer) send(entries []*lokiValue) {
}
}

// ************* Interrupter *************
// ************* Bootstrap & Interrupt *************

// Bootstrap run periodic jobs
func (syncer *LokiSyncer) Bootstrap(context.Context) {
go func() {
var batch []*lokiValue
batchSize := 0
waitChannel := time.NewTimer(syncer.maxBatchWaitMs)

defer func() {
if batchSize > 0 {
syncer.send(batch)
}

syncer.waitGroup.Done()
}()

for {
select {
case <-syncer.quitChannel:
return
case entry := <-syncer.logChannel:
batch = append(batch, entry)
batchSize++
if batchSize >= syncer.maxBatchSize {
syncer.send(batch)
batch = []*lokiValue{}
batchSize = 0
waitChannel.Reset(syncer.maxBatchWaitMs)
}
case <-waitChannel.C:
if batchSize > 0 {
syncer.send(batch)
batch = []*lokiValue{}
batchSize = 0
}
waitChannel.Reset(syncer.maxBatchWaitMs)
}
}
}()
}

// Interrupt goroutine
func (syncer *lokiSyncer) Interrupt(context.Context) {
func (syncer *LokiSyncer) Interrupt(context.Context) {
close(syncer.quitChannel)
syncer.waitGroup.Wait()
}

// ************* Model *************
func (syncer *LokiSyncer) AddLabel(key, value string) {
syncer.labels.Set(key, value)
}

// Create new lokiStreamList
func (syncer *lokiSyncer) newLokiStreamList(values []*lokiValue) []byte {
func (syncer *LokiSyncer) newLokiStreamList(values []*lokiValue) []byte {
msg := &lokiStreamList{
Streams: []*lokiStream{
{
Stream: syncer.labels,
Values: values,
},
},
Streams: []*lokiStream{},
}

for i := range values {
val := values[i]
labels := syncer.labels.Copy()
for k, v := range val.Labels {
labels[k] = v
}

msg.Streams = append(msg.Streams, &lokiStream{
Stream: labels,
Values: [][]string{val.Values},
})
}

bytes, _ := json.Marshal(msg)
Expand All @@ -266,12 +272,15 @@ func (syncer *lokiSyncer) newLokiStreamList(values []*lokiValue) []byte {
}

// Refer https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
type lokiValue []string
type lokiValue struct {
Values []string `json:"-"`
Labels map[string]string `json:"-"`
}

// Refer https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
type lokiStream struct {
Stream map[string]string `json:"stream"`
Values []*lokiValue `json:"values"`
Values [][]string `json:"values"`
}

// Refer https://grafana.com/docs/loki/latest/api/#post-lokiapiv1push
Expand All @@ -282,15 +291,60 @@ type lokiStreamList struct {
// ************* Implementation of zapcore.WriteSyncer *************

// Write to logChannel
func (syncer *lokiSyncer) Write(p []byte) (n int, err error) {
func (syncer *LokiSyncer) Write(p []byte) (n int, err error) {
syncer.logChannel <- &lokiValue{
fmt.Sprintf("%d", time.Now().UnixNano()), string(p),
Values: []string{fmt.Sprintf("%d", time.Now().UnixNano()), string(p)},
}

return len(p), nil
}

// Noop
func (syncer *lokiSyncer) Sync() error {
func (syncer *LokiSyncer) Sync() error {
return nil
}

func newAtomicMap() *atomicMap {
return &atomicMap{
mutex: sync.Mutex{},
m: make(map[string]string),
}
}

type atomicMap struct {
mutex sync.Mutex
m map[string]string
}

func (a *atomicMap) Set(k, v string) {
a.mutex.Lock()
defer a.mutex.Unlock()

a.m[k] = v
}

func (a *atomicMap) Get(k string) string {
a.mutex.Lock()
defer a.mutex.Unlock()

return a.m[k]
}

func (a *atomicMap) Copy() map[string]string {
a.mutex.Lock()
defer a.mutex.Unlock()

res := map[string]string{}
for k, v := range a.m {
res[k] = v
}

return res
}

func (a *atomicMap) Delete(k string) {
a.mutex.Lock()
defer a.mutex.Unlock()

delete(a.m, k)
}
Loading

0 comments on commit 86ee807

Please sign in to comment.