netdev-dpdk: fix mbuf leaks
[cascardo/ovs.git] / lib / sflow_receiver.c
1 /* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of either the
2  *   Sun Industry Standards Source License 1.1, that is available at:
3  *    http://host-sflow.sourceforge.net/sissl.html
4  * or the InMon sFlow License, that is available at:
5  *    http://www.inmon.com/technology/sflowlicense.txt
6  */
7
8 #ifndef __CHECKER__            /* Don't run sparse on anything in this file. */
9
10 #include <assert.h>
11 #include "sflow_api.h"
12
13 static void resetSampleCollector(SFLReceiver *receiver);
14 static void sendSample(SFLReceiver *receiver);
15 static void sflError(SFLReceiver *receiver, char *errm);
16 inline static void putNet32(SFLReceiver *receiver, u_int32_t val);
17 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
18 #ifdef SFLOW_DO_SOCKET
19 static void initSocket(SFLReceiver *receiver);
20 #endif
21
22 /*_________________--------------------------__________________
23   _________________    sfl_receiver_init     __________________
24   -----------------__________________________------------------
25 */
26
27 void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
28 {
29     /* first clear everything */
30     memset(receiver, 0, sizeof(*receiver));
31
32     /* now copy in the parameters */
33     receiver->agent = agent;
34
35     /* set defaults */
36     receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
37     receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
38
39 #ifdef SFLOW_DO_SOCKET
40     /* initialize the socket address */
41     initSocket(receiver);
42 #endif
43
44     /* preset some of the header fields */
45     receiver->sampleCollector.datap = receiver->sampleCollector.data;
46     putNet32(receiver, SFLDATAGRAM_VERSION5);
47     putAddress(receiver, &agent->myIP);
48     putNet32(receiver, agent->subId);
49
50     /* prepare to receive the first sample */
51     resetSampleCollector(receiver);
52 }
53
54 /*_________________---------------------------__________________
55   _________________      reset                __________________
56   -----------------___________________________------------------
57
58   called on timeout, or when owner string is cleared
59 */
60
61 static void reset(SFLReceiver *receiver) {
62     // ask agent to tell samplers and pollers to stop sending samples
63     sfl_agent_resetReceiver(receiver->agent, receiver);
64     // reinitialize
65     sfl_receiver_init(receiver, receiver->agent);
66 }
67
68 #ifdef SFLOW_DO_SOCKET
69 /*_________________---------------------------__________________
70   _________________      initSocket           __________________
71   -----------------___________________________------------------
72 */
73
74 static void initSocket(SFLReceiver *receiver) {
75     if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
76         struct sockaddr_in6 *sa6 = &receiver->receiver6;
77         sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort);
78         sa6->sin6_family = AF_INET6;
79         sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
80     }
81     else {
82         struct sockaddr_in *sa4 = &receiver->receiver4;
83         sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort);
84         sa4->sin_family = AF_INET;
85         sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
86     }
87 }
88 #endif
89
90 /*_________________----------------------------------------_____________
91   _________________          MIB Vars                      _____________
92   -----------------________________________________________-------------
93 */
94
95 char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
96     return receiver->sFlowRcvrOwner;
97 }
98 void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
99     receiver->sFlowRcvrOwner = sFlowRcvrOwner;
100     if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
101         // reset condition! owner string was cleared
102         reset(receiver);
103     }
104 }
105 time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
106     return receiver->sFlowRcvrTimeout;
107 }
108 void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
109     receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
110 }
111 u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
112     return receiver->sFlowRcvrMaximumDatagramSize;
113 }
114 void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) {
115     u_int32_t mdz = sFlowRcvrMaximumDatagramSize;
116     if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
117     receiver->sFlowRcvrMaximumDatagramSize = mdz;
118 }
119 SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
120     return &receiver->sFlowRcvrAddress;
121 }
122 void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
123     if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy
124 #ifdef SFLOW_DO_SOCKET
125     initSocket(receiver);
126 #endif
127 }
128 u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
129     return receiver->sFlowRcvrPort;
130 }
131 void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) {
132     receiver->sFlowRcvrPort = sFlowRcvrPort;
133     // update the socket structure
134 #ifdef SFLOW_DO_SOCKET
135     initSocket(receiver);
136 #endif
137 }
138
139 /*_________________---------------------------__________________
140   _________________   sfl_receiver_tick       __________________
141   -----------------___________________________------------------
142 */
143
144 void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
145 {
146     // if there are any samples to send, flush them now
147     if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
148     // check the timeout
149     if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
150         // count down one tick and reset if we reach 0
151         if(--receiver->sFlowRcvrTimeout == 0) reset(receiver);
152     }
153 }
154
155 /*_________________-----------------------------__________________
156   _________________   receiver write utilities  __________________
157   -----------------_____________________________------------------
158 */
159
160 inline static void put32(SFLReceiver *receiver, u_int32_t val)
161 {
162     *receiver->sampleCollector.datap++ = val;
163 }
164
165 inline static void putNet32(SFLReceiver *receiver, u_int32_t val)
166 {
167     *receiver->sampleCollector.datap++ = htonl(val);
168 }
169
170 inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads)
171 {
172     u_int32_t *from = (u_int32_t *)obj;
173     while(quads--) putNet32(receiver, *from++);
174 }
175
176 inline static void putNet64(SFLReceiver *receiver, u_int64_t val64)
177 {
178     u_int32_t *firstQuadPtr = receiver->sampleCollector.datap;
179     // first copy the bytes in
180     memcpy((u_char *)firstQuadPtr, &val64, 8);
181     if(htonl(1) != 1) {
182         // swap the bytes, and reverse the quads too
183         u_int32_t tmp = *receiver->sampleCollector.datap++;
184         *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
185         *receiver->sampleCollector.datap++ = htonl(tmp);
186     }
187     else receiver->sampleCollector.datap += 2;
188 }
189
190 inline static void put128(SFLReceiver *receiver, u_char *val)
191 {
192     memcpy(receiver->sampleCollector.datap, val, 16);
193     receiver->sampleCollector.datap += 4;
194 }
195
196 inline static void putString(SFLReceiver *receiver, SFLString *s)
197 {
198     putNet32(receiver, s->len);
199     memcpy(receiver->sampleCollector.datap, s->str, s->len);
200     receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
201     if ((s->len % 4) != 0){
202         u_int8_t padding = 4 - (s->len % 4);
203         memset(((u_int8_t*)receiver->sampleCollector.datap)-padding, 0, padding);
204     }
205 }
206
207 inline static u_int32_t stringEncodingLength(SFLString *s) {
208     // answer in bytes,  so remember to mulitply by 4 after rounding up to nearest 4-byte boundary
209     return 4 + (((s->len + 3) / 4) * 4);
210 }
211
212 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
213 {
214     // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error?
215     if(addr->type == 0) {
216         putNet32(receiver, SFLADDRESSTYPE_IP_V4);
217         put32(receiver, 0);
218     }
219     else {
220         putNet32(receiver, addr->type);
221         if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
222         else put128(receiver, addr->address.ip_v6.addr);
223     }
224 }
225
226 inline static u_int32_t addressEncodingLength(SFLAddress *addr) {
227     return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8;  // type + address (unspecified == IPV4)
228 }
229
230 inline static void putMACAddress(SFLReceiver *receiver,
231                                  const struct eth_addr mac)
232 {
233     memcpy(receiver->sampleCollector.datap, &mac, 6);
234     receiver->sampleCollector.datap += 2;
235 }
236
237 inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw)
238 {
239     putNet32(receiver, sw->src_vlan);
240     putNet32(receiver, sw->src_priority);
241     putNet32(receiver, sw->dst_vlan);
242     putNet32(receiver, sw->dst_priority);
243 }
244
245 inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router)
246 {
247     putAddress(receiver, &router->nexthop);
248     putNet32(receiver, router->src_mask);
249     putNet32(receiver, router->dst_mask);
250 }
251
252 inline static u_int32_t routerEncodingLength(SFLExtended_router *router) {
253     return addressEncodingLength(&router->nexthop) + 8;
254 }
255
256 inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw)
257 {
258     putAddress(receiver, &gw->nexthop);
259     putNet32(receiver, gw->as);
260     putNet32(receiver, gw->src_as);
261     putNet32(receiver, gw->src_peer_as);
262     putNet32(receiver, gw->dst_as_path_segments);
263     {
264         u_int32_t seg = 0;
265         for(; seg < gw->dst_as_path_segments; seg++) {
266             putNet32(receiver, gw->dst_as_path[seg].type);
267             putNet32(receiver, gw->dst_as_path[seg].length);
268             putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length);
269         }
270     }
271     putNet32(receiver, gw->communities_length);
272     putNet32_run(receiver, gw->communities, gw->communities_length);
273     putNet32(receiver, gw->localpref);
274 }
275
276 inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) {
277     u_int32_t elemSiz = addressEncodingLength(&gw->nexthop);
278     u_int32_t seg = 0;
279     elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments
280     for(; seg < gw->dst_as_path_segments; seg++) {
281         elemSiz += 8; // type, length
282         elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes
283     }
284     elemSiz += 4; // communities_length
285     elemSiz += 4 * gw->communities_length; // communities
286     elemSiz += 4; // localpref
287     return elemSiz;
288 }
289
290 inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user)
291 {
292     putNet32(receiver, user->src_charset);
293     putString(receiver, &user->src_user);
294     putNet32(receiver, user->dst_charset);
295     putString(receiver, &user->dst_user);
296 }
297
298 inline static u_int32_t userEncodingLength(SFLExtended_user *user) {
299     return 4
300         + stringEncodingLength(&user->src_user)
301         + 4
302         + stringEncodingLength(&user->dst_user);
303 }
304
305 inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url)
306 {
307     putNet32(receiver, url->direction);
308     putString(receiver, &url->url);
309     putString(receiver, &url->host);
310 }
311
312 inline static u_int32_t urlEncodingLength(SFLExtended_url *url) {
313     return 4
314         + stringEncodingLength(&url->url)
315         + stringEncodingLength(&url->host);
316 }
317
318 inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack)
319 {
320     putNet32(receiver, labelStack->depth);
321     putNet32_run(receiver, labelStack->stack, labelStack->depth);
322 }
323
324 inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) {
325     return 4 + (4 * labelStack->depth);
326 }
327
328 inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls)
329 {
330     putAddress(receiver, &mpls->nextHop);
331     putLabelStack(receiver, &mpls->in_stack);
332     putLabelStack(receiver, &mpls->out_stack);
333 }
334
335 inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) {
336     return addressEncodingLength(&mpls->nextHop)
337         + labelStackEncodingLength(&mpls->in_stack)
338         + labelStackEncodingLength(&mpls->out_stack);
339 }
340
341 inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat)
342 {
343     putAddress(receiver, &nat->src);
344     putAddress(receiver, &nat->dst);
345 }
346
347 inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) {
348     return addressEncodingLength(&nat->src)
349         + addressEncodingLength(&nat->dst);
350 }
351
352 inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel)
353 {
354     putString(receiver, &tunnel->tunnel_lsp_name);
355     putNet32(receiver, tunnel->tunnel_id);
356     putNet32(receiver, tunnel->tunnel_cos);
357 }
358
359 inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) {
360     return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8;
361 }
362
363 inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc)
364 {
365     putString(receiver, &vc->vc_instance_name);
366     putNet32(receiver, vc->vll_vc_id);
367     putNet32(receiver, vc->vc_label_cos);
368 }
369
370 inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) {
371     return stringEncodingLength( &vc->vc_instance_name) + 8;
372 }
373
374 inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn)
375 {
376     putString(receiver, &ftn->mplsFTNDescr);
377     putNet32(receiver, ftn->mplsFTNMask);
378 }
379
380 inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) {
381     return stringEncodingLength( &ftn->mplsFTNDescr) + 4;
382 }
383
384 inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec)
385 {
386     putNet32(receiver, ldpfec->mplsFecAddrPrefixLength);
387 }
388
389 inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) {
390     return 4;
391 }
392
393 inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel)
394 {
395     putLabelStack(receiver, &vlanTunnel->stack);
396 }
397
398 inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) {
399     return labelStackEncodingLength(&vlanTunnel->stack);
400 }
401
402
403 inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters)
404 {
405     putNet32(receiver, counters->ifIndex);
406     putNet32(receiver, counters->ifType);
407     putNet64(receiver, counters->ifSpeed);
408     putNet32(receiver, counters->ifDirection);
409     putNet32(receiver, counters->ifStatus);
410     putNet64(receiver, counters->ifInOctets);
411     putNet32(receiver, counters->ifInUcastPkts);
412     putNet32(receiver, counters->ifInMulticastPkts);
413     putNet32(receiver, counters->ifInBroadcastPkts);
414     putNet32(receiver, counters->ifInDiscards);
415     putNet32(receiver, counters->ifInErrors);
416     putNet32(receiver, counters->ifInUnknownProtos);
417     putNet64(receiver, counters->ifOutOctets);
418     putNet32(receiver, counters->ifOutUcastPkts);
419     putNet32(receiver, counters->ifOutMulticastPkts);
420     putNet32(receiver, counters->ifOutBroadcastPkts);
421     putNet32(receiver, counters->ifOutDiscards);
422     putNet32(receiver, counters->ifOutErrors);
423     putNet32(receiver, counters->ifPromiscuousMode);
424 }
425
426
427 /*_________________-----------------------------__________________
428   _________________      computeFlowSampleSize  __________________
429   -----------------_____________________________------------------
430 */
431
432 static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
433 {
434     SFLFlow_sample_element *elem = fs->elements;
435 #ifdef SFL_USE_32BIT_INDEX
436     u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
437                        sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
438 #else
439     u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
440                        sample_pool, drops, input, output, number of elements */
441 #endif
442
443     fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
444     for(; elem != NULL; elem = elem->nxt) {
445         u_int elemSiz = 0;
446         fs->num_elements++;
447         siz += 8; /* tag, length */
448         switch(elem->tag) {
449         case SFLFLOW_HEADER:
450             elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */
451             elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */
452             break;
453         case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break;
454         case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break;
455         case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break;
456         case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break;
457         case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break;
458         case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break;
459         case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break;
460         case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break;
461         case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break;
462         case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break;
463         case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break;
464         case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break;
465         case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break;
466         case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break;
467         case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break;
468         case SFLFLOW_EX_IPV4_TUNNEL_EGRESS:
469         case SFLFLOW_EX_IPV4_TUNNEL_INGRESS:
470             elemSiz = sizeof(SFLSampled_ipv4);
471             break;
472         case SFLFLOW_EX_VNI_EGRESS:
473         case SFLFLOW_EX_VNI_INGRESS:
474             elemSiz = sizeof(SFLExtended_vni);
475             break;
476         default:
477             sflError(receiver, "unexpected packet_data_tag");
478             return -1;
479             break;
480         }
481         // cache the element size, and accumulate it into the overall FlowSample size
482         elem->length = elemSiz;
483         siz += elemSiz;
484     }
485
486     return siz;
487 }
488
489 /*_________________-------------------------------__________________
490   _________________ sfl_receiver_writeFlowSample  __________________
491   -----------------_______________________________------------------
492 */
493
494 int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
495 {
496     int packedSize;
497     if(fs == NULL) return -1;
498     if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
499
500     // check in case this one sample alone is too big for the datagram
501     // in fact - if it is even half as big then we should ditch it. Very
502     // important to avoid overruning the packet buffer.
503     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
504         sflError(receiver, "flow sample too big for datagram");
505         return -1;
506     }
507
508     // if the sample pkt is full enough so that this sample might put
509     // it over the limit, then we should send it now before going on.
510     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
511         sendSample(receiver);
512
513     receiver->sampleCollector.numSamples++;
514
515 #ifdef SFL_USE_32BIT_INDEX
516     putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
517 #else
518     putNet32(receiver, SFLFLOW_SAMPLE);
519 #endif
520
521     putNet32(receiver, packedSize - 8); // don't include tag and len
522     putNet32(receiver, fs->sequence_number);
523
524 #ifdef SFL_USE_32BIT_INDEX
525     putNet32(receiver, fs->ds_class);
526     putNet32(receiver, fs->ds_index);
527 #else
528     putNet32(receiver, fs->source_id);
529 #endif
530
531     putNet32(receiver, fs->sampling_rate);
532     putNet32(receiver, fs->sample_pool);
533     putNet32(receiver, fs->drops);
534
535 #ifdef SFL_USE_32BIT_INDEX
536     putNet32(receiver, fs->inputFormat);
537     putNet32(receiver, fs->input);
538     putNet32(receiver, fs->outputFormat);
539     putNet32(receiver, fs->output);
540 #else
541     putNet32(receiver, fs->input);
542     putNet32(receiver, fs->output);
543 #endif
544
545     putNet32(receiver, fs->num_elements);
546
547     {
548         SFLFlow_sample_element *elem = fs->elements;
549         for(; elem != NULL; elem = elem->nxt) {
550
551             putNet32(receiver, elem->tag);
552             putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()
553
554             switch(elem->tag) {
555             case SFLFLOW_HEADER:
556                 putNet32(receiver, elem->flowType.header.header_protocol);
557                 putNet32(receiver, elem->flowType.header.frame_length);
558                 putNet32(receiver, elem->flowType.header.stripped);
559                 putNet32(receiver, elem->flowType.header.header_length);
560                 /* the header */
561                 memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
562                 /* round up to multiple of 4 to preserve alignment */
563                 receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
564                 break;
565             case SFLFLOW_ETHERNET:
566                 putNet32(receiver, elem->flowType.ethernet.eth_len);
567                 putMACAddress(receiver, elem->flowType.ethernet.src_mac);
568                 putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
569                 putNet32(receiver, elem->flowType.ethernet.eth_type);
570                 break;
571             case SFLFLOW_IPV4:
572             case SFLFLOW_EX_IPV4_TUNNEL_EGRESS:
573             case SFLFLOW_EX_IPV4_TUNNEL_INGRESS:
574                 putNet32(receiver, elem->flowType.ipv4.length);
575                 putNet32(receiver, elem->flowType.ipv4.protocol);
576                 put32(receiver, elem->flowType.ipv4.src_ip.addr);
577                 put32(receiver, elem->flowType.ipv4.dst_ip.addr);
578                 putNet32(receiver, elem->flowType.ipv4.src_port);
579                 putNet32(receiver, elem->flowType.ipv4.dst_port);
580                 putNet32(receiver, elem->flowType.ipv4.tcp_flags);
581                 putNet32(receiver, elem->flowType.ipv4.tos);
582                 break;
583             case SFLFLOW_IPV6:
584                 putNet32(receiver, elem->flowType.ipv6.length);
585                 putNet32(receiver, elem->flowType.ipv6.protocol);
586                 put128(receiver, elem->flowType.ipv6.src_ip.addr);
587                 put128(receiver, elem->flowType.ipv6.dst_ip.addr);
588                 putNet32(receiver, elem->flowType.ipv6.src_port);
589                 putNet32(receiver, elem->flowType.ipv6.dst_port);
590                 putNet32(receiver, elem->flowType.ipv6.tcp_flags);
591                 putNet32(receiver, elem->flowType.ipv6.priority);
592                 break;
593             case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
594             case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
595             case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
596             case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
597             case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
598             case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
599             case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
600             case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
601             case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
602             case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
603             case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
604             case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
605             case SFLFLOW_EX_VNI_EGRESS:
606             case SFLFLOW_EX_VNI_INGRESS:
607                 putNet32(receiver, elem->flowType.tunnel_vni.vni);
608                 break;
609
610             default:
611                 sflError(receiver, "unexpected packet_data_tag");
612                 return -1;
613                 break;
614             }
615         }
616     }
617
618     // sanity check
619     assert(((u_char *)receiver->sampleCollector.datap
620             - (u_char *)receiver->sampleCollector.data
621             - receiver->sampleCollector.pktlen)  == (u_int32_t)packedSize);
622
623     // update the pktlen
624     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
625     return packedSize;
626 }
627
628 /*_________________-----------------------------__________________
629   _________________ computeCountersSampleSize   __________________
630   -----------------_____________________________------------------
631 */
632
633 static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
634 {
635     SFLCounters_sample_element *elem = cs->elements;
636 #ifdef SFL_USE_32BIT_INDEX
637     u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
638 #else
639     u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */
640 #endif
641
642     cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
643     for(; elem != NULL; elem = elem->nxt) {
644         u_int elemSiz = 0;
645         cs->num_elements++;
646         siz += 8; /* tag, length */
647         switch(elem->tag) {
648         case SFLCOUNTERS_GENERIC:  elemSiz = SFL_CTR_GENERIC_XDR_SIZE; break;
649         case SFLCOUNTERS_ETHERNET: elemSiz = SFL_CTR_ETHERNET_XDR_SIZE; break;
650         case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break;
651         case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break;
652         case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break;
653         case SFLCOUNTERS_LACP: elemSiz = SFL_CTR_LACP_XDR_SIZE; break;
654         case SFLCOUNTERS_OPENFLOWPORT: elemSiz = SFL_CTR_OPENFLOWPORT_XDR_SIZE; break;
655         case SFLCOUNTERS_PORTNAME: elemSiz = stringEncodingLength(&elem->counterBlock.portName.portName); break;
656         case SFLCOUNTERS_APP_RESOURCES: elemSiz = SFL_CTR_APP_RESOURCES_XDR_SIZE; break;
657         case SFLCOUNTERS_OVSDP: elemSiz = SFL_CTR_OVSDP_XDR_SIZE; break;
658         default:
659             sflError(receiver, "unexpected counters_tag");
660             return -1;
661             break;
662         }
663         // cache the element size, and accumulate it into the overall FlowSample size
664         elem->length = elemSiz;
665         siz += elemSiz;
666     }
667     return siz;
668 }
669
670 /*_________________----------------------------------__________________
671   _________________ sfl_receiver_writeCountersSample __________________
672   -----------------__________________________________------------------
673 */
674
675 int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
676 {
677     int packedSize;
678     if(cs == NULL) return -1;
679     // if the sample pkt is full enough so that this sample might put
680     // it over the limit, then we should send it now.
681     if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
682
683     // check in case this one sample alone is too big for the datagram
684     // in fact - if it is even half as big then we should ditch it. Very
685     // important to avoid overruning the packet buffer.
686     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
687         sflError(receiver, "counters sample too big for datagram");
688         return -1;
689     }
690
691     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
692         sendSample(receiver);
693
694     receiver->sampleCollector.numSamples++;
695
696 #ifdef SFL_USE_32BIT_INDEX
697     putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
698 #else
699     putNet32(receiver, SFLCOUNTERS_SAMPLE);
700 #endif
701
702     putNet32(receiver, packedSize - 8); // tag and length not included
703     putNet32(receiver, cs->sequence_number);
704
705 #ifdef SFL_USE_32BIT_INDEX
706     putNet32(receiver, cs->ds_class);
707     putNet32(receiver, cs->ds_index);
708 #else
709     putNet32(receiver, cs->source_id);
710 #endif
711
712     putNet32(receiver, cs->num_elements);
713
714     {
715         SFLCounters_sample_element *elem = cs->elements;
716         for(; elem != NULL; elem = elem->nxt) {
717
718             putNet32(receiver, elem->tag);
719             putNet32(receiver, elem->length); // length cached in computeCountersSampleSize()
720
721             switch(elem->tag) {
722             case SFLCOUNTERS_GENERIC:
723                 putGenericCounters(receiver, &(elem->counterBlock.generic));
724                 break;
725             case SFLCOUNTERS_ETHERNET:
726                 // all these counters are 32-bit
727                 putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4);
728                 break;
729             case SFLCOUNTERS_TOKENRING:
730                 // all these counters are 32-bit
731                 putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4);
732                 break;
733             case SFLCOUNTERS_VG:
734                 // mixed sizes
735                 putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames);
736                 putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets);
737                 putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames);
738                 putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets);
739                 putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors);
740                 putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors);
741                 putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors);
742                 putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames);
743                 putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames);
744                 putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets);
745                 putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings);
746                 putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets);
747                 putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets);
748                 putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets);
749                 break;
750             case SFLCOUNTERS_VLAN:
751                 // mixed sizes
752                 putNet32(receiver, elem->counterBlock.vlan.vlan_id);
753                 putNet64(receiver, elem->counterBlock.vlan.octets);
754                 putNet32(receiver, elem->counterBlock.vlan.ucastPkts);
755                 putNet32(receiver, elem->counterBlock.vlan.multicastPkts);
756                 putNet32(receiver, elem->counterBlock.vlan.broadcastPkts);
757                 putNet32(receiver, elem->counterBlock.vlan.discards);
758                 break;
759             case SFLCOUNTERS_LACP:
760                 putMACAddress(receiver, elem->counterBlock.lacp.actorSystemID);
761                 putMACAddress(receiver, elem->counterBlock.lacp.partnerSystemID);
762                 putNet32(receiver, elem->counterBlock.lacp.attachedAggID);
763                 put32(receiver, elem->counterBlock.lacp.portState.all);
764                 putNet32(receiver, elem->counterBlock.lacp.LACPDUsRx);
765                 putNet32(receiver, elem->counterBlock.lacp.markerPDUsRx);
766                 putNet32(receiver, elem->counterBlock.lacp.markerResponsePDUsRx);
767                 putNet32(receiver, elem->counterBlock.lacp.unknownRx);
768                 putNet32(receiver, elem->counterBlock.lacp.illegalRx);
769                 putNet32(receiver, elem->counterBlock.lacp.LACPDUsTx);
770                 putNet32(receiver, elem->counterBlock.lacp.markerPDUsTx);
771                 putNet32(receiver, elem->counterBlock.lacp.markerResponsePDUsTx);
772                 break;
773             case SFLCOUNTERS_OPENFLOWPORT:
774                 putNet64(receiver, elem->counterBlock.ofPort.datapath_id);
775                 putNet32(receiver, elem->counterBlock.ofPort.port_no);
776                 break;
777             case SFLCOUNTERS_PORTNAME:
778                 putString(receiver, &elem->counterBlock.portName.portName);
779                 break;
780             case SFLCOUNTERS_APP_RESOURCES:
781                 putNet32(receiver, elem->counterBlock.appResources.user_time);
782                 putNet32(receiver, elem->counterBlock.appResources.system_time);
783                 putNet64(receiver, elem->counterBlock.appResources.mem_used);
784                 putNet64(receiver, elem->counterBlock.appResources.mem_max);
785                 putNet32(receiver, elem->counterBlock.appResources.fd_open);
786                 putNet32(receiver, elem->counterBlock.appResources.fd_max);
787                 putNet32(receiver, elem->counterBlock.appResources.conn_open);
788                 putNet32(receiver, elem->counterBlock.appResources.conn_max);
789                 break;
790             case SFLCOUNTERS_OVSDP:
791                 putNet32(receiver, elem->counterBlock.ovsdp.n_hit);
792                 putNet32(receiver, elem->counterBlock.ovsdp.n_missed);
793                 putNet32(receiver, elem->counterBlock.ovsdp.n_lost);
794                 putNet32(receiver, elem->counterBlock.ovsdp.n_mask_hit);
795                 putNet32(receiver, elem->counterBlock.ovsdp.n_flows);
796                 putNet32(receiver, elem->counterBlock.ovsdp.n_masks);
797                 break;
798             default:
799                 sflError(receiver, "unexpected counters_tag");
800                 return -1;
801                 break;
802             }
803         }
804     }
805     // sanity check
806     assert(((u_char *)receiver->sampleCollector.datap
807             - (u_char *)receiver->sampleCollector.data
808             - receiver->sampleCollector.pktlen)  == (u_int32_t)packedSize);
809
810     // update the pktlen
811     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
812     return packedSize;
813 }
814
815 /*_________________---------------------------------__________________
816   _________________ sfl_receiver_samplePacketsSent  __________________
817   -----------------_________________________________------------------
818 */
819
820 u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
821 {
822     return receiver->sampleCollector.packetSeqNo;
823 }
824
825 /*_________________---------------------------__________________
826   _________________     sendSample            __________________
827   -----------------___________________________------------------
828 */
829
830 static void sendSample(SFLReceiver *receiver)
831 {
832     /* construct and send out the sample, then reset for the next one... */
833     /* first fill in the header with the latest values */
834     /* version, agent_address and sub_agent_id were pre-set. */
835     u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
836     receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
837     receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
838     receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
839     /* send */
840     if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,
841                                                            receiver->agent,
842                                                            receiver,
843                                                            (u_char *)receiver->sampleCollector.data,
844                                                            receiver->sampleCollector.pktlen);
845     else {
846 #ifdef SFLOW_DO_SOCKET
847         /* send it myself */
848         if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
849             u_int32_t soclen = sizeof(struct sockaddr_in6);
850             int result = sendto(receiver->agent->receiverSocket6,
851                                 receiver->sampleCollector.data,
852                                 receiver->sampleCollector.pktlen,
853                                 0,
854                                 (struct sockaddr *)&receiver->receiver6,
855                                 soclen);
856             if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
857             if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
858         }
859         else {
860             u_int32_t soclen = sizeof(struct sockaddr_in);
861             int result = sendto(receiver->agent->receiverSocket4,
862                                 receiver->sampleCollector.data,
863                                 receiver->sampleCollector.pktlen,
864                                 0,
865                                 (struct sockaddr *)&receiver->receiver4,
866                                 soclen);
867             if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
868             if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
869         }
870 #endif
871     }
872
873     /* reset for the next time */
874     resetSampleCollector(receiver);
875 }
876
877 /*_________________---------------------------__________________
878   _________________   resetSampleCollector    __________________
879   -----------------___________________________------------------
880 */
881
882 static void resetSampleCollector(SFLReceiver *receiver)
883 {
884     receiver->sampleCollector.pktlen = 0;
885     receiver->sampleCollector.numSamples = 0;
886     /* point the datap to just after the header */
887     receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
888         (receiver->sampleCollector.data + 10) :  (receiver->sampleCollector.data + 7);
889
890     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
891 }
892
893 /*_________________---------------------------__________________
894   _________________         sflError          __________________
895   -----------------___________________________------------------
896 */
897
898 static void sflError(SFLReceiver *receiver, char *msg)
899 {
900     sfl_agent_error(receiver->agent, "receiver", msg);
901     resetSampleCollector(receiver);
902 }
903
904 #endif  /* !__CHECKER__ */