Skip to content

Commit

Permalink
librbd: delay commit of op start event
Browse files Browse the repository at this point in the history
If the start event is flagged as committed before the op is
actually executed, librbd won't be able to replay the event
should a crash occur.

Signed-off-by: Jason Dillaman <[email protected]>
  • Loading branch information
Jason Dillaman committed Mar 1, 2016
1 parent b0dc047 commit 9c99637
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
34 changes: 24 additions & 10 deletions src/librbd/Journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,14 @@ void Journal<I>::append_op_event(uint64_t op_tid,

// TODO: use allocated tag_id
future = m_journaler->append(0, bl);

// delay committing op event to ensure consistent replay
assert(m_op_futures.count(op_tid) == 0);
m_op_futures[op_tid] = future;
}

on_safe = create_async_context_callback(m_image_ctx, on_safe);
future.flush(new C_OpEventSafe(this, op_tid, future, on_safe));
future.flush(on_safe);

CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << this << " " << __func__ << ": "
Expand All @@ -429,16 +433,24 @@ void Journal<I>::commit_op_event(uint64_t op_tid, int r) {
bufferlist bl;
::encode(event_entry, bl);

Future future;
Future op_start_future;
Future op_finish_future;
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);

// ready to commit op event
auto it = m_op_futures.find(op_tid);
assert(it != m_op_futures.end());
op_start_future = it->second;
m_op_futures.erase(it);

// TODO: use allocated tag_id
future = m_journaler->append(0, bl);
op_finish_future = m_journaler->append(0, bl);
}

future.flush(new C_OpEventSafe(this, op_tid, future, nullptr));
op_finish_future.flush(new C_OpEventSafe(this, op_tid, op_start_future,
op_finish_future));
}

template <typename I>
Expand Down Expand Up @@ -866,8 +878,9 @@ void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
}

template <typename I>
void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
Context *on_safe) {
void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
const Future &op_start_future,
const Future &op_finish_future) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
<< "tid=" << tid << dendl;
Expand All @@ -878,10 +891,11 @@ void Journal<I>::handle_op_event_safe(int r, uint64_t tid, const Future &future,
lderr(cct) << "failed to commit op event: " << cpp_strerror(r) << dendl;
}

m_journaler->committed(future);
if (on_safe != nullptr) {
on_safe->complete(r);
}
m_journaler->committed(op_start_future);
m_journaler->committed(op_finish_future);

// reduce the replay window after committing an op event
m_journaler->flush_commit_position(nullptr);
}

template <typename I>
Expand Down
23 changes: 13 additions & 10 deletions src/librbd/Journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "include/atomic.h"
#include "include/Context.h"
#include "include/interval_set.h"
#include "include/unordered_map.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
Expand All @@ -18,6 +17,7 @@
#include <iosfwd>
#include <list>
#include <string>
#include <unordered_map>

class Context;
namespace journal {
Expand Down Expand Up @@ -173,7 +173,8 @@ class Journal {
}
};

typedef ceph::unordered_map<uint64_t, Event> Events;
typedef std::unordered_map<uint64_t, Event> Events;
typedef std::unordered_map<uint64_t, Future> TidToFutures;

struct C_IOEventSafe : public Context {
Journal *journal;
Expand All @@ -191,16 +192,17 @@ class Journal {
struct C_OpEventSafe : public Context {
Journal *journal;
uint64_t tid;
Future future;
Context *on_safe;
Future op_start_future;
Future op_finish_future;

C_OpEventSafe(Journal *journal, uint64_t tid, const Future &future,
Context *on_safe)
: journal(journal), tid(tid), future(future), on_safe(on_safe) {
C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future)
: journal(journal), tid(tid), op_start_future(op_start_future),
op_finish_future(op_finish_future) {
}

virtual void finish(int r) {
journal->handle_op_event_safe(r, tid, future, on_safe);
journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future);
}
};

Expand Down Expand Up @@ -251,6 +253,7 @@ class Journal {
Events m_events;

atomic_t m_op_tid;
TidToFutures m_op_futures;

bool m_blocking_writes;

Expand Down Expand Up @@ -279,8 +282,8 @@ class Journal {
void handle_journal_destroyed(int r);

void handle_io_event_safe(int r, uint64_t tid);
void handle_op_event_safe(int r, uint64_t tid, const Future &future,
Context *on_safe);
void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
const Future &op_finish_future);

void stop_recording();

Expand Down

0 comments on commit 9c99637

Please sign in to comment.