These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / ceph / osd_client.c
index c4ec923..a28e47f 100644 (file)
@@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
 }
 #endif /* CONFIG_BLOCK */
 
-#define osd_req_op_data(oreq, whch, typ, fld)  \
-       ({                                              \
-               BUG_ON(whch >= (oreq)->r_num_ops);      \
-               &(oreq)->r_ops[whch].typ.fld;           \
-       })
+#define osd_req_op_data(oreq, whch, typ, fld)                          \
+({                                                                     \
+       struct ceph_osd_request *__oreq = (oreq);                       \
+       unsigned int __whch = (whch);                                   \
+       BUG_ON(__whch >= __oreq->r_num_ops);                            \
+       &__oreq->r_ops[__whch].typ.fld;                                 \
+})
 
 static struct ceph_osd_data *
 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
@@ -285,6 +287,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
        switch (op->op) {
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
+       case CEPH_OSD_OP_WRITEFULL:
                ceph_osd_data_release(&op->extent.osd_data);
                break;
        case CEPH_OSD_OP_CALL:
@@ -296,6 +299,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
        case CEPH_OSD_OP_CMPXATTR:
                ceph_osd_data_release(&op->xattr.osd_data);
                break;
+       case CEPH_OSD_OP_STAT:
+               ceph_osd_data_release(&op->raw_data_in);
+               break;
        default:
                break;
        }
@@ -450,7 +456,7 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
  */
 static struct ceph_osd_req_op *
 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
-                               u16 opcode)
+                u16 opcode, u32 flags)
 {
        struct ceph_osd_req_op *op;
 
@@ -460,14 +466,15 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
        op = &osd_req->r_ops[which];
        memset(op, 0, sizeof (*op));
        op->op = opcode;
+       op->flags = flags;
 
        return op;
 }
 
 void osd_req_op_init(struct ceph_osd_request *osd_req,
-                               unsigned int which, u16 opcode)
+                    unsigned int which, u16 opcode, u32 flags)
 {
-       (void)_osd_req_op_init(osd_req, which, opcode);
+       (void)_osd_req_op_init(osd_req, which, opcode, flags);
 }
 EXPORT_SYMBOL(osd_req_op_init);
 
@@ -476,17 +483,19 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
                                u64 offset, u64 length,
                                u64 truncate_size, u32 truncate_seq)
 {
-       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+                                                     opcode, 0);
        size_t payload_len = 0;
 
        BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
-              opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE);
+              opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
+              opcode != CEPH_OSD_OP_TRUNCATE);
 
        op->extent.offset = offset;
        op->extent.length = length;
        op->extent.truncate_size = truncate_size;
        op->extent.truncate_seq = truncate_seq;
-       if (opcode == CEPH_OSD_OP_WRITE)
+       if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
                payload_len += length;
 
        op->payload_len = payload_len;
@@ -515,7 +524,8 @@ EXPORT_SYMBOL(osd_req_op_extent_update);
 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
                        u16 opcode, const char *class, const char *method)
 {
-       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+                                                     opcode, 0);
        struct ceph_pagelist *pagelist;
        size_t payload_len = 0;
        size_t size;
@@ -552,7 +562,8 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
                          u16 opcode, const char *name, const void *value,
                          size_t size, u8 cmp_op, u8 cmp_mode)
 {
-       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+                                                     opcode, 0);
        struct ceph_pagelist *pagelist;
        size_t payload_len;
 
@@ -585,7 +596,8 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
                                unsigned int which, u16 opcode,
                                u64 cookie, u64 version, int flag)
 {
-       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+       struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+                                                     opcode, 0);
 
        BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
 
@@ -602,7 +614,8 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
                                u64 expected_write_size)
 {
        struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-                                                     CEPH_OSD_OP_SETALLOCHINT);
+                                                     CEPH_OSD_OP_SETALLOCHINT,
+                                                     0);
 
        op->alloc_hint.expected_object_size = expected_object_size;
        op->alloc_hint.expected_write_size = expected_write_size;
