Skip to content

Commit

Permalink
allow pvmigrate to scale down prometheus pods for openebs volume plac…
Browse files Browse the repository at this point in the history
…ement (replicatedhq#4874)

* allow pvmigrate to scale down prometheus pods for openebs volume placement

* reset ekco scale if migration fails
  • Loading branch information
laverya committed Oct 9, 2023
1 parent 702c100 commit 1d3d474
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 176 deletions.
169 changes: 0 additions & 169 deletions pkg/cli/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@ package cli

import (
"context"
"encoding/json"
"fmt"
"log"
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"

Expand All @@ -27,11 +22,6 @@ import (
var scaleDownReplicasWaitTime = 5 * time.Minute

const (
prometheusNamespace = "monitoring"
prometheusName = "k8s"
prometheusStatefulSetName = "prometheus-k8s"
ekcoNamespace = "kurl"
ekcoDeploymentName = "ekc-operator"
pvmigrateScaleDownAnnotation = "kurl.sh/pvcmigrate-scale"
longhornNamespace = "longhorn-system"
overProvisioningSetting = "storage-over-provisioning-percentage"
Expand Down Expand Up @@ -114,9 +104,6 @@ func NewLonghornRollbackMigrationReplicas(cli CLI) *cobra.Command {
}
logger.Print("Longhorn volumes have been rolled back to their original replica count.")

if err := scaleUpPodsUsingLonghorn(context.Background(), logger, cli); err != nil {
return fmt.Errorf("error scaling up pods using longhorn: %w", err)
}
return nil
},
}
Expand Down Expand Up @@ -190,162 +177,6 @@ func NewLonghornPrepareForMigration(cli CLI) *cobra.Command {
}
}

// scaleUpPodsUsingLonghorn scales up any deployment or statefulset that has been previously
// scaled down by scaleDownPodsUsingLonghorn. uses the default annotation used by pvmigrate.
func scaleUpPodsUsingLonghorn(ctx context.Context, logger *log.Logger, cli client.Client) error {
if err := scaleEkco(ctx, logger, cli, 1); err != nil {
return fmt.Errorf("error scaling ekco operator back up: %w", err)
}
if err := scaleUpPrometheus(ctx, cli); err != nil {
return fmt.Errorf("error scaling prometheus back up: %w", err)
}

logger.Print("Scaling up pods using Longhorn volumes.")
var deps appsv1.DeploymentList
if err := cli.List(ctx, &deps); err != nil {
return fmt.Errorf("error listing longhorn deployments: %w", err)
}
for _, dep := range deps.Items {
if _, ok := dep.Annotations[pvmigrateScaleDownAnnotation]; !ok {
continue
}
replicas, err := strconv.Atoi(dep.Annotations[pvmigrateScaleDownAnnotation])
if err != nil {
return fmt.Errorf("error parsing replica count for deployment %s/%s: %w", dep.Namespace, dep.Name, err)
}
dep.Spec.Replicas = ptr.To(int32(replicas))
delete(dep.Annotations, pvmigrateScaleDownAnnotation)
logger.Printf("Scaling up deployment %s/%s", dep.Namespace, dep.Name)
if err := cli.Update(ctx, &dep); err != nil {
return fmt.Errorf("error scaling up deployment %s/%s: %w", dep.Namespace, dep.Name, err)
}
}

var sts appsv1.StatefulSetList
if err := cli.List(ctx, &sts); err != nil {
return fmt.Errorf("error listing longhorn statefulsets: %w", err)
}
for _, st := range sts.Items {
if _, ok := st.Annotations[pvmigrateScaleDownAnnotation]; !ok {
continue
}
replicas, err := strconv.Atoi(st.Annotations[pvmigrateScaleDownAnnotation])
if err != nil {
return fmt.Errorf("error parsing replica count for statefulset %s/%s: %w", st.Namespace, st.Name, err)
}
st.Spec.Replicas = ptr.To(int32(replicas))
delete(st.Annotations, pvmigrateScaleDownAnnotation)
logger.Printf("Scaling up statefulset %s/%s", st.Namespace, st.Name)
if err := cli.Update(ctx, &st); err != nil {
return fmt.Errorf("error scaling up statefulset %s/%s: %w", st.Namespace, st.Name, err)
}
}

logger.Print("Pods using Longhorn volumes have been scaled up.")
return nil
}

func isPrometheusInstalled(ctx context.Context, cli client.Client) (bool, error) {
nsn := types.NamespacedName{Name: prometheusNamespace}
if err := cli.Get(ctx, nsn, &corev1.Namespace{}); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, fmt.Errorf("error getting prometheus namespace: %w", err)
}
return true, nil
}

