Skip to content

Commit

Permalink
Merge pull request red-hat-storage#303 from egafford/persistent-failu…
Browse files Browse the repository at this point in the history
…re-domain

Preserve rack FailureDomain if previously assigned
  • Loading branch information
openshift-merge-robot committed Nov 19, 2019
2 parents 1aa8010 + cfb52e8 commit b854efa
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 18 deletions.
30 changes: 17 additions & 13 deletions pkg/controller/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (r *ReconcileStorageCluster) Reconcile(request reconcile.Request) (reconcil
instance.Status.CephBlockPoolsCreated = false
instance.Status.CephObjectStoreUsersCreated = false
instance.Status.CephFilesystemsCreated = false
instance.Status.FailureDomain = determineFailureDomain(instance.Status.NodeTopologies)
instance.Status.FailureDomain = determineFailureDomain(instance)
err = r.client.Status().Update(context.TODO(), instance)
if err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -371,17 +371,17 @@ func (r *ReconcileStorageCluster) reconcileNodeTopologyMap(sc *ocsv1.StorageClus
updated = true
}
}
if strings.Contains(label, "rack") {
if !nodeRacks.Contains(value, node.Name) {
nodeRacks.Add(value, node.Name)
}
}
if strings.Contains(label, "rack") {
if !nodeRacks.Contains(value, node.Name) {
nodeRacks.Add(value, node.Name)
}
}
}

}

