Merge tag 'perf-urgent-for-mingo' of git://git.kernel.org/pub/scm/linux/kernel/git...
[cascardo/linux.git] / net / ceph / osd_client.c
index 30f6faf..f3fc54e 100644 (file)
@@ -30,8 +30,11 @@ static void __send_queued(struct ceph_osd_client *osdc);
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 static void __register_request(struct ceph_osd_client *osdc,
                               struct ceph_osd_request *req);
+static void __unregister_request(struct ceph_osd_client *osdc,
+                                struct ceph_osd_request *req);
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req);
+static void __enqueue_request(struct ceph_osd_request *req);
 static void __send_request(struct ceph_osd_client *osdc,
                           struct ceph_osd_request *req);
 
@@ -428,68 +431,9 @@ EXPORT_SYMBOL(ceph_osdc_alloc_request);
 static bool osd_req_opcode_valid(u16 opcode)
 {
        switch (opcode) {
-       case CEPH_OSD_OP_READ:
-       case CEPH_OSD_OP_STAT:
-       case CEPH_OSD_OP_MAPEXT:
-       case CEPH_OSD_OP_MASKTRUNC:
-       case CEPH_OSD_OP_SPARSE_READ:
-       case CEPH_OSD_OP_NOTIFY:
-       case CEPH_OSD_OP_NOTIFY_ACK:
-       case CEPH_OSD_OP_ASSERT_VER:
-       case CEPH_OSD_OP_WRITE:
-       case CEPH_OSD_OP_WRITEFULL:
-       case CEPH_OSD_OP_TRUNCATE:
-       case CEPH_OSD_OP_ZERO:
-       case CEPH_OSD_OP_DELETE:
-       case CEPH_OSD_OP_APPEND:
-       case CEPH_OSD_OP_STARTSYNC:
-       case CEPH_OSD_OP_SETTRUNC:
-       case CEPH_OSD_OP_TRIMTRUNC:
-       case CEPH_OSD_OP_TMAPUP:
-       case CEPH_OSD_OP_TMAPPUT:
-       case CEPH_OSD_OP_TMAPGET:
-       case CEPH_OSD_OP_CREATE:
-       case CEPH_OSD_OP_ROLLBACK:
-       case CEPH_OSD_OP_WATCH:
-       case CEPH_OSD_OP_OMAPGETKEYS:
-       case CEPH_OSD_OP_OMAPGETVALS:
-       case CEPH_OSD_OP_OMAPGETHEADER:
-       case CEPH_OSD_OP_OMAPGETVALSBYKEYS:
-       case CEPH_OSD_OP_OMAPSETVALS:
-       case CEPH_OSD_OP_OMAPSETHEADER:
-       case CEPH_OSD_OP_OMAPCLEAR:
-       case CEPH_OSD_OP_OMAPRMKEYS:
-       case CEPH_OSD_OP_OMAP_CMP:
-       case CEPH_OSD_OP_SETALLOCHINT:
-       case CEPH_OSD_OP_CLONERANGE:
-       case CEPH_OSD_OP_ASSERT_SRC_VERSION:
-       case CEPH_OSD_OP_SRC_CMPXATTR:
-       case CEPH_OSD_OP_GETXATTR:
-       case CEPH_OSD_OP_GETXATTRS:
-       case CEPH_OSD_OP_CMPXATTR:
-       case CEPH_OSD_OP_SETXATTR:
-       case CEPH_OSD_OP_SETXATTRS:
-       case CEPH_OSD_OP_RESETXATTRS:
-       case CEPH_OSD_OP_RMXATTR:
-       case CEPH_OSD_OP_PULL:
-       case CEPH_OSD_OP_PUSH:
-       case CEPH_OSD_OP_BALANCEREADS:
-       case CEPH_OSD_OP_UNBALANCEREADS:
-       case CEPH_OSD_OP_SCRUB:
-       case CEPH_OSD_OP_SCRUB_RESERVE:
-       case CEPH_OSD_OP_SCRUB_UNRESERVE:
-       case CEPH_OSD_OP_SCRUB_STOP:
-       case CEPH_OSD_OP_SCRUB_MAP:
-       case CEPH_OSD_OP_WRLOCK:
-       case CEPH_OSD_OP_WRUNLOCK:
-       case CEPH_OSD_OP_RDLOCK:
-       case CEPH_OSD_OP_RDUNLOCK:
-       case CEPH_OSD_OP_UPLOCK:
-       case CEPH_OSD_OP_DNLOCK:
-       case CEPH_OSD_OP_CALL:
-       case CEPH_OSD_OP_PGLS:
-       case CEPH_OSD_OP_PGLS_FILTER:
-               return true;
+#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
+__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
+#undef GENERATE_CASE
        default:
                return false;
        }
@@ -892,6 +836,37 @@ __lookup_request_ge(struct ceph_osd_client *osdc,
        return NULL;
 }
 
+static void __kick_linger_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+       struct ceph_osd *osd = req->r_osd;
+
+       /*
+        * Linger requests need to be resent with a new tid to avoid
+        * the dup op detection logic on the OSDs.  Achieve this with
+        * a re-register dance instead of open-coding.
+        */
+       ceph_osdc_get_request(req);
+       if (!list_empty(&req->r_linger_item))
+               __unregister_linger_request(osdc, req);
+       else
+               __unregister_request(osdc, req);
+       __register_request(osdc, req);
+       ceph_osdc_put_request(req);
+
+       /*
+        * Unless request has been registered as both normal and
+        * lingering, __unregister{,_linger}_request clears r_osd.
+        * However, here we need to preserve r_osd to make sure we
+        * requeue on the same OSD.
+        */
+       WARN_ON(req->r_osd || !osd);
+       req->r_osd = osd;
+
+       dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
+       __enqueue_request(req);
+}
+
 /*
  * Resubmit requests pending on the given osd.
  */
