Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dns: reapply "dns: stop polling for updates; use UpdateState API" #3228

Merged
merged 4 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 52 additions & 101 deletions internal/resolver/dns/dns_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ import (
"sync"
"time"

"google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
Expand All @@ -49,7 +48,6 @@ func init() {

const (
defaultPort = "443"
defaultFreq = time.Minute * 30
defaultDNSSvrPort = "53"
golang = "GO"
// txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
Expand Down Expand Up @@ -99,13 +97,10 @@ var customAuthorityResolver = func(authority string) (netResolver, error) {

// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
return &dnsBuilder{minFreq: defaultFreq}
return &dnsBuilder{}
}

type dnsBuilder struct {
// minimum frequency of polling the DNS server.
minFreq time.Duration
}
type dnsBuilder struct{}

// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
Expand All @@ -115,33 +110,20 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
}

// IP address.
if net.ParseIP(host) != nil {
host, _ = formatIP(host)
addr := []resolver.Address{{Addr: host + ":" + port}}
i := &ipResolver{
cc: cc,
ip: addr,
rn: make(chan struct{}, 1),
q: make(chan struct{}),
}
cc.NewAddress(addr)
go i.watcher()
return i, nil
if ipAddr, ok := formatIP(host); ok {
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
cc.UpdateState(resolver.State{Addresses: addr})
return deadResolver{}, nil
}

// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
bc := backoff.DefaultConfig
bc.MaxDelay = b.minFreq
d := &dnsResolver{
freq: b.minFreq,
backoff: internalbackoff.Exponential{Config: bc},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
Expand All @@ -157,6 +139,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts

d.wg.Add(1)
go d.watcher()
d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}

Expand All @@ -171,53 +154,23 @@ type netResolver interface {
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}

// ipResolver watches for the name resolution update for an IP address.
type ipResolver struct {
cc resolver.ClientConn
ip []resolver.Address
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
q chan struct{}
}
// deadResolver is a resolver that does nothing.
type deadResolver struct{}

// ResolveNow resend the address it stores, no resolution is needed.
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOptions) {
select {
case i.rn <- struct{}{}:
default:
}
}
func (deadResolver) ResolveNow(resolver.ResolveNowOptions) {}

// Close closes the ipResolver.
func (i *ipResolver) Close() {
close(i.q)
}

func (i *ipResolver) watcher() {
for {
select {
case <-i.rn:
i.cc.NewAddress(i.ip)
case <-i.q:
return
}
}
}
func (deadResolver) Close() {}

// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
freq time.Duration
backoff internalbackoff.Exponential
retryCount int
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
t *time.Timer
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
Expand All @@ -229,7 +182,7 @@ type dnsResolver struct {
}

// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOptions) {
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
default:
Expand All @@ -240,7 +193,6 @@ func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOptions) {
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
d.t.Stop()
}

func (d *dnsResolver) watcher() {
Expand All @@ -249,29 +201,11 @@ func (d *dnsResolver) watcher() {
select {
case <-d.ctx.Done():
return
case <-d.t.C:
case <-d.rn:
if !d.t.Stop() {
// Before resetting a timer, it should be stopped to prevent racing with
// reads on it's channel.
<-d.t.C
}
}

result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
if sc != "" { // We get empty string when disabled or the TXT lookup failed.
d.cc.NewServiceConfig(sc)
}
d.cc.NewAddress(result)
state := d.lookup()
d.cc.UpdateState(*state)

// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
Expand Down Expand Up @@ -314,11 +248,26 @@ func (d *dnsResolver) lookupSRV() []resolver.Address {
return newAddrs
}

func (d *dnsResolver) lookupTXT() string {
var filterError = func(err error) error {
if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
// Timeouts and temporary errors should be communicated to gRPC to
// attempt another DNS query (with backoff). Other errors should be
// suppressed (they may represent the absence of a TXT record).
return nil
}
return err
}

func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
if err != nil {
grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
return ""
err = filterError(err)
if err != nil {
err = fmt.Errorf("error from DNS TXT record lookup: %v", err)
grpclog.Infoln("grpc:", err)
return &serviceconfig.ParseResult{Err: err}
}
return nil
}
var res string
for _, s := range ss {
Expand All @@ -327,10 +276,12 @@ func (d *dnsResolver) lookupTXT() string {

// TXT record must have "grpc_config=" attribute in order to be used as service config.
if !strings.HasPrefix(res, txtAttribute) {
grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute)
return ""
grpclog.Warningf("grpc: DNS TXT record %v missing %v attribute", res, txtAttribute)
// This is not an error; it is the equivalent of not having a service config.
return nil
}
return strings.TrimPrefix(res, txtAttribute)
sc := canaryingSC(strings.TrimPrefix(res, txtAttribute))
return d.cc.ParseServiceConfig(sc)
}

func (d *dnsResolver) lookupHost() []resolver.Address {
Expand All @@ -352,15 +303,15 @@ func (d *dnsResolver) lookupHost() []resolver.Address {
return newAddrs
}

func (d *dnsResolver) lookup() ([]resolver.Address, string) {
newAddrs := d.lookupSRV()
// Support fallback to non-balancer address.
newAddrs = append(newAddrs, d.lookupHost()...)
if d.disableServiceConfig {
return newAddrs, ""
func (d *dnsResolver) lookup() *resolver.State {
srv := d.lookupSRV()
state := &resolver.State{
Addresses: append(d.lookupHost(), srv...),
}
if !d.disableServiceConfig {
state.ServiceConfig = d.lookupTXT()
}
sc := d.lookupTXT()
return newAddrs, canaryingSC(sc)
return state
}

// formatIP returns ok = false if addr is not a valid textual representation of an IP address.
Expand Down
Loading