Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support consul service health checks #102

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
confd

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk how the maintainers feel about this, but it isn't really necessary if you use go install instead of go build. That will build the binary and then place it in $GOPATH/bin, which should be in your $PATH.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this to a different commit.

237 changes: 235 additions & 2 deletions backends/consul/client.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package consul

import (
"encoding/json"
"path"
"strconv"
"strings"

"github.com/armon/consul-api"
)

// Client provides a wrapper around the consulkv client
type Client struct {
client *consulapi.KV
client *consulapi.KV
health *consulapi.Health
catalog *consulapi.Catalog
}

// Wrapper for service information
type Service struct {
Name string
Tags []string
}

// NewConsulClient returns a new client to Consul for the given address
Expand All @@ -22,7 +32,7 @@ func NewConsulClient(nodes []string) (*Client, error) {
if err != nil {
return nil, err
}
return &Client{client.KV()}, nil
return &Client{client.KV(), client.Health(), client.Catalog()}, nil
}

// GetValues queries Consul for keys
Expand All @@ -38,5 +48,228 @@ func (c *Client) GetValues(keys []string) (map[string]string, error) {
vars[path.Join("/", p.Key)] = string(p.Value)
}
}
sd, err := ProcessConsulData(*c, keys)
if err != nil {
return vars, err
} else {
MergeMaps(vars, sd)
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there every be conflicts between service names and key names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially. I don't think there is a way to protect against that unless we move all the KV entries into it's own namespace internal to confd.

That's why I selected to prefix the service data with _consul.

return vars, nil
}

// Use the configured prefixes to load Consule data into
// the available keys
func ProcessConsulData(c Client, keys []string) (map[string]string, error) {
vars := make(map[string]string)
for _, key := range keys {
data, err := ProcessConsulDataPrefix(c, key)
if err != nil {
return vars, err
} else {
MergeMaps(vars, data)
}
}
return vars, nil
}

// Process a specific key with respect to Consul data
func ProcessConsulDataPrefix(c Client, key string) (map[string]string, error) {
// Split the prefix into tokens so we can iterate through them
tokens := strings.Split(strings.TrimPrefix(key, "/"), "/")
// if the prefix doesn't start with /_consul, then this code won't
// handle it, so exit out
if tokens[0] != "_consul" {
return make(map[string]string), nil
}
// if the prefix is simply /_consul, then get all Consul data that we can
// and return it
if cap(tokens) == 1 {
return ProcessAllConsulServiceData(c)
} else {
// if the prefix starts with /_consul then pass it to some subfunctions
// for processing. Pops off the _consul token so we can ignore it
return ProcessConsulServiceData(c, tokens[1:])
}
}

// Gets data for all services in consul by looking up services from
// the Consul catalog
func ProcessAllConsulServiceData(c Client) (map[string]string, error) {
// Get all services from the Consul catalog
services, err := ListServices(c)
if err != nil {
return nil, err
}
// Get all the health information for the services
return GetServicesHealth(c, services)
}

// Gets service data from Consul by traversing the provide prefix that
// has been tokenized
func ProcessConsulServiceData(c Client, tokens []string) (map[string]string, error) {
// only handle prefixes that start with /_consul/service
if tokens[0] != "service" {
return make(map[string]string), nil
}
if cap(tokens) == 1 {
// If the key is /_consul/service then we get all service data
// by querying the Consul catalog and iterating over the list of
// services that are known
return ProcessAllConsulServiceData(c)
} else {
// Otherwise, pop this token off and try processing for a specific service
return ProcessConsulDataForService(c, tokens[1:])
}
}

// Use the remaing tokens in the prefix to acquire service data from Consul
func ProcessConsulDataForService(c Client, tokens []string) (map[string]string, error) {
// The first token is the service name
service := tokens[0]

// If there is a second token, than it is the tag
tag := ""
if cap(tokens) > 1 {
tag = tokens[1]
}
// Get the Service from the Consul catalog
srv, err := GetService(c, service)
if err != nil {
return nil, err
}
// If a service by this name exists, get its health, otherwise return empty
if srv.Name != "" {
if tag == "" {
// If there is no tag, then get health for all tags
return GetServiceHealth(c, srv)
} else {
// Otherwise just get the health for the specfic tag
return GetServiceTagHealth(c, srv, tag)
}
} else {
return make(map[string]string), nil
}
}

