X-Git-Url: http://git.cascardo.eti.br/?a=blobdiff_plain;f=lib%2Fvconn-stream.c;h=2858462bed7ecf49dbabe81d9810213cae89a596;hb=HEAD;hp=b38c568686362b90d18bcf343b299d0c039be0dc;hpb=a5e54d9b6f8002f34cc792df69e6eda68cf95223;p=cascardo%2Fovs.git diff --git a/lib/vconn-stream.c b/lib/vconn-stream.c index b38c56868..2858462be 100644 --- a/lib/vconn-stream.c +++ b/lib/vconn-stream.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009 Nicira Networks. + * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,316 +15,383 @@ */ #include -#include "vconn-stream.h" -#include #include #include #include #include #include #include -#include "leak-checker.h" +#include "fatal-signal.h" #include "ofpbuf.h" #include "openflow/openflow.h" #include "poll-loop.h" #include "socket-util.h" +#include "stream.h" #include "util.h" #include "vconn-provider.h" -#include "vconn.h" +#include "openvswitch/vconn.h" +#include "openvswitch/vlog.h" -#include "vlog.h" -#define THIS_MODULE VLM_vconn_stream +VLOG_DEFINE_THIS_MODULE(vconn_stream); /* Active stream socket vconn. */ -struct stream_vconn +struct vconn_stream { struct vconn vconn; - int fd; + struct stream *stream; struct ofpbuf *rxbuf; struct ofpbuf *txbuf; - struct poll_waiter *tx_waiter; + int n_packets; }; -static struct vconn_class stream_vconn_class; +static const struct vconn_class stream_vconn_class; static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); -static void stream_clear_txbuf(struct stream_vconn *); +static void vconn_stream_clear_txbuf(struct vconn_stream *); -int -new_stream_vconn(const char *name, int fd, int connect_status, - bool reconnectable, struct vconn **vconnp) +static struct vconn * +vconn_stream_new(struct stream *stream, int connect_status, + uint32_t allowed_versions) { - struct stream_vconn *s; + struct vconn_stream *s; s = xmalloc(sizeof *s); vconn_init(&s->vconn, &stream_vconn_class, connect_status, - name, reconnectable); - s->fd = fd; + stream_get_name(stream), allowed_versions); + s->stream = stream; s->txbuf = NULL; - s->tx_waiter = NULL; s->rxbuf = NULL; - *vconnp = &s->vconn; - return 0; + s->n_packets = 0; + return &s->vconn; +} + +/* Creates a new vconn that will send and receive data on a stream named 'name' + * and stores a pointer to the vconn in '*vconnp'. + * + * Returns 0 if successful, otherwise a positive errno value. */ +static int +vconn_stream_open(const char *name, uint32_t allowed_versions, + char *suffix OVS_UNUSED, struct vconn **vconnp, uint8_t dscp) +{ + struct stream *stream; + int error; + + error = stream_open_with_default_port(name, OFP_PORT, &stream, dscp); + if (!error) { + error = stream_connect(stream); + if (!error || error == EAGAIN) { + *vconnp = vconn_stream_new(stream, error, allowed_versions); + return 0; + } + } + + stream_close(stream); + return error; } -static struct stream_vconn * -stream_vconn_cast(struct vconn *vconn) +static struct vconn_stream * +vconn_stream_cast(struct vconn *vconn) { - vconn_assert_class(vconn, &stream_vconn_class); - return CONTAINER_OF(vconn, struct stream_vconn, vconn); + return CONTAINER_OF(vconn, struct vconn_stream, vconn); } static void -stream_close(struct vconn *vconn) +vconn_stream_close(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); - poll_cancel(s->tx_waiter); - stream_clear_txbuf(s); + struct vconn_stream *s = vconn_stream_cast(vconn); + + if ((vconn->error == EPROTO || s->n_packets < 1) && s->rxbuf) { + stream_report_content(s->rxbuf->data, s->rxbuf->size, STREAM_OPENFLOW, + &this_module, vconn_get_name(vconn)); + } + + stream_close(s->stream); + vconn_stream_clear_txbuf(s); ofpbuf_delete(s->rxbuf); - close(s->fd); free(s); } static int -stream_connect(struct vconn *vconn) +vconn_stream_connect(struct vconn *vconn) { - struct stream_vconn *s = stream_vconn_cast(vconn); - return check_connection_completion(s->fd); + struct vconn_stream *s = vconn_stream_cast(vconn); + return stream_connect(s->stream); } static int -stream_recv(struct vconn *vconn, struct ofpbuf **bufferp) +vconn_stream_recv__(struct vconn_stream *s, int rx_len) { - struct stream_vconn *s = stream_vconn_cast(vconn); - struct ofpbuf *rx; - size_t want_bytes; - ssize_t retval; - - if (s->rxbuf == NULL) { - s->rxbuf = ofpbuf_new(1564); - } - rx = s->rxbuf; + struct ofpbuf *rx = s->rxbuf; + int want_bytes, retval; -again: - if (sizeof(struct ofp_header) > rx->size) { - want_bytes = sizeof(struct ofp_header) - rx->size; - } else { - struct ofp_header *oh = rx->data; - size_t length = ntohs(oh->length); - if (length < sizeof(struct ofp_header)) { - VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)", - length); - return EPROTO; - } - want_bytes = length - rx->size; - if (!want_bytes) { - *bufferp = rx; - s->rxbuf = NULL; - return 0; - } - } + want_bytes = rx_len - rx->size; ofpbuf_prealloc_tailroom(rx, want_bytes); - - retval = read(s->fd, ofpbuf_tail(rx), want_bytes); + retval = stream_recv(s->stream, ofpbuf_tail(rx), want_bytes); if (retval > 0) { rx->size += retval; - if (retval == want_bytes) { - if (rx->size > sizeof(struct ofp_header)) { - *bufferp = rx; - s->rxbuf = NULL; - return 0; - } else { - goto again; - } - } - return EAGAIN; + return retval == want_bytes ? 0 : EAGAIN; } else if (retval == 0) { if (rx->size) { VLOG_ERR_RL(&rl, "connection dropped mid-packet"); return EPROTO; - } else { - return EOF; } + return EOF; } else { - return errno; + return -retval; } } -static void -stream_clear_txbuf(struct stream_vconn *s) +static int +vconn_stream_recv(struct vconn *vconn, struct ofpbuf **bufferp) { - ofpbuf_delete(s->txbuf); - s->txbuf = NULL; - s->tx_waiter = NULL; -} + struct vconn_stream *s = vconn_stream_cast(vconn); + const struct ofp_header *oh; + int rx_len; -static void -stream_do_tx(int fd UNUSED, short int revents UNUSED, void *vconn_) -{ - struct vconn *vconn = vconn_; - struct stream_vconn *s = stream_vconn_cast(vconn); - ssize_t n = write(s->fd, s->txbuf->data, s->txbuf->size); - if (n < 0) { - if (errno != EAGAIN) { - VLOG_ERR_RL(&rl, "send: %s", strerror(errno)); - stream_clear_txbuf(s); - return; + /* Allocate new receive buffer if we don't have one. */ + if (s->rxbuf == NULL) { + s->rxbuf = ofpbuf_new(1564); + } + + /* Read ofp_header. */ + if (s->rxbuf->size < sizeof(struct ofp_header)) { + int retval = vconn_stream_recv__(s, sizeof(struct ofp_header)); + if (retval) { + return retval; } - } else if (n > 0) { - ofpbuf_pull(s->txbuf, n); - if (!s->txbuf->size) { - stream_clear_txbuf(s); - return; + } + + /* Read payload. */ + oh = s->rxbuf->data; + rx_len = ntohs(oh->length); + if (rx_len < sizeof(struct ofp_header)) { + VLOG_ERR_RL(&rl, "received too-short ofp_header (%d bytes)", rx_len); + return EPROTO; + } else if (s->rxbuf->size < rx_len) { + int retval = vconn_stream_recv__(s, rx_len); + if (retval) { + return retval; } } - s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); + + s->n_packets++; + *bufferp = s->rxbuf; + s->rxbuf = NULL; + return 0; +} + +static void +vconn_stream_clear_txbuf(struct vconn_stream *s) +{ + ofpbuf_delete(s->txbuf); + s->txbuf = NULL; } static int -stream_send(struct vconn *vconn, struct ofpbuf *buffer) +vconn_stream_send(struct vconn *vconn, struct ofpbuf *buffer) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); ssize_t retval; if (s->txbuf) { return EAGAIN; } - retval = write(s->fd, buffer->data, buffer->size); + retval = stream_send(s->stream, buffer->data, buffer->size); if (retval == buffer->size) { ofpbuf_delete(buffer); return 0; - } else if (retval >= 0 || errno == EAGAIN) { - leak_checker_claim(buffer); + } else if (retval >= 0 || retval == -EAGAIN) { s->txbuf = buffer; if (retval > 0) { ofpbuf_pull(buffer, retval); } - s->tx_waiter = poll_fd_callback(s->fd, POLLOUT, stream_do_tx, vconn); return 0; } else { - return errno; + return -retval; + } +} + +static void +vconn_stream_run(struct vconn *vconn) +{ + struct vconn_stream *s = vconn_stream_cast(vconn); + ssize_t retval; + + stream_run(s->stream); + if (!s->txbuf) { + return; + } + + retval = stream_send(s->stream, s->txbuf->data, s->txbuf->size); + if (retval < 0) { + if (retval != -EAGAIN) { + VLOG_ERR_RL(&rl, "send: %s", ovs_strerror(-retval)); + vconn_stream_clear_txbuf(s); + return; + } + } else if (retval > 0) { + ofpbuf_pull(s->txbuf, retval); + if (!s->txbuf->size) { + vconn_stream_clear_txbuf(s); + return; + } + } +} + +static void +vconn_stream_run_wait(struct vconn *vconn) +{ + struct vconn_stream *s = vconn_stream_cast(vconn); + + stream_run_wait(s->stream); + if (s->txbuf) { + stream_send_wait(s->stream); } } static void -stream_wait(struct vconn *vconn, enum vconn_wait_type wait) +vconn_stream_wait(struct vconn *vconn, enum vconn_wait_type wait) { - struct stream_vconn *s = stream_vconn_cast(vconn); + struct vconn_stream *s = vconn_stream_cast(vconn); switch (wait) { case WAIT_CONNECT: - poll_fd_wait(s->fd, POLLOUT); + stream_connect_wait(s->stream); break; case WAIT_SEND: if (!s->txbuf) { - poll_fd_wait(s->fd, POLLOUT); + stream_send_wait(s->stream); } else { - /* Nothing to do: need to drain txbuf first. */ + /* Nothing to do: need to drain txbuf first. + * vconn_stream_run_wait() will arrange to wake up when there room + * to send data, so there's no point in calling poll_fd_wait() + * redundantly here. */ } break; case WAIT_RECV: - poll_fd_wait(s->fd, POLLIN); + stream_recv_wait(s->stream); break; default: - NOT_REACHED(); + OVS_NOT_REACHED(); } } - -static struct vconn_class stream_vconn_class = { - "stream", /* name */ - NULL, /* open */ - stream_close, /* close */ - stream_connect, /* connect */ - stream_recv, /* recv */ - stream_send, /* send */ - stream_wait, /* wait */ -}; /* Passive stream socket vconn. */ -struct pstream_pvconn +struct pvconn_pstream { struct pvconn pvconn; - int fd; - int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len, - struct vconn **); + struct pstream *pstream; }; -static struct pvconn_class pstream_pvconn_class; +static const struct pvconn_class pstream_pvconn_class; -static struct pstream_pvconn * -pstream_pvconn_cast(struct pvconn *pvconn) +static struct pvconn_pstream * +pvconn_pstream_cast(struct pvconn *pvconn) { - pvconn_assert_class(pvconn, &pstream_pvconn_class); - return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn); + return CONTAINER_OF(pvconn, struct pvconn_pstream, pvconn); } -int -new_pstream_pvconn(const char *name, int fd, - int (*accept_cb)(int fd, const struct sockaddr *, - size_t sa_len, struct vconn **), - struct pvconn **pvconnp) +/* Creates a new pvconn named 'name' that will accept new connections using + * pstream_accept() and stores a pointer to the pvconn in '*pvconnp'. + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +static int +pvconn_pstream_listen(const char *name, uint32_t allowed_versions, + char *suffix OVS_UNUSED, struct pvconn **pvconnp, + uint8_t dscp) { - struct pstream_pvconn *ps = xmalloc(sizeof *ps); - pvconn_init(&ps->pvconn, &pstream_pvconn_class, name); - ps->fd = fd; - ps->accept_cb = accept_cb; + struct pvconn_pstream *ps; + struct pstream *pstream; + int error; + + error = pstream_open_with_default_port(name, OFP_PORT, &pstream, dscp); + if (error) { + return error; + } + + ps = xmalloc(sizeof *ps); + pvconn_init(&ps->pvconn, &pstream_pvconn_class, name, allowed_versions); + ps->pstream = pstream; *pvconnp = &ps->pvconn; return 0; } static void -pstream_close(struct pvconn *pvconn) +pvconn_pstream_close(struct pvconn *pvconn) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - close(ps->fd); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + pstream_close(ps->pstream); free(ps); } static int -pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp) +pvconn_pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - struct sockaddr_storage ss; - socklen_t ss_len = sizeof ss; - int new_fd; - int retval; - - new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); - if (new_fd < 0) { - int retval = errno; - if (retval != EAGAIN) { - VLOG_DBG_RL(&rl, "accept: %s", strerror(retval)); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + struct stream *stream; + int error; + + error = pstream_accept(ps->pstream, &stream); + if (error) { + if (error != EAGAIN) { + VLOG_DBG_RL(&rl, "%s: accept: %s", + pstream_get_name(ps->pstream), ovs_strerror(error)); } - return retval; + return error; } - retval = set_nonblocking(new_fd); - if (retval) { - close(new_fd); - return retval; - } - - return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len, - new_vconnp); + *new_vconnp = vconn_stream_new(stream, 0, pvconn->allowed_versions); + return 0; } static void -pstream_wait(struct pvconn *pvconn) +pvconn_pstream_wait(struct pvconn *pvconn) { - struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn); - poll_fd_wait(ps->fd, POLLIN); + struct pvconn_pstream *ps = pvconn_pstream_cast(pvconn); + pstream_wait(ps->pstream); } + +/* Stream-based vconns and pvconns. */ + +#define STREAM_INIT(NAME) \ + { \ + NAME, \ + vconn_stream_open, \ + vconn_stream_close, \ + vconn_stream_connect, \ + vconn_stream_recv, \ + vconn_stream_send, \ + vconn_stream_run, \ + vconn_stream_run_wait, \ + vconn_stream_wait, \ + } -static struct pvconn_class pstream_pvconn_class = { - "pstream", - NULL, - pstream_close, - pstream_accept, - pstream_wait -}; +#define PSTREAM_INIT(NAME) \ + { \ + NAME, \ + pvconn_pstream_listen, \ + pvconn_pstream_close, \ + pvconn_pstream_accept, \ + pvconn_pstream_wait \ + } + +static const struct vconn_class stream_vconn_class = STREAM_INIT("stream"); +static const struct pvconn_class pstream_pvconn_class = PSTREAM_INIT("pstream"); + +const struct vconn_class tcp_vconn_class = STREAM_INIT("tcp"); +const struct pvconn_class ptcp_pvconn_class = PSTREAM_INIT("ptcp"); + +const struct vconn_class unix_vconn_class = STREAM_INIT("unix"); +const struct pvconn_class punix_pvconn_class = PSTREAM_INIT("punix"); + +#ifdef HAVE_OPENSSL +const struct vconn_class ssl_vconn_class = STREAM_INIT("ssl"); +const struct pvconn_class pssl_pvconn_class = PSTREAM_INIT("pssl"); +#endif