// scaleUpPrometheus scales up prometheus.
func scaleUpPrometheus(ctx context.Context, cli client.Client) error {
if installed, err := isPrometheusInstalled(ctx, cli); err != nil {
return fmt.Errorf("error scaling down prometheus: %w", err)
} else if !installed {
return nil
}

nsn := types.NamespacedName{Namespace: prometheusNamespace, Name: prometheusName}
var prometheus promv1.Prometheus
if err := cli.Get(ctx, nsn, &prometheus); err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error getting prometheus: %w", err)
}
replicasStr, ok := prometheus.Annotations[pvmigrateScaleDownAnnotation]
if !ok {
return fmt.Errorf("error reading original replicas from the prometheus annotation: not found")
}
origReplicas, err := strconv.Atoi(replicasStr)
if err != nil {
return fmt.Errorf("error converting replicas annotation to integer: %w", err)
}
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
pvmigrateScaleDownAnnotation: nil,
},
},
"spec": map[string]interface{}{
"replicas": origReplicas,
},
}
rawPatch, err := json.Marshal(patch)
if err != nil {
return fmt.Errorf("error creating prometheus patch: %w", err)
}
if err := cli.Patch(ctx, &prometheus, client.RawPatch(types.MergePatchType, rawPatch)); err != nil {
return fmt.Errorf("error scaling prometheus: %w", err)
}
return nil
}

// scaleEkco scales ekco operator to the number of provided replicas.
func scaleEkco(ctx context.Context, logger *log.Logger, cli client.Client, replicas int32) error {
nsn := types.NamespacedName{Namespace: ekcoNamespace, Name: ekcoDeploymentName}
var dep appsv1.Deployment
if err := cli.Get(ctx, nsn, &dep); err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error reading ekco deployment: %w", err)
}
dep.Spec.Replicas = &replicas
if err := cli.Update(ctx, &dep); err != nil {
return fmt.Errorf("error scaling ekco deployment: %w", err)
}
if replicas != 0 {
return nil
}
logger.Print("Waiting for ekco operator to scale down.")
if err := waitForPodsToBeScaledDown(
ctx, logger, cli, ekcoNamespace, labels.SelectorFromSet(dep.Spec.Selector.MatchLabels),
); err != nil {
return fmt.Errorf("error waiting for ekco operator to scale down: %w", err)
}
return nil
}

// waitForPodsToBeScaledDown waits for all pods using matching the provided selector to disappear in the provided
// namespace.
func waitForPodsToBeScaledDown(ctx context.Context, logger *log.Logger, cli client.Client, ns string, sel labels.Selector) error {
return wait.PollUntilContextTimeout(ctx, 3*time.Second, 5*time.Minute, true, func(ctx2 context.Context) (bool, error) {
var pods corev1.PodList
opts := []client.ListOption{
client.InNamespace(ns),
client.MatchingLabelsSelector{Selector: sel},
}
if err := cli.List(ctx2, &pods, opts...); err != nil {
return false, fmt.Errorf("error listing pods: %w", err)
}
if len(pods.Items) > 0 {
logger.Printf("%d pods found, waiting for them to be scaled down.", len(pods.Items))
return false, nil
}
return true, nil
})
}

