Merge branch 'fixes' of git://git.kernel.org/pub/scm/linux/kernel/git/evalenti/linux...
[cascardo/linux.git] / fs / ceph / mds_client.c
index 5f62fb7..71c073f 100644 (file)
@@ -480,6 +480,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
                mdsc->max_sessions = newmax;
        }
        mdsc->sessions[mds] = s;
+       atomic_inc(&mdsc->num_sessions);
        atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 
        ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
@@ -503,6 +504,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
        mdsc->sessions[s->s_mds] = NULL;
        ceph_con_close(&s->s_con);
        ceph_put_mds_session(s);
+       atomic_dec(&mdsc->num_sessions);
 }
 
 /*
@@ -842,8 +844,9 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6
        struct ceph_options *opt = mdsc->fsc->client->options;
        void *p;
 
-       const char* metadata[3][2] = {
+       const char* metadata[][2] = {
                {"hostname", utsname()->nodename},
+               {"kernel_version", utsname()->release},
                {"entity_id", opt->name ? opt->name : ""},
                {NULL, NULL}
        };
@@ -1464,19 +1467,33 @@ out_unlocked:
        return err;
 }
 
+static int check_cap_flush(struct inode *inode, u64 want_flush_seq)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int ret;
+       spin_lock(&ci->i_ceph_lock);
+       if (ci->i_flushing_caps)
+               ret = ci->i_cap_flush_seq >= want_flush_seq;
+       else
+               ret = 1;
+       spin_unlock(&ci->i_ceph_lock);
+       return ret;
+}
+
 /*
  * flush all dirty inode data to disk.
  *
  * returns true if we've flushed through want_flush_seq
  */
-static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
+static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
 {
-       int mds, ret = 1;
+       int mds;
 
        dout("check_cap_flush want %lld\n", want_flush_seq);
        mutex_lock(&mdsc->mutex);
-       for (mds = 0; ret && mds < mdsc->max_sessions; mds++) {
+       for (mds = 0; mds < mdsc->max_sessions; mds++) {
                struct ceph_mds_session *session = mdsc->sessions[mds];
+               struct inode *inode = NULL;
 
                if (!session)
                        continue;
@@ -1489,29 +1506,29 @@ static int check_cap_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq)
                                list_entry(session->s_cap_flushing.next,
                                           struct ceph_inode_info,
                                           i_flushing_item);
-                       struct inode *inode = &ci->vfs_inode;
 
-                       spin_lock(&ci->i_ceph_lock);
-                       if (ci->i_cap_flush_seq <= want_flush_seq) {
+                       if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) {
                                dout("check_cap_flush still flushing %p "
-                                    "seq %lld <= %lld to mds%d\n", inode,
-                                    ci->i_cap_flush_seq, want_flush_seq,
-                                    session->s_mds);
-                               ret = 0;
+                                    "seq %lld <= %lld to mds%d\n",
+                                    &ci->vfs_inode, ci->i_cap_flush_seq,
+                                    want_flush_seq, session->s_mds);
+                               inode = igrab(&ci->vfs_inode);
                        }
-                       spin_unlock(&ci->i_ceph_lock);
                }
                mutex_unlock(&session->s_mutex);
                ceph_put_mds_session(session);
 
-               if (!ret)
-                       return ret;
+               if (inode) {
+                       wait_event(mdsc->cap_flushing_wq,
+                                  check_cap_flush(inode, want_flush_seq));
+                       iput(inode);
+               }
+
                mutex_lock(&mdsc->mutex);
        }
 
        mutex_unlock(&mdsc->mutex);
        dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq);
-       return ret;
 }
 
 /*
@@ -1923,7 +1940,11 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        head->num_releases = cpu_to_le16(releases);
 
        /* time stamp */
-       ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+       {
+               struct ceph_timespec ts;
+               ceph_encode_timespec(&ts, &req->r_stamp);
+               ceph_encode_copy(&p, &ts, sizeof(ts));
+       }
 
        BUG_ON(p > end);
        msg->front.iov_len = p - msg->front.iov_base;
@@ -2012,7 +2033,11 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
 
                /* time stamp */
                p = msg->front.iov_base + req->r_request_release_offset;
-               ceph_encode_copy(&p, &req->r_stamp, sizeof(req->r_stamp));
+               {
+                       struct ceph_timespec ts;
+                       ceph_encode_timespec(&ts, &req->r_stamp);
+                       ceph_encode_copy(&p, &ts, sizeof(ts));
+               }
 
                msg->front.iov_len = p - msg->front.iov_base;
                msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
@@ -2159,6 +2184,8 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds)
                p = rb_next(p);
                if (req->r_got_unsafe)
                        continue;
+               if (req->r_attempts > 0)
+                       continue; /* only new requests */
                if (req->r_session &&
                    req->r_session->s_mds == mds) {
                        dout(" kicking tid %llu\n", req->r_tid);
@@ -2286,6 +2313,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        struct ceph_mds_request *req;
        struct ceph_mds_reply_head *head = msg->front.iov_base;
        struct ceph_mds_reply_info_parsed *rinfo;  /* parsed reply info */
+       struct ceph_snap_realm *realm;
        u64 tid;
        int err, result;
        int mds = session->s_mds;
@@ -2401,11 +2429,13 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        }
 
        /* snap trace */
