Skip to content

Commit

Permalink
journal: add support for trimming committed journal objects
Browse files Browse the repository at this point in the history
As clients periodically mark payloads as committed, the trimmer will
determine the new minimum object set and delete all objects before the
new minimum.

Signed-off-by: Jason Dillaman <[email protected]>
  • Loading branch information
Jason Dillaman committed Nov 6, 2015
1 parent 6bd8c0a commit 65f86fe
Show file tree
Hide file tree
Showing 7 changed files with 426 additions and 17 deletions.
78 changes: 65 additions & 13 deletions src/journal/JournalMetadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ using namespace cls::journal;

JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id)
: m_cct(NULL), m_oid(oid), m_client_id(client_id), m_order(0),
m_splay_width(0), m_initialized(false), m_timer(NULL),
const std::string &client_id,
double commit_interval)
: m_cct(NULL), m_oid(oid), m_client_id(client_id),
m_commit_interval(commit_interval), m_order(0), m_splay_width(0),
m_initialized(false), m_timer(NULL),
m_timer_lock("JournalMetadata::m_timer_lock"),
m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0),
m_update_notifications(0) {
m_update_notifications(0), m_commit_position_pending(false),
m_commit_position_ctx(NULL) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
Expand Down Expand Up @@ -74,6 +77,7 @@ int JournalMetadata::init() {
int JournalMetadata::register_client(const std::string &description) {
assert(!m_client_id.empty());

ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_register(m_ioctx, m_oid, m_client_id, description);
if (r < 0) {
lderr(m_cct) << "failed to register journal client '" << m_client_id
Expand All @@ -88,6 +92,7 @@ int JournalMetadata::register_client(const std::string &description) {
int JournalMetadata::unregister_client() {
assert(!m_client_id.empty());

ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_unregister(m_ioctx, m_oid, m_client_id);
if (r < 0) {
lderr(m_cct) << "failed to unregister journal client '" << m_client_id
Expand Down Expand Up @@ -117,6 +122,9 @@ void JournalMetadata::remove_listener(Listener *listener) {

void JournalMetadata::set_minimum_set(uint64_t object_set) {
Mutex::Locker locker(m_lock);

ldout(m_cct, 20) << __func__ << ": current=" << m_minimum_set
<< ", new=" << object_set << dendl;
if (m_minimum_set >= object_set) {
return;
}
Expand All @@ -137,6 +145,9 @@ void JournalMetadata::set_minimum_set(uint64_t object_set) {

void JournalMetadata::set_active_set(uint64_t object_set) {
Mutex::Locker locker(m_lock);

ldout(m_cct, 20) << __func__ << ": current=" << m_active_set
<< ", new=" << object_set << dendl;
if (m_active_set >= object_set) {
return;
}
Expand All @@ -157,18 +168,25 @@ void JournalMetadata::set_active_set(uint64_t object_set) {

void JournalMetadata::set_commit_position(
const ObjectSetPosition &commit_position, Context *on_safe) {
assert(on_safe != NULL);

Mutex::Locker locker(m_lock);
ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
<< ", new=" << commit_position << dendl;
if (commit_position <= m_client.commit_position ||
commit_position <= m_commit_position) {
on_safe->complete(-ESTALE);
return;
}

librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, commit_position);
if (m_commit_position_ctx != NULL) {
m_commit_position_ctx->complete(-ESTALE);
}

C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_safe);
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
m_client.commit_position = commit_position;
m_commit_position = commit_position;
m_commit_position_ctx = on_safe;
schedule_commit_task();
}

void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) {
Expand Down Expand Up @@ -235,6 +253,33 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
}
}

void JournalMetadata::schedule_commit_task() {
assert(m_lock.is_locked());

Mutex::Locker timer_locker(m_timer_lock);
if (!m_commit_position_pending) {
m_commit_position_pending = true;
m_timer->add_event_after(m_commit_interval, new C_CommitPositionTask(this));
}
}

void JournalMetadata::handle_commit_position_task() {
Mutex::Locker locker(m_lock);

librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);

C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
m_commit_position_ctx = NULL;

librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
}

void JournalMetadata::schedule_watch_reset() {
Mutex::Locker locker(m_timer_lock);
m_timer->add_event_after(0.1, new C_WatchReset(this));
Expand Down Expand Up @@ -269,6 +314,13 @@ void JournalMetadata::handle_watch_error(int err) {
void JournalMetadata::notify_update() {
ldout(m_cct, 10) << "notifying journal header update" << dendl;

bufferlist bl;
m_ioctx.notify2(m_oid, bl, 5000, NULL);
}

void JournalMetadata::async_notify_update() {
ldout(m_cct, 10) << "async notifying journal header update" << dendl;

librados::AioCompletion *comp =
librados::Rados::aio_create_completion(NULL, NULL, NULL);

Expand Down
32 changes: 30 additions & 2 deletions src/journal/JournalMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
};

JournalMetadata(librados::IoCtx &ioctx, const std::string &oid,
const std::string &client_id);
const std::string &client_id, double commit_interval);
~JournalMetadata();

int init();
Expand All @@ -50,6 +50,9 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
int register_client(const std::string &description);
int unregister_client();

inline const std::string &get_client_id() const {
return m_client_id;
}
inline uint8_t get_order() const {
return m_order;
}
Expand Down Expand Up @@ -83,6 +86,11 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
*commit_position = m_client.commit_position;
}

void get_registered_clients(RegisteredClients *registered_clients) {
Mutex::Locker locker(m_lock);
*registered_clients = m_registered_clients;
}

inline uint64_t allocate_tid(const std::string &tag) {
Mutex::Locker locker(m_lock);
return m_allocated_tids[tag]++;
Expand All @@ -91,6 +99,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const;

void notify_update();
void async_notify_update();

private:
typedef std::map<std::string, uint64_t> AllocatedTids;
Expand Down Expand Up @@ -122,6 +131,17 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
}
};

struct C_CommitPositionTask : public Context {
JournalMetadataPtr journal_metadata;

C_CommitPositionTask(JournalMetadata *_journal_metadata)
: journal_metadata(_journal_metadata) {}

virtual void finish(int r) {
journal_metadata->handle_commit_position_task();
};
};

struct C_NotifyUpdate : public Context {
JournalMetadataPtr journal_metadata;
Context *on_safe;
Expand All @@ -131,7 +151,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {

virtual void finish(int r) {
if (r == 0) {
journal_metadata->notify_update();
journal_metadata->async_notify_update();
}
if (on_safe != NULL) {
on_safe->complete(r);
Expand Down Expand Up @@ -159,6 +179,7 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
CephContext *m_cct;
std::string m_oid;
std::string m_client_id;
double m_commit_interval;

uint8_t m_order;
uint8_t m_splay_width;
Expand All @@ -184,9 +205,16 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
size_t m_update_notifications;
Cond m_update_cond;

bool m_commit_position_pending;
ObjectSetPosition m_commit_position;
Context *m_commit_position_ctx;

void refresh(Context *on_finish);
void handle_refresh_complete(C_Refresh *refresh, int r);

void schedule_commit_task();
void handle_commit_position_task();

void schedule_watch_reset();
void handle_watch_reset();
void handle_watch_notify(uint64_t notify_id, uint64_t cookie);
Expand Down
Loading

0 comments on commit 65f86fe

Please sign in to comment.