Skip to content

Commit

Permalink
feat(gateway) add initial gatewayapi.Gateway controller
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Beaumont <[email protected]>
  • Loading branch information
michaelbeaumont committed Dec 1, 2021
1 parent b0677af commit fc5231f
Show file tree
Hide file tree
Showing 9 changed files with 791 additions and 0 deletions.
3 changes: 3 additions & 0 deletions api/mesh/v1alpha1/dataplane_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
// External service tag
ExternalServiceTag = "kuma.io/external-service-name"

// Listener tag is used to select Gateway listeners
ListenerTag = "gateways.kuma.io/listener-name"

// Used for Service-less dataplanes
TCPPortReserved = 49151 // IANA Reserved
)
Expand Down
51 changes: 51 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/gatewayapi/condition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package controllers

import (
kube_apps "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_client "sigs.k8s.io/controller-runtime/pkg/client"
gatewayapi "sigs.k8s.io/gateway-api/apis/v1alpha2"
)

func conditionOn(
obj kube_client.Object, typ gatewayapi.GatewayConditionType, status metav1.ConditionStatus, reason gatewayapi.GatewayConditionReason,
) metav1.Condition {
return metav1.Condition{
Type: string(typ), Status: status, Reason: string(reason), LastTransitionTime: metav1.Now(), ObservedGeneration: obj.GetGeneration(),
}
}

func getCondition(deployment *kube_apps.Deployment, typ kube_apps.DeploymentConditionType) *metav1.ConditionStatus {
for _, c := range deployment.Status.Conditions {
if c.Type == typ {
status := metav1.ConditionStatus(c.Status)
return &status
}
}

return nil
}

func setConditions(gateway *gatewayapi.Gateway, deployment *kube_apps.Deployment) {
conditions := []metav1.Condition{
conditionOn(gateway, gatewayapi.GatewayConditionScheduled, metav1.ConditionTrue, gatewayapi.GatewayReasonScheduled),
}

// TODO(michaelbeaumont) it'd be nice to get more up to date info from the
// kuma-dp instance to tell whether listeners are _really_ ready
if len(gateway.Status.Addresses) == 0 {
conditions = append(conditions,
conditionOn(gateway, gatewayapi.GatewayConditionReady, metav1.ConditionFalse, gatewayapi.GatewayReasonAddressNotAssigned),
)
} else if condition := getCondition(deployment, kube_apps.DeploymentAvailable); condition == nil || *condition != metav1.ConditionTrue {
conditions = append(conditions,
conditionOn(gateway, gatewayapi.GatewayConditionReady, metav1.ConditionFalse, gatewayapi.GatewayReasonListenersNotReady),
)
} else {
conditions = append(conditions,
conditionOn(gateway, gatewayapi.GatewayConditionReady, metav1.ConditionTrue, gatewayapi.GatewayReasonReady),
)
}

gateway.Status.Conditions = conditions
}
174 changes: 174 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/gatewayapi/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package controllers

import (
"errors"
"fmt"

gatewayapi "sigs.k8s.io/gateway-api/apis/v1alpha2"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
)

func k8sToKumaHeader(header gatewayapi.HTTPHeader) *mesh_proto.GatewayRoute_HttpRoute_Filter_RequestHeader_Header {
return &mesh_proto.GatewayRoute_HttpRoute_Filter_RequestHeader_Header{
Name: string(header.Name),
Value: header.Value,
}
}

func gapiToKumaRef(objectNamespace string, ref gatewayapi.BackendObjectReference) (map[string]string, error) {
// References to Services are required by GAPI to include a port
// TODO remove when https://github.com/kubernetes-sigs/gateway-api/pull/944
// is in master
if ref.Port == nil {
return nil, errors.New("backend reference must include port")
}

if *ref.Kind != "Service" {
return nil, errors.New("backend reference must be a Service") // TODO setappropriate status on gateway
}

namespace := objectNamespace
if ref.Namespace != nil {
namespace = string(*ref.Namespace)
}

return map[string]string{
mesh_proto.ServiceTag: fmt.Sprintf("%s_%s_svc_%d", ref.Name, namespace, *ref.Port),
}, nil
}