@@ -900,12 +875,14 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request *req, *nreq;
        LIST_HEAD(resend);
+       LIST_HEAD(resend_linger);
        int err;
 
-       dout("__kick_osd_requests osd%d\n", osd->o_osd);
+       dout("%s osd%d\n", __func__, osd->o_osd);
        err = __reset_osd(osdc, osd);
        if (err)
                return;
+
        /*
         * Build up a list of requests to resend by traversing the
         * osd's list of requests.  Requests for a given object are
@@ -926,33 +903,32 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
                if (!req->r_sent)
                        break;
-               list_move_tail(&req->r_req_lru_item, &resend);
-               dout("requeueing %p tid %llu osd%d\n", req, req->r_tid,
-                    osd->o_osd);
-               if (!req->r_linger)
+
+               if (!req->r_linger) {
+                       dout("%s requeueing %p tid %llu\n", __func__, req,
+                            req->r_tid);
+                       list_move_tail(&req->r_req_lru_item, &resend);
                        req->r_flags |= CEPH_OSD_FLAG_RETRY;
+               } else {
+                       list_move_tail(&req->r_req_lru_item, &resend_linger);
+               }
        }
        list_splice(&resend, &osdc->req_unsent);
 
        /*
-        * Linger requests are re-registered before sending, which
-        * sets up a new tid for each.  We add them to the unsent
-        * list at the end to keep things in tid order.
+        * Both registered and not yet registered linger requests are
+        * enqueued with a new tid on the same OSD.  We add/move them
+        * to req_unsent/o_requests at the end to keep things in tid
+        * order.
         */
        list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
                                 r_linger_osd_item) {
-               /*
-                * reregister request prior to unregistering linger so
-                * that r_osd is preserved.
-                */
-               BUG_ON(!list_empty(&req->r_req_lru_item));
-               __register_request(osdc, req);
-               list_add_tail(&req->r_req_lru_item, &osdc->req_unsent);
-               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-               __unregister_linger_request(osdc, req);
-               dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid,
-                    osd->o_osd);
+               WARN_ON(!list_empty(&req->r_req_lru_item));
+               __kick_linger_request(req);
        }
+
+       list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
+               __kick_linger_request(req);
 }
 
 /*
@@ -1346,6 +1322,22 @@ static int __calc_request_pg(struct ceph_osdmap *osdmap,
                                   &req->r_target_oid, pg_out);
 }
 
+static void __enqueue_request(struct ceph_osd_request *req)
+{
+       struct ceph_osd_client *osdc = req->r_osdc;
+
+       dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
+            req->r_osd ? req->r_osd->o_osd : -1);
+
+       if (req->r_osd) {
+               __remove_osd_from_lru(req->r_osd);
+               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
+               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
+       } else {
+               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
+       }
+}
+
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
@@ -1423,13 +1415,7 @@ static int __map_request(struct ceph_osd_client *osdc,
                              &osdc->osdmap->osd_addr[o]);
        }
 
-       if (req->r_osd) {
-               __remove_osd_from_lru(req->r_osd);
-               list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-               list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
-       } else {
-               list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
-       }
+       __enqueue_request(req);
        err = 1;   /* osd or pg changed */
 
 out:
@@ -1774,8 +1760,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        }
        bytes = le32_to_cpu(msg->hdr.data_len);
        if (payload_len != bytes) {
-               pr_warning("sum of op payload lens %d != data_len %d",
-                          payload_len, bytes);
+               pr_warn("sum of op payload lens %d != data_len %d\n",
+                       payload_len, bytes);
                goto bad_put;
        }
 
@@ -2313,24 +2299,19 @@ static void handle_watch_notify(struct ceph_osd_client *osdc,
        if (event) {
                event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
                if (!event_work) {
-                       dout("ERROR: could not allocate event_work\n");
-                       goto done_err;
+                       pr_err("couldn't allocate event_work\n");
+                       ceph_osdc_put_event(event);
+                       return;
                }
                INIT_WORK(&event_work->work, do_event_work);
                event_work->event = event;
                event_work->ver = ver;
                event_work->notify_id = notify_id;
                event_work->opcode = opcode;
-               if (!queue_work(osdc->notify_wq, &event_work->work)) {
-                       dout("WARNING: failed to queue notify event work\n");
-                       goto done_err;
-               }
-       }
 
-       return;
+               queue_work(osdc->notify_wq, &event_work->work);
+       }
 
-done_err:
-       ceph_osdc_put_event(event);
        return;
 
 bad:
@@ -2797,10 +2778,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        ceph_msg_revoke_incoming(req->r_reply);
 
        if (front_len > req->r_reply->front_alloc_len) {
-               pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
-                          front_len, req->r_reply->front_alloc_len,
-                          (unsigned int)con->peer_name.type,
-                          le64_to_cpu(con->peer_name.num));
+               pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n",
+                       front_len, req->r_reply->front_alloc_len,
+                       (unsigned int)con->peer_name.type,
+                       le64_to_cpu(con->peer_name.num));
                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
                                 false);
                if (!m)
@@ -2823,8 +2804,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
                        if (osd_data->pages &&
                                unlikely(osd_data->length < data_len)) {
 
-                               pr_warning("tid %lld reply has %d bytes "
-                                       "we had only %llu bytes ready\n",
+                               pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n",
                                        tid, data_len, osd_data->length);
                                *skip = 1;
                                ceph_msg_put(m);