// scaleDownReplicas scales down the number of replicas for all volumes to 1. Returns a bool indicating if any
// of the volumes were scaled down.
func scaleDownReplicas(ctx context.Context, logger *log.Logger, cli client.Client) (bool, error) {
Expand Down
21 changes: 18 additions & 3 deletions scripts/common/longhorn.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ function longhorn_run_pvmigrate() {

if ! $BIN_PVMIGRATE --source-sc "$longhornStorageClass" --dest-sc "$destStorageClass" --rsync-image "$KURL_UTIL_IMAGE" "$skipFreeSpaceCheckFlag" "$skipPreflightValidationFlag" "$setDefaultsFlag"; then
longhorn_restore_migration_replicas
kubectl -n kurl scale deploy ekc-operator --replicas=1
return 1
fi
return 0
Expand Down Expand Up @@ -118,9 +119,11 @@ function longhorn_to_sc_migration() {
fi
fi

kubectl -n monitoring patch prometheus k8s --type='json' --patch '[{"op": "replace", "path": "/spec/replicas", value: 0}]'
log "Waiting for prometheus pods to be removed"
spinner_until 300 prometheus_pods_gone
# scale down prometheus operator pods, not the actual prometheus pods
# this way pvmigrate can place PVCs on the correct nodes if migrating to OpenEBS
kubectl scale deployment -n monitoring prometheus-operator --replicas=0
log "Waiting for prometheus operator pods to be removed"
spinner_until 300 prometheus_operator_pods_gone
fi
fi

Expand Down Expand Up @@ -157,6 +160,18 @@ function longhorn_to_sc_migration() {

longhorn_restore_migration_replicas

# reset ekco scale
if [ "$ekcoScaledDown" = "1" ] ; then
kubectl -n kurl scale deploy ekc-operator --replicas=1
fi

# reset prometheus scale
if kubectl get namespace monitoring &>/dev/null; then
if kubectl get prometheus -n monitoring k8s &>/dev/null; then
kubectl scale deployment -n monitoring prometheus-operator --replicas=1
fi
fi

# print success message
logSuccess "Migration from longhorn to $scProvisioner completed successfully!"
report_addon_success "longhorn-to-$scProvisioner-migration" "v1"
Expand Down
18 changes: 14 additions & 4 deletions scripts/common/rook.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ function prometheus_pods_gone() {
return 0
}

function prometheus_operator_pods_gone() {
if kubectl -n monitoring get pods -l app=kube-prometheus-stack-operator 2>/dev/null | grep 'prometheus' &>/dev/null ; then
return 1
fi

return 0
}

function ekco_pods_gone() {
pods_gone_by_selector kurl app=ekc-operator
}
Expand Down Expand Up @@ -225,9 +233,11 @@ function rook_ceph_to_sc_migration() {
fi
fi

kubectl -n monitoring patch prometheus k8s --type='json' --patch '[{"op": "replace", "path": "/spec/replicas", value: 0}]'
echo "Waiting for prometheus pods to be removed"
spinner_until 300 prometheus_pods_gone
# scale down prometheus operator pods, not the actual prometheus pods
# this way pvmigrate can place PVCs on the correct nodes if migrating to OpenEBS
kubectl scale deployment -n monitoring prometheus-operator --replicas=0
log "Waiting for prometheus operator pods to be removed"
spinner_until 300 prometheus_operator_pods_gone
fi
fi

Expand Down Expand Up @@ -280,7 +290,7 @@ function rook_ceph_to_sc_migration() {
# reset prometheus scale
if kubectl get namespace monitoring &>/dev/null; then
if kubectl get prometheus -n monitoring k8s &>/dev/null; then
kubectl patch prometheus -n monitoring k8s --type='json' --patch '[{"op": "replace", "path": "/spec/replicas", value: 2}]'
kubectl scale deployment -n monitoring prometheus-operator --replicas=1
fi
fi

Expand Down

0 comments on commit 1d3d474

Please sign in to comment.