Extend remote config watch with context and callback

This commit is contained in:
robin.hubbig 2024-04-05 10:16:02 +02:00
parent 947eb59667
commit ba872b6d0d

View file

@ -21,6 +21,7 @@ package viper
import (
"bytes"
"context"
"encoding/csv"
"errors"
"fmt"
@ -143,6 +144,16 @@ func DecodeHook(hook mapstructure.DecodeHookFunc) DecoderConfigOption {
}
}
type RemoteConfigEvent uint
const (
RemoteConfigEvent_Unknown RemoteConfigEvent = iota
// Remote config was updated
RemoteConfigEvent_Updated
// Remote config watch routine stopped
RemoteConfigEvent_Stopped
)
// Viper is a prioritized configuration registry. It
// maintains a set of configuration sources, fetches
// values to populate those, and provides them according
@ -217,7 +228,8 @@ type Viper struct {
aliases map[string]string
typeByDefValue bool
onConfigChange func(fsnotify.Event)
onConfigChange func(fsnotify.Event)
onRemoteConfigChange func(RemoteConfigEvent)
logger *slog.Logger
@ -432,6 +444,14 @@ func (v *Viper) OnConfigChange(run func(in fsnotify.Event)) {
v.onConfigChange = run
}
// OnRemoteConfigChange sets the event handler that is called when a remote config file changes.
func OnRemoteConfigChange(run func(RemoteConfigEvent)) { v.OnRemoteConfigChange(run) }
// OnRemoteConfigChange sets the event handler that is called when a remote config file changes.
func (v *Viper) OnRemoteConfigChange(run func(RemoteConfigEvent)) {
v.onRemoteConfigChange = run
}
// WatchConfig starts watching a config file for changes.
func WatchConfig() { v.WatchConfig() }
@ -1973,7 +1993,11 @@ func (v *Viper) WatchRemoteConfig() error {
}
func (v *Viper) WatchRemoteConfigOnChannel() error {
return v.watchKeyValueConfigOnChannel()
return v.watchKeyValueConfigOnChannelWithContext(context.TODO())
}
func (v *Viper) WatchRemoteConfigOnChannelWithContext(ctx context.Context) error {
return v.watchKeyValueConfigOnChannelWithContext(ctx)
}
// Retrieve the first found remote configuration.
@ -2010,20 +2034,32 @@ func (v *Viper) getRemoteConfig(provider RemoteProvider) (map[string]any, error)
return v.kvstore, err
}
// Retrieve the first found remote configuration.
func (v *Viper) watchKeyValueConfigOnChannel() error {
// Watch the first found remote configuration.
func (v *Viper) watchKeyValueConfigOnChannelWithContext(ctx context.Context) error {
if len(v.remoteProviders) == 0 {
return RemoteConfigError("No Remote Providers")
}
for _, rp := range v.remoteProviders {
respc, _ := RemoteConfig.WatchChannel(rp)
// Todo: Add quit channel
go func(rc <-chan *RemoteResponse) {
for {
b := <-rc
reader := bytes.NewReader(b.Value)
v.unmarshalReader(reader, v.kvstore)
select {
case b := <-rc:
reader := bytes.NewReader(b.Value)
if err := v.unmarshalReader(reader, v.kvstore); err != nil {
v.logger.Error(fmt.Errorf("unmarshal remote config update: %w", err).Error())
continue
}
if v.onRemoteConfigChange != nil {
v.onRemoteConfigChange(RemoteConfigEvent_Updated)
}
case <-ctx.Done():
if v.onRemoteConfigChange != nil {
v.onRemoteConfigChange(RemoteConfigEvent_Stopped)
}
return
}
}
}(respc)
return nil