lib: Move compiler.h to <openvswitch/compiler.h>
[cascardo/ovs.git] / tests / test-sflow.c
1 /*
2  * Copyright (c) 2011, 2012, 2013, 2014 Nicira, Inc.
3  * Copyright (c) 2013 InMon Corp.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <config.h>
19 #undef NDEBUG
20 #include "netflow.h"
21 #include <arpa/inet.h>
22 #include <errno.h>
23 #include <getopt.h>
24 #include <signal.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <setjmp.h>
28 #include "command-line.h"
29 #include "daemon.h"
30 #include "dynamic-string.h"
31 #include "ofpbuf.h"
32 #include "ovstest.h"
33 #include "packets.h"
34 #include "poll-loop.h"
35 #include "socket-util.h"
36 #include "unixctl.h"
37 #include "util.h"
38 #include "vlog.h"
39
40 OVS_NO_RETURN static void usage(void);
41 static void parse_options(int argc, char *argv[]);
42
43 static unixctl_cb_func test_sflow_exit;
44
45 /* Datagram. */
46 #define SFLOW_VERSION_5 5
47 #define SFLOW_MIN_LEN 36
48
49 /* Sample tag numbers. */
50 #define SFLOW_FLOW_SAMPLE 1
51 #define SFLOW_COUNTERS_SAMPLE 2
52 #define SFLOW_FLOW_SAMPLE_EXPANDED 3
53 #define SFLOW_COUNTERS_SAMPLE_EXPANDED 4
54
55 /* Structure element tag numbers. */
56 #define SFLOW_TAG_CTR_IFCOUNTERS 1
57 #define SFLOW_TAG_CTR_LACPCOUNTERS 7
58 #define SFLOW_TAG_CTR_OPENFLOWPORT 1004
59 #define SFLOW_TAG_CTR_PORTNAME 1005
60 #define SFLOW_TAG_PKT_HEADER 1
61 #define SFLOW_TAG_PKT_SWITCH 1001
62 #define SFLOW_TAG_PKT_TUNNEL4_OUT 1023
63 #define SFLOW_TAG_PKT_TUNNEL4_IN 1024
64 #define SFLOW_TAG_PKT_TUNNEL_VNI_OUT 1029
65 #define SFLOW_TAG_PKT_TUNNEL_VNI_IN 1030
66
67 /* string sizes */
68 #define SFL_MAX_PORTNAME_LEN 255
69
70 struct sflow_addr {
71     enum {
72         SFLOW_ADDRTYPE_undefined = 0,
73         SFLOW_ADDRTYPE_IP4,
74         SFLOW_ADDRTYPE_IP6
75     } type;
76
77     union {
78         ovs_be32 ip4;
79         ovs_be32 ip6[4];
80     } a;
81 };
82
83 struct sflow_xdr {
84     /* Exceptions. */
85     jmp_buf env;
86     int errline;
87
88     /* Cursor. */
89     ovs_be32 *datap;
90     uint32_t i;
91     uint32_t quads;
92
93     /* Agent. */
94     struct sflow_addr agentAddr;
95     char agentIPStr[INET6_ADDRSTRLEN + 2];
96     uint32_t subAgentId;
97     uint32_t uptime_mS;
98
99     /* Datasource. */
100     uint32_t dsClass;
101     uint32_t dsIndex;
102
103     /* Sequence numbers. */
104     uint32_t dgramSeqNo;
105     uint32_t fsSeqNo;
106     uint32_t csSeqNo;
107
108     /* Structure offsets. */
109     struct {
110         uint32_t HEADER;
111         uint32_t SWITCH;
112         uint32_t TUNNEL4_OUT;
113         uint32_t TUNNEL4_IN;
114         uint32_t TUNNEL_VNI_OUT;
115         uint32_t TUNNEL_VNI_IN;
116         uint32_t IFCOUNTERS;
117         uint32_t LACPCOUNTERS;
118         uint32_t OPENFLOWPORT;
119         uint32_t PORTNAME;
120     } offset;
121
122     /* Flow sample fields. */
123     uint32_t meanSkipCount;
124     uint32_t samplePool;
125     uint32_t dropEvents;
126     uint32_t inputPortFormat;
127     uint32_t inputPort;
128     uint32_t outputPortFormat;
129     uint32_t outputPort;
130 };
131
132 #define SFLOWXDR_try(x) ((x->errline = setjmp(x->env)) == 0)
133 #define SFLOWXDR_throw(x) longjmp(x->env, __LINE__)
134 #define SFLOWXDR_assert(x, t) if (!(t)) SFLOWXDR_throw(x)
135
136 static void
137 sflowxdr_init(struct sflow_xdr *x, void *buf, size_t len)
138 {
139     x->datap = buf;
140     x->quads = len >> 2;
141 }
142
143 static uint32_t
144 sflowxdr_next(struct sflow_xdr *x)
145 {
146     return ntohl(x->datap[x->i++]);
147 }
148
149 static ovs_be32
150 sflowxdr_next_n(struct sflow_xdr *x)
151 {
152     return x->datap[x->i++];
153 }
154
155 static bool
156 sflowxdr_more(const struct sflow_xdr *x, uint32_t q)
157 {
158     return q + x->i <= x->quads;
159 }
160
161 static void
162 sflowxdr_skip(struct sflow_xdr *x, uint32_t q)
163 {
164     x->i += q;
165 }
166
167 static uint32_t
168 sflowxdr_mark(const struct sflow_xdr *x, uint32_t q)
169 {
170     return x->i + q;
171 }
172
173 static bool
174 sflowxdr_mark_ok(const struct sflow_xdr *x, uint32_t m)
175 {
176     return m == x->i;
177 }
178
179 static void
180 sflowxdr_mark_unique(struct sflow_xdr *x, uint32_t *pi)
181 {
182     if (*pi) {
183         SFLOWXDR_throw(x);
184     }
185     *pi = x->i;
186 }
187
188 static void
189 sflowxdr_setc(struct sflow_xdr *x, uint32_t j)
190 {
191     x->i = j;
192 }
193
194 static const char *
195 sflowxdr_str(const struct sflow_xdr *x)
196 {
197     return (const char *) (x->datap + x->i);
198 }
199
200 static uint64_t
201 sflowxdr_next_int64(struct sflow_xdr *x)
202 {
203     uint64_t scratch;
204     scratch = sflowxdr_next(x);
205     scratch <<= 32;
206     scratch += sflowxdr_next(x);
207     return scratch;
208 }
209
210 static void
211 process_counter_sample(struct sflow_xdr *x)
212 {
213     if (x->offset.IFCOUNTERS) {
214         sflowxdr_setc(x, x->offset.IFCOUNTERS);
215         printf("IFCOUNTERS");
216         printf(" dgramSeqNo=%"PRIu32, x->dgramSeqNo);
217         printf(" ds=%s>%"PRIu32":%"PRIu32,
218                x->agentIPStr, x->dsClass, x->dsIndex);
219         printf(" csSeqNo=%"PRIu32, x->csSeqNo);
220         printf(" ifindex=%"PRIu32, sflowxdr_next(x));
221         printf(" type=%"PRIu32, sflowxdr_next(x));
222         printf(" ifspeed=%"PRIu64, sflowxdr_next_int64(x));
223         printf(" direction=%"PRIu32, sflowxdr_next(x));
224         printf(" status=%"PRIu32, sflowxdr_next(x));
225         printf(" in_octets=%"PRIu64, sflowxdr_next_int64(x));
226         printf(" in_unicasts=%"PRIu32, sflowxdr_next(x));
227         printf(" in_multicasts=%"PRIu32, sflowxdr_next(x));
228         printf(" in_broadcasts=%"PRIu32, sflowxdr_next(x));
229         printf(" in_discards=%"PRIu32, sflowxdr_next(x));
230         printf(" in_errors=%"PRIu32, sflowxdr_next(x));
231         printf(" in_unknownprotos=%"PRIu32, sflowxdr_next(x));
232         printf(" out_octets=%"PRIu64, sflowxdr_next_int64(x));
233         printf(" out_unicasts=%"PRIu32, sflowxdr_next(x));
234         printf(" out_multicasts=%"PRIu32, sflowxdr_next(x));
235         printf(" out_broadcasts=%"PRIu32, sflowxdr_next(x));
236         printf(" out_discards=%"PRIu32, sflowxdr_next(x));
237         printf(" out_errors=%"PRIu32, sflowxdr_next(x));
238         printf(" promiscuous=%"PRIu32, sflowxdr_next(x));
239         printf("\n");
240     }
241     if (x->offset.LACPCOUNTERS) {
242         uint8_t *mac;
243         union {
244             ovs_be32 all;
245             struct {
246                 uint8_t actorAdmin;
247                 uint8_t actorOper;
248                 uint8_t partnerAdmin;
249                 uint8_t partnerOper;
250             } v;
251         } state;
252
253         sflowxdr_setc(x, x->offset.LACPCOUNTERS);
254         printf("LACPCOUNTERS");
255         mac = (uint8_t *)sflowxdr_str(x);
256         printf(" sysID="ETH_ADDR_FMT, ETH_ADDR_ARGS(mac));
257         sflowxdr_skip(x, 2);
258         mac = (uint8_t *)sflowxdr_str(x);
259         printf(" partnerID="ETH_ADDR_FMT, ETH_ADDR_ARGS(mac));
260         sflowxdr_skip(x, 2);
261         printf(" aggID=%"PRIu32, sflowxdr_next(x));
262         state.all = sflowxdr_next_n(x);
263         printf(" actorAdmin=0x%"PRIx32, state.v.actorAdmin);
264         printf(" actorOper=0x%"PRIx32, state.v.actorOper);
265         printf(" partnerAdmin=0x%"PRIx32, state.v.partnerAdmin);
266         printf(" partnerOper=0x%"PRIx32, state.v.partnerOper);
267         printf(" LACPUDsRx=%"PRIu32, sflowxdr_next(x));
268         printf(" markerPDUsRx=%"PRIu32, sflowxdr_next(x));
269         printf(" markerRespPDUsRx=%"PRIu32, sflowxdr_next(x));
270         printf(" unknownRx=%"PRIu32, sflowxdr_next(x));
271         printf(" illegalRx=%"PRIu32, sflowxdr_next(x));
272         printf(" LACPUDsTx=%"PRIu32, sflowxdr_next(x));
273         printf(" markerPDUsTx=%"PRIu32, sflowxdr_next(x));
274         printf(" markerRespPDUsTx=%"PRIu32, sflowxdr_next(x));
275         printf("\n");
276     }
277     if (x->offset.OPENFLOWPORT) {
278         sflowxdr_setc(x, x->offset.OPENFLOWPORT);
279         printf("OPENFLOWPORT");
280         printf(" datapath_id=%"PRIu64, sflowxdr_next_int64(x));
281         printf(" port_no=%"PRIu32, sflowxdr_next(x));
282         printf("\n");
283     }
284     if (x->offset.PORTNAME) {
285         uint32_t pnLen;
286         const char *pnBytes;
287         char portName[SFL_MAX_PORTNAME_LEN + 1];
288         sflowxdr_setc(x, x->offset.PORTNAME);
289         printf("PORTNAME");
290         pnLen = sflowxdr_next(x);
291         SFLOWXDR_assert(x, (pnLen <= SFL_MAX_PORTNAME_LEN));
292         pnBytes = sflowxdr_str(x);
293         memcpy(portName, pnBytes, pnLen);
294         portName[pnLen] = '\0';
295         printf(" portName=%s", portName);
296         printf("\n");
297     }
298 }
299
300 static char
301 bin_to_hex(int hexit)
302 {
303     return "0123456789ABCDEF"[hexit];
304 }
305
306 static int
307 print_hex(const char *a, int len, char *buf, int bufLen)
308 {
309     unsigned char nextByte;
310     int b = 0;
311     int i;
312
313     for (i = 0; i < len; i++) {
314         if (b > bufLen - 10) {
315             break;
316         }
317         nextByte = a[i];
318         buf[b++] = bin_to_hex(nextByte >> 4);
319         buf[b++] = bin_to_hex(nextByte & 0x0f);
320         if (i < len - 1) {
321             buf[b++] = '-';
322         }
323     }
324     buf[b] = '\0';
325     return b;
326 }
327
328 static void
329 print_struct_ipv4(struct sflow_xdr *x, const char *prefix)
330 {
331     ovs_be32 src, dst;
332
333     printf(" %s_length=%"PRIu32,    prefix, sflowxdr_next(x));
334     printf(" %s_protocol=%"PRIu32,  prefix, sflowxdr_next(x));
335
336     src = sflowxdr_next_n(x);
337     dst = sflowxdr_next_n(x);
338     printf(" %s_src="IP_FMT,        prefix, IP_ARGS(src));
339     printf(" %s_dst="IP_FMT,        prefix, IP_ARGS(dst));
340
341     printf(" %s_src_port=%"PRIu32,  prefix, sflowxdr_next(x));
342     printf(" %s_dst_port=%"PRIu32,  prefix, sflowxdr_next(x));
343     printf(" %s_tcp_flags=%"PRIu32, prefix, sflowxdr_next(x));
344     printf(" %s_tos=%"PRIu32,       prefix, sflowxdr_next(x));
345 }
346
347 #define SFLOW_HEX_SCRATCH 1024
348
349 static void
350 process_flow_sample(struct sflow_xdr *x)
351 {
352     if (x->offset.HEADER) {
353         uint32_t headerLen;
354         char scratch[SFLOW_HEX_SCRATCH];
355
356         printf("HEADER");
357         printf(" dgramSeqNo=%"PRIu32, x->dgramSeqNo);
358         printf(" ds=%s>%"PRIu32":%"PRIu32,
359                x->agentIPStr, x->dsClass, x->dsIndex);
360         printf(" fsSeqNo=%"PRIu32, x->fsSeqNo);
361
362         if (x->offset.TUNNEL4_IN) {
363             sflowxdr_setc(x, x->offset.TUNNEL4_IN);
364             print_struct_ipv4(x, "tunnel4_in");
365         }
366
367         if (x->offset.TUNNEL4_OUT) {
368             sflowxdr_setc(x, x->offset.TUNNEL4_OUT);
369             print_struct_ipv4(x, "tunnel4_out");
370         }
371
372         if (x->offset.TUNNEL_VNI_IN) {
373             sflowxdr_setc(x, x->offset.TUNNEL_VNI_IN);
374             printf( " tunnel_in_vni=%"PRIu32, sflowxdr_next(x));
375         }
376
377         if (x->offset.TUNNEL_VNI_OUT) {
378             sflowxdr_setc(x, x->offset.TUNNEL_VNI_OUT);
379             printf( " tunnel_out_vni=%"PRIu32, sflowxdr_next(x));
380         }
381
382         if (x->offset.SWITCH) {
383             sflowxdr_setc(x, x->offset.SWITCH);
384             printf(" in_vlan=%"PRIu32, sflowxdr_next(x));
385             printf(" in_priority=%"PRIu32, sflowxdr_next(x));
386             printf(" out_vlan=%"PRIu32, sflowxdr_next(x));
387             printf(" out_priority=%"PRIu32, sflowxdr_next(x));
388         }
389
390         sflowxdr_setc(x, x->offset.HEADER);
391         printf(" meanSkip=%"PRIu32, x->meanSkipCount);
392         printf(" samplePool=%"PRIu32, x->samplePool);
393         printf(" dropEvents=%"PRIu32, x->dropEvents);
394         printf(" in_ifindex=%"PRIu32, x->inputPort);
395         printf(" in_format=%"PRIu32, x->inputPortFormat);
396         printf(" out_ifindex=%"PRIu32, x->outputPort);
397         printf(" out_format=%"PRIu32, x->outputPortFormat);
398         printf(" hdr_prot=%"PRIu32, sflowxdr_next(x));
399         printf(" pkt_len=%"PRIu32, sflowxdr_next(x));
400         printf(" stripped=%"PRIu32, sflowxdr_next(x));
401         headerLen = sflowxdr_next(x);
402         printf(" hdr_len=%"PRIu32, headerLen);
403         print_hex(sflowxdr_str(x), headerLen, scratch, SFLOW_HEX_SCRATCH);
404         printf(" hdr=%s", scratch);
405         printf("\n");
406     }
407 }
408
409 static void
410 process_datagram(struct sflow_xdr *x)
411 {
412     uint32_t samples, s;
413
414     SFLOWXDR_assert(x, (sflowxdr_next(x) == SFLOW_VERSION_5));
415
416     /* Read the sFlow header. */
417     x->agentAddr.type = sflowxdr_next(x);
418     switch (x->agentAddr.type) {
419     case SFLOW_ADDRTYPE_IP4:
420         x->agentAddr.a.ip4 = sflowxdr_next_n(x);
421         break;
422
423     case SFLOW_ADDRTYPE_IP6:
424         x->agentAddr.a.ip6[0] = sflowxdr_next_n(x);
425         x->agentAddr.a.ip6[1] = sflowxdr_next_n(x);
426         x->agentAddr.a.ip6[2] = sflowxdr_next_n(x);
427         x->agentAddr.a.ip6[3] = sflowxdr_next_n(x);
428         break;
429
430     case SFLOW_ADDRTYPE_undefined:
431     default:
432         SFLOWXDR_throw(x);
433         break;
434     }
435     x->subAgentId = sflowxdr_next(x);
436     x->dgramSeqNo = sflowxdr_next(x);
437     x->uptime_mS = sflowxdr_next(x);
438
439     /* Store the agent address as a string. */
440     if (x->agentAddr.type == SFLOW_ADDRTYPE_IP6) {
441         char ipstr[INET6_ADDRSTRLEN];
442         inet_ntop(AF_INET6, (const void *) &x->agentAddr.a.ip6,
443                   ipstr, INET6_ADDRSTRLEN);
444         snprintf(x->agentIPStr, sizeof x->agentIPStr, "[%s]", ipstr);
445     } else {
446         snprintf(x->agentIPStr, sizeof x->agentIPStr,
447                  IP_FMT, IP_ARGS(x->agentAddr.a.ip4));
448     }
449
450     /* Array of flow/counter samples. */
451     samples = sflowxdr_next(x);
452     for (s = 0; s < samples; s++) {
453         uint32_t sType = sflowxdr_next(x);
454         uint32_t sQuads = sflowxdr_next(x) >> 2;
455         uint32_t sMark = sflowxdr_mark(x, sQuads);
456         SFLOWXDR_assert(x, sflowxdr_more(x, sQuads));
457
458         switch (sType) {
459         case SFLOW_COUNTERS_SAMPLE_EXPANDED:
460         case SFLOW_COUNTERS_SAMPLE:
461         {
462             uint32_t csElements, e;
463             uint32_t ceTag, ceQuads, ceMark, csEnd;
464
465             x->csSeqNo = sflowxdr_next(x);
466             if (sType == SFLOW_COUNTERS_SAMPLE_EXPANDED) {
467                 x->dsClass = sflowxdr_next(x);
468                 x->dsIndex = sflowxdr_next(x);
469             } else {
470                 uint32_t dsCombined = sflowxdr_next(x);
471                 x->dsClass = dsCombined >> 24;
472                 x->dsIndex = dsCombined & 0x00FFFFFF;
473             }
474
475             csElements = sflowxdr_next(x);
476             for (e = 0; e < csElements; e++) {
477                 SFLOWXDR_assert(x, sflowxdr_more(x,2));
478                 ceTag = sflowxdr_next(x);
479                 ceQuads = sflowxdr_next(x) >> 2;
480                 ceMark = sflowxdr_mark(x, ceQuads);
481                 SFLOWXDR_assert(x, sflowxdr_more(x,ceQuads));
482                 /* Only care about selected structures.  Just record their
483                  * offsets here. We'll read the fields out later. */
484                 switch (ceTag) {
485                 case SFLOW_TAG_CTR_IFCOUNTERS:
486                     sflowxdr_mark_unique(x, &x->offset.IFCOUNTERS);
487                     break;
488                 case SFLOW_TAG_CTR_LACPCOUNTERS:
489                     sflowxdr_mark_unique(x, &x->offset.LACPCOUNTERS);
490                     break;
491                 case SFLOW_TAG_CTR_PORTNAME:
492                     sflowxdr_mark_unique(x, &x->offset.PORTNAME);
493                     break;
494                 case SFLOW_TAG_CTR_OPENFLOWPORT:
495                     sflowxdr_mark_unique(x, &x->offset.OPENFLOWPORT);
496                     break;
497
498                     /* Add others here... */
499                 }
500
501                 sflowxdr_skip(x, ceQuads);
502                 SFLOWXDR_assert(x, sflowxdr_mark_ok(x, ceMark));
503             }
504
505             csEnd = sflowxdr_mark(x, 0);
506             process_counter_sample(x);
507             /* Make sure we pick up the decoding where we left off. */
508             sflowxdr_setc(x, csEnd);
509
510             /* Clear the offsets for the next sample. */
511             memset(&x->offset, 0, sizeof x->offset);
512         }
513         break;
514
515         case SFLOW_FLOW_SAMPLE:
516         case SFLOW_FLOW_SAMPLE_EXPANDED:
517         {
518             uint32_t fsElements, e;
519             uint32_t feTag, feQuads, feMark, fsEnd;
520             x->fsSeqNo = sflowxdr_next(x);
521             if (sType == SFLOW_FLOW_SAMPLE_EXPANDED) {
522                 x->dsClass = sflowxdr_next(x);
523                 x->dsIndex = sflowxdr_next(x);
524             } else {
525                 uint32_t dsCombined = sflowxdr_next(x);
526                 x->dsClass = dsCombined >> 24;
527                 x->dsIndex = dsCombined & 0x00FFFFFF;
528             }
529             x->meanSkipCount = sflowxdr_next(x);
530             x->samplePool = sflowxdr_next(x);
531             x->dropEvents = sflowxdr_next(x);
532             if (sType == SFLOW_FLOW_SAMPLE_EXPANDED) {
533                 x->inputPortFormat = sflowxdr_next(x);
534                 x->inputPort = sflowxdr_next(x);
535                 x->outputPortFormat = sflowxdr_next(x);
536                 x->outputPort = sflowxdr_next(x);
537             } else {
538                 uint32_t inp, outp;
539
540                 inp = sflowxdr_next(x);
541                 outp = sflowxdr_next(x);
542                 x->inputPortFormat = inp >> 30;
543                 x->inputPort = inp & 0x3fffffff;
544                 x->outputPortFormat = outp >> 30;
545                 x->outputPort = outp & 0x3fffffff;
546             }
547             fsElements = sflowxdr_next(x);
548             for (e = 0; e < fsElements; e++) {
549                 SFLOWXDR_assert(x, sflowxdr_more(x,2));
550                 feTag = sflowxdr_next(x);
551                 feQuads = sflowxdr_next(x) >> 2;
552                 feMark = sflowxdr_mark(x, feQuads);
553                 SFLOWXDR_assert(x, sflowxdr_more(x,feQuads));
554                 /* Only care about selected structures.  Just record their
555                  * offsets here. We'll read the fields out below. */
556                 switch (feTag) {
557                 case SFLOW_TAG_PKT_HEADER:
558                     sflowxdr_mark_unique(x, &x->offset.HEADER);
559                     break;
560
561                 case SFLOW_TAG_PKT_SWITCH:
562                     sflowxdr_mark_unique(x, &x->offset.SWITCH);
563                     break;
564
565                 case SFLOW_TAG_PKT_TUNNEL4_OUT:
566                     sflowxdr_mark_unique(x, &x->offset.TUNNEL4_OUT);
567                     break;
568
569                 case SFLOW_TAG_PKT_TUNNEL4_IN:
570                     sflowxdr_mark_unique(x, &x->offset.TUNNEL4_IN);
571                     break;
572
573                 case SFLOW_TAG_PKT_TUNNEL_VNI_OUT:
574                     sflowxdr_mark_unique(x, &x->offset.TUNNEL_VNI_OUT);
575                     break;
576
577                 case SFLOW_TAG_PKT_TUNNEL_VNI_IN:
578                     sflowxdr_mark_unique(x, &x->offset.TUNNEL_VNI_IN);
579                     break;
580
581                     /* Add others here... */
582                 }
583
584                 sflowxdr_skip(x, feQuads);
585                 SFLOWXDR_assert(x, sflowxdr_mark_ok(x, feMark));
586             }
587
588             fsEnd = sflowxdr_mark(x, 0);
589             process_flow_sample(x);
590             /* Make sure we pick up the decoding where we left off. */
591             sflowxdr_setc(x, fsEnd);
592
593             /* Clear the offsets for the next counter/flow sample. */
594             memset(&x->offset, 0, sizeof x->offset);
595         }
596         break;
597
598         default:
599             /* Skip other sample types. */
600             sflowxdr_skip(x, sQuads);
601         }
602         SFLOWXDR_assert(x, sflowxdr_mark_ok(x, sMark));
603     }
604 }
605
606 static void
607 print_sflow(struct ofpbuf *buf)
608 {
609     char *dgram_buf;
610     int dgram_len = ofpbuf_size(buf);
611     struct sflow_xdr xdrDatagram;
612     struct sflow_xdr *x = &xdrDatagram;
613
614     memset(x, 0, sizeof *x);
615     if (SFLOWXDR_try(x)) {
616         SFLOWXDR_assert(x, (dgram_buf = ofpbuf_try_pull(buf, ofpbuf_size(buf))));
617         sflowxdr_init(x, dgram_buf, dgram_len);
618         SFLOWXDR_assert(x, dgram_len >= SFLOW_MIN_LEN);
619         process_datagram(x);
620     } else {
621         // CATCH
622         printf("\n>>>>> ERROR in " __FILE__ " at line %u\n", x->errline);
623     }
624 }
625
626 static void
627 test_sflow_main(int argc, char *argv[])
628 {
629     struct unixctl_server *server;
630     enum { MAX_RECV = 1500 };
631     const char *target;
632     struct ofpbuf buf;
633     bool exiting = false;
634     int error;
635     int sock;
636
637     proctitle_init(argc, argv);
638     set_program_name(argv[0]);
639     service_start(&argc, &argv);
640     parse_options(argc, argv);
641
642     if (argc - optind != 1) {
643         ovs_fatal(0, "exactly one non-option argument required "
644                   "(use --help for help)");
645     }
646     target = argv[optind];
647
648     sock = inet_open_passive(SOCK_DGRAM, target, 0, NULL, 0, true);
649     if (sock < 0) {
650         ovs_fatal(0, "%s: failed to open (%s)", argv[1], ovs_strerror(-sock));
651     }
652
653     daemon_save_fd(STDOUT_FILENO);
654     daemonize_start();
655
656     error = unixctl_server_create(NULL, &server);
657     if (error) {
658         ovs_fatal(error, "failed to create unixctl server");
659     }
660     unixctl_command_register("exit", "", 0, 0, test_sflow_exit, &exiting);
661
662     daemonize_complete();
663
664     ofpbuf_init(&buf, MAX_RECV);
665     for (;;) {
666         int retval;
667
668         unixctl_server_run(server);
669
670         ofpbuf_clear(&buf);
671         do {
672             retval = recv(sock, ofpbuf_data(&buf), buf.allocated, 0);
673         } while (retval < 0 && errno == EINTR);
674         if (retval > 0) {
675             ofpbuf_put_uninit(&buf, retval);
676             print_sflow(&buf);
677             fflush(stdout);
678         }
679
680         if (exiting) {
681             break;
682         }
683
684         poll_fd_wait(sock, POLLIN);
685         unixctl_server_wait(server);
686         poll_block();
687     }
688 }
689
690 static void
691 parse_options(int argc, char *argv[])
692 {
693     enum {
694         DAEMON_OPTION_ENUMS,
695         VLOG_OPTION_ENUMS
696     };
697     static const struct option long_options[] = {
698         {"verbose", optional_argument, NULL, 'v'},
699         {"help", no_argument, NULL, 'h'},
700         DAEMON_LONG_OPTIONS,
701         VLOG_LONG_OPTIONS,
702         {NULL, 0, NULL, 0},
703     };
704     char *short_options = long_options_to_short_options(long_options);
705
706     for (;;) {
707         int c = getopt_long(argc, argv, short_options, long_options, NULL);
708         if (c == -1) {
709             break;
710         }
711
712         switch (c) {
713         case 'h':
714             usage();
715
716         DAEMON_OPTION_HANDLERS
717         VLOG_OPTION_HANDLERS
718
719         case '?':
720             exit(EXIT_FAILURE);
721
722         default:
723             abort();
724         }
725     }
726     free(short_options);
727 }
728
729 static void
730 usage(void)
731 {
732     printf("%s: sflow collector test utility\n"
733            "usage: %s [OPTIONS] PORT[:IP]\n"
734            "where PORT is the UDP port to listen on and IP is optionally\n"
735            "the IP address to listen on.\n",
736            program_name, program_name);
737     daemon_usage();
738     vlog_usage();
739     printf("\nOther options:\n"
740            "  -h, --help                  display this help message\n");
741     exit(EXIT_SUCCESS);
742 }
743
744 static void
745 test_sflow_exit(struct unixctl_conn *conn,
746                 int argc OVS_UNUSED, const char *argv[] OVS_UNUSED,
747                 void *exiting_)
748 {
749     bool *exiting = exiting_;
750     *exiting = true;
751     unixctl_command_reply(conn, NULL);
752 }
753
754 OVSTEST_REGISTER("test-sflow", test_sflow_main);