diff --git a/remote/consul/consul.go b/remote/consul/consul.go index fc9b8d3..e35e6d8 100644 --- a/remote/consul/consul.go +++ b/remote/consul/consul.go @@ -6,45 +6,39 @@ import ( "github.com/hashicorp/consul/watch" "github.com/spf13/viper" "io" + "sync" ) -type consulConfigProvider struct{} +type consulConfigProvider struct { + mu sync.Mutex + idxMap map[string]uint64 +} -func (rc consulConfigProvider) Get(rp viper.RemoteProvider) (io.Reader, error) { +func (rc *consulConfigProvider) Get(rp viper.RemoteProvider) (io.Reader, error) { config := api.DefaultConfig() config.Address = rp.Endpoint() client, err := api.NewClient(config) if err != nil { return nil, err } - kv, _, err := client.KV().Get(rp.Path(), nil) + kv, meta, err := client.KV().Get(rp.Path(), nil) if err != nil { return nil, err } + rc.updateIndex(rp, meta.LastIndex) return bytes.NewReader(kv.Value), nil } -func (rc consulConfigProvider) Watch(rp viper.RemoteProvider) (io.Reader, error) { - // TODO same as Get(), but behave like before(viper/remote), maybe record LastIndex in rp? - resp, quit := newWatcher(rp.Path(), rp.Endpoint()) - r := <-resp - close(quit) - return bytes.NewReader(r.Value), r.Error +func (rc *consulConfigProvider) Watch(rp viper.RemoteProvider) (io.Reader, error) { + return rc.watch(rp) } -func (rc consulConfigProvider) WatchChannel( - rp viper.RemoteProvider) (resp <-chan *viper.RemoteResponse, quit chan bool) { - return newWatcher(rp.Path(), rp.Endpoint()) -} - -// To stop watch, just close(quit) -func newWatcher(key, addr string) (<-chan *viper.RemoteResponse, chan bool) { - p, err := watch.Parse(map[string]interface{}{"type": "key", "key": key}) +func (rc *consulConfigProvider) watch(rp viper.RemoteProvider) (r io.Reader, err error) { + p, err := watch.Parse(map[string]interface{}{"type": "key", "key": rp.Path()}) if err != nil { - return nil, nil + return nil, err } - quit := make(chan bool) - viperResponseCh := make(chan *viper.RemoteResponse) + // handler p.Handler = func(index uint64, data interface{}) { if data == nil { return @@ -53,12 +47,49 @@ func newWatcher(key, addr string) (<-chan *viper.RemoteResponse, chan bool) { if !ok { return } + if !rc.updateIndex(rp, index) { + return + } + r = bytes.NewReader(kv.Value) + p.Stop() + } + // start watch + p.Run(rp.Endpoint()) + return +} + +func (rc *consulConfigProvider) WatchChannel( + rp viper.RemoteProvider) (resp <-chan *viper.RemoteResponse, quit chan bool) { + return rc.watchChannel(rp) +} + +func (rc *consulConfigProvider) watchChannel(rp viper.RemoteProvider) (<-chan *viper.RemoteResponse, chan bool) { + p, err := watch.Parse(map[string]interface{}{"type": "key", "key": rp.Path()}) + if err != nil { + // this should not happen + return nil, nil + } + quit := make(chan bool) + viperResponseCh := make(chan *viper.RemoteResponse) + // handler + p.Handler = func(index uint64, data interface{}) { + if data == nil { + return + } + kv, ok := data.(*api.KVPair) + if !ok { + return + } + if !rc.updateIndex(rp, index) { + return + } select { case viperResponseCh <- &viper.RemoteResponse{Value: kv.Value}: case <-quit: } } - go p.Run(addr) + // start watcher + go p.Run(rp.Endpoint()) // wait quit go func() { <-quit @@ -67,6 +98,22 @@ func newWatcher(key, addr string) (<-chan *viper.RemoteResponse, chan bool) { return viperResponseCh, quit } -func init() { - viper.RemoteConfig = &consulConfigProvider{} +func makeIndexKey(rp viper.RemoteProvider) string { + return rp.Endpoint() + "_" + rp.Path() +} + +func (rc *consulConfigProvider) updateIndex( + rp viper.RemoteProvider, lastIndex uint64) (updated bool) { + rc.mu.Lock() + oldLastIndex := rc.idxMap[makeIndexKey(rp)] + if oldLastIndex < lastIndex { + rc.idxMap[makeIndexKey(rp)] = lastIndex + updated = true + } + rc.mu.Unlock() + return +} + +func init() { + viper.RemoteConfig = &consulConfigProvider{idxMap: make(map[string]uint64)} } diff --git a/remote/consul/doc.go b/remote/consul/doc.go new file mode 100644 index 0000000..e09afa1 --- /dev/null +++ b/remote/consul/doc.go @@ -0,0 +1,50 @@ +package consul + +// Integrates the consul's remote features of Viper. + +// viper/remote's problems: +// 1.consul watch do not work +// 2.outdated consul's lib + +// use this ConsulRemoteConfigProvider, just replace +// _ "github.com/spf13/viper/remote" => _ "github.com/spf13/viper/remote/consul" + +// usage example 1: +//func main() { +// v := viper.New() +// err := v.AddRemoteProvider("consul", "127.0.0.1:8500", "foo/bar") +// if err != nil { +// panic(err) +// } +// v.SetConfigType("json") +// err = v.ReadRemoteConfig() +// if err != nil { +// panic(err) +// } +// for { +// // blocking until consul kv updated +// err = v.WatchRemoteConfig() +// if err != nil { +// panic(err) +// } +// // your config is updated now. +// // ... +// } +//} + +// usage example 2: +//func main() { +// v := viper.New() +// err := v.AddRemoteProvider("consul", "127.0.0.1:8500", "foo/bar") +// if err != nil { +// panic(err) +// } +// v.SetConfigType("json") +// err = v.ReadRemoteConfig() +// if err != nil { +// panic(err) +// } +// // your config will be update async +// v.WatchRemoteConfigOnChannel() +// // .... +//} \ No newline at end of file