Skip to content

Commit

Permalink
libceph: MOSDOp v8 encoding (actual spgid + full hash)
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya Dryomov <[email protected]>
  • Loading branch information
idryomov committed Jul 7, 2017
1 parent 98ad5eb commit 8cb441c
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 20 deletions.
17 changes: 17 additions & 0 deletions include/linux/ceph/osd_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,23 @@ struct ceph_request_redirect {
struct ceph_object_locator oloc;
};

/*
* osd request identifier
*
* caller name + incarnation# + tid to unique identify this request
*/
struct ceph_osd_reqid {
struct ceph_entity_name name;
__le64 tid;
__le32 inc;
} __packed;

struct ceph_blkin_trace_info {
__le64 trace_id;
__le64 span_id;
__le64 parent_span_id;
} __packed;

typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie,
u64 notifier_id, void *data, size_t data_len);
typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err);
Expand Down
4 changes: 3 additions & 1 deletion include/linux/ceph/osdmap.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,13 @@ static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
return &map->osd_addr[osd];
}

#define CEPH_PGID_ENCODING_LEN (1 + 8 + 4 + 4)

static inline int ceph_decode_pgid(void **p, void *end, struct ceph_pg *pgid)
{
__u8 version;

if (!ceph_has_room(p, end, 1 + 8 + 4 + 4)) {
if (!ceph_has_room(p, end, CEPH_PGID_ENCODING_LEN)) {
pr_warn("incomplete pg encoding\n");
return -EINVAL;
}
Expand Down
153 changes: 134 additions & 19 deletions net/ceph/osd_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <linux/bio.h>
#endif

#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/osd_client.h>
#include <linux/ceph/messenger.h>
Expand Down Expand Up @@ -555,17 +556,21 @@ int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
WARN_ON(ceph_oloc_empty(&req->r_base_oloc));

/* create request message */
msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
msg_size = CEPH_ENCODING_START_BLK_LEN +
CEPH_PGID_ENCODING_LEN + 1; /* spgid */
msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
msg_size += CEPH_ENCODING_START_BLK_LEN +
sizeof(struct ceph_osd_reqid); /* reqid */
msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
msg_size += CEPH_ENCODING_START_BLK_LEN +
ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
msg_size += 1 + 8 + 4 + 4; /* pgid */
msg_size += 4 + req->r_base_oid.name_len; /* oid */
msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
msg_size += 8; /* snapid */
msg_size += 8; /* snap_seq */
msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
msg_size += 4; /* retry_attempt */
msg_size += 4 + 8; /* retry_attempt, features */

if (req->r_mempool)
msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
Expand Down Expand Up @@ -1493,6 +1498,13 @@ static void encode_pgid(void **p, const struct ceph_pg *pgid)
ceph_encode_32(p, -1); /* preferred */
}

static void encode_spgid(void **p, const struct ceph_spg *spgid)
{
ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
encode_pgid(p, &spgid->pgid);
ceph_encode_8(p, spgid->shard);
}

static void encode_oloc(void **p, void *end,
const struct ceph_object_locator *oloc)
{
Expand All @@ -1507,7 +1519,8 @@ static void encode_oloc(void **p, void *end,
ceph_encode_32(p, 0);
}

static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
static void encode_request_partial(struct ceph_osd_request *req,
struct ceph_msg *msg)
{
void *p = msg->front.iov_base;
void *const end = p + msg->front_alloc_len;
Expand All @@ -1524,18 +1537,25 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)

setup_request_data(req, msg);

ceph_encode_32(&p, 1); /* client_inc, always 1 */
encode_spgid(&p, &req->r_t.spgid); /* actual spg */
ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
ceph_encode_32(&p, req->r_flags);

/* reqid */
ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
memset(p, 0, sizeof(struct ceph_osd_reqid));
p += sizeof(struct ceph_osd_reqid);

/* trace */
memset(p, 0, sizeof(struct ceph_blkin_trace_info));
p += sizeof(struct ceph_blkin_trace_info);

ceph_encode_32(&p, 0); /* client_inc, always 0 */
ceph_encode_timespec(p, &req->r_mtime);
p += sizeof(struct ceph_timespec);

/* reassert_version */
memset(p, 0, sizeof(struct ceph_eversion));
p += sizeof(struct ceph_eversion);

encode_oloc(&p, end, &req->r_t.target_oloc);
encode_pgid(&p, &req->r_t.pgid);
ceph_encode_string(&p, end, req->r_t.target_oid.name,
req->r_t.target_oid.name_len);

Expand All @@ -1558,11 +1578,10 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
}

ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
BUG_ON(p != end - 8); /* space for features */

BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
/* front_len is finalized in encode_request_finish() */
msg->hdr.data_len = cpu_to_le32(data_len);
/*
* The header "data_off" is a hint to the receiver allowing it
Expand All @@ -1571,9 +1590,99 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
*/
msg->hdr.data_off = cpu_to_le16(req->r_data_offset);

dout("%s req %p oid %s oid_len %d front %zu data %u\n", __func__,
req, req->r_t.target_oid.name, req->r_t.target_oid.name_len,
msg->front.iov_len, data_len);
dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
req->r_t.target_oid.name, req->r_t.target_oid.name_len);
}