if determineFailureDomain(topologyMap) == "rack" {
if determineFailureDomain(sc) == "rack" {
err = r.ensureNodeRacks(nodes, minNodes, nodeRacks, topologyMap, reqLogger)
if err != nil {
return err
Expand Down Expand Up @@ -664,18 +664,19 @@ func (r *ReconcileStorageCluster) ensureCephCluster(sc *ocsv1.StorageCluster, re

// determineFailureDomain determines the appropriate Ceph failure domain based
// on the storage cluster's topology map
func determineFailureDomain(topologyMap *ocsv1.NodeTopologyMap) string {
func determineFailureDomain(sc *ocsv1.StorageCluster) string {
if sc.Status.FailureDomain != "" {
return sc.Status.FailureDomain
}
topologyMap := sc.Status.NodeTopologies
failureDomain := "rack"

for label, labelValues := range topologyMap.Labels {
if strings.Contains(label, "zone") {
if len(labelValues) >= 3 {
failureDomain = "zone"
break
}
}
}

return failureDomain
}

Expand Down Expand Up @@ -722,7 +723,7 @@ func newCephCluster(sc *ocsv1.StorageCluster, cephImage string) *cephv1.CephClus
RulesNamespace: "openshift-storage",
},
Storage: rook.StorageScopeSpec{
StorageClassDeviceSets: newStorageClassDeviceSets(sc.Spec.StorageDeviceSets, sc.Status.NodeTopologies),
StorageClassDeviceSets: newStorageClassDeviceSets(sc),
TopologyAware: true,
},
Placement: rook.PlacementSpec{
Expand Down Expand Up @@ -770,7 +771,10 @@ func newCephDaemonResources(custom map[string]corev1.ResourceRequirements) map[s
}

// newStorageClassDeviceSets converts a list of StorageDeviceSets into a list of Rook StorageClassDeviceSets
func newStorageClassDeviceSets(storageDeviceSets []ocsv1.StorageDeviceSet, topologyMap *ocsv1.NodeTopologyMap) []rook.StorageClassDeviceSet {
func newStorageClassDeviceSets(sc *ocsv1.StorageCluster) []rook.StorageClassDeviceSet {
storageDeviceSets := sc.Spec.StorageDeviceSets
topologyMap := sc.Status.NodeTopologies

var storageClassDeviceSets []rook.StorageClassDeviceSet

for _, ds := range storageDeviceSets {
Expand All @@ -785,7 +789,7 @@ func newStorageClassDeviceSets(storageDeviceSets []ocsv1.StorageDeviceSet, topol

if noPlacement {
if topologyKey == "" {
topologyKey = determineFailureDomain(topologyMap)
topologyKey = determineFailureDomain(sc)
}
if topologyMap != nil {
topologyKey, topologyKeyValues = topologyMap.GetKeyValues(topologyKey)
Expand Down
47 changes: 42 additions & 5 deletions pkg/controller/storagecluster/storagecluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,38 @@ func TestNodeTopologyMapNoNodes(t *testing.T) {
assert.Equal(t, err, fmt.Errorf("Not enough nodes found: Expected %d, found %d", defaults.DeviceSetReplica, len(nodeList.Items)))
}

func TestNodeTopologyMapPreexistingRack(t *testing.T) {
sc := &api.StorageCluster{}
mockStorageCluster.DeepCopyInto(sc)
nodeList := &corev1.NodeList{}
mockNodeList.DeepCopyInto(nodeList)
sc.Status.FailureDomain = "rack"

nodeTopologyMap := &api.NodeTopologyMap{
Labels: map[string]api.TopologyLabelValues{
zoneTopologyLabel: []string{
"zone1",
"zone2",
"zone3",
},
defaults.RackTopologyKey: []string{
"rack0",
"rack1",
"rack2",
},
},
}

reconciler := createFakeStorageClusterReconciler(t, sc, nodeList)
err := reconciler.reconcileNodeTopologyMap(sc, reconciler.reqLogger)
assert.NoError(t, err)

actual := &api.StorageCluster{}
err = reconciler.client.Get(nil, mockStorageClusterRequest.NamespacedName, actual)
assert.NoError(t, err)
assert.Equal(t, nodeTopologyMap, actual.Status.NodeTopologies)
}

func TestNodeTopologyMapTwoAZ(t *testing.T) {
sc := &api.StorageCluster{}
mockStorageCluster.DeepCopyInto(sc)
Expand Down Expand Up @@ -277,11 +309,13 @@ func TestNodeTopologyMapThreeAZ(t *testing.T) {
}

func TestFailureDomain(t *testing.T) {
sc := &api.StorageCluster{}
nodeTopologyMap := &api.NodeTopologyMap{
Labels: map[string]api.TopologyLabelValues{},
}
sc.Status.NodeTopologies = nodeTopologyMap

failureDomain := determineFailureDomain(nodeTopologyMap)
failureDomain := determineFailureDomain(sc)
assert.Equal(t, "rack", failureDomain)

nodeTopologyMap.Labels[zoneTopologyLabel] = []string{
Expand All @@ -290,7 +324,7 @@ func TestFailureDomain(t *testing.T) {
"zone3",
}

failureDomain = determineFailureDomain(nodeTopologyMap)
failureDomain = determineFailureDomain(sc)
assert.Equal(t, "zone", failureDomain)
}

Expand Down Expand Up @@ -369,7 +403,8 @@ func TestStorageClusterCephClusterCreation(t *testing.T) {
}

func TestStorageClassDeviceSetCreation(t *testing.T) {
deviceSet := mockDeviceSets[0]
sc := &api.StorageCluster{}
sc.Spec.StorageDeviceSets = mockDeviceSets

nodeTopologyMap := &api.NodeTopologyMap{
Labels: map[string]api.TopologyLabelValues{
Expand All @@ -379,10 +414,12 @@ func TestStorageClassDeviceSetCreation(t *testing.T) {
},
},
}
sc.Status.NodeTopologies = nodeTopologyMap

actual := newStorageClassDeviceSets(mockDeviceSets, nodeTopologyMap)
actual := newStorageClassDeviceSets(sc)
assert.Equal(t, defaults.DeviceSetReplica, len(actual))

deviceSet := sc.Spec.StorageDeviceSets[0]
for i, scds := range actual {
assert.Equal(t, fmt.Sprintf("%s-%d", deviceSet.Name, i), scds.Name)
// TODO: Change this when OCP console is updated
Expand All @@ -395,7 +432,7 @@ func TestStorageClassDeviceSetCreation(t *testing.T) {

nodeTopologyMap.Labels[zoneTopologyLabel] = append(nodeTopologyMap.Labels[zoneTopologyLabel], "zone3")

actual = newStorageClassDeviceSets(mockDeviceSets, nodeTopologyMap)
actual = newStorageClassDeviceSets(sc)
assert.Equal(t, defaults.DeviceSetReplica, len(actual))

for i, scds := range actual {
Expand Down

0 comments on commit b854efa

Please sign in to comment.