Skip to content

Commit

Permalink
Merge pull request ceph#12140 from iain-buclaw-sociomantic:pybind-aio…
Browse files Browse the repository at this point in the history
…exec

python-rados: implement new aio_execute

Reviewed-by: Josh Durgin <[email protected]>
  • Loading branch information
jdurgin committed Dec 2, 2016
2 parents 53b3f6b + 7aa80fd commit 1515bd0
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
73 changes: 72 additions & 1 deletion src/pybind/rados/rados.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ cdef extern from "rados/librados.h" nogil:

int rados_exec(rados_ioctx_t io, const char * oid, const char * cls, const char * method,
const char * in_buf, size_t in_len, char * buf, size_t out_len)
int rados_aio_exec(rados_ioctx_t io, const char * oid, rados_completion_t completion, const char * cls, const char * method,
const char * in_buf, size_t in_len, char * buf, size_t out_len)

int rados_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, const char * oid, time_t * mtime, int flags)
int rados_aio_write_op_operate(rados_write_op_t write_op, rados_ioctx_t io, rados_completion_t completion, const char *oid, time_t *mtime, int flags)
Expand Down Expand Up @@ -2224,6 +2226,75 @@ cdef class Ioctx(object):
raise make_ex(ret, "error reading %s" % object_name)
return completion

@requires(('object_name', str_type), ('cls', str_type), ('method', str_type), ('data', bytes))
def aio_execute(self, object_name, cls, method, data,
length=8192, oncomplete=None, onsafe=None):
"""
Asynchronously execute an OSD class method on an object.
oncomplete and onsafe will be called with the data returned from
the plugin as well as the completion:
oncomplete(completion, data)
onsafe(completion, data)
:param object_name: name of the object
:type object_name: str
:param cls: name of the object class
:type cls: str
:param method: name of the method
:type method: str
:param data: input data
:type data: bytes
:param length: size of output buffer in bytes (default=8192)
:type length: int
:param oncomplete: what to do when the execution is complete
:type oncomplete: completion
:param onsafe: what to do when the execution is safe and complete
:type onsafe: completion
:raises: :class:`Error`
:returns: completion object
"""

object_name = cstr(object_name, 'object_name')
cls = cstr(cls, 'cls')
method = cstr(method, 'method')
cdef:
Completion completion
char *_object_name = object_name
char *_cls = cls
char *_method = method
char *_data = data
size_t _data_len = len(data)

char *ref_buf
size_t _length = length

def oncomplete_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
if return_value > 0 and return_value != length:
_PyBytes_Resize(&_completion_v.buf, return_value)
return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)

def onsafe_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
return onsafe(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)

completion = self.__get_completion(oncomplete_ if oncomplete else None, onsafe_ if onsafe else None)
completion.buf = PyBytes_FromStringAndSize(NULL, length)
ret_buf = PyBytes_AsString(completion.buf)
self.__track_completion(completion)
with nogil:
ret = rados_aio_exec(self.io, _object_name, completion.rados_comp,
_cls, _method, _data, _data_len, ret_buf, _length)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error executing %s::%s on %s" % (cls, method, object_name))
return completion

def aio_remove(self, object_name, oncomplete=None, onsafe=None):
"""
Asychronously remove an object
Expand Down Expand Up @@ -2541,7 +2612,7 @@ returned %d, but should return zero on success." % (self.name, ret))
:type method: str
:param data: input data
:type data: bytes
:param length: size of output buffer in bytes (default=8291)
:param length: size of output buffer in bytes (default=8192)
:type length: int
:raises: :class:`TypeError`
Expand Down
33 changes: 32 additions & 1 deletion src/test/pybind/test_rados.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,37 @@ def test_execute(self):
ret, buf = self.ioctx.execute("foo", "hello", "say_hello", b"nose")
eq(buf, b"Hello, nose!")

def test_aio_execute(self):
count = [0]
retval = [None]
lock = threading.Condition()
def cb(_, buf):
with lock:
if retval[0] is None:
retval[0] = buf
count[0] += 1
lock.notify()
self.ioctx.write("foo", b"") # ensure object exists

comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"", 32, cb, cb)
comp.wait_for_complete()
with lock:
while count[0] < 2:
lock.wait()
eq(comp.get_return_value(), 13)
eq(retval[0], b"Hello, world!")

retval[0] = None
comp = self.ioctx.aio_execute("foo", "hello", "say_hello", b"nose", 32, cb, cb)
comp.wait_for_complete()
with lock:
while count[0] < 4:
lock.wait()
eq(comp.get_return_value(), 12)
eq(retval[0], b"Hello, nose!")

[i.remove() for i in self.ioctx.list_objects()]

class TestObject(object):

def setUp(self):
Expand Down Expand Up @@ -935,4 +966,4 @@ def test_ceph_osd_pool_create_utf8(self):
ret, buf, out = self.rados.mon_command(json.dumps(cmd), b'')
eq(ret, 0)
assert len(out) > 0
eq(u"pool '\u9ec5' created", out)
eq(u"pool '\u9ec5' created", out)

0 comments on commit 1515bd0

Please sign in to comment.