Orangefs: implement .write_iter
authorMike Marshall <hubcap@omnibond.com>
Wed, 13 Jan 2016 16:18:12 +0000 (11:18 -0500)
committerMike Marshall <hubcap@omnibond.com>
Wed, 13 Jan 2016 16:18:12 +0000 (11:18 -0500)
Until now, orangefs_devreq_write_iter has just been a wrapper for
the old-fashioned orangefs_devreq_writev... linux would call
.write_iter with "struct kiocb *iocb" and "struct iov_iter *iter"
and .write_iter would just:

        return pvfs2_devreq_writev(iocb->ki_filp,
                                   iter->iov,
                                   iter->nr_segs,
                                   &iocb->ki_pos);

Signed-off-by: Mike Marshall <hubcap@omnibond.com>
fs/orangefs/devorangefs-req.c

index e3bb15e..0f01d3e 100644 (file)
@@ -245,304 +245,240 @@ error:
 }
 
 /*
- * Function for writev() callers into the device. Readdir related
- * operations have an extra iovec containing info about objects
- * contained in directories.
+ * Function for writev() callers into the device.
+ *
+ * Userspace should have written:
+ *  - __u32 version
+ *  - __u32 magic
+ *  - __u64 tag
+ *  - struct orangefs_downcall_s
+ *  - trailer buffer (in the case of READDIR operations)
  */
