Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support join & leave for member controllers #184

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/mcs-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
//+kubebuilder:scaffold:imports
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils/controller"

fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/hubconfig"
Expand Down Expand Up @@ -241,12 +242,13 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma
hubClient := hubMgr.GetClient()

klog.V(1).InfoS("Create multiclusterservice reconciler")
if err := (&multiclusterservice.Reconciler{
mcs := &multiclusterservice.Reconciler{
Client: memberClient,
Scheme: memberMgr.GetScheme(),
FleetSystemNamespace: *fleetSystemNamespace,
Recorder: memberMgr.GetEventRecorderFor(multiclusterservice.ControllerName),
}).SetupWithManager(memberMgr); err != nil {
}
if err := mcs.SetupWithManager(memberMgr); err != nil {
klog.ErrorS(err, "Unable to create multiclusterservice reconciler")
return err
}
Expand All @@ -269,6 +271,7 @@ func setupControllersWithManager(_ context.Context, hubMgr, memberMgr manager.Ma
MemberClient: memberClient,
HubClient: hubClient,
AgentType: clusterv1beta1.MultiClusterServiceAgent,
Controllers: []controller.MemberController{mcs},
}).SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler")
return err
Expand Down
45 changes: 31 additions & 14 deletions cmd/member-net-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
//+kubebuilder:scaffold:imports
clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1"
fleetv1alpha1 "go.goms.io/fleet/apis/v1alpha1"
"go.goms.io/fleet/pkg/utils/controller"

fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/env"
Expand Down Expand Up @@ -259,79 +260,94 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager.
memberClient := memberMgr.GetClient()
hubClient := hubMgr.GetClient()

var controllers []controller.MemberController
klog.V(1).InfoS("Create endpointslice controller")
if err := (&endpointslice.Reconciler{
endpointSliceController := &endpointslice.Reconciler{
MemberClusterID: mcName,
MemberClient: memberClient,
HubClient: hubClient,
HubNamespace: mcHubNamespace,
}).SetupWithManager(ctx, memberMgr); err != nil {
}
if err := endpointSliceController.SetupWithManager(ctx, memberMgr); err != nil {
klog.ErrorS(err, "Unable to create endpointslice controller")
return err
}
controllers = append(controllers, endpointSliceController)

klog.V(1).InfoS("Create endpointsliceexport controller")
if err := (&endpointsliceexport.Reconciler{
endpointSliceExportController := &endpointsliceexport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
}).SetupWithManager(hubMgr); err != nil {
}
if err := endpointSliceExportController.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create endpointsliceexport controller")
return err
}
controllers = append(controllers, endpointSliceExportController)

klog.V(1).InfoS("Create endpointsliceimport controller")
if err := (&endpointsliceimport.Reconciler{
endpointSliceImportController := &endpointsliceimport.Reconciler{
MemberClusterID: mcName,
MemberClient: memberClient,
HubClient: hubClient,
FleetSystemNamespace: *fleetSystemNamespace,
}).SetupWithManager(ctx, memberMgr, hubMgr); err != nil {
}
if err := endpointSliceImportController.SetupWithManager(ctx, memberMgr, hubMgr); err != nil {
klog.ErrorS(err, "Unable to create endpointsliceimport controller")
return err
}
controllers = append(controllers, endpointSliceImportController)

klog.V(1).InfoS("Create internalserviceexport controller")
if err := (&internalserviceexport.Reconciler{
internalServiceExportController := &internalserviceexport.Reconciler{
MemberClusterID: mcName,
MemberClient: memberClient,
HubClient: hubClient,
Recorder: memberMgr.GetEventRecorderFor(internalserviceexport.ControllerName),
}).SetupWithManager(hubMgr); err != nil {
}
if err := internalServiceExportController.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalserviceexport controller")
return err
}
controllers = append(controllers, internalServiceExportController)

klog.V(1).InfoS("Create internalserviceimport controller")
if err := (&internalserviceimport.Reconciler{
internalServiceImportController := &internalserviceimport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
}).SetupWithManager(hubMgr); err != nil {
}
if err := internalServiceImportController.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalserviceimport controller")
return err
}
controllers = append(controllers, internalServiceImportController)

