netdev-dpdk: fix mbuf leaks
[cascardo/ovs.git] / tests / test-sflow.c
1 /*
2  * Copyright (c) 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
3  * Copyright (c) 2013 InMon Corp.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <config.h>
19 #undef NDEBUG
20 #include "netflow.h"
21 #include <arpa/inet.h>
22 #include <errno.h>
23 #include <getopt.h>
24 #include <signal.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27 #include <setjmp.h>
28 #include "command-line.h"
29 #include "daemon.h"
30 #include "dynamic-string.h"
31 #include "ofpbuf.h"
32 #include "ovstest.h"
33 #include "packets.h"
34 #include "poll-loop.h"
35 #include "socket-util.h"
36 #include "unixctl.h"
37 #include "util.h"
38 #include "openvswitch/vlog.h"
39
40 OVS_NO_RETURN static void usage(void);
41 static void parse_options(int argc, char *argv[]);
42
43 static unixctl_cb_func test_sflow_exit;
44
45 /* Datagram. */
46 #define SFLOW_VERSION_5 5
47 #define SFLOW_MIN_LEN 36
48
49 /* Sample tag numbers. */
50 #define SFLOW_FLOW_SAMPLE 1
51 #define SFLOW_COUNTERS_SAMPLE 2
52 #define SFLOW_FLOW_SAMPLE_EXPANDED 3
53 #define SFLOW_COUNTERS_SAMPLE_EXPANDED 4
54
55 /* Structure element tag numbers. */
56 #define SFLOW_TAG_CTR_IFCOUNTERS 1
57 #define SFLOW_TAG_CTR_LACPCOUNTERS 7
58 #define SFLOW_TAG_CTR_OPENFLOWPORT 1004
59 #define SFLOW_TAG_CTR_PORTNAME 1005
60 #define SFLOW_TAG_PKT_HEADER 1
61 #define SFLOW_TAG_PKT_SWITCH 1001
62 #define SFLOW_TAG_PKT_TUNNEL4_OUT 1023
63 #define SFLOW_TAG_PKT_TUNNEL4_IN 1024
64 #define SFLOW_TAG_PKT_TUNNEL_VNI_OUT 1029
65 #define SFLOW_TAG_PKT_TUNNEL_VNI_IN 1030
66 #define SFLOW_TAG_PKT_MPLS 1006
67
68 /* string sizes */
69 #define SFL_MAX_PORTNAME_LEN 255
70
71 struct sflow_addr {
72     enum {
73         SFLOW_ADDRTYPE_undefined = 0,
74         SFLOW_ADDRTYPE_IP4,
75         SFLOW_ADDRTYPE_IP6
76     } type;
77
78     union {
79         ovs_be32 ip4;
80         ovs_be32 ip6[4];
81     } a;
82 };
83
84 struct sflow_xdr {
85     /* Exceptions. */
86     jmp_buf env;
87     int errline;
88
89     /* Cursor. */
90     ovs_be32 *datap;
91     uint32_t i;
92     uint32_t quads;
93
94     /* Agent. */
95     struct sflow_addr agentAddr;
96     char agentIPStr[INET6_ADDRSTRLEN + 2];
97     uint32_t subAgentId;
98     uint32_t uptime_mS;
99
100     /* Datasource. */
101     uint32_t dsClass;
102     uint32_t dsIndex;
103
104     /* Sequence numbers. */
105     uint32_t dgramSeqNo;
106     uint32_t fsSeqNo;
107     uint32_t csSeqNo;
108
109     /* Structure offsets. */
110     struct {
111         uint32_t HEADER;
112         uint32_t SWITCH;
113         uint32_t TUNNEL4_OUT;
114         uint32_t TUNNEL4_IN;
115         uint32_t TUNNEL_VNI_OUT;
116         uint32_t TUNNEL_VNI_IN;
117         uint32_t MPLS;
118         uint32_t IFCOUNTERS;
119         uint32_t LACPCOUNTERS;
120         uint32_t OPENFLOWPORT;
121         uint32_t PORTNAME;
122     } offset;
123
124     /* Flow sample fields. */
125     uint32_t meanSkipCount;
126     uint32_t samplePool;
127     uint32_t dropEvents;
128     uint32_t inputPortFormat;
129     uint32_t inputPort;
130     uint32_t outputPortFormat;
131     uint32_t outputPort;
132 };
133
134 #define SFLOWXDR_try(x) ((x->errline = setjmp(x->env)) == 0)
135 #define SFLOWXDR_throw(x) longjmp(x->env, __LINE__)
136 #define SFLOWXDR_assert(x, t) if (!(t)) SFLOWXDR_throw(x)
137
138 static void
139 sflowxdr_init(struct sflow_xdr *x, void *buf, size_t len)
140 {
141     x->datap = buf;
142     x->quads = len >> 2;
143 }
144
145 static uint32_t
146 sflowxdr_next(struct sflow_xdr *x)
147 {
148     return ntohl(x->datap[x->i++]);
149 }
150
151 static ovs_be32
152 sflowxdr_next_n(struct sflow_xdr *x)
153 {
154     return x->datap[x->i++];
155 }
156
157 static bool
158 sflowxdr_more(const struct sflow_xdr *x, uint32_t q)
159 {
160     return q + x->i <= x->quads;
161 }
162
163 static void
164 sflowxdr_skip(struct sflow_xdr *x, uint32_t q)
165 {
166     x->i += q;
167 }
168
169 static uint32_t
170 sflowxdr_mark(const struct sflow_xdr *x, uint32_t q)
171 {
172     return x->i + q;
173 }
174
175 static bool
176 sflowxdr_mark_ok(const struct sflow_xdr *x, uint32_t m)
177 {
178     return m == x->i;
179 }
180
181 static void
182 sflowxdr_mark_unique(struct sflow_xdr *x, uint32_t *pi)
183 {
184     if (*pi) {
185         SFLOWXDR_throw(x);
186     }
187     *pi = x->i;
188 }
189
190 static void
191 sflowxdr_setc(struct sflow_xdr *x, uint32_t j)
192 {
193     x->i = j;
194 }
195
196 static const char *
197 sflowxdr_str(const struct sflow_xdr *x)
198 {
199     return (const char *) (x->datap + x->i);
200 }
201
202 static uint64_t
203 sflowxdr_next_int64(struct sflow_xdr *x)
204 {
205     uint64_t scratch;
206     scratch = sflowxdr_next(x);
207     scratch <<= 32;
208     scratch += sflowxdr_next(x);
209     return scratch;
210 }
211
212 static void
213 process_counter_sample(struct sflow_xdr *x)
214 {
215     if (x->offset.IFCOUNTERS) {
216         sflowxdr_setc(x, x->offset.IFCOUNTERS);
217         printf("IFCOUNTERS");
218         printf(" dgramSeqNo=%"PRIu32, x->dgramSeqNo);
219         printf(" ds=%s>%"PRIu32":%"PRIu32,
220                x->agentIPStr, x->dsClass, x->dsIndex);
221         printf(" csSeqNo=%"PRIu32, x->csSeqNo);
222         printf(" ifindex=%"PRIu32, sflowxdr_next(x));
223         printf(" type=%"PRIu32, sflowxdr_next(x));
224         printf(" ifspeed=%"PRIu64, sflowxdr_next_int64(x));
225         printf(" direction=%"PRIu32, sflowxdr_next(x));
226         printf(" status=%"PRIu32, sflowxdr_next(x));
227         printf(" in_octets=%"PRIu64, sflowxdr_next_int64(x));
228         printf(" in_unicasts=%"PRIu32, sflowxdr_next(x));
229         printf(" in_multicasts=%"PRIu32, sflowxdr_next(x));
230         printf(" in_broadcasts=%"PRIu32, sflowxdr_next(x));
231         printf(" in_discards=%"PRIu32, sflowxdr_next(x));
232         printf(" in_errors=%"PRIu32, sflowxdr_next(x));
233         printf(" in_unknownprotos=%"PRIu32, sflowxdr_next(x));
234         printf(" out_octets=%"PRIu64, sflowxdr_next_int64(x));
235         printf(" out_unicasts=%"PRIu32, sflowxdr_next(x));
236         printf(" out_multicasts=%"PRIu32, sflowxdr_next(x));
237         printf(" out_broadcasts=%"PRIu32, sflowxdr_next(x));
238         printf(" out_discards=%"PRIu32, sflowxdr_next(x));
239         printf(" out_errors=%"PRIu32, sflowxdr_next(x));
240         printf(" promiscuous=%"PRIu32, sflowxdr_next(x));
241         printf("\n");
242     }
243     if (x->offset.LACPCOUNTERS) {
244         struct eth_addr *mac;
245         union {
246             ovs_be32 all;
247             struct {
248                 uint8_t actorAdmin;
249                 uint8_t actorOper;
250                 uint8_t partnerAdmin;
251                 uint8_t partnerOper;
252             } v;
253         } state;
254
255         sflowxdr_setc(x, x->offset.LACPCOUNTERS);
256         printf("LACPCOUNTERS");
257         mac = (void *)sflowxdr_str(x);
258         printf(" sysID="ETH_ADDR_FMT, ETH_ADDR_ARGS(*mac));
259         sflowxdr_skip(x, 2);
260         mac = (void *)sflowxdr_str(x);
261         printf(" partnerID="ETH_ADDR_FMT, ETH_ADDR_ARGS(*mac));
262         sflowxdr_skip(x, 2);
263         printf(" aggID=%"PRIu32, sflowxdr_next(x));
264         state.all = sflowxdr_next_n(x);
265         printf(" actorAdmin=0x%"PRIx32, state.v.actorAdmin);
266         printf(" actorOper=0x%"PRIx32, state.v.actorOper);
267         printf(" partnerAdmin=0x%"PRIx32, state.v.partnerAdmin);
268         printf(" partnerOper=0x%"PRIx32, state.v.partnerOper);
269         printf(" LACPDUsRx=%"PRIu32, sflowxdr_next(x));
270         printf(" markerPDUsRx=%"PRIu32, sflowxdr_next(x));
271         printf(" markerRespPDUsRx=%"PRIu32, sflowxdr_next(x));
272         printf(" unknownRx=%"PRIu32, sflowxdr_next(x));
273         printf(" illegalRx=%"PRIu32, sflowxdr_next(x));
274         printf(" LACPDUsTx=%"PRIu32, sflowxdr_next(x));
275         printf(" markerPDUsTx=%"PRIu32, sflowxdr_next(x));
276         printf(" markerRespPDUsTx=%"PRIu32, sflowxdr_next(x));
277         printf("\n");
278     }
279     if (x->offset.OPENFLOWPORT) {
280         sflowxdr_setc(x, x->offset.OPENFLOWPORT);
281         printf("OPENFLOWPORT");
282         printf(" datapath_id=%"PRIu64, sflowxdr_next_int64(x));
283         printf(" port_no=%"PRIu32, sflowxdr_next(x));
284         printf("\n");
285     }
286     if (x->offset.PORTNAME) {
287         uint32_t pnLen;
288         const char *pnBytes;
289         char portName[SFL_MAX_PORTNAME_LEN + 1];
290         sflowxdr_setc(x, x->offset.PORTNAME);
291         printf("PORTNAME");
292         pnLen = sflowxdr_next(x);
293         SFLOWXDR_assert(x, (pnLen <= SFL_MAX_PORTNAME_LEN));
294         pnBytes = sflowxdr_str(x);
295         memcpy(portName, pnBytes, pnLen);
296         portName[pnLen] = '\0';
297         printf(" portName=%s", portName);
298         printf("\n");
299     }
300 }
301
302 static char
303 bin_to_hex(int hexit)
304 {
305     return "0123456789ABCDEF"[hexit];
306 }
307
308 static int
309 print_hex(const char *a, int len, char *buf, int bufLen)
310 {
311     unsigned char nextByte;
312     int b = 0;
313     int i;
314
315     for (i = 0; i < len; i++) {
316         if (b > bufLen - 10) {
317             break;
318         }
319         nextByte = a[i];
320         buf[b++] = bin_to_hex(nextByte >> 4);
321         buf[b++] = bin_to_hex(nextByte & 0x0f);
322         if (i < len - 1) {
323             buf[b++] = '-';
324         }
325     }
326     buf[b] = '\0';
327     return b;
328 }
329
330 static void
331 print_struct_ipv4(struct sflow_xdr *x, const char *prefix)
332 {
333     ovs_be32 src, dst;
334
335     printf(" %s_length=%"PRIu32,    prefix, sflowxdr_next(x));
336     printf(" %s_protocol=%"PRIu32,  prefix, sflowxdr_next(x));
337
338     src = sflowxdr_next_n(x);
339     dst = sflowxdr_next_n(x);
340     printf(" %s_src="IP_FMT,        prefix, IP_ARGS(src));
341     printf(" %s_dst="IP_FMT,        prefix, IP_ARGS(dst));
342
343     printf(" %s_src_port=%"PRIu32,  prefix, sflowxdr_next(x));
344     printf(" %s_dst_port=%"PRIu32,  prefix, sflowxdr_next(x));
345     printf(" %s_tcp_flags=%"PRIu32, prefix, sflowxdr_next(x));
346     printf(" %s_tos=%"PRIu32,       prefix, sflowxdr_next(x));
347 }
348
349 #define SFLOW_HEX_SCRATCH 1024
350
351 static void
352 process_flow_sample(struct sflow_xdr *x)
353 {
354     if (x->offset.HEADER) {
355         uint32_t headerLen;
356         char scratch[SFLOW_HEX_SCRATCH];
357
358         printf("HEADER");
359         printf(" dgramSeqNo=%"PRIu32, x->dgramSeqNo);
360         printf(" ds=%s>%"PRIu32":%"PRIu32,
361                x->agentIPStr, x->dsClass, x->dsIndex);
362         printf(" fsSeqNo=%"PRIu32, x->fsSeqNo);
363
364         if (x->offset.TUNNEL4_IN) {
365             sflowxdr_setc(x, x->offset.TUNNEL4_IN);
366             print_struct_ipv4(x, "tunnel4_in");
367         }
368
369         if (x->offset.TUNNEL4_OUT) {
370             sflowxdr_setc(x, x->offset.TUNNEL4_OUT);
371             print_struct_ipv4(x, "tunnel4_out");
372         }
373
374         if (x->offset.TUNNEL_VNI_IN) {
375             sflowxdr_setc(x, x->offset.TUNNEL_VNI_IN);
376             printf( " tunnel_in_vni=%"PRIu32, sflowxdr_next(x));
377         }
378
379         if (x->offset.TUNNEL_VNI_OUT) {
380             sflowxdr_setc(x, x->offset.TUNNEL_VNI_OUT);
381             printf( " tunnel_out_vni=%"PRIu32, sflowxdr_next(x));
382         }
383
384         if (x->offset.MPLS) {
385             uint32_t addr_type, stack_depth, ii;
386             ovs_be32 mpls_lse;
387             sflowxdr_setc(x, x->offset.MPLS);
388             /* OVS only sets the out_stack. The rest will be blank. */
389             /* skip next hop address */
390             addr_type = sflowxdr_next(x);
391             sflowxdr_skip(x, addr_type == SFLOW_ADDRTYPE_IP6 ? 4 : 1);
392             /* skip in_stack */
393             stack_depth = sflowxdr_next(x);
394             sflowxdr_skip(x, stack_depth);
395             /* print out_stack */
396             stack_depth = sflowxdr_next(x);
397             for(ii = 0; ii < stack_depth; ii++) {
398                 mpls_lse=sflowxdr_next_n(x);
399                 printf(" mpls_label_%"PRIu32"=%"PRIu32,
400                        ii, mpls_lse_to_label(mpls_lse));
401                 printf(" mpls_tc_%"PRIu32"=%"PRIu32,
402                        ii, mpls_lse_to_tc(mpls_lse));
403                 printf(" mpls_ttl_%"PRIu32"=%"PRIu32,
404                        ii, mpls_lse_to_ttl(mpls_lse));
405                 printf(" mpls_bos_%"PRIu32"=%"PRIu32,
406                        ii, mpls_lse_to_bos(mpls_lse));
407             }
408         }
409
410         if (x->offset.SWITCH) {
411             sflowxdr_setc(x, x->offset.SWITCH);
412             printf(" in_vlan=%"PRIu32, sflowxdr_next(x));
413             printf(" in_priority=%"PRIu32, sflowxdr_next(x));
414             printf(" out_vlan=%"PRIu32, sflowxdr_next(x));
415             printf(" out_priority=%"PRIu32, sflowxdr_next(x));
416         }
417
418         sflowxdr_setc(x, x->offset.HEADER);
419         printf(" meanSkip=%"PRIu32, x->meanSkipCount);
420         printf(" samplePool=%"PRIu32, x->samplePool);
421         printf(" dropEvents=%"PRIu32, x->dropEvents);
422         printf(" in_ifindex=%"PRIu32, x->inputPort);
423         printf(" in_format=%"PRIu32, x->inputPortFormat);
424         printf(" out_ifindex=%"PRIu32, x->outputPort);
425         printf(" out_format=%"PRIu32, x->outputPortFormat);
426         printf(" hdr_prot=%"PRIu32, sflowxdr_next(x));
427         printf(" pkt_len=%"PRIu32, sflowxdr_next(x));
428         printf(" stripped=%"PRIu32, sflowxdr_next(x));
429         headerLen = sflowxdr_next(x);
430         printf(" hdr_len=%"PRIu32, headerLen);
431         print_hex(sflowxdr_str(x), headerLen, scratch, SFLOW_HEX_SCRATCH);
432         printf(" hdr=%s", scratch);
433         printf("\n");
434     }
435 }
436
437 static void
438 process_datagram(struct sflow_xdr *x)
439 {
440     uint32_t samples, s;
441
442     SFLOWXDR_assert(x, (sflowxdr_next(x) == SFLOW_VERSION_5));
443
444     /* Read the sFlow header. */
445     x->agentAddr.type = sflowxdr_next(x);
446     switch (x->agentAddr.type) {
447     case SFLOW_ADDRTYPE_IP4:
448         x->agentAddr.a.ip4 = sflowxdr_next_n(x);
449         break;
450
451     case SFLOW_ADDRTYPE_IP6:
452         x->agentAddr.a.ip6[0] = sflowxdr_next_n(x);
453         x->agentAddr.a.ip6[1] = sflowxdr_next_n(x);
454         x->agentAddr.a.ip6[2] = sflowxdr_next_n(x);
455         x->agentAddr.a.ip6[3] = sflowxdr_next_n(x);
456         break;
457
458     case SFLOW_ADDRTYPE_undefined:
459     default:
460         SFLOWXDR_throw(x);
461         break;
462     }
463     x->subAgentId = sflowxdr_next(x);
464     x->dgramSeqNo = sflowxdr_next(x);
465     x->uptime_mS = sflowxdr_next(x);
466
467     /* Store the agent address as a string. */
468     if (x->agentAddr.type == SFLOW_ADDRTYPE_IP6) {
469         char ipstr[INET6_ADDRSTRLEN];
470         inet_ntop(AF_INET6, (const void *) &x->agentAddr.a.ip6,
471                   ipstr, INET6_ADDRSTRLEN);
472         snprintf(x->agentIPStr, sizeof x->agentIPStr, "[%s]", ipstr);
473     } else {
474         snprintf(x->agentIPStr, sizeof x->agentIPStr,
475                  IP_FMT, IP_ARGS(x->agentAddr.a.ip4));
476     }
477
478     /* Array of flow/counter samples. */
479     samples = sflowxdr_next(x);
480     for (s = 0; s < samples; s++) {
481         uint32_t sType = sflowxdr_next(x);
482         uint32_t sQuads = sflowxdr_next(x) >> 2;
483         uint32_t sMark = sflowxdr_mark(x, sQuads);
484         SFLOWXDR_assert(x, sflowxdr_more(x, sQuads));
485
486         switch (sType) {
487         case SFLOW_COUNTERS_SAMPLE_EXPANDED:
488         case SFLOW_COUNTERS_SAMPLE:
489         {
490             uint32_t csElements, e;
491             uint32_t ceTag, ceQuads, ceMark, csEnd;
492
493             x->csSeqNo = sflowxdr_next(x);
494             if (sType == SFLOW_COUNTERS_SAMPLE_EXPANDED) {
495                 x->dsClass = sflowxdr_next(x);
496                 x->dsIndex = sflowxdr_next(x);
497             } else {
498                 uint32_t dsCombined = sflowxdr_next(x);
499                 x->dsClass = dsCombined >> 24;
500                 x->dsIndex = dsCombined & 0x00FFFFFF;
501             }
502
503             csElements = sflowxdr_next(x);
504             for (e = 0; e < csElements; e++) {
505                 SFLOWXDR_assert(x, sflowxdr_more(x,2));
506                 ceTag = sflowxdr_next(x);
507                 ceQuads = sflowxdr_next(x) >> 2;
508                 ceMark = sflowxdr_mark(x, ceQuads);
509                 SFLOWXDR_assert(x, sflowxdr_more(x,ceQuads));
510                 /* Only care about selected structures.  Just record their
511                  * offsets here. We'll read the fields out later. */
512                 switch (ceTag) {
513                 case SFLOW_TAG_CTR_IFCOUNTERS:
514                     sflowxdr_mark_unique(x, &x->offset.IFCOUNTERS);
515                     break;
516                 case SFLOW_TAG_CTR_LACPCOUNTERS:
517                     sflowxdr_mark_unique(x, &x->offset.LACPCOUNTERS);
518                     break;
519                 case SFLOW_TAG_CTR_PORTNAME:
520                     sflowxdr_mark_unique(x, &x->offset.PORTNAME);
521                     break;
522                 case SFLOW_TAG_CTR_OPENFLOWPORT:
523                     sflowxdr_mark_unique(x, &x->offset.OPENFLOWPORT);
524                     break;
525
526                     /* Add others here... */
527                 }
528
529                 sflowxdr_skip(x, ceQuads);
530                 SFLOWXDR_assert(x, sflowxdr_mark_ok(x, ceMark));
531             }
532
533             csEnd = sflowxdr_mark(x, 0);
534             process_counter_sample(x);
535             /* Make sure we pick up the decoding where we left off. */
536             sflowxdr_setc(x, csEnd);
537
538             /* Clear the offsets for the next sample. */
539             memset(&x->offset, 0, sizeof x->offset);
540         }
541         break;
542
543         case SFLOW_FLOW_SAMPLE:
544         case SFLOW_FLOW_SAMPLE_EXPANDED:
545         {
546             uint32_t fsElements, e;
547             uint32_t feTag, feQuads, feMark, fsEnd;
548             x->fsSeqNo = sflowxdr_next(x);
549             if (sType == SFLOW_FLOW_SAMPLE_EXPANDED) {
550                 x->dsClass = sflowxdr_next(x);
551                 x->dsIndex = sflowxdr_next(x);
552             } else {
553                 uint32_t dsCombined = sflowxdr_next(x);
554                 x->dsClass = dsCombined >> 24;
555                 x->dsIndex = dsCombined & 0x00FFFFFF;
556             }
557             x->meanSkipCount = sflowxdr_next(x);
558             x->samplePool = sflowxdr_next(x);
559             x->dropEvents = sflowxdr_next(x);
560             if (sType == SFLOW_FLOW_SAMPLE_EXPANDED) {
561                 x->inputPortFormat = sflowxdr_next(x);
562                 x->inputPort = sflowxdr_next(x);
563                 x->outputPortFormat = sflowxdr_next(x);
564                 x->outputPort = sflowxdr_next(x);
565             } else {
566                 uint32_t inp, outp;
567
568                 inp = sflowxdr_next(x);
569                 outp = sflowxdr_next(x);
570                 x->inputPortFormat = inp >> 30;
571                 x->inputPort = inp & 0x3fffffff;
572                 x->outputPortFormat = outp >> 30;
573                 x->outputPort = outp & 0x3fffffff;
574             }
575             fsElements = sflowxdr_next(x);
576             for (e = 0; e < fsElements; e++) {
577                 SFLOWXDR_assert(x, sflowxdr_more(x,2));
578                 feTag = sflowxdr_next(x);
579                 feQuads = sflowxdr_next(x) >> 2;
580                 feMark = sflowxdr_mark(x, feQuads);
581                 SFLOWXDR_assert(x, sflowxdr_more(x,feQuads));
582                 /* Only care about selected structures.  Just record their
583                  * offsets here. We'll read the fields out below. */
584                 switch (feTag) {
585                 case SFLOW_TAG_PKT_HEADER:
586                     sflowxdr_mark_unique(x, &x->offset.HEADER);
587                     break;
588
589                 case SFLOW_TAG_PKT_SWITCH:
590                     sflowxdr_mark_unique(x, &x->offset.SWITCH);
591                     break;
592
593                 case SFLOW_TAG_PKT_TUNNEL4_OUT:
594                     sflowxdr_mark_unique(x, &x->offset.TUNNEL4_OUT);
595                     break;
596
597                 case SFLOW_TAG_PKT_TUNNEL4_IN:
598                     sflowxdr_mark_unique(x, &x->offset.TUNNEL4_IN);
599                     break;
600
601                 case SFLOW_TAG_PKT_TUNNEL_VNI_OUT:
602                     sflowxdr_mark_unique(x, &x->offset.TUNNEL_VNI_OUT);
603                     break;
604
605                 case SFLOW_TAG_PKT_TUNNEL_VNI_IN:
606                     sflowxdr_mark_unique(x, &x->offset.TUNNEL_VNI_IN);
607                     break;
608
609                 case SFLOW_TAG_PKT_MPLS:
610                     sflowxdr_mark_unique(x, &x->offset.MPLS);
611                     break;
612
613                     /* Add others here... */
614                 }
615
616                 sflowxdr_skip(x, feQuads);
617                 SFLOWXDR_assert(x, sflowxdr_mark_ok(x, feMark));
618             }
619
620             fsEnd = sflowxdr_mark(x, 0);
621             process_flow_sample(x);
622             /* Make sure we pick up the decoding where we left off. */
623             sflowxdr_setc(x, fsEnd);
624
625             /* Clear the offsets for the next counter/flow sample. */
626             memset(&x->offset, 0, sizeof x->offset);
627         }
628         break;
629
630         default:
631             /* Skip other sample types. */
632             sflowxdr_skip(x, sQuads);
633         }
634         SFLOWXDR_assert(x, sflowxdr_mark_ok(x, sMark));
635     }
636 }
637
638 static void
639 print_sflow(struct ofpbuf *buf)
640 {
641     char *dgram_buf;
642     int dgram_len = buf->size;
643     struct sflow_xdr xdrDatagram;
644     struct sflow_xdr *x = &xdrDatagram;
645
646     memset(x, 0, sizeof *x);
647     if (SFLOWXDR_try(x)) {
648         SFLOWXDR_assert(x, (dgram_buf = ofpbuf_try_pull(buf, buf->size)));
649         sflowxdr_init(x, dgram_buf, dgram_len);
650         SFLOWXDR_assert(x, dgram_len >= SFLOW_MIN_LEN);
651         process_datagram(x);
652     } else {
653         // CATCH
654         printf("\n>>>>> ERROR in " __FILE__ " at line %u\n", x->errline);
655     }
656 }
657
658 static void
659 test_sflow_main(int argc, char *argv[])
660 {
661     struct unixctl_server *server;
662     enum { MAX_RECV = 1500 };
663     const char *target;
664     struct ofpbuf buf;
665     bool exiting = false;
666     int error;
667     int sock;
668
669     ovs_cmdl_proctitle_init(argc, argv);
670     set_program_name(argv[0]);
671     service_start(&argc, &argv);
672     parse_options(argc, argv);
673
674     if (argc - optind != 1) {
675         ovs_fatal(0, "exactly one non-option argument required "
676                   "(use --help for help)");
677     }
678     target = argv[optind];
679
680     sock = inet_open_passive(SOCK_DGRAM, target, 0, NULL, 0, true);
681     if (sock < 0) {
682         ovs_fatal(0, "%s: failed to open (%s)", target, ovs_strerror(-sock));
683     }
684
685     daemon_save_fd(STDOUT_FILENO);
686     daemonize_start(false);
687
688     error = unixctl_server_create(NULL, &server);
689     if (error) {
690         ovs_fatal(error, "failed to create unixctl server");
691     }
692     unixctl_command_register("exit", "", 0, 0, test_sflow_exit, &exiting);
693
694     daemonize_complete();
695
696     ofpbuf_init(&buf, MAX_RECV);
697     for (;;) {
698         int retval;
699
700         unixctl_server_run(server);
701
702         ofpbuf_clear(&buf);
703         do {
704             retval = recv(sock, buf.data, buf.allocated, 0);
705         } while (retval < 0 && errno == EINTR);
706         if (retval > 0) {
707             ofpbuf_put_uninit(&buf, retval);
708             print_sflow(&buf);
709             fflush(stdout);
710         }
711
712         if (exiting) {
713             break;
714         }
715
716         poll_fd_wait(sock, POLLIN);
717         unixctl_server_wait(server);
718         poll_block();
719     }
720     ofpbuf_uninit(&buf);
721     unixctl_server_destroy(server);
722 }
723
724 static void
725 parse_options(int argc, char *argv[])
726 {
727     enum {
728         DAEMON_OPTION_ENUMS,
729         VLOG_OPTION_ENUMS
730     };
731     static const struct option long_options[] = {
732         {"verbose", optional_argument, NULL, 'v'},
733         {"help", no_argument, NULL, 'h'},
734         DAEMON_LONG_OPTIONS,
735         VLOG_LONG_OPTIONS,
736         {NULL, 0, NULL, 0},
737     };
738     char *short_options = ovs_cmdl_long_options_to_short_options(long_options);
739
740     for (;;) {
741         int c = getopt_long(argc, argv, short_options, long_options, NULL);
742         if (c == -1) {
743             break;
744         }
745
746         switch (c) {
747         case 'h':
748             usage();
749
750         DAEMON_OPTION_HANDLERS
751         VLOG_OPTION_HANDLERS
752
753         case '?':
754             exit(EXIT_FAILURE);
755
756         default:
757             abort();
758         }
759     }
760     free(short_options);
761 }
762
763 static void
764 usage(void)
765 {
766     printf("%s: sflow collector test utility\n"
767            "usage: %s [OPTIONS] PORT[:IP]\n"
768            "where PORT is the UDP port to listen on and IP is optionally\n"
769            "the IP address to listen on.\n",
770            program_name, program_name);
771     daemon_usage();
772     vlog_usage();
773     printf("\nOther options:\n"
774            "  -h, --help                  display this help message\n");
775     exit(EXIT_SUCCESS);
776 }
777
778 static void
779 test_sflow_exit(struct unixctl_conn *conn,
780                 int argc OVS_UNUSED, const char *argv[] OVS_UNUSED,
781                 void *exiting_)
782 {
783     bool *exiting = exiting_;
784     *exiting = true;
785     unixctl_command_reply(conn, NULL);
786 }
787
788 OVSTEST_REGISTER("test-sflow", test_sflow_main);