vconn: Convert vconn code to modern OVS structure.
[cascardo/ovs.git] / lib / vconn-stream.c
1 /*
2  * Copyright (c) 2008, 2009 Nicira Networks.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "vconn-stream.h"
19 #include <assert.h>
20 #include <errno.h>
21 #include <poll.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/types.h>
25 #include <unistd.h>
26 #include "fatal-signal.h"
27 #include "leak-checker.h"
28 #include "ofpbuf.h"
29 #include "openflow/openflow.h"
30 #include "poll-loop.h"
31 #include "socket-util.h"
32 #include "util.h"
33 #include "vconn-provider.h"
34 #include "vconn.h"
35
36 #include "vlog.h"
37 #define THIS_MODULE VLM_vconn_stream
38
39 /* Active stream socket vconn. */
40
41 struct stream_vconn
42 {
43     struct vconn vconn;
44     int fd;
45     struct ofpbuf *rxbuf;
46     struct ofpbuf *txbuf;
47     char *unlink_path;
48 };
49
50 static struct vconn_class stream_vconn_class;
51
52 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
53
54 static void stream_clear_txbuf(struct stream_vconn *);
55 static void maybe_unlink_and_free(char *path);
56
57 /* Creates a new vconn named 'name' that will send and receive data on 'fd' and
58  * stores a pointer to the vconn in '*vconnp'.  Initial connection status
59  * 'connect_status' is interpreted as described for vconn_init().
60  *
61  * When '*vconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
62  * fatal_signal_unlink_file_now() and then freed with free().
63  *
64  * Returns 0 if successful, otherwise a positive errno value.  (The current
65  * implementation never fails.) */
66 int
67 new_stream_vconn(const char *name, int fd, int connect_status,
68                  char *unlink_path, struct vconn **vconnp)
69 {
70     struct stream_vconn *s;
71
72     s = xmalloc(sizeof *s);
73     vconn_init(&s->vconn, &stream_vconn_class, connect_status, name);
74     s->fd = fd;
75     s->txbuf = NULL;
76     s->rxbuf = NULL;
77     s->unlink_path = unlink_path;
78     *vconnp = &s->vconn;
79     return 0;
80 }
81
82 static struct stream_vconn *
83 stream_vconn_cast(struct vconn *vconn)
84 {
85     vconn_assert_class(vconn, &stream_vconn_class);
86     return CONTAINER_OF(vconn, struct stream_vconn, vconn);
87 }
88
89 static void
90 stream_close(struct vconn *vconn)
91 {
92     struct stream_vconn *s = stream_vconn_cast(vconn);
93     stream_clear_txbuf(s);
94     ofpbuf_delete(s->rxbuf);
95     close(s->fd);
96     maybe_unlink_and_free(s->unlink_path);
97     free(s);
98 }
99
100 static int
101 stream_connect(struct vconn *vconn)
102 {
103     struct stream_vconn *s = stream_vconn_cast(vconn);
104     return check_connection_completion(s->fd);
105 }
106
107 static int
108 stream_recv(struct vconn *vconn, struct ofpbuf **bufferp)
109 {
110     struct stream_vconn *s = stream_vconn_cast(vconn);
111     struct ofpbuf *rx;
112     size_t want_bytes;
113     ssize_t retval;
114
115     if (s->rxbuf == NULL) {
116         s->rxbuf = ofpbuf_new(1564);
117     }
118     rx = s->rxbuf;
119
120 again:
121     if (sizeof(struct ofp_header) > rx->size) {
122         want_bytes = sizeof(struct ofp_header) - rx->size;
123     } else {
124         struct ofp_header *oh = rx->data;
125         size_t length = ntohs(oh->length);
126         if (length < sizeof(struct ofp_header)) {
127             VLOG_ERR_RL(&rl, "received too-short ofp_header (%zu bytes)",
128                         length);
129             return EPROTO;
130         }
131         want_bytes = length - rx->size;
132         if (!want_bytes) {
133             *bufferp = rx;
134             s->rxbuf = NULL;
135             return 0;
136         }
137     }
138     ofpbuf_prealloc_tailroom(rx, want_bytes);
139
140     retval = read(s->fd, ofpbuf_tail(rx), want_bytes);
141     if (retval > 0) {
142         rx->size += retval;
143         if (retval == want_bytes) {
144             if (rx->size > sizeof(struct ofp_header)) {
145                 *bufferp = rx;
146                 s->rxbuf = NULL;
147                 return 0;
148             } else {
149                 goto again;
150             }
151         }
152         return EAGAIN;
153     } else if (retval == 0) {
154         if (rx->size) {
155             VLOG_ERR_RL(&rl, "connection dropped mid-packet");
156             return EPROTO;
157         } else {
158             return EOF;
159         }
160     } else {
161         return errno;
162     }
163 }
164
165 static void
166 stream_clear_txbuf(struct stream_vconn *s)
167 {
168     ofpbuf_delete(s->txbuf);
169     s->txbuf = NULL;
170 }
171
172 static int
173 stream_send(struct vconn *vconn, struct ofpbuf *buffer)
174 {
175     struct stream_vconn *s = stream_vconn_cast(vconn);
176     ssize_t retval;
177
178     if (s->txbuf) {
179         return EAGAIN;
180     }
181
182     retval = write(s->fd, buffer->data, buffer->size);
183     if (retval == buffer->size) {
184         ofpbuf_delete(buffer);
185         return 0;
186     } else if (retval >= 0 || errno == EAGAIN) {
187         leak_checker_claim(buffer);
188         s->txbuf = buffer;
189         if (retval > 0) {
190             ofpbuf_pull(buffer, retval);
191         }
192         return 0;
193     } else {
194         return errno;
195     }
196 }
197
198 static void
199 stream_run(struct vconn *vconn)
200 {
201     struct stream_vconn *s = stream_vconn_cast(vconn);
202     ssize_t n;
203
204     if (!s->txbuf) {
205         return;
206     }
207
208     n = write(s->fd, s->txbuf->data, s->txbuf->size);
209     if (n < 0) {
210         if (errno != EAGAIN) {
211             VLOG_ERR_RL(&rl, "send: %s", strerror(errno));
212             stream_clear_txbuf(s);
213             return;
214         }
215     } else if (n > 0) {
216         ofpbuf_pull(s->txbuf, n);
217         if (!s->txbuf->size) {
218             stream_clear_txbuf(s);
219             return;
220         }
221     }
222 }
223
224 static void
225 stream_run_wait(struct vconn *vconn)
226 {
227     struct stream_vconn *s = stream_vconn_cast(vconn);
228
229     if (s->txbuf) {
230         poll_fd_wait(s->fd, POLLOUT);
231     }
232 }
233
234 static void
235 stream_wait(struct vconn *vconn, enum vconn_wait_type wait)
236 {
237     struct stream_vconn *s = stream_vconn_cast(vconn);
238     switch (wait) {
239     case WAIT_CONNECT:
240         poll_fd_wait(s->fd, POLLOUT);
241         break;
242
243     case WAIT_SEND:
244         if (!s->txbuf) {
245             poll_fd_wait(s->fd, POLLOUT);
246         } else {
247             /* Nothing to do: need to drain txbuf first.  stream_run_wait()
248              * will arrange to wake up when there room to send data, so there's
249              * no point in calling poll_fd_wait() redundantly here. */
250         }
251         break;
252
253     case WAIT_RECV:
254         poll_fd_wait(s->fd, POLLIN);
255         break;
256
257     default:
258         NOT_REACHED();
259     }
260 }
261
262 static struct vconn_class stream_vconn_class = {
263     "stream",                   /* name */
264     NULL,                       /* open */
265     stream_close,               /* close */
266     stream_connect,             /* connect */
267     stream_recv,                /* recv */
268     stream_send,                /* send */
269     stream_run,                 /* run */
270     stream_run_wait,            /* run_wait */
271     stream_wait,                /* wait */
272 };
273 \f
274 /* Passive stream socket vconn. */
275
276 struct pstream_pvconn
277 {
278     struct pvconn pvconn;
279     int fd;
280     int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len,
281                      struct vconn **);
282     char *unlink_path;
283 };
284
285 static struct pvconn_class pstream_pvconn_class;
286
287 static struct pstream_pvconn *
288 pstream_pvconn_cast(struct pvconn *pvconn)
289 {
290     pvconn_assert_class(pvconn, &pstream_pvconn_class);
291     return CONTAINER_OF(pvconn, struct pstream_pvconn, pvconn);
292 }
293
294 /* Creates a new pvconn named 'name' that will accept new socket connections on
295  * 'fd' and stores a pointer to the vconn in '*pvconnp'.
296  *
297  * When a connection has been accepted, 'accept_cb' will be called with the new
298  * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'.
299  * accept_cb must return 0 if the connection is successful, in which case it
300  * must initialize '*vconnp' to the new vconn, or a positive errno value on
301  * error.  In either case accept_cb takes ownership of the 'fd' passed in.
302  *
303  * When '*pvconnp' is closed, then 'unlink_path' (if nonnull) will be passed to
304  * fatal_signal_unlink_file_now() and freed with free().
305  *
306  * Returns 0 if successful, otherwise a positive errno value.  (The current
307  * implementation never fails.) */
308 int
309 new_pstream_pvconn(const char *name, int fd,
310                   int (*accept_cb)(int fd, const struct sockaddr *sa,
311                                    size_t sa_len, struct vconn **vconnp),
312                   char *unlink_path, struct pvconn **pvconnp)
313 {
314     struct pstream_pvconn *ps = xmalloc(sizeof *ps);
315     pvconn_init(&ps->pvconn, &pstream_pvconn_class, name);
316     ps->fd = fd;
317     ps->accept_cb = accept_cb;
318     ps->unlink_path = unlink_path;
319     *pvconnp = &ps->pvconn;
320     return 0;
321 }
322
323 static void
324 pstream_close(struct pvconn *pvconn)
325 {
326     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
327     close(ps->fd);
328     maybe_unlink_and_free(ps->unlink_path);
329     free(ps);
330 }
331
332 static int
333 pstream_accept(struct pvconn *pvconn, struct vconn **new_vconnp)
334 {
335     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
336     struct sockaddr_storage ss;
337     socklen_t ss_len = sizeof ss;
338     int new_fd;
339     int retval;
340
341     new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len);
342     if (new_fd < 0) {
343         int retval = errno;
344         if (retval != EAGAIN) {
345             VLOG_DBG_RL(&rl, "accept: %s", strerror(retval));
346         }
347         return retval;
348     }
349
350     retval = set_nonblocking(new_fd);
351     if (retval) {
352         close(new_fd);
353         return retval;
354     }
355
356     return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len,
357                          new_vconnp);
358 }
359
360 static void
361 pstream_wait(struct pvconn *pvconn)
362 {
363     struct pstream_pvconn *ps = pstream_pvconn_cast(pvconn);
364     poll_fd_wait(ps->fd, POLLIN);
365 }
366
367 static struct pvconn_class pstream_pvconn_class = {
368     "pstream",
369     NULL,
370     pstream_close,
371     pstream_accept,
372     pstream_wait
373 };
374 \f
375 /* Helper functions. */
376 static void
377 maybe_unlink_and_free(char *path)
378 {
379     if (path) {
380         fatal_signal_unlink_file_now(path);
381         free(path);
382     }
383 }