Skip to content

Commit

Permalink
feat(gateway): watch Gateways for tag changes in GatewayInstance reco…
Browse files Browse the repository at this point in the history
…nciler

Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont committed Dec 21, 2021
1 parent dfb2ade commit 7ec7532
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type GatewayInstanceStatus struct {
const (
GatewayInstanceReady string = "Ready"

GatewayInstanceNoGatewayMatched = "NoGatewayMatched"
GatewayInstanceDeploymentNotAvailable = "DeploymentNotAvailable"

GatewayInstanceAddressNotReady = "LoadBalancerAddressNotReady"
Expand Down
94 changes: 84 additions & 10 deletions pkg/plugins/runtime/k8s/controllers/gateway_instance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
kube_types "k8s.io/apimachinery/pkg/types"
kube_ctrl "sigs.k8s.io/controller-runtime"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
kube_handler "sigs.k8s.io/controller-runtime/pkg/handler"
kube_reconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
kube_source "sigs.k8s.io/controller-runtime/pkg/source"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
"github.com/kumahq/kuma/pkg/core/resources/manager"
Expand Down Expand Up @@ -53,16 +56,19 @@ func (r *GatewayInstanceReconciler) Reconcile(ctx context.Context, req kube_ctrl
return kube_ctrl.Result{}, err
}

deployment, err := r.createOrUpdateDeployment(ctx, gatewayInstance)
if err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "unable to create Deployment for Gateway")
}

svc, err := r.createOrUpdateService(ctx, gatewayInstance)
if err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "unable to create Service for Gateway")
}

var deployment *kube_apps.Deployment
if svc != nil {
deployment, err = r.createOrUpdateDeployment(ctx, gatewayInstance)
if err != nil {
return kube_ctrl.Result{}, errors.Wrap(err, "unable to create Deployment for Gateway")
}
}

updateStatus(gatewayInstance, svc, deployment)

if err := r.Client.Status().Update(ctx, gatewayInstance); err != nil {
Expand All @@ -76,6 +82,8 @@ func k8sSelector(name string) map[string]string {
return map[string]string{"app": name}
}

// createOrUpdateService can either return an error, a created Service or
// neither if reconciliation shouldn't continue.
func (r *GatewayInstanceReconciler) createOrUpdateService(
ctx context.Context,
gatewayInstance *mesh_k8s.GatewayInstance,
Expand All @@ -85,7 +93,8 @@ func (r *GatewayInstanceReconciler) createOrUpdateService(
})

if gateway == nil {
return nil, fmt.Errorf("no matching Gateway")
// We have an index and watch set up to requeue when this changes
return nil, nil
}

obj, err := ctrls_util.CreateOrUpdateControlled(
Expand Down Expand Up @@ -127,6 +136,8 @@ func (r *GatewayInstanceReconciler) createOrUpdateService(
return obj.(*kube_core.Service), nil
}

// createOrUpdateDeployment can either return an error, a created Deployment or
// neither if reconciliation shouldn't continue.
func (r *GatewayInstanceReconciler) createOrUpdateDeployment(
ctx context.Context,
gatewayInstance *mesh_k8s.GatewayInstance,
Expand Down Expand Up @@ -222,20 +233,68 @@ func getCombinedReadiness(svc *kube_core.Service, deployment *kube_apps.Deployme
}
}

const noGateway = "No Gateway matched by tags"

func updateStatus(gatewayInstance *mesh_k8s.GatewayInstance, svc *kube_core.Service, deployment *kube_apps.Deployment) {
status, reason := getCombinedReadiness(svc, deployment)
var status kube_meta.ConditionStatus
var reason string
var message string

readiness := kube_meta.Condition{
Type: mesh_k8s.GatewayInstanceReady, Status: status, Reason: reason, LastTransitionTime: kube_meta.Now(), ObservedGeneration: gatewayInstance.GetGeneration(),
if svc == nil {
status, reason, message = kube_meta.ConditionFalse, mesh_k8s.GatewayInstanceNoGatewayMatched, noGateway
} else {
status, reason = getCombinedReadiness(svc, deployment)
gatewayInstance.Status.LoadBalancer = &svc.Status.LoadBalancer
}

gatewayInstance.Status.LoadBalancer = &svc.Status.LoadBalancer
readiness := kube_meta.Condition{
Type: mesh_k8s.GatewayInstanceReady, Status: status, Reason: reason, Message: message, LastTransitionTime: kube_meta.Now(), ObservedGeneration: gatewayInstance.GetGeneration(),
}

gatewayInstance.Status.Conditions = []kube_meta.Condition{
readiness,
}
}

const serviceKey string = ".metadata.service"

// GatewayToInstanceMapper maps a Gateway object to GatewayInstance objects by
// using the service tag to list GatewayInstances with a matching index.
// The index is set up on GatewayInstance in SetupWithManager and holds the service
// tag from the GatewayInstance tags.
func GatewayToInstanceMapper(l logr.Logger, client kube_client.Client) kube_handler.MapFunc {
l = l.WithName("gateway-to-gateway-instance-mapper")

return func(obj kube_client.Object) []kube_reconcile.Request {
gateway := obj.(*mesh_k8s.Gateway)

var serviceNames []string
for _, selector := range gateway.Spec.GetSelectors() {
if tagValue, ok := selector.Match[mesh_proto.ServiceTag]; ok {
serviceNames = append(serviceNames, tagValue)
}
}

var req []kube_reconcile.Request
for _, serviceName := range serviceNames {
instances := &mesh_k8s.GatewayInstanceList{}
if err := client.List(
context.Background(), instances, kube_client.MatchingFields{serviceKey: serviceName},
); err != nil {
l.WithValues("gateway", obj.GetName()).Error(err, "failed to fetch GatewayInstances")
}

for _, instance := range instances.Items {
req = append(req, kube_reconcile.Request{
NamespacedName: kube_types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name},
})
}
}

return req
}
}

func (r *GatewayInstanceReconciler) SetupWithManager(mgr kube_ctrl.Manager) error {
gatewayInstanceGVK := kube_schema.GroupVersionKind{
Group: mesh_k8s.GroupVersion.Group,
Expand All @@ -251,9 +310,24 @@ func (r *GatewayInstanceReconciler) SetupWithManager(mgr kube_ctrl.Manager) erro
return err
}

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &mesh_k8s.GatewayInstance{}, serviceKey, func(obj kube_client.Object) []string {
instance := obj.(*mesh_k8s.GatewayInstance)

serviceName := instance.Spec.Tags[mesh_proto.ServiceTag]

return []string{serviceName}
}); err != nil {
return err
}

return kube_ctrl.NewControllerManagedBy(mgr).
For(&mesh_k8s.GatewayInstance{}).
Owns(&kube_core.Service{}).
Owns(&kube_apps.Deployment{}).
// On Update events our mapper function is called with the object both
// before the event as well as the object after. In the case of
// unbinding a Gateway from one Instance to another, we end up
// reconciling both Instances.
Watches(&kube_source.Kind{Type: &mesh_k8s.Gateway{}}, kube_handler.EnqueueRequestsFromMapFunc(GatewayToInstanceMapper(r.Log, mgr.GetClient()))).
Complete(r)
}

0 comments on commit 7ec7532

Please sign in to comment.