libceph: introduce BVECS data type

In preparation for rbd "fancy" striping, introduce ceph_bvec_iter for
working with bio_vec array data buffers.  The wrappers are trivial, but
make it look similar to ceph_bio_iter.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
Ilya Dryomov
2018-01-20 10:30:11 +01:00
parent f9dcbc44cd
commit b9e281c2b3
4 changed files with 164 additions and 0 deletions

View File

@@ -894,6 +894,58 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
}
#endif /* CONFIG_BLOCK */
static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
size_t length)
{
struct ceph_msg_data *data = cursor->data;
struct bio_vec *bvecs = data->bvec_pos.bvecs;
cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
cursor->bvec_iter = data->bvec_pos.iter;
cursor->bvec_iter.bi_size = cursor->resid;
BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
cursor->last_piece =
cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
}
static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
size_t *page_offset,
size_t *length)
{
struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
cursor->bvec_iter);
*page_offset = bv.bv_offset;
*length = bv.bv_len;
return bv.bv_page;
}
static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
size_t bytes)
{
struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
BUG_ON(bytes > cursor->resid);
BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
cursor->resid -= bytes;
bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
if (!cursor->resid) {
BUG_ON(!cursor->last_piece);
return false; /* no more data */
}
if (!bytes || cursor->bvec_iter.bi_bvec_done)
return false; /* more bytes to process in this segment */
BUG_ON(cursor->last_piece);
BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
cursor->last_piece =
cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
return true;
}
/*
* For a page array, a piece comes from the first page in the array
* that has not already been fully consumed.
@@ -1077,6 +1129,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
ceph_msg_data_bio_cursor_init(cursor, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_BVECS:
ceph_msg_data_bvecs_cursor_init(cursor, length);
break;
case CEPH_MSG_DATA_NONE:
default:
/* BUG(); */
@@ -1125,6 +1180,9 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
page = ceph_msg_data_bio_next(cursor, page_offset, length);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_BVECS:
page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
break;
case CEPH_MSG_DATA_NONE:
default:
page = NULL;
@@ -1163,6 +1221,9 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
new_piece = ceph_msg_data_bio_advance(cursor, bytes);
break;
#endif /* CONFIG_BLOCK */
case CEPH_MSG_DATA_BVECS:
new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
break;
case CEPH_MSG_DATA_NONE:
default:
BUG();
@@ -3247,6 +3308,20 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
EXPORT_SYMBOL(ceph_msg_data_add_bio);
#endif /* CONFIG_BLOCK */
void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
struct ceph_bvec_iter *bvec_pos)
{
struct ceph_msg_data *data;
data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
BUG_ON(!data);
data->bvec_pos = *bvec_pos;
list_add_tail(&data->links, &msg->data);
msg->data_length += bvec_pos->iter.bi_size;
}
EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
/*
* construct a new message with given type, size
* the new msg has a ref count of 1.

View File

@@ -155,6 +155,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
}
#endif /* CONFIG_BLOCK */
static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
struct ceph_bvec_iter *bvec_pos)
{
osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
osd_data->bvec_pos = *bvec_pos;
}
#define osd_req_op_data(oreq, whch, typ, fld) \
({ \
struct ceph_osd_request *__oreq = (oreq); \
@@ -229,6 +236,17 @@ void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
#endif /* CONFIG_BLOCK */
void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_bvec_iter *bvec_pos)
{
struct ceph_osd_data *osd_data;
osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
ceph_osd_data_bvecs_init(osd_data, bvec_pos);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
static void osd_req_op_cls_request_info_pagelist(
struct ceph_osd_request *osd_req,
unsigned int which, struct ceph_pagelist *pagelist)
@@ -266,6 +284,23 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
unsigned int which,
struct bio_vec *bvecs, u32 bytes)
{
struct ceph_osd_data *osd_data;
struct ceph_bvec_iter it = {
.bvecs = bvecs,
.iter = { .bi_size = bytes },
};
osd_data = osd_req_op_data(osd_req, which, cls, request_data);
ceph_osd_data_bvecs_init(osd_data, &it);
osd_req->r_ops[which].cls.indata_len += bytes;
osd_req->r_ops[which].indata_len += bytes;
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
unsigned int which, struct page **pages, u64 length,
u32 alignment, bool pages_from_pool, bool own_pages)
@@ -291,6 +326,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
case CEPH_OSD_DATA_TYPE_BIO:
return (u64)osd_data->bio_length;
#endif /* CONFIG_BLOCK */
case CEPH_OSD_DATA_TYPE_BVECS:
return osd_data->bvec_pos.iter.bi_size;
default:
WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
return 0;
@@ -831,6 +868,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
#endif
} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
} else {
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
}