@@ -661,9 +674,11 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
                break;
        case CEPH_OSD_OP_READ:
        case CEPH_OSD_OP_WRITE:
+       case CEPH_OSD_OP_WRITEFULL:
        case CEPH_OSD_OP_ZERO:
        case CEPH_OSD_OP_TRUNCATE:
-               if (src->op == CEPH_OSD_OP_WRITE)
+               if (src->op == CEPH_OSD_OP_WRITE ||
+                   src->op == CEPH_OSD_OP_WRITEFULL)
                        request_data_len = src->extent.length;
                dst->extent.offset = cpu_to_le64(src->extent.offset);
                dst->extent.length = cpu_to_le64(src->extent.length);
@@ -672,7 +687,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
                dst->extent.truncate_seq =
                        cpu_to_le32(src->extent.truncate_seq);
                osd_data = &src->extent.osd_data;
-               if (src->op == CEPH_OSD_OP_WRITE)
+               if (src->op == CEPH_OSD_OP_WRITE ||
+                   src->op == CEPH_OSD_OP_WRITEFULL)
                        ceph_osdc_msg_data_add(req->r_request, osd_data);
                else
                        ceph_osdc_msg_data_add(req->r_reply, osd_data);
@@ -786,7 +802,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        }
 
        if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
-               osd_req_op_init(req, which, opcode);
+               osd_req_op_init(req, which, opcode, 0);
        } else {
                u32 object_size = le32_to_cpu(layout->fl_object_size);
                u32 object_base = off - objoff;
@@ -1088,7 +1104,7 @@ static void __move_osd_to_lru(struct ceph_osd_client *osdc,
        BUG_ON(!list_empty(&osd->o_osd_lru));
 
        list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
-       osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
+       osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
 }
 
 static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
@@ -1199,7 +1215,7 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
 static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
 {
        schedule_delayed_work(&osdc->timeout_work,
-                       osdc->client->options->osd_keepalive_timeout * HZ);
+                             osdc->client->options->osd_keepalive_timeout);
 }
 
 static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
@@ -1567,10 +1583,9 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
+       struct ceph_options *opts = osdc->client->options;
        struct ceph_osd_request *req;
        struct ceph_osd *osd;
-       unsigned long keepalive =
-               osdc->client->options->osd_keepalive_timeout * HZ;
        struct list_head slow_osds;
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -1586,7 +1601,8 @@ static void handle_timeout(struct work_struct *work)
         */
        INIT_LIST_HEAD(&slow_osds);
        list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
-               if (time_before(jiffies, req->r_stamp + keepalive))
+               if (time_before(jiffies,
+                               req->r_stamp + opts->osd_keepalive_timeout))
                        break;
 
                osd = req->r_osd;
@@ -1613,8 +1629,7 @@ static void handle_osds_timeout(struct work_struct *work)
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client,
                             osds_timeout_work.work);
-       unsigned long delay =
-               osdc->client->options->osd_idle_ttl * HZ >> 2;
+       unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
 
        dout("osds timeout\n");
        down_read(&osdc->map_sem);
@@ -1737,8 +1752,7 @@ static void complete_request(struct ceph_osd_request *req)
  * handle osd op reply.  either call the callback if it is specified,
  * or do the completion to wake up the waiting thread.
  */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
-                        struct ceph_connection *con)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
        void *p, *end;
        struct ceph_osd_request *req;