func gapiToKumaMatch(match gatewayapi.HTTPRouteMatch) (*mesh_proto.GatewayRoute_HttpRoute_Match, error) {
kumaMatch := &mesh_proto.GatewayRoute_HttpRoute_Match{}

if m := match.Method; m != nil {
if kumaMethod, ok := mesh_proto.HttpMethod_value[string(*m)]; ok {
kumaMatch.Method = mesh_proto.HttpMethod(kumaMethod)
} else if *m != "" {
return nil, fmt.Errorf("unexpected HTTP method %s", *m)
}
}

if p := match.Path; p != nil {
path := &mesh_proto.GatewayRoute_HttpRoute_Match_Path{
Value: *p.Value,
}

switch *p.Type {
case gatewayapi.PathMatchExact:
path.Match = mesh_proto.GatewayRoute_HttpRoute_Match_Path_EXACT
case gatewayapi.PathMatchPathPrefix:
path.Match = mesh_proto.GatewayRoute_HttpRoute_Match_Path_PREFIX
case gatewayapi.PathMatchRegularExpression:
path.Match = mesh_proto.GatewayRoute_HttpRoute_Match_Path_REGEX
}

kumaMatch.Path = path
}

for _, header := range match.Headers {
kumaHeader := &mesh_proto.GatewayRoute_HttpRoute_Match_Header{
Name: string(header.Name),
Value: header.Value,
}

switch *header.Type {
case gatewayapi.HeaderMatchExact:
kumaHeader.Match = mesh_proto.GatewayRoute_HttpRoute_Match_Header_EXACT
case gatewayapi.HeaderMatchRegularExpression:
kumaHeader.Match = mesh_proto.GatewayRoute_HttpRoute_Match_Header_REGEX
}

kumaMatch.Headers = append(kumaMatch.Headers, kumaHeader)
}

for _, query := range match.QueryParams {
kumaQuery := &mesh_proto.GatewayRoute_HttpRoute_Match_Query{
Name: query.Name,
Value: query.Value,
}

switch *query.Type {
case gatewayapi.QueryParamMatchExact:
kumaQuery.Match = mesh_proto.GatewayRoute_HttpRoute_Match_Query_EXACT
case gatewayapi.QueryParamMatchRegularExpression:
kumaQuery.Match = mesh_proto.GatewayRoute_HttpRoute_Match_Query_REGEX
}

kumaMatch.QueryParameters = append(kumaMatch.QueryParameters, kumaQuery)
}

return kumaMatch, nil
}

func gapiToKumaFilter(namespace string, filter gatewayapi.HTTPRouteFilter) (*mesh_proto.GatewayRoute_HttpRoute_Filter, error) {
var kumaFilter mesh_proto.GatewayRoute_HttpRoute_Filter

switch filter.Type {
case gatewayapi.HTTPRouteFilterRequestHeaderModifier:
filter := filter.RequestHeaderModifier

var kumaInnerFilter mesh_proto.GatewayRoute_HttpRoute_Filter_RequestHeader

for _, set := range filter.Set {
kumaInnerFilter.Set = append(kumaInnerFilter.Set, k8sToKumaHeader(set))
}

for _, add := range filter.Add {
kumaInnerFilter.Add = append(kumaInnerFilter.Add, k8sToKumaHeader(add))
}

kumaInnerFilter.Remove = filter.Remove

kumaFilter.Filter = &mesh_proto.GatewayRoute_HttpRoute_Filter_RequestHeader_{
RequestHeader: &kumaInnerFilter,
}
case gatewayapi.HTTPRouteFilterRequestMirror:
filter := filter.RequestMirror

destinationRef, err := gapiToKumaRef(namespace, filter.BackendRef)
if err != nil {
return nil, err
}

kumaInnerFilter := mesh_proto.GatewayRoute_HttpRoute_Filter_Mirror{
Backend: &mesh_proto.GatewayRoute_Backend{
Destination: destinationRef,
},
Percentage: util_proto.Double(100),
}

kumaFilter.Filter = &mesh_proto.GatewayRoute_HttpRoute_Filter_Mirror_{
Mirror: &kumaInnerFilter,
}
case gatewayapi.HTTPRouteFilterRequestRedirect:
filter := filter.RequestRedirect

kumaInnerFilter := mesh_proto.GatewayRoute_HttpRoute_Filter_Redirect{}

if s := filter.Scheme; s != nil {
kumaInnerFilter.Scheme = *s
}

if h := filter.Hostname; h != nil {
kumaInnerFilter.Hostname = string(*h)
}

if p := filter.Port; p != nil {
kumaInnerFilter.Port = uint32(*p)
}

if sc := filter.StatusCode; sc != nil {
kumaInnerFilter.StatusCode = uint32(*sc)
}

kumaFilter.Filter = &mesh_proto.GatewayRoute_HttpRoute_Filter_Redirect_{
Redirect: &kumaInnerFilter,
}
default:
return nil, fmt.Errorf("unsupported filter type %v", filter.Type)
}

return &kumaFilter, nil
}
128 changes: 128 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/gatewayapi/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package controllers

