Skip to content

Commit

Permalink
mgr/rook: Make use of rook-client-python when talking to Rook
Browse files Browse the repository at this point in the history
Fixes:

* `CephFilesystem.spec.onlyManageDaemons` does not exist
* `CephObjectStroe.spec.gateway.allNodes` does not exist
* Added directory-osds to existsing nodes was broken

Signed-off-by: Sebastian Wagner <[email protected]>
  • Loading branch information
sebastian-philipp committed Feb 13, 2020
1 parent 6153816 commit 846761e
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 166 deletions.
2 changes: 2 additions & 0 deletions src/pybind/mgr/rook/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
kubernetes
jsonpatch
315 changes: 149 additions & 166 deletions src/pybind/mgr/rook/rook_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import json
from contextlib import contextmanager

import jsonpatch
from six.moves.urllib.parse import urljoin # pylint: disable=import-error

# Optional kubernetes imports to enable MgrModule.can_run
Expand All @@ -32,6 +33,11 @@
class ApiException(Exception): # type: ignore
status = 0

from .rook_client.ceph import cephfilesystem as cfs
from .rook_client.ceph import cephnfs as cnfs
from .rook_client.ceph import cephobjectstore as cos
from .rook_client.ceph import cephcluster as ccl


import orchestrator

Expand Down Expand Up @@ -341,86 +347,81 @@ def add_filesystem(self, spec):
# TODO warn if spec.extended has entries we don't kow how
# to action.

rook_fs = {
"apiVersion": self.rook_env.api_name,
"kind": "CephFilesystem",
"metadata": {
"name": spec.name,
"namespace": self.rook_env.namespace
},
"spec": {
"onlyManageDaemons": True,
"metadataServer": {
"activeCount": spec.count,
"activeStandby": True

}
}
}
rook_fs = cfs.CephFilesystem(
apiVersion=self.rook_env.api_name,
metadata=dict(
name=spec.name,
namespace=self.rook_env.namespace,
),
spec=cfs.Spec(
metadataServer=cfs.MetadataServer(
activeCount=spec.count,
activeStandby=True
)
)
)

with self.ignore_409("CephFilesystem '{0}' already exists".format(spec.name)):
self.rook_api_post("cephfilesystems/", body=rook_fs)
self.rook_api_post("cephfilesystems/", body=rook_fs.to_json())

def add_nfsgw(self, spec):
# type: (orchestrator.NFSServiceSpec) -> None
# TODO use spec.placement
# TODO warn if spec.extended has entries we don't kow how
# to action.

rook_nfsgw = {
"apiVersion": self.rook_env.api_name,
"kind": "CephNFS",
"metadata": {
"name": spec.name,
"namespace": self.rook_env.namespace
},
"spec": {
"rados": {
"pool": spec.pool
},
"server": {
"active": spec.count,
}
}
}
rook_nfsgw = cnfs.CephNFS(
apiVersion=self.rook_env.api_name,
metadata=dict(
name=spec.name,
namespace=self.rook_env.namespace,
),
spec=cnfs.Spec(
rados=cnfs.Rados(
pool=spec.pool
),
server=cnfs.Server(
active=spec.count
)
)
)

if spec.namespace:
rook_nfsgw["spec"]["rados"]["namespace"] = spec.namespace # type: ignore
rook_nfsgw.spec.rados.namespace = spec.namespace

with self.ignore_409("NFS cluster '{0}' already exists".format(spec.name)):
self.rook_api_post("cephnfses/", body=rook_nfsgw)
self.rook_api_post("cephnfses/", body=rook_nfsgw.to_json())

def add_objectstore(self, spec):
# type: (orchestrator.RGWSpec) -> None
rook_os = {
"apiVersion": self.rook_env.api_name,
"kind": "CephObjectStore",
"metadata": {
"name": spec.name,
"namespace": self.rook_env.namespace
},
"spec": {
"metadataPool": {
"failureDomain": "host",
"replicated": {
"size": 1
}
},
"dataPool": {
"failureDomain": "osd",
"replicated": {
"size": 1
}
},
"gateway": {
"type": "s3",
"port": spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80,
"instances": spec.count,
"allNodes": False
}
}
}