// Get a specific service from the Consul catalog.
// Returns a zero struct if not found...Check that the service.Name != ""
func GetService(c Client, name string) (Service, error) {
// Get all services in the catalog
services, err := ListServices(c)
// return error if it occurred
if err != nil {
return Service{}, err
}
// Find the service instance with the name we are looking for and return it
// Otherwise return the zero struct
var service Service
for _, srv := range services {
if srv.Name == name {
service = srv
}
}
return service, nil
}

// ListServices queries Consul for all registered services
func ListServices(c Client) ([]Service, error) {
services := make([]Service, 0)
srvs, _, err := c.catalog.Services(nil)
if err != nil {
return services, err
}
for name, tags := range srvs {
services = append(services, Service{name, tags})
}
return services, nil
}

// Retrieve data for all provided services
func GetServicesHealth(c Client, services []Service) (map[string]string, error) {
entries := make(map[string]string)
for _, service := range services {
service_entries, err := GetServiceHealth(c, service)
if err != nil {
return entries, err
}
MergeMaps(entries, service_entries)
}
return entries, nil
}

// GetServicesHealth queries Consul for the active host data for each
// service in the input. Typically this is returned value from ListServices.
// Currently Consul is queried only for healthy hosts for the service.
//
// Returned data from Consul is stored as a JSON blob under the
// /_consul/service/<name>/<idx> where <name> is the configured
// name for the service and <idx> is the entry index in result set from Consul.
// The /_consul prefix was selected to avoid potential clashes with the
// key-value store.
//
// Additionally, Consul service tag data is evaluated and the service JSON is
// stored under a second key with the format /_consul/service/<name>/<tag>/<idx>
// where <name> and <idx> are the same as above and <tag> is the applied tag
// from Consul. This is down to allow templating on either the service or a
// subset of instances of that service with a particular tag.
//
// Currently Consul datacenters are not supported via this method.
func GetServiceHealth(c Client, service Service) (map[string]string, error) {
entries := make(map[string]string)
// For each service retrieve the list of health hosts from Consul
serviceEntries, _, err := c.health.Service(service.Name, "", true, nil)
if err != nil {
return entries, err
}
// For each service host, add the JSON data to the root list
for idx, serviceEntry := range serviceEntries {
key := path.Join("/", "_consul", "service", service.Name, strconv.Itoa(idx))
service_json, _ := json.Marshal(serviceEntry)
entries[key] = string(service_json)
}
// For each tag registered for the service, retrieve the healthy hosts
// for the service & tag
tag_entries, err := GetServiceTagsHealth(c, service)
if err != nil {
return entries, err
}
MergeMaps(entries, tag_entries)
return entries, nil
}

// Get service data for all Tags for a service
func GetServiceTagsHealth(c Client, service Service) (map[string]string, error) {
entries := make(map[string]string)
for _, tag := range service.Tags {
tag_entries, err := GetServiceTagHealth(c, service, tag)
if err != nil {
return entries, err
}
MergeMaps(entries, tag_entries)
}
return entries, nil
}

// Get service data for a single service Tag
func GetServiceTagHealth(c Client, service Service, tag string) (map[string]string, error) {
entries := make(map[string]string)
serviceEntries, _, err := c.health.Service(service.Name, tag, true, nil)
if err != nil {
return entries, err
}
// Add each service hosts for the tag, add the JSON data to the tag list
for idx, serviceEntry := range serviceEntries {
key := path.Join("/", "_consul", "service", service.Name, tag, strconv.Itoa(idx))
service_json, _ := json.Marshal(serviceEntry)
entries[key] = string(service_json)
}
return entries, nil
}

// Merge two maps together. Data from the 2nd arg will override the data
// in the original map.
func MergeMaps(base map[string]string, data map[string]string) {
for k, v := range data {
base[k] = v
}
}
3 changes: 2 additions & 1 deletion integration/confdir/conf.d/service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ mode = "0644"
src = "service.conf.tmpl"
dest = "/tmp/confd-service-test.conf"
keys = [
"/_consul",
"/_consul/service/rails/v1",
"/_consul/service/test"
]