diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 2d5ca952c3193..f7b57b2582bcf 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -750,7 +750,7 @@ func (s *ProxyServer) Run() error { // functions must configure their shared informer event handlers first. informerFactory.Start(wait.NeverStop) - if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) { + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) || utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { // Make an informer that selects for our nodename. currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 0ef4a73e1b576..8224f30954daf 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -50,6 +50,9 @@ type BaseEndpointInfo struct { IsLocal bool Topology map[string]string + // ZoneHints represent the zone hints for the endpoint. This is based on + // endpoint.hints.forZones[*].name in the EndpointSlice API. + ZoneHints sets.String // Ready indicates whether this endpoint is ready and NOT terminating. // For pods, this is true if a pod has a ready status and a nil deletion timestamp. // This is only set when watching EndpointSlices. If using Endpoints, this is always @@ -102,6 +105,11 @@ func (info *BaseEndpointInfo) GetTopology() map[string]string { return info.Topology } +// GetZoneHints returns the zone hint for the endpoint. +func (info *BaseEndpointInfo) GetZoneHints() sets.String { + return info.ZoneHints +} + // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. func (info *BaseEndpointInfo) IP() string { return utilproxy.IPPart(info.Endpoint) @@ -118,7 +126,7 @@ func (info *BaseEndpointInfo) Equal(other Endpoint) bool { } func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string, - ready, serving, terminating bool) *BaseEndpointInfo { + ready, serving, terminating bool, zoneHints sets.String) *BaseEndpointInfo { return &BaseEndpointInfo{ Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)), IsLocal: isLocal, @@ -126,6 +134,7 @@ func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string] Ready: ready, Serving: serving, Terminating: terminating, + ZoneHints: zoneHints, } } @@ -427,8 +436,10 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint isServing := true isTerminating := false isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname + // Only supported with EndpointSlice API + zoneHints := sets.String{} - baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil, isReady, isServing, isTerminating) + baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil, isReady, isServing, isTerminating, zoneHints) if ect.makeEndpointInfo != nil { endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo)) } else { diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 469521e8514ee..2f75a0d6d872d 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -194,7 +194,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -218,7 +218,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "port", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -241,7 +241,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -278,12 +278,12 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "2.2.2.2:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "2.2.2.2:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "2.2.2.2:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "2.2.2.2:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -307,7 +307,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -331,7 +331,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -355,7 +355,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -385,10 +385,10 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, @@ -418,10 +418,10 @@ func TestEndpointsToEndpointsMap(t *testing.T) { }), expected: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): { - {Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): { - {Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, }, diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 572b68b7ba16e..ec3bb8b3292a0 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -26,8 +26,11 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" utilproxy "k8s.io/kubernetes/pkg/proxy/util" utilnet "k8s.io/utils/net" ) @@ -78,6 +81,7 @@ type endpointInfo struct { Addresses []string NodeName *string Topology map[string]string + ZoneHints sets.String Ready bool Serving bool @@ -136,6 +140,15 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) * epInfo.NodeName = endpoint.NodeName + if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { + if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { + epInfo.ZoneHints = sets.String{} + for _, zone := range endpoint.Hints.ForZones { + epInfo.ZoneHints.Insert(zone.Name) + } + } + } + esInfo.Endpoints = append(esInfo.Endpoints, epInfo) } @@ -275,7 +288,7 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName } endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology, - endpoint.Ready, endpoint.Serving, endpoint.Terminating) + endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints) // This logic ensures we're deduping potential overlapping endpoints // isLocal should not vary between matching IPs, but if it does, we diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 292e05aa85226..1d6b5551036eb 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1020,22 +1020,12 @@ func (proxier *Proxier) syncProxyRules() { allEndpoints := proxier.endpointsMap[svcName] - // Service Topology will not be enabled in the following cases: - // 1. externalTrafficPolicy=Local (mutually exclusive with service topology). - // 2. ServiceTopology is not enabled. - // 3. EndpointSlice is not enabled (service topology depends on endpoint slice - // to get topology information). - if !svcInfo.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { - allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints) - } + // Filtering for topology aware endpoints. This function will only + // filter endpoints if appropriate feature gates are enabled and the + // Service does not have conflicting configuration such as + // externalTrafficPolicy=Local. + allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) - // Service InternalTrafficPolicy is only enabled when all of the - // following are true: - // 1. InternalTrafficPolicy is Local - // 2. ServiceInternalTrafficPolicy feature gate is on - if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { - allEndpoints = proxy.FilterLocalEndpoint(svcInfo.InternalTrafficPolicy(), allEndpoints) - } readyEndpoints := make([]proxy.Endpoint, 0, len(allEndpoints)) for _, endpoint := range allEndpoints { if !endpoint.IsReady() { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 2656b52caa20a..4961381c8ae2f 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -2057,21 +2057,15 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode endpoints := proxier.endpointsMap[svcPortName] - // Service Topology will not be enabled in the following cases: - // 1. externalTrafficPolicy=Local (mutually exclusive with service topology). - // 2. ServiceTopology is not enabled. - // 3. EndpointSlice is not enabled (service topology depends on endpoint slice - // to get topology information). - if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { - endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints) - } - - // Service InternalTrafficPolicy is only enabled when all of the - // following are true: - // 1. InternalTrafficPolicy is PreferLocal or Local - // 2. ServiceInternalTrafficPolicy feature gate is on - if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && onlyNodeLocalEndpointsForInternal { - endpoints = proxy.FilterLocalEndpoint(proxier.serviceMap[svcPortName].InternalTrafficPolicy(), endpoints) + // Filtering for topology aware endpoints. This function will only + // filter endpoints if appropriate feature gates are enabled and the + // Service does not have conflicting configuration such as + // externalTrafficPolicy=Local. + svcInfo, ok := proxier.serviceMap[svcPortName] + if !ok { + klog.Warningf("Unable to filter endpoints due to missing Service info for %s", svcPortName) + } else { + endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels) } for _, epInfo := range endpoints { diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index a60cee03c2e58..ac20a461b9dbd 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -2952,12 +2952,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -2973,12 +2973,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -2996,18 +2996,18 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -3023,24 +3023,24 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:12", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { - {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:12", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { - {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -3060,54 +3060,54 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { - {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.4:13", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.4:13", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): { - {Endpoint: "1.1.1.3:14", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.4:14", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.3:14", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.4:14", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): { - {Endpoint: "2.2.2.1:21", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "2.2.2.2:21", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "2.2.2.1:21", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "2.2.2.2:21", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { - {Endpoint: "2.2.2.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "2.2.2.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p13", v1.ProtocolUDP): { - {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.4:13", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.4:13", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p14", v1.ProtocolUDP): { - {Endpoint: "1.1.1.3:14", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.4:14", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.3:14", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.4:14", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns2", "ep2", "p21", v1.ProtocolUDP): { - {Endpoint: "2.2.2.1:21", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "2.2.2.2:21", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "2.2.2.1:21", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "2.2.2.2:21", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { - {Endpoint: "2.2.2.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "2.2.2.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -3127,7 +3127,7 @@ func Test_updateEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -3147,7 +3147,7 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, @@ -3167,17 +3167,17 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -3197,17 +3197,17 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -3232,15 +3232,15 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, @@ -3260,15 +3260,15 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -3287,12 +3287,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -3313,12 +3313,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -3343,39 +3343,39 @@ func Test_updateEndpointsMap(t *testing.T) { }, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP): { - {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "2.2.2.22:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "2.2.2.22:22", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP): { - {Endpoint: "2.2.2.3:23", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "2.2.2.3:23", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): { - {Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "4.4.4.5:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "4.4.4.5:44", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP): { - {Endpoint: "4.4.4.6:45", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "4.4.4.6:45", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, - {Endpoint: "1.1.1.11:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, + {Endpoint: "1.1.1.11:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): { - {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): { - {Endpoint: "1.1.1.2:122", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.2:122", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): { - {Endpoint: "3.3.3.3:33", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "3.3.3.3:33", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP): { - {Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{{ @@ -3413,7 +3413,7 @@ func Test_updateEndpointsMap(t *testing.T) { oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { - {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}}, }, }, expectedStaleEndpoints: []proxy.ServiceEndpoint{}, diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index a78930944a14f..d56e0aed81a04 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -55,6 +55,7 @@ type BaseServiceInfo struct { nodeLocalInternal bool internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType topologyKeys []string + hintsAnnotation string } var _ ServicePort = &BaseServiceInfo{} @@ -138,6 +139,11 @@ func (info *BaseServiceInfo) TopologyKeys() []string { return info.topologyKeys } +// HintsAnnotation is part of ServicePort interface. +func (info *BaseServiceInfo) HintsAnnotation() string { + return info.hintsAnnotation +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { nodeLocalExternal := false if apiservice.RequestsOnlyLocalTraffic(service) { @@ -165,6 +171,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic nodeLocalInternal: nodeLocalInternal, internalTrafficPolicy: service.Spec.InternalTrafficPolicy, topologyKeys: service.Spec.TopologyKeys, + hintsAnnotation: service.Annotations[v1.AnnotationTopologyAwareHints], } loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges)) diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index b272f3652575e..036571858224a 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -19,11 +19,80 @@ package proxy import ( v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" ) -// FilterTopologyEndpoint returns the appropriate endpoints based on the cluster -// topology. +// FilterEndpoints filters endpoints based on Service configuration, node +// labels, and enabled feature gates. This is primarily used to enable topology +// aware routing. +func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) []Endpoint { + if svcInfo.NodeLocalExternal() || !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { + return endpoints + } + + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) { + return deprecatedTopologyFilter(nodeLabels, svcInfo.TopologyKeys(), endpoints) + } + + if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { + return filterEndpointsInternalTrafficPolicy(svcInfo.InternalTrafficPolicy(), endpoints) + } + + if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { + return filterEndpointsWithHints(endpoints, svcInfo.HintsAnnotation(), nodeLabels) + } + + return endpoints +} + +// filterEndpointsWithHints provides filtering based on the hints included in +// EndpointSlices. If any of the following are true, the full list of endpoints +// will be returned without any filtering: +// * The AnnotationTopologyAwareHints annotation is not set to "auto" for this +// Service. +// * No zone is specified in node labels. +// * No endpoints for this Service have a hint pointing to the zone this +// instance of kube-proxy is running in. +// * One or more endpoints for this Service do not have hints specified. +func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, nodeLabels map[string]string) []Endpoint { + if hintsAnnotation != "auto" { + if hintsAnnotation != "" && hintsAnnotation != "disabled" { + klog.Warningf("Skipping topology aware endpoint filtering since Service has unexpected value for %s annotation: %s", v1.AnnotationTopologyAwareHints, hintsAnnotation) + } + return endpoints + } + + zone, ok := nodeLabels[v1.LabelTopologyZone] + if !ok || zone == "" { + klog.Warningf("Skipping topology aware endpoint filtering since node is missing %s label", v1.LabelTopologyZone) + return endpoints + } + + filteredEndpoints := []Endpoint{} + + for _, endpoint := range endpoints { + if endpoint.GetZoneHints().Len() == 0 { + klog.Warningf("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint") + return endpoints + } + if endpoint.GetZoneHints().Has(zone) { + filteredEndpoints = append(filteredEndpoints, endpoint) + } + } + + if len(filteredEndpoints) > 0 { + klog.Warningf("Skipping topology aware endpoint filtering since no hints were provided for zone %s", zone) + return filteredEndpoints + } + + return endpoints +} + +// deprecatedTopologyFilter returns the appropriate endpoints based on the +// cluster topology. This will be removed in an upcoming release along with the +// ServiceTopology feature gate. +// // This uses the current node's labels, which contain topology information, and // the required topologyKeys to find appropriate endpoints. If both the endpoint's // topology and the current node have matching values for topologyKeys[0], the @@ -40,7 +109,7 @@ import ( // // If topologyKeys is not specified or empty, no topology constraints will be // applied and this will return all endpoints. -func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string, endpoints []Endpoint) []Endpoint { +func deprecatedTopologyFilter(nodeLabels map[string]string, topologyKeys []string, endpoints []Endpoint) []Endpoint { // Do not filter endpoints if service has no topology keys. if len(topologyKeys) == 0 { return endpoints @@ -81,13 +150,13 @@ func FilterTopologyEndpoint(nodeLabels map[string]string, topologyKeys []string, return filteredEndpoints } -// FilterLocalEndpoint returns the node local endpoints based on configured -// InternalTrafficPolicy. +// filterEndpointsInternalTrafficPolicy returns the node local endpoints based +// on configured InternalTrafficPolicy. // // If ServiceInternalTrafficPolicy feature gate is off, returns the original -// endpoints slice. +// EndpointSlice. // Otherwise, if InternalTrafficPolicy is Local, only return the node local endpoints. -func FilterLocalEndpoint(internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType, endpoints []Endpoint) []Endpoint { +func filterEndpointsInternalTrafficPolicy(internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType, endpoints []Endpoint) []Endpoint { if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) { return endpoints } diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index 9f501152f3ae6..24bcad9887e3a 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -22,12 +22,328 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/features" ) -func TestFilterTopologyEndpoint(t *testing.T) { +func TestFilterEndpoints(t *testing.T) { + type endpoint struct { + ip string + zoneHints sets.String + } + testCases := []struct { + name string + epsProxyingEnabled bool + serviceTopologyEnabled bool + hintsEnabled bool + nodeLabels map[string]string + serviceInfo ServicePort + endpoints []endpoint + expectedEndpoints []endpoint + }{{ + name: "hints + eps proxying enabled, hints annotation == auto", + hintsEnabled: true, + epsProxyingEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "auto"}, + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "hints + eps proxying enabled, hints annotation == disabled, hints ignored", + hintsEnabled: true, + epsProxyingEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "disabled"}, + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "hints + eps proxying enabled, hints annotation == Auto (wrong capitalization), hints ignored", + hintsEnabled: true, + epsProxyingEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "Auto"}, + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "hints + eps proxying enabled, hints annotation empty, hints ignored", + hintsEnabled: true, + epsProxyingEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: false}, + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "hints enabled, eps proxying not, hints are ignored", + hintsEnabled: true, + epsProxyingEnabled: false, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: false}, + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "node local endpoints, hints are ignored", + hintsEnabled: true, + epsProxyingEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: true}, + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "all gates enabled, serviceTopology gate takes precedence and hints are ignored", + hintsEnabled: true, + epsProxyingEnabled: true, + serviceTopologyEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: true}, + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }} + + endpointsToStringArray := func(endpoints []Endpoint) []string { + result := make([]string, 0, len(endpoints)) + for _, ep := range endpoints { + result = append(result, ep.String()) + } + return result + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceProxying, tc.epsProxyingEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTopology, tc.serviceTopologyEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)() + + endpoints := []Endpoint{} + for _, ep := range tc.endpoints { + endpoints = append(endpoints, &BaseEndpointInfo{Endpoint: ep.ip, ZoneHints: ep.zoneHints}) + } + + expectedEndpoints := []Endpoint{} + for _, ep := range tc.expectedEndpoints { + expectedEndpoints = append(expectedEndpoints, &BaseEndpointInfo{Endpoint: ep.ip, ZoneHints: ep.zoneHints}) + } + + filteredEndpoints := FilterEndpoints(endpoints, tc.serviceInfo, tc.nodeLabels) + if len(filteredEndpoints) != len(expectedEndpoints) { + t.Errorf("expected %d filtered endpoints, got %d", len(expectedEndpoints), len(filteredEndpoints)) + } + if !reflect.DeepEqual(filteredEndpoints, expectedEndpoints) { + t.Errorf("expected %v, got %v", endpointsToStringArray(expectedEndpoints), endpointsToStringArray(filteredEndpoints)) + } + }) + } +} + +func Test_filterEndpointsWithHints(t *testing.T) { + type endpoint struct { + ip string + zoneHints sets.String + } + testCases := []struct { + name string + nodeLabels map[string]string + hintsAnnotation string + endpoints []endpoint + expectedEndpoints []endpoint + }{{ + name: "empty node labels", + nodeLabels: map[string]string{}, + hintsAnnotation: "auto", + endpoints: []endpoint{{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}}, + expectedEndpoints: []endpoint{{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}}, + }, { + name: "empty zone label", + nodeLabels: map[string]string{v1.LabelTopologyZone: ""}, + hintsAnnotation: "auto", + endpoints: []endpoint{{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}}, + expectedEndpoints: []endpoint{{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}}, + }, { + name: "node in different zone, no endpoint filtering", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-b"}, + hintsAnnotation: "auto", + endpoints: []endpoint{{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}}, + expectedEndpoints: []endpoint{{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}}, + }, { + name: "normal endpoint filtering", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "auto", + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "hintsAnnotation empty, no filtering applied", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "", + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "hintsAnnotation disabled, no filtering applied", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "disabled", + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "missing hints, no filtering applied", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + hintsAnnotation: "auto", + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5"}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, + {ip: "10.1.2.5"}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, + }, + }, { + name: "multiple hints per endpoint, filtering includes any endpoint with zone included", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-c"}, + hintsAnnotation: "auto", + endpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a", "zone-b", "zone-c")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b", "zone-c")}, + {ip: "10.1.2.5", zoneHints: sets.NewString("zone-b", "zone-d")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-c")}, + }, + expectedEndpoints: []endpoint{ + {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a", "zone-b", "zone-c")}, + {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b", "zone-c")}, + {ip: "10.1.2.6", zoneHints: sets.NewString("zone-c")}, + }, + }} + + endpointsToStringArray := func(endpoints []Endpoint) []string { + result := make([]string, 0, len(endpoints)) + for _, ep := range endpoints { + result = append(result, ep.String()) + } + return result + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + endpoints := []Endpoint{} + for _, ep := range tc.endpoints { + endpoints = append(endpoints, &BaseEndpointInfo{Endpoint: ep.ip, ZoneHints: ep.zoneHints}) + } + + expectedEndpoints := []Endpoint{} + for _, ep := range tc.expectedEndpoints { + expectedEndpoints = append(expectedEndpoints, &BaseEndpointInfo{Endpoint: ep.ip, ZoneHints: ep.zoneHints}) + } + + filteredEndpoints := filterEndpointsWithHints(endpoints, tc.hintsAnnotation, tc.nodeLabels) + if len(filteredEndpoints) != len(expectedEndpoints) { + t.Errorf("expected %d filtered endpoints, got %d", len(expectedEndpoints), len(filteredEndpoints)) + } + if !reflect.DeepEqual(filteredEndpoints, expectedEndpoints) { + t.Errorf("expected %v, got %v", endpointsToStringArray(expectedEndpoints), endpointsToStringArray(filteredEndpoints)) + } + }) + } +} + +func Test_deprecatedTopologyFilter(t *testing.T) { type endpoint struct { Endpoint string NodeName types.NodeName @@ -470,7 +786,7 @@ func TestFilterTopologyEndpoint(t *testing.T) { } currentNodeLabels := tc.nodeLabels[tc.currentNodeName] filteredEndpoint := []endpoint{} - for _, ep := range FilterTopologyEndpoint(currentNodeLabels, tc.topologyKeys, endpoints) { + for _, ep := range deprecatedTopologyFilter(currentNodeLabels, tc.topologyKeys, endpoints) { filteredEndpoint = append(filteredEndpoint, m[ep]) } if !reflect.DeepEqual(filteredEndpoint, tc.expected) { @@ -480,7 +796,7 @@ func TestFilterTopologyEndpoint(t *testing.T) { } } -func TestFilterLocalEndpoint(t *testing.T) { +func Test_filterEndpointsInternalTrafficPolicy(t *testing.T) { cluster := v1.ServiceInternalTrafficPolicyCluster local := v1.ServiceInternalTrafficPolicyLocal @@ -566,7 +882,7 @@ func TestFilterLocalEndpoint(t *testing.T) { for _, tc := range testCases { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceInternalTrafficPolicy, tc.featureGateOn)() t.Run(tc.name, func(t *testing.T) { - filteredEndpoint := FilterLocalEndpoint(tc.internalTrafficPolicy, tc.endpoints) + filteredEndpoint := filterEndpointsInternalTrafficPolicy(tc.internalTrafficPolicy, tc.endpoints) if !reflect.DeepEqual(filteredEndpoint, tc.expected) { t.Errorf("expected %v, got %v", tc.expected, filteredEndpoint) } diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 9085bf0c7434d..a33d6ba145e63 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/proxy/config" ) @@ -91,6 +92,8 @@ type ServicePort interface { InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType // TopologyKeys returns service TopologyKeys as a string array. TopologyKeys() []string + // HintsAnnotation returns the value of the v1.AnnotationTopologyAwareHints annotation. + HintsAnnotation() string } // Endpoint in an interface which abstracts information about an endpoint. @@ -117,6 +120,9 @@ type Endpoint interface { IsTerminating() bool // GetTopology returns the topology information of the endpoint. GetTopology() map[string]string + // GetZoneHint returns the zone hint for the endpoint. This is based on + // endpoint.hints.forZones[0].name in the EndpointSlice API. + GetZoneHints() sets.String // IP returns IP part of the endpoint. IP() string // Port returns the Port part of the endpoint. diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index fa585a62403ba..6ff9444d97919 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -37,6 +37,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" @@ -201,6 +202,11 @@ func (info *endpointsInfo) GetTopology() map[string]string { return nil } +// GetZoneHint returns the zone hint for the endpoint. +func (info *endpointsInfo) GetZoneHints() sets.String { + return sets.String{} +} + // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. func (info *endpointsInfo) IP() string { return info.ip