rook_os = cos.CephObjectStore(
apiVersion=self.rook_env.api_name,
metadata=dict(
name=spec.name,
namespace=self.rook_env.namespace
),
spec=cos.Spec(
metadataPool=cos.MetadataPool(
failureDomain='host',
replicated=cos.Replicated(
size=1
)
),
dataPool=cos.DataPool(
failureDomain='osd',
replicated=cos.Replicated(
size=1
)
),
gateway=cos.Gateway(
type='s3',
port=spec.rgw_frontend_port if spec.rgw_frontend_port is not None else 80,
instances=spec.count
)
)
)

with self.ignore_409("CephObjectStore '{0}' already exists".format(spec.name)):
self.rook_api_post("cephobjectstores/", body=rook_os)
self.rook_api_post("cephobjectstores/", body=rook_os.to_json())

def rm_service(self, rooktype, service_id):

Expand Down Expand Up @@ -448,45 +449,25 @@ def node_exists(self, node_name):
return node_name in self.get_node_names()

def update_mon_count(self, newcount):
patch = [{"op": "replace", "path": "/spec/mon/count", "value": newcount}]

try:
self.rook_api_patch(
"cephclusters/{0}".format(self.rook_env.cluster_name),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))
raise ApplyException(
"Failed to update mon count in Cluster CRD: {0}".format(e))

return "Updated mon count to {0}".format(newcount)
def _update_mon_count(current, new):
# type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
new.spec.mon.count = newcount
return new
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)

def update_mds_count(self, svc_id, newcount):
patch = [{"op": "replace", "path": "/spec/metadataServer/activeCount",
"value": newcount}]

try:
self.rook_api_patch(
"cephfilesystems/{0}".format(svc_id),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))
raise ApplyException(
"Failed to update NFS server count for {0}: {1}".format(svc_id, e))
return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
def _update_nfs_count(current, new):
# type: (cfs.CephFilesystem, cfs.CephFilesystem) -> cfs.CephFilesystem
new.spec.metadataServer.activeCount = newcount
return new
return self._patch(cnfs.CephNFS, 'cephnfses', svc_id, _update_nfs_count)

def update_nfs_count(self, svc_id, newcount):
patch = [{"op": "replace", "path": "/spec/server/active", "value": newcount}]

try:
self.rook_api_patch(
"cephnfses/{0}".format(svc_id),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))
raise ApplyException(
"Failed to update NFS server count for {0}: {1}".format(svc_id, e))
return "Updated NFS server count for {0} to {1}".format(svc_id, newcount)
def _update_nfs_count(current, new):
# type: (cnfs.CephNFS, cnfs.CephNFS) -> cnfs.CephNFS
new.spec.server.active = newcount
return new
return self._patch(cnfs.CephNFS, 'cephnfses',svc_id, _update_nfs_count)

def add_osds(self, drive_group, all_hosts):
# type: (orchestrator.DriveGroupSpec, List[str]) -> str
Expand All @@ -499,80 +480,82 @@ def add_osds(self, drive_group, all_hosts):

assert drive_group.objectstore in ("bluestore", "filestore")

# The CRD looks something like this:
# nodes:
# - name: "gravel1.rockery"
# devices:
# - name: "sdb"
# config:
# storeType: bluestore

current_cluster = self.rook_api_get(
"cephclusters/{0}".format(self.rook_env.cluster_name))

patch = []

# FIXME: this is all not really atomic, because jsonpatch doesn't
# let us do "test" operations that would check if items with
# matching names were in existing lists.

if 'nodes' not in current_cluster['spec']['storage']:
patch.append({
'op': 'add', 'path': '/spec/storage/nodes', 'value': []
})

current_nodes = current_cluster['spec']['storage'].get('nodes', [])