klog.V(1).InfoS("Create serviceexport reconciler")
if err := (&serviceexport.Reconciler{
serviceExportController := &serviceexport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
MemberClusterID: mcName,
HubNamespace: mcHubNamespace,
Recorder: memberMgr.GetEventRecorderFor(serviceexport.ControllerName),
}).SetupWithManager(memberMgr); err != nil {
}
if err := serviceExportController.SetupWithManager(memberMgr); err != nil {
klog.ErrorS(err, "Unable to create serviceexport reconciler")
return err
}
controllers = append(controllers, serviceExportController)

klog.V(1).InfoS("Create serviceimport reconciler")
if err := (&serviceimport.Reconciler{
serviceImportController := &serviceimport.Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
MemberClusterID: mcName,
HubNamespace: mcHubNamespace,
}).SetupWithManager(memberMgr); err != nil {
}
if err := serviceImportController.SetupWithManager(memberMgr); err != nil {
klog.ErrorS(err, "Unable to create serviceimport reconciler")
return err
}
controllers = append(controllers, serviceImportController)

if *isV1Alpha1APIEnabled {
klog.V(1).InfoS("Create internalmembercluster (v1alpha1 API) reconciler")
Expand All @@ -351,6 +367,7 @@ func setupControllersWithManager(ctx context.Context, hubMgr, memberMgr manager.
MemberClient: memberClient,
HubClient: hubClient,
AgentType: clusterv1beta1.ServiceExportImportAgent,
Controllers: controllers,
}).SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Unable to create internalmembercluster (v1beta1 API) reconciler")
return err
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ require (

require (
github.com/stretchr/testify v1.9.0
go.goms.io/fleet v0.10.5
go.goms.io/fleet v0.10.8
golang.org/x/sync v0.7.0
)

require (
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.goms.io/fleet v0.10.5 h1:Zc+pLk77zWv0hAqBbFZEMMd05MVw9P8jp8YHTy7WPdI=
go.goms.io/fleet v0.10.5/go.mod h1:FpVP3YsiewmyGH77Yx6sLngHbZKgepnmJDIibz2pjZo=
go.goms.io/fleet v0.10.8 h1:AAK4wr4uKB8ATMhC4cpCKYAq9lMr9XLYE5QE+vkBf5M=
go.goms.io/fleet v0.10.8/go.mod h1:2MaaOUGGespUMwgy64MBIMXELv8lDJq+0/NyS3OGzTw=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand All @@ -120,6 +120,8 @@ golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/member/endpointslice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"

discoveryv1 "k8s.io/api/discovery/v1"
Expand Down Expand Up @@ -53,6 +54,8 @@ type Reconciler struct {
HubClient client.Client
// The namespace reserved for the current member cluster in the hub cluster.
HubNamespace string
// whether to start exporting an EndpointSlice
joined atomic.Bool
}

//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceexports,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -110,6 +113,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, nil
}

// When the member leaves, the controller will continue to unexport the endpointSlice as intended.
if !r.joined.Load() {
klog.V(2).InfoS("EndpointSlice controller has not joined yet, skip exporting the endpointSlice and requeue the request", "endpointSlice", endpointSliceRef)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
zhiying-lin marked this conversation as resolved.
Show resolved Hide resolved
}

// Retrieve the unique name assigned; if none has been assigned, or the one assigned is not valid, possibly due
// to user tampering with the annotation, assign a new unique name.
fleetUniqueName, ok := endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName]
Expand Down Expand Up @@ -439,3 +448,24 @@ func (r *Reconciler) annotateLastSeenGenerationAndTimestamp(ctx context.Context,
endpointSlice.Annotations[metrics.MetricsAnnotationLastSeenTimestamp] = startTime.Format(metrics.MetricsLastSeenTimestampFormat)
return r.MemberClient.Update(ctx, endpointSlice)
}

// Join marks the joined status as true.
func (r *Reconciler) Join(_ context.Context) error {
if r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSlice controller joined")
r.joined.Store(true)
return nil
}

// Leave marks the joined status as false.
// When the controller is in the leave state, it will only handle the delete events.
func (r *Reconciler) Leave(_ context.Context) error {
if !r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSlice controller left")
r.joined.Store(false)
return nil
}
8 changes: 6 additions & 2 deletions pkg/controllers/member/endpointslice/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
hubClient client.Client
ctx context.Context
cancel context.CancelFunc
reconciler *Reconciler
)

