Skip to content

Commit

Permalink
Adding support for TopologyAwareHints to kube-proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
robscott committed Mar 8, 2021
1 parent 1dcf09c commit f07be06
Show file tree
Hide file tree
Showing 12 changed files with 545 additions and 133 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func (s *ProxyServer) Run() error {
// functions must configure their shared informer event handlers first.
informerFactory.Start(wait.NeverStop)

if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) {
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) || utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
// Make an informer that selects for our nodename.
currentNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
Expand Down
15 changes: 13 additions & 2 deletions pkg/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type BaseEndpointInfo struct {
IsLocal bool
Topology map[string]string

// ZoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
ZoneHints sets.String
// Ready indicates whether this endpoint is ready and NOT terminating.
// For pods, this is true if a pod has a ready status and a nil deletion timestamp.
// This is only set when watching EndpointSlices. If using Endpoints, this is always
Expand Down Expand Up @@ -102,6 +105,11 @@ func (info *BaseEndpointInfo) GetTopology() map[string]string {
return info.Topology
}

// GetZoneHints returns the zone hint for the endpoint.
func (info *BaseEndpointInfo) GetZoneHints() sets.String {
return info.ZoneHints
}

// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) IP() string {
return utilproxy.IPPart(info.Endpoint)
Expand All @@ -118,14 +126,15 @@ func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
}

func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]string,
ready, serving, terminating bool) *BaseEndpointInfo {
ready, serving, terminating bool, zoneHints sets.String) *BaseEndpointInfo {
return &BaseEndpointInfo{
Endpoint: net.JoinHostPort(IP, strconv.Itoa(port)),
IsLocal: isLocal,
Topology: topology,
Ready: ready,
Serving: serving,
Terminating: terminating,
ZoneHints: zoneHints,
}
}

Expand Down Expand Up @@ -427,8 +436,10 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
isServing := true
isTerminating := false
isLocal := addr.NodeName != nil && *addr.NodeName == ect.hostname
// Only supported with EndpointSlice API
zoneHints := sets.String{}

baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil, isReady, isServing, isTerminating)
baseEndpointInfo := newBaseEndpointInfo(addr.IP, int(port.Port), isLocal, nil, isReady, isServing, isTerminating, zoneHints)
if ect.makeEndpointInfo != nil {
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
} else {
Expand Down
28 changes: 14 additions & 14 deletions pkg/proxy/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand All @@ -218,7 +218,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "port", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand All @@ -241,7 +241,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand Down Expand Up @@ -278,12 +278,12 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "2.2.2.2:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
{Endpoint: "2.2.2.2:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "2.2.2.2:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
{Endpoint: "2.2.2.2:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand All @@ -307,7 +307,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand All @@ -331,7 +331,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand All @@ -355,7 +355,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand Down Expand Up @@ -385,10 +385,10 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): {
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand Down Expand Up @@ -418,10 +418,10 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
}),
expected: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p1", v1.ProtocolTCP): {
{Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:11", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
makeServicePortName("ns1", "ep1", "p2", v1.ProtocolTCP): {
{Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "[2001:db8:85a3:0:0:8a2e:370:7334]:22", IsLocal: false, Ready: true, Serving: true, Terminating: false, ZoneHints: sets.String{}},
},
},
},
Expand Down
15 changes: 14 additions & 1 deletion pkg/proxy/endpointslicecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilnet "k8s.io/utils/net"
)
Expand Down Expand Up @@ -78,6 +81,7 @@ type endpointInfo struct {
Addresses []string
NodeName *string
Topology map[string]string
ZoneHints sets.String

Ready bool
Serving bool
Expand Down Expand Up @@ -136,6 +140,15 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *

epInfo.NodeName = endpoint.NodeName

if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
epInfo.ZoneHints = sets.String{}
for _, zone := range endpoint.Hints.ForZones {
epInfo.ZoneHints.Insert(zone.Name)
}
}
}

esInfo.Endpoints = append(esInfo.Endpoints, epInfo)
}

Expand Down Expand Up @@ -275,7 +288,7 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName
}

endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology,
endpoint.Ready, endpoint.Serving, endpoint.Terminating)
endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints)

// This logic ensures we're deduping potential overlapping endpoints
// isLocal should not vary between matching IPs, but if it does, we
Expand Down
20 changes: 5 additions & 15 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,22 +1020,12 @@ func (proxier *Proxier) syncProxyRules() {

allEndpoints := proxier.endpointsMap[svcName]

// Service Topology will not be enabled in the following cases:
// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
// 2. ServiceTopology is not enabled.
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
// to get topology information).
if !svcInfo.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
allEndpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, svcInfo.TopologyKeys(), allEndpoints)
}
// Filtering for topology aware endpoints. This function will only
// filter endpoints if appropriate feature gates are enabled and the
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)

// Service InternalTrafficPolicy is only enabled when all of the
// following are true:
// 1. InternalTrafficPolicy is Local
// 2. ServiceInternalTrafficPolicy feature gate is on
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
allEndpoints = proxy.FilterLocalEndpoint(svcInfo.InternalTrafficPolicy(), allEndpoints)
}
readyEndpoints := make([]proxy.Endpoint, 0, len(allEndpoints))
for _, endpoint := range allEndpoints {
if !endpoint.IsReady() {
Expand Down
24 changes: 9 additions & 15 deletions pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -2057,21 +2057,15 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode

endpoints := proxier.endpointsMap[svcPortName]

// Service Topology will not be enabled in the following cases:
// 1. externalTrafficPolicy=Local (mutually exclusive with service topology).
// 2. ServiceTopology is not enabled.
// 3. EndpointSlice is not enabled (service topology depends on endpoint slice
// to get topology information).
if !onlyNodeLocalEndpoints && utilfeature.DefaultFeatureGate.Enabled(features.ServiceTopology) && utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) {
endpoints = proxy.FilterTopologyEndpoint(proxier.nodeLabels, proxier.serviceMap[svcPortName].TopologyKeys(), endpoints)
}

// Service InternalTrafficPolicy is only enabled when all of the
// following are true:
// 1. InternalTrafficPolicy is PreferLocal or Local
// 2. ServiceInternalTrafficPolicy feature gate is on
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && onlyNodeLocalEndpointsForInternal {
endpoints = proxy.FilterLocalEndpoint(proxier.serviceMap[svcPortName].InternalTrafficPolicy(), endpoints)
// Filtering for topology aware endpoints. This function will only
// filter endpoints if appropriate feature gates are enabled and the
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
svcInfo, ok := proxier.serviceMap[svcPortName]
if !ok {
klog.Warningf("Unable to filter endpoints due to missing Service info for %s", svcPortName)
} else {
endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels)
}

for _, epInfo := range endpoints {
Expand Down
Loading

0 comments on commit f07be06

Please sign in to comment.