static void encode_request_finish(struct ceph_msg *msg)
{
void *p = msg->front.iov_base;
void *const end = p + msg->front_alloc_len;

if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
/* luminous OSD -- encode features and be done */
p = end - 8;
ceph_encode_64(&p, msg->con->peer_features);
} else {
struct {
char spgid[CEPH_ENCODING_START_BLK_LEN +
CEPH_PGID_ENCODING_LEN + 1];
__le32 hash;
__le32 epoch;
__le32 flags;
char reqid[CEPH_ENCODING_START_BLK_LEN +
sizeof(struct ceph_osd_reqid)];
char trace[sizeof(struct ceph_blkin_trace_info)];
__le32 client_inc;
struct ceph_timespec mtime;
} __packed head;
struct ceph_pg pgid;
void *oloc, *oid, *tail;
int oloc_len, oid_len, tail_len;
int len;

/*
* Pre-luminous OSD -- reencode v8 into v4 using @head
* as a temporary buffer. Encode the raw PG; the rest
* is just a matter of moving oloc, oid and tail blobs
* around.
*/
memcpy(&head, p, sizeof(head));
p += sizeof(head);

oloc = p;
p += CEPH_ENCODING_START_BLK_LEN;
pgid.pool = ceph_decode_64(&p);
p += 4 + 4; /* preferred, key len */
len = ceph_decode_32(&p);
p += len; /* nspace */
oloc_len = p - oloc;

oid = p;
len = ceph_decode_32(&p);
p += len;
oid_len = p - oid;

tail = p;
tail_len = (end - p) - 8;

p = msg->front.iov_base;
ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));

/* reassert_version */
memset(p, 0, sizeof(struct ceph_eversion));
p += sizeof(struct ceph_eversion);

BUG_ON(p >= oloc);
memmove(p, oloc, oloc_len);
p += oloc_len;

pgid.seed = le32_to_cpu(head.hash);
encode_pgid(&p, &pgid); /* raw pg */

BUG_ON(p >= oid);
memmove(p, oid, oid_len);
p += oid_len;

/* tail -- ops, snapid, snapc, retry_attempt */
BUG_ON(p >= tail);
memmove(p, tail, tail_len);
p += tail_len;

msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
}

BUG_ON(p > end);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);

dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
le16_to_cpu(msg->hdr.version));
}

/*
Expand All @@ -1599,7 +1708,7 @@ static void send_request(struct ceph_osd_request *req)
else
WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);

encode_request(req, req->r_request);
encode_request_partial(req, req->r_request);

dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d flags 0x%x attempt %d\n",
__func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
Expand Down Expand Up @@ -4577,6 +4686,11 @@ static int invalidate_authorizer(struct ceph_connection *con)
return ceph_monc_validate_auth(&osdc->client->monc);
}

static void osd_reencode_message(struct ceph_msg *msg)
{
encode_request_finish(msg);
}

static int osd_sign_message(struct ceph_msg *msg)
{
struct ceph_osd *o = msg->con->private;
Expand All @@ -4601,6 +4715,7 @@ static const struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.invalidate_authorizer = invalidate_authorizer,
.alloc_msg = alloc_msg,
.reencode_message = osd_reencode_message,
.sign_message = osd_sign_message,
.check_message_signature = osd_check_message_signature,
.fault = osd_fault,
Expand Down

0 comments on commit 8cb441c

Please sign in to comment.