@@ -2619,7 +2633,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        osdc->event_count = 0;
 
        schedule_delayed_work(&osdc->osds_timeout_work,
-          round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
+           round_jiffies_relative(osdc->client->options->osd_idle_ttl));
 
        err = -ENOMEM;
        osdc->req_mempool = mempool_create_kmalloc_pool(10,
@@ -2794,7 +2808,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
                ceph_osdc_handle_map(osdc, msg);
                break;
        case CEPH_MSG_OSD_OPREPLY:
-               handle_reply(osdc, msg, con);
+               handle_reply(osdc, msg);
                break;
        case CEPH_MSG_WATCH_NOTIFY:
                handle_watch_notify(osdc, msg);
@@ -2809,8 +2823,9 @@ out:
 }
 
 /*
- * lookup and return message for incoming reply.  set up reply message
- * pages.
+ * Lookup and return message for incoming reply.  Don't try to do
+ * anything about a larger than preallocated data portion of the
+ * message at the moment - for now, just skip the message.
  */
 static struct ceph_msg *get_reply(struct ceph_connection *con,
                                  struct ceph_msg_header *hdr,
@@ -2828,23 +2843,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        mutex_lock(&osdc->request_mutex);
        req = __lookup_request(osdc, tid);
        if (!req) {
-               *skip = 1;
+               dout("%s osd%d tid %llu unknown, skipping\n", __func__,
+                    osd->o_osd, tid);
                m = NULL;
-               dout("get_reply unknown tid %llu from osd%d\n", tid,
-                    osd->o_osd);
+               *skip = 1;
                goto out;
        }
 
-       if (req->r_reply->con)
-               dout("%s revoking msg %p from old con %p\n", __func__,
-                    req->r_reply, req->r_reply->con);
        ceph_msg_revoke_incoming(req->r_reply);
 
        if (front_len > req->r_reply->front_alloc_len) {
-               pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n",
-                       front_len, req->r_reply->front_alloc_len,
-                       (unsigned int)con->peer_name.type,
-                       le64_to_cpu(con->peer_name.num));
+               pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
+                       __func__, osd->o_osd, req->r_tid, front_len,
+                       req->r_reply->front_alloc_len);
                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
                                 false);
                if (!m)
@@ -2852,37 +2863,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                ceph_msg_put(req->r_reply);
                req->r_reply = m;
        }
-       m = ceph_msg_get(req->r_reply);
-
-       if (data_len > 0) {
-               struct ceph_osd_data *osd_data;
 
-               /*
-                * XXX This is assuming there is only one op containing
-                * XXX page data.  Probably OK for reads, but this
-                * XXX ought to be done more generally.
-                */
-               osd_data = osd_req_op_extent_osd_data(req, 0);
-               if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
-                       if (osd_data->pages &&
-                               unlikely(osd_data->length < data_len)) {
-
-                               pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n",
-                                       tid, data_len, osd_data->length);
-                               *skip = 1;
-                               ceph_msg_put(m);
-                               m = NULL;
-                               goto out;
-                       }
-               }
+       if (data_len > req->r_reply->data_length) {
+               pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
+                       __func__, osd->o_osd, req->r_tid, data_len,
+                       req->r_reply->data_length);
+               m = NULL;
+               *skip = 1;
+               goto out;
        }
-       *skip = 0;
+
+       m = ceph_msg_get(req->r_reply);
        dout("get_reply tid %lld %p\n", tid, m);
 
 out:
        mutex_unlock(&osdc->request_mutex);
        return m;
-
 }
 
 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
@@ -2980,17 +2976,19 @@ static int invalidate_authorizer(struct ceph_connection *con)
        return ceph_monc_validate_auth(&osdc->client->monc);
 }
 
-static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+static int osd_sign_message(struct ceph_msg *msg)
 {
-       struct ceph_osd *o = con->private;
+       struct ceph_osd *o = msg->con->private;
        struct ceph_auth_handshake *auth = &o->o_auth;
+
        return ceph_auth_sign_message(auth, msg);
 }
 
-static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+static int osd_check_message_signature(struct ceph_msg *msg)
 {
-       struct ceph_osd *o = con->private;
+       struct ceph_osd *o = msg->con->private;
        struct ceph_auth_handshake *auth = &o->o_auth;
+
        return ceph_auth_check_message_signature(auth, msg);
 }
 
@@ -3002,7 +3000,7 @@ static const struct ceph_connection_operations osd_con_ops = {
        .verify_authorizer_reply = verify_authorizer_reply,
        .invalidate_authorizer = invalidate_authorizer,
        .alloc_msg = alloc_msg,
-       .sign_message = sign_message,
-       .check_message_signature = check_message_signature,
+       .sign_message = osd_sign_message,
+       .check_message_signature = osd_check_message_signature,
        .fault = osd_reset,
 };