if drive_group.hosts(all_hosts)[0] not in [n['name'] for n in current_nodes]:
pd = { "name": drive_group.hosts(all_hosts)[0],
"config": { "storeType": drive_group.objectstore }} # type: dict

if block_devices:
pd["devices"] = [{'name': d.path} for d in block_devices]
if directories:
pd["directories"] = [{'path': p} for p in directories]

patch.append({ "op": "add", "path": "/spec/storage/nodes/-", "value": pd }) # type: ignore
else:
# Extend existing node
node_idx = None
current_node = None
for i, c in enumerate(current_nodes):
if c['name'] == drive_group.hosts(all_hosts)[0]:
current_node = c
node_idx = i
break

assert node_idx is not None
assert current_node is not None

new_devices = list(set(block_devices) - set([d['name'] for d in current_node['devices']]))
for n in new_devices:
patch.append({
"op": "add",
"path": "/spec/storage/nodes/{0}/devices/-".format(node_idx),
"value": {'name': n.path} # type: ignore
})
if directories:
new_dirs = list(set(directories) - set(current_node['directories']))
for p in new_dirs:
patch.append({
"op": "add",
"path": "/spec/storage/nodes/{0}/directories/-".format(node_idx),
"value": {'path': p} # type: ignore
})
def _add_osds(current_cluster, new_cluster):
# type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster

# FIXME: this is all not really atomic, because jsonpatch doesn't
# let us do "test" operations that would check if items with
# matching names were in existing lists.

if not hasattr(new_cluster.spec.storage, 'nodes'):
new_cluster.spec.storage.nodes = ccl.NodesList()

current_nodes = getattr(current_cluster.spec.storage, 'nodes', ccl.NodesList())

if drive_group.hosts(all_hosts)[0] not in [n.name for n in current_nodes]:
pd = ccl.NodesItem(
name=drive_group.hosts(all_hosts)[0],
config=ccl.Config(
storeType=drive_group.objectstore
)
)

if block_devices:
pd.devices = ccl.DevicesList(
ccl.DevicesItem(name=d.path) for d in block_devices
)
if directories:
pd.directories = ccl.DirectoriesList(
ccl.DirectoriesItem(path=p) for p in directories
)
new_cluster.spec.storage.nodes.append(pd)
else:
for _node in new_cluster.spec.storage.nodes:
current_node = _node # type: ccl.NodesItem
if current_node.name == drive_group.hosts(all_hosts)[0]:
if block_devices:
if not hasattr(current_node, 'devices'):
current_node.devices = ccl.DevicesList()
new_devices = list(set(block_devices) - set([d.name for d in current_node.devices]))
current_node.devices.extend(
ccl.DevicesItem(name=n.path) for n in new_devices
)

if directories:
if not hasattr(current_node, 'directories'):
current_node.directories = ccl.DirectoriesList()
new_dirs = list(set(directories) - set([d.path for d in current_node.directories]))
current_node.directories.extend(
ccl.DirectoriesItem(path=n) for n in new_dirs
)
return new_cluster

return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _add_osds)

def _patch(self, crd, crd_name, cr_name, func):
current_json = self.rook_api_get(
"{}/{}".format(crd_name, cr_name)
)

current = crd.from_json(current_json)
new = crd.from_json(current_json) # no deepcopy.

new = func(current, new)

patch = list(jsonpatch.make_patch(current_json, new.to_json()))

log.info('patch for {}/{}: \n{}'.format(crd_name, cr_name, patch))

if len(patch) == 0:
return "No change"

try:
self.rook_api_patch(
"cephclusters/{0}".format(self.rook_env.cluster_name),
"{}/{}".format(crd_name, cr_name),
body=patch)
except ApiException as e:
log.exception("API exception: {0}".format(e))
raise ApplyException(
"Failed to create OSD entries in Cluster CRD: {0}".format(
e))
"Failed to update {}/{}: {}".format(crd_name, cr_name, e))

return "Success"

0 comments on commit 846761e

Please sign in to comment.