+       realm = NULL;
        if (rinfo->snapblob_len) {
                down_write(&mdsc->snap_rwsem);
                ceph_update_snap_trace(mdsc, rinfo->snapblob,
-                              rinfo->snapblob + rinfo->snapblob_len,
-                              le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
+                               rinfo->snapblob + rinfo->snapblob_len,
+                               le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP,
+                               &realm);
                downgrade_write(&mdsc->snap_rwsem);
        } else {
                down_read(&mdsc->snap_rwsem);
@@ -2423,6 +2453,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        mutex_unlock(&req->r_fill_mutex);
 
        up_read(&mdsc->snap_rwsem);
+       if (realm)
+               ceph_put_snap_realm(mdsc, realm);
 out_err:
        mutex_lock(&mdsc->mutex);
        if (!req->r_aborted) {
@@ -2487,6 +2519,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
                dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
                BUG_ON(req->r_err);
                BUG_ON(req->r_got_result);
+               req->r_attempts = 0;
                req->r_num_fwd = fwd_seq;
                req->r_resend_mds = next_mds;
                put_request_session(req);
@@ -2580,6 +2613,14 @@ static void handle_session(struct ceph_mds_session *session,
                send_flushmsg_ack(mdsc, session, seq);
                break;
 
+       case CEPH_SESSION_FORCE_RO:
+               dout("force_session_readonly %p\n", session);
+               spin_lock(&session->s_cap_lock);
+               session->s_readonly = true;
+               spin_unlock(&session->s_cap_lock);
+               wake_up_session_caps(session, 0);
+               break;
+
        default:
                pr_err("mdsc_handle_session bad op %d mds%d\n", op, mds);
                WARN_ON(1);
@@ -2610,6 +2651,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
                                   struct ceph_mds_session *session)
 {
        struct ceph_mds_request *req, *nreq;
+       struct rb_node *p;
        int err;
 
        dout("replay_unsafe_requests mds%d\n", session->s_mds);
@@ -2622,6 +2664,28 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
                        ceph_con_send(&session->s_con, req->r_request);
                }
        }
+
+       /*
+        * also re-send old requests when MDS enters reconnect stage. So that MDS
+        * can process completed request in clientreplay stage.
+        */
+       p = rb_first(&mdsc->request_tree);
+       while (p) {
+               req = rb_entry(p, struct ceph_mds_request, r_node);
+               p = rb_next(p);
+               if (req->r_got_unsafe)
+                       continue;
+               if (req->r_attempts == 0)
+                       continue; /* only old requests */
+               if (req->r_session &&
+                   req->r_session->s_mds == session->s_mds) {
+                       err = __prepare_send_request(mdsc, req, session->s_mds);
+                       if (!err) {
+                               ceph_msg_get(req->r_request);
+                               ceph_con_send(&session->s_con, req->r_request);
+                       }
+               }
+       }
        mutex_unlock(&mdsc->mutex);
 }
 
@@ -2787,6 +2851,8 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
        spin_unlock(&session->s_gen_ttl_lock);
 
        spin_lock(&session->s_cap_lock);
+       /* don't know if session is readonly */
+       session->s_readonly = 0;
        /*
         * notify __ceph_remove_cap() that we are composing cap reconnect.
         * If a cap get released before being added to the cap reconnect,
@@ -2933,9 +2999,6 @@ static void check_new_map(struct ceph_mds_client *mdsc,
                                mutex_unlock(&s->s_mutex);
                                s->s_state = CEPH_MDS_SESSION_RESTARTING;
                        }
-
-                       /* kick any requests waiting on the recovering mds */
-                       kick_requests(mdsc, i);
                } else if (oldstate == newstate) {
                        continue;  /* nothing new with this mds */
                }
@@ -3295,6 +3358,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
        mdsc->sessions = NULL;
+       atomic_set(&mdsc->num_sessions, 0);
        mdsc->max_sessions = 0;
        mdsc->stopping = 0;
        init_rwsem(&mdsc->snap_rwsem);
@@ -3428,14 +3492,17 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
        dout("sync\n");
        mutex_lock(&mdsc->mutex);
        want_tid = mdsc->last_tid;
-       want_flush = mdsc->cap_flush_seq;
        mutex_unlock(&mdsc->mutex);
-       dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
 
        ceph_flush_dirty_caps(mdsc);
+       spin_lock(&mdsc->cap_dirty_lock);
+       want_flush = mdsc->cap_flush_seq;
+       spin_unlock(&mdsc->cap_dirty_lock);
+
+       dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush);
 
        wait_unsafe_requests(mdsc, want_tid);
-       wait_event(mdsc->cap_flushing_wq, check_cap_flush(mdsc, want_flush));
+       wait_caps_flush(mdsc, want_flush);
 }
 
 /*
@@ -3443,17 +3510,9 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
  */
 static bool done_closing_sessions(struct ceph_mds_client *mdsc)
 {
-       int i, n = 0;
-
        if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN)
                return true;
-
-       mutex_lock(&mdsc->mutex);
-       for (i = 0; i < mdsc->max_sessions; i++)
-               if (mdsc->sessions[i])
-                       n++;
-       mutex_unlock(&mdsc->mutex);
-       return n == 0;
+       return atomic_read(&mdsc->num_sessions) == 0;
 }
 
 /*