// setUpResources help set up resources in the test environment.
Expand Down Expand Up @@ -99,13 +100,15 @@ var _ = BeforeSuite(func() {
ctrlMgr, err := ctrl.NewManager(memberCfg, ctrl.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())

err = (&Reconciler{
reconciler = &Reconciler{
MemberClusterID: memberClusterID,
MemberClient: memberClient,
HubClient: hubClient,
HubNamespace: hubNSForMember,
}).SetupWithManager(ctx, ctrlMgr)
}
err = reconciler.SetupWithManager(ctx, ctrlMgr)
Expect(err).NotTo(HaveOccurred())
Expect(reconciler.Join(ctx)).Should(Succeed())

go func() {
defer GinkgoRecover()
Expand All @@ -116,6 +119,7 @@ var _ = BeforeSuite(func() {

var _ = AfterSuite(func() {
defer klog.Flush()
Expect(reconciler.Leave(ctx)).Should(Succeed())
cancel()

By("tearing down the test environment")
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/member/endpointsliceexport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,17 @@ func isEndpointSliceExportLinkedWithEndpointSlice(endpointSliceExport *fleetnetv
}
return true
}

// Join does nothing.
// There is no need to start or stop the controller as this controller is designed to clean up any invalid
// EndpointSliceExport in the hub cluster.
func (r *Reconciler) Join(_ context.Context) error {
// do nothing
return nil
}

// Leave does nothing.
func (r *Reconciler) Leave(_ context.Context) error {
// do nothing
return nil
}
30 changes: 30 additions & 0 deletions pkg/controllers/member/endpointsliceimport/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package endpointsliceimport
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -82,6 +83,8 @@ type Reconciler struct {
HubClient client.Client
// The namespace reserved for fleet resources in the member cluster.
FleetSystemNamespace string
// whether to start imports an endpointSlice
joined atomic.Bool
}

//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceimports,verbs=get;list;watch;update;patch
Expand Down Expand Up @@ -130,6 +133,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{}, nil
}

// When the member leaves, the controller will continue to handle the deleted endpointSliceImport as intended.
if !r.joined.Load() {
klog.V(2).InfoS("EndpointSliceImport controller has not joined yet, skip importing the endpointSlice and requeue the request", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// Import the EndpointSlice, or update an imported EndpointSlice.

// Inquire the corresponding MCS to find out which Service the imported EndpointSlice should associate with.
Expand Down Expand Up @@ -428,3 +437,24 @@ func (r *Reconciler) observeMetrics(ctx context.Context, endpointSliceImport *fl
"isFirstImport", isFirstImport)
return nil
}

// Join marks the joined status as true.
func (r *Reconciler) Join(_ context.Context) error {
if r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSliceImport controller joined")
r.joined.Store(true)
return nil
}

// Leave marks the joined status as false.
// When the controller is in the leave state, it will only handle the delete events.
func (r *Reconciler) Leave(_ context.Context) error {
if !r.joined.Load() {
return nil
}
klog.InfoS("Mark the endpointSliceImport controller left")
r.joined.Store(false)
return nil
}
8 changes: 6 additions & 2 deletions pkg/controllers/member/endpointsliceimport/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
hubClient client.Client
ctx context.Context
cancel context.CancelFunc
reconciler *Reconciler
)

// setUpResources help set up resources in the test environment.
Expand Down Expand Up @@ -116,12 +117,14 @@ var _ = BeforeSuite(func() {
hubClient = hubCtrlMgr.GetClient()
Expect(hubClient).NotTo(BeNil())

err = (&Reconciler{
reconciler = &Reconciler{
MemberClient: memberClient,
HubClient: hubClient,
FleetSystemNamespace: fleetSystemNS,
}).SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr)
}
err = reconciler.SetupWithManager(ctx, memberCtrlMgr, hubCtrlMgr)
Expect(err).NotTo(HaveOccurred())
Expect(reconciler.Join(ctx)).Should(Succeed())

go func() {
defer GinkgoRecover()
Expand All @@ -141,6 +144,7 @@ var _ = BeforeSuite(func() {

var _ = AfterSuite(func() {
defer klog.Flush()
Expect(reconciler.Leave(ctx)).Should(Succeed())
cancel()

By("tearing down the test environment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var _ = BeforeSuite(func() {
filepath.Join("../../../../../", "config", "crd", "bases"),
// need to make sure the version matches the one in the go.mod
// workaround mentioned in https://github.com/kubernetes-sigs/controller-runtime/issues/1191
filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "[email protected].5", "config", "crd", "bases"),
filepath.Join(build.Default.GOPATH, "pkg", "mod", "go.goms.io", "[email protected].8", "config", "crd", "bases"),
},
ErrorIfCRDPathMissing: true,
}
Expand Down
Loading
Loading