import (
"context"
"fmt"
"strconv"

"github.com/pkg/errors"
kube_apps "k8s.io/api/apps/v1"
kube_core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_types "k8s.io/apimachinery/pkg/types"
kube_controllerutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
gatewayapi "sigs.k8s.io/gateway-api/apis/v1alpha2"

core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata"
util_k8s "github.com/kumahq/kuma/pkg/plugins/runtime/k8s/util"
)

func k8sResourceName(name string) string {
return fmt.Sprintf("%s-kuma-gateway", name)
}

func k8sSelector(name string) map[string]string {
return map[string]string{"app": k8sResourceName(name)}
}

func (r *GatewayReconciler) createOrUpdateService(
ctx context.Context,
gateway *core_mesh.GatewayResource,
k8sGateway *gatewayapi.Gateway,
) (*kube_core.Service, error) {
service := &kube_core.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: k8sGateway.Namespace,
Name: k8sResourceName(k8sGateway.Name),
},
}

ns := kube_core.Namespace{}
if err := r.Client.Get(context.Background(), kube_types.NamespacedName{Name: k8sGateway.Namespace}, &ns); err != nil {
return nil, errors.Wrap(err, "unable to get Namespace for gateway")
}

if _, err := kube_controllerutil.CreateOrUpdate(ctx, r.Client, service, func() error {
var ports []kube_core.ServicePort

for _, listener := range gateway.Spec.GetConf().GetListeners() {
ports = append(ports, kube_core.ServicePort{
Name: strconv.Itoa(int(listener.Port)),
Protocol: kube_core.ProtocolTCP,
Port: int32(listener.Port),
})
}

service.Spec = kube_core.ServiceSpec{
Selector: k8sSelector(k8sGateway.Name),
Ports: ports,
Type: kube_core.ServiceTypeLoadBalancer,
}

err := kube_controllerutil.SetControllerReference(k8sGateway, service, r.Scheme)
return errors.Wrap(err, "unable to set Service's controller reference to Gateway")
}); err != nil {
return nil, errors.Wrap(err, "unable to create or update Service for Gateway")
}

return service, nil
}

func (r *GatewayReconciler) createOrUpdateDeployment(
ctx context.Context,
gateway *core_mesh.GatewayResource,
k8sGateway *gatewayapi.Gateway,
) (*kube_apps.Deployment, error) {
ns := kube_core.Namespace{}
if err := r.Client.Get(context.Background(), kube_types.NamespacedName{Name: k8sGateway.Namespace}, &ns); err != nil {
return nil, errors.Wrap(err, "unable to get Namespace for gateway")
}

deployment := &kube_apps.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: k8sGateway.GetNamespace(),
Name: k8sResourceName(k8sGateway.GetName()),
},
}

if _, err := kube_controllerutil.CreateOrUpdate(ctx, r.Client, deployment, func() error {
// TODO(michaelbeaumont) fix the resource limits ro fit use as a gateway
// proxy
podSpec, err := r.Injector.NewGatewayContainer(k8sGateway.Annotations, &ns)
if err != nil {
return errors.Wrap(err, "unable to create Gateway container")
}

annotations := map[string]string{
metadata.KumaGatewayAnnotation: metadata.AnnotationBuiltin,
metadata.KumaSidecarInjectionAnnotation: metadata.AnnotationDisabled,
}

if mesh := util_k8s.MeshFor(k8sGateway); mesh != model.DefaultMesh {
annotations[metadata.KumaMeshAnnotation] = mesh
}

var replicas int32 = 1

deployment.Spec.Replicas = &replicas
deployment.Spec.Selector = &metav1.LabelSelector{
MatchLabels: k8sSelector(k8sGateway.Name),
}
deployment.Spec.Template = kube_core.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: k8sSelector(k8sGateway.Name),
Annotations: annotations,
},
Spec: podSpec,
}

err = kube_controllerutil.SetControllerReference(k8sGateway, deployment, r.Scheme)
return errors.Wrap(err, "unable to set Deployments's controller reference to Gateway")
}); err != nil {
return nil, errors.Wrap(err, "unable to create or update Deployment for Gateway")
}

return deployment, nil
}
Loading

0 comments on commit fc5231f

Please sign in to comment.