-static ssize_t orangefs_devreq_writev(struct file *file,
-                                  const struct iovec *iov,
-                                  size_t count,
-                                  loff_t *offset)
+static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
+                                     struct iov_iter *iter)
 {
+       ssize_t ret;
        struct orangefs_kernel_op_s *op = NULL;
-       void *buffer = NULL;
-       void *ptr = NULL;
-       unsigned long i = 0;
-       int num_remaining = MAX_DEV_REQ_DOWNSIZE;
-       int ret = 0;
-       /* num elements in iovec without trailer */
-       int notrailer_count = 4;
-       /*
-        * If there's a trailer, its iov index will be equal to
-        * notrailer_count.
-        */
-       int trailer_index = notrailer_count;
-       int payload_size = 0;
-       int returned_downcall_size = 0;
-       __s32 magic = 0;
-       __s32 proto_ver = 0;
-       __u64 tag = 0;
-       ssize_t total_returned_size = 0;
+       struct {
+               __u32 version;
+               __u32 magic;
+               __u64 tag;
+       } head;
+       int total = ret = iov_iter_count(iter);
+       int n;
+       int downcall_size = sizeof(struct orangefs_downcall_s);
+       int head_size = sizeof(head);
+
+       gossip_debug(GOSSIP_DEV_DEBUG, "%s: total:%d: ret:%zd:\n",
+                    __func__,
+                    total,
+                    ret);
 
-       /*
-        * There will always be at least notrailer_count iovecs, and
-        * when there's a trailer, one more than notrailer_count. Check
-        * count's sanity.
-        */
-       if (count != notrailer_count && count != (notrailer_count + 1)) {
-               gossip_err("%s: count:%zu: notrailer_count :%d:\n",
-                       __func__,
-                       count,
-                       notrailer_count);
-               return -EPROTO;
+        if (total < MAX_DEV_REQ_DOWNSIZE) {
+               gossip_err("%s: total:%d: must be at least:%lu:\n",
+                          __func__,
+                          total,
+                          MAX_DEV_REQ_DOWNSIZE);
+               ret = -EFAULT;
+               goto out;
        }
-
-
-       /* Copy the non-trailer iovec data into a device request buffer. */
-       buffer = dev_req_alloc();
-       if (!buffer) {
-               gossip_err("%s: dev_req_alloc failed.\n", __func__);
-               return -ENOMEM;
+     
+       n = copy_from_iter(&head, head_size, iter);
+       if (n < head_size) {
+               gossip_err("%s: failed to copy head.\n", __func__);
+               ret = -EFAULT;
+               goto out;
        }
-       ptr = buffer;
-       for (i = 0; i < notrailer_count; i++) {
-               if (iov[i].iov_len > num_remaining) {
-                       gossip_err
-                           ("writev error: Freeing buffer and returning\n");
-                       dev_req_release(buffer);
-                       return -EMSGSIZE;
-               }
-               ret = copy_from_user(ptr, iov[i].iov_base, iov[i].iov_len);
-               if (ret) {
-                       gossip_err("Failed to copy data from user space\n");
-                       dev_req_release(buffer);
-                       return -EIO;
-               }
-               num_remaining -= iov[i].iov_len;
-               ptr += iov[i].iov_len;
-               payload_size += iov[i].iov_len;
+
+       if (head.version < ORANGEFS_MINIMUM_USERSPACE_VERSION) {
+               gossip_err("%s: userspace claims version"
+                          "%d, minimum version required: %d.\n",
+                          __func__,
+                          head.version,
+                          ORANGEFS_MINIMUM_USERSPACE_VERSION);
+               ret = -EPROTO;
+               goto out;
        }
-       total_returned_size = payload_size;
 
-       /* these elements are currently 8 byte aligned (8 bytes for (version +
-        * magic) 8 bytes for tag).  If you add another element, either
-        * make it 8 bytes big, or use get_unaligned when asigning.
-        */
-       ptr = buffer;
-       proto_ver = *((__s32 *) ptr); /* unused */
-       ptr += sizeof(__s32);
+       if (head.magic != ORANGEFS_DEVREQ_MAGIC) {
+               gossip_err("Error: Device magic number does not match.\n");
+               ret = -EPROTO;
+               goto out;
+       }
 
-       magic = *((__s32 *) ptr);
-       ptr += sizeof(__s32);
+       op = orangefs_devreq_remove_op(head.tag);
+       if (!op) {
+               gossip_err("WARNING: No one's waiting for tag %llu\n",
+                          llu(head.tag));
+               goto out;
+       }
 
-       tag = *((__u64 *) ptr);
-       ptr += sizeof(__u64);
+       get_op(op); /* increase ref count. */
 
-       if (magic != ORANGEFS_DEVREQ_MAGIC) {
-               gossip_err("Error: Device magic number does not match.\n");
-               dev_req_release(buffer);
-               return -EPROTO;
+       n = copy_from_iter(&op->downcall, downcall_size, iter);
+       if (n != downcall_size) {
+               gossip_err("%s: failed to copy downcall.\n", __func__);
+               put_op(op);
+               ret = -EFAULT;
+               goto out;
        }
 
-       op = orangefs_devreq_remove_op(tag);
-       if (op) {
-               /* Increase ref count! */
-               get_op(op);
-
-               /* calculate the size of the returned downcall. */
-               returned_downcall_size =
-                       payload_size - (2 * sizeof(__s32) + sizeof(__u64));
-
-               /* copy the passed in downcall into the op */
-               if (returned_downcall_size ==
-                       sizeof(struct orangefs_downcall_s)) {
-                       memcpy(&op->downcall,
-                              ptr,
-                              sizeof(struct orangefs_downcall_s));
-               } else {
-                       gossip_err("%s: returned downcall size:%d: \n",
-                                  __func__,
-                                  returned_downcall_size);
-                       dev_req_release(buffer);
-                       put_op(op);
-                       return -EMSGSIZE;
-               }
+       if (op->downcall.status)
+               goto wakeup;
 
-               /* Don't tolerate an unexpected trailer iovec. */
-               if ((op->downcall.trailer_size == 0) &&
-                   (count != notrailer_count)) {
-                       gossip_err("%s: unexpected trailer iovec.\n",
-                                  __func__);
-                       dev_req_release(buffer);
-                       put_op(op);
-                       return -EPROTO;
-               }
+       /*
+        * We've successfully peeled off the head and the downcall. 
+        * Something has gone awry if total doesn't equal the
+        * sum of head_size, downcall_size and trailer_size.
+        */
+       if ((head_size + downcall_size + op->downcall.trailer_size) != total) {
+               gossip_err("%s: funky write, head_size:%d"
+                          ": downcall_size:%d: trailer_size:%lld"
+                          ": total size:%d:\n",
+                          __func__,
+                          head_size,
+                          downcall_size,
+                          op->downcall.trailer_size,
+                          total);
+               put_op(op);
+               ret = -EFAULT;
+               goto out;
+       }
 
-               /* Don't consider the trailer if there's a bad status. */
-               if (op->downcall.status != 0)
-                       goto no_trailer;
+       /* Only READDIR operations should have trailers. */
+       if ((op->downcall.type != ORANGEFS_VFS_OP_READDIR) &&
+           (op->downcall.trailer_size != 0)) {
+               gossip_err("%s: %x operation with trailer.",
+                          __func__,
+                          op->downcall.type);
+               put_op(op);
+               ret = -EFAULT;
+               goto out;
+       }
 
-               /* get the trailer if there is one. */
-               if (op->downcall.trailer_size == 0)
-                       goto no_trailer;
+       /* READDIR operations should always have trailers. */
+       if ((op->downcall.type == ORANGEFS_VFS_OP_READDIR) &&
+           (op->downcall.trailer_size == 0)) {
+               gossip_err("%s: %x operation with no trailer.",
+                          __func__,
+                          op->downcall.type);
+               put_op(op);
+               ret = -EFAULT;
+               goto out;
+       }
 
-               gossip_debug(GOSSIP_DEV_DEBUG,
-                            "%s: op->downcall.trailer_size %lld\n",
-                            __func__,
-                            op->downcall.trailer_size);
+       if (op->downcall.type != ORANGEFS_VFS_OP_READDIR)
+               goto wakeup;
 
-               /*
-                * Bail if we think think there should be a trailer, but
-                * there's no iovec for it.
-                */
-               if (count != (notrailer_count + 1)) {
-                       gossip_err("%s: trailer_size:%lld: count:%zu:\n",
-                                  __func__,
-                                  op->downcall.trailer_size,
-                                  count);
-                       dev_req_release(buffer);
-                       put_op(op);
-                       return -EPROTO;
-               }
+       op->downcall.trailer_buf =
+               vmalloc(op->downcall.trailer_size);
+       if (op->downcall.trailer_buf == NULL) {
+               gossip_err("%s: failed trailer vmalloc.\n",
+                          __func__);
+               put_op(op);
+               ret = -ENOMEM;
+               goto out;
+       }
+       memset(op->downcall.trailer_buf, 0, op->downcall.trailer_size);
+       n = copy_from_iter(op->downcall.trailer_buf,
+                          op->downcall.trailer_size,
+                          iter);
+       if (n != op->downcall.trailer_size) {
+               gossip_err("%s: failed to copy trailer.\n", __func__);
+               vfree(op->downcall.trailer_buf);
+               put_op(op);
+               ret = -EFAULT;
+               goto out;
+       }
 
-               /* Verify that trailer_size is accurate. */
-               if (op->downcall.trailer_size != iov[trailer_index].iov_len) {
-                       gossip_err("%s: trailer_size:%lld: != iov_len:%zd:\n",
-                                  __func__,
-                                  op->downcall.trailer_size,
-                                  iov[trailer_index].iov_len);
-                       dev_req_release(buffer);
-                       put_op(op);
-                       return -EMSGSIZE;
-               }
+wakeup:
 
-               total_returned_size += iov[trailer_index].iov_len;
+       /*
+        * If this operation is an I/O operation we need to wait
+        * for all data to be copied before we can return to avoid
+        * buffer corruption and races that can pull the buffers
+        * out from under us.
+        *
+        * Essentially we're synchronizing with other parts of the
+        * vfs implicitly by not allowing the user space
+        * application reading/writing this device to return until
+        * the buffers are done being used.
+        */
+       if (op->downcall.type == ORANGEFS_VFS_OP_FILE_IO) {
+               int timed_out = 0;
+               DEFINE_WAIT(wait_entry);
 
                /*
-                * Allocate a buffer, copy the trailer bytes into it and
-                * attach it to the downcall.
+                * tell the vfs op waiting on a waitqueue
+                * that this op is done
                 */
-               op->downcall.trailer_buf = vmalloc(iov[trailer_index].iov_len);
-               if (op->downcall.trailer_buf != NULL) {
-                       gossip_debug(GOSSIP_DEV_DEBUG, "vmalloc: %p\n",
-                                    op->downcall.trailer_buf);
-                       ret = copy_from_user(op->downcall.trailer_buf,
-                                            iov[trailer_index].iov_base,
-                                            iov[trailer_index].iov_len);
-                       if (ret) {
-                               gossip_err("%s: Failed to copy trailer.\n",
-                                          __func__);
-                               dev_req_release(buffer);
-                               gossip_debug(GOSSIP_DEV_DEBUG,
-                                            "vfree: %p\n",
-                                            op->downcall.trailer_buf);
-                               vfree(op->downcall.trailer_buf);
-                               op->downcall.trailer_buf = NULL;
-                               put_op(op);
-                               return -EIO;
-                       }
-               } else {
-                       gossip_err("writev: could not vmalloc for trailer!\n");
-                       dev_req_release(buffer);
-                       put_op(op);
-                       return -ENOMEM;
-               }
+               spin_lock(&op->lock);
+               set_op_state_serviced(op);
+               spin_unlock(&op->lock);
 
-no_trailer:
-
-               /* if this operation is an I/O operation we need to wait
-                * for all data to be copied before we can return to avoid
-                * buffer corruption and races that can pull the buffers
-                * out from under us.
-                *
-                * Essentially we're synchronizing with other parts of the
-                * vfs implicitly by not allowing the user space
-                * application reading/writing this device to return until
-                * the buffers are done being used.
-                */
-               if (op->upcall.type == ORANGEFS_VFS_OP_FILE_IO) {
-                       int timed_out = 0;
-                       DEFINE_WAIT(wait_entry);
+               wake_up_interruptible(&op->waitq);
 
-                       /*
-                        * tell the vfs op waiting on a waitqueue
-                        * that this op is done
-                        */
+               while (1) {
                        spin_lock(&op->lock);
-                       set_op_state_serviced(op);
-                       spin_unlock(&op->lock);
-
-                       wake_up_interruptible(&op->waitq);
-
-                       while (1) {
-                               spin_lock(&op->lock);
-                               prepare_to_wait_exclusive(
-                                       &op->io_completion_waitq,
-                                       &wait_entry,
-                                       TASK_INTERRUPTIBLE);
-                               if (op->io_completed) {
-                                       spin_unlock(&op->lock);
-                                       break;
-                               }
+                       prepare_to_wait_exclusive(
+                               &op->io_completion_waitq,
+                               &wait_entry,
+                               TASK_INTERRUPTIBLE);
+                       if (op->io_completed) {
                                spin_unlock(&op->lock);
-
-                               if (!signal_pending(current)) {
-                                       int timeout =
-                                           MSECS_TO_JIFFIES(1000 *
-                                                            op_timeout_secs);
-                                       if (!schedule_timeout(timeout)) {
-                                               gossip_debug(GOSSIP_DEV_DEBUG,
-                                                       "%s: timed out.\n",
-                                                       __func__);
-                                               timed_out = 1;
-                                               break;
-                                       }
-                                       continue;
-                               }
-
-                               gossip_debug(GOSSIP_DEV_DEBUG,
-                                       "%s: signal on I/O wait, aborting\n",
-                                       __func__);
                                break;
                        }
-
-                       spin_lock(&op->lock);
-                       finish_wait(&op->io_completion_waitq, &wait_entry);
                        spin_unlock(&op->lock);
 
-                       /* NOTE: for I/O operations we handle releasing the op
-                        * object except in the case of timeout.  the reason we
-                        * can't free the op in timeout cases is that the op
-                        * service logic in the vfs retries operations using
-                        * the same op ptr, thus it can't be freed.
-                        */
-                       if (!timed_out)
-                               op_release(op);
-               } else {
+                       if (!signal_pending(current)) {
+                               int timeout =
+                                   MSECS_TO_JIFFIES(1000 *
+                                                    op_timeout_secs);
+                               if (!schedule_timeout(timeout)) {
+                                       gossip_debug(GOSSIP_DEV_DEBUG,
+                                               "%s: timed out.\n",
+                                               __func__);
+                                       timed_out = 1;
+                                       break;
+                               }
+                               continue;
+                       }
 
-                       /*
-                        * tell the vfs op waiting on a waitqueue that
-                        * this op is done
-                        */
-                       spin_lock(&op->lock);
-                       set_op_state_serviced(op);
-                       spin_unlock(&op->lock);
-                       /*
-                        * for every other operation (i.e. non-I/O), we need to
-                        * wake up the callers for downcall completion
-                        * notification
-                        */
-                       wake_up_interruptible(&op->waitq);
+                       gossip_debug(GOSSIP_DEV_DEBUG,
+                               "%s: signal on I/O wait, aborting\n",
+                               __func__);
+                       break;
                }
+
+               spin_lock(&op->lock);
+               finish_wait(&op->io_completion_waitq, &wait_entry);
+               spin_unlock(&op->lock);
+
+               /* NOTE: for I/O operations we handle releasing the op
+                * object except in the case of timeout.  the reason we
+                * can't free the op in timeout cases is that the op
+                * service logic in the vfs retries operations using
+                * the same op ptr, thus it can't be freed.
+                */
+               if (!timed_out)
+                       op_release(op);
        } else {
-               /* ignore downcalls that we're not interested in */
-               gossip_debug(GOSSIP_DEV_DEBUG,
-                            "WARNING: No one's waiting for tag %llu\n",
-                            llu(tag));
+               /*
+                * tell the vfs op waiting on a waitqueue that
+                * this op is done
+                */
+               spin_lock(&op->lock);
+               set_op_state_serviced(op);
+               spin_unlock(&op->lock);
+               /*
+                * for every other operation (i.e. non-I/O), we need to
+                * wake up the callers for downcall completion
+                * notification
+                */
+               wake_up_interruptible(&op->waitq);
        }
-       /* put_op? */
-       dev_req_release(buffer);
-
-       return total_returned_size;
-}
-
-static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
-                                     struct iov_iter *iter)
-{
-       return orangefs_devreq_writev(iocb->ki_filp,
-                                  iter->iov,
-                                  iter->nr_segs,
-                                  &iocb->ki_pos);
+out:
+       return ret;
 }
 
 /* Returns whether any FS are still pending remounted */