netdev-dpdk: fix mbuf leaks
[cascardo/ovs.git] / utilities / ovs-dpctl-top.in
1 #! @PYTHON@
2 #
3 # Copyright (c) 2013 Nicira, Inc.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 #
18 # The approximate_size code was copied from
19 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
20 # which is licensed under # "Dive Into Python 3," Copyright 2011 Mark Pilgrim,
21 # used under a Creative Commons Attribution-Share-Alike license:
22 # http://creativecommons.org/licenses/by-sa/3.0/
23 #
24 #
25
26 """Top like behavior for ovs-dpctl dump-flows output.
27
28 This program summarizes ovs-dpctl flow content by aggregating the number
29 of packets, total bytes and occurrence of the following fields:
30
31   - Datapath in_port
32
33   - Ethernet type
34
35   - Source and destination MAC addresses
36
37   - IP protocol
38
39   - Source and destination IPv4 addresses
40
41   - Source and destination IPv6 addresses
42
43   - UDP and TCP destination port
44
45   - Tunnel source and destination addresses
46
47
48 Output shows four values:
49   - FIELDS: the flow fields for example in_port(1).
50
51   - PACKETS: the total number of packets containing the flow field.
52
53   - BYTES: the total number of bytes containing the flow field. If units are
54   not present then values are in bytes.
55
56   - AVERAGE: the average packets size (BYTES/PACKET).
57
58   - COUNT: the number of lines in the dump-flow output contain the flow field.
59
60 Top Behavior
61
62 While in top mode, the default behavior, the following single character
63 commands are supported:
64
65   a - toggles top in accumulate and live mode. Accumulate mode is described
66   below.
67
68   s - toggles which column is used to sort content in decreasing order. A
69   DESC title is placed over the column.
70
71   _ - a space indicating to collect dump-flow content again
72
73   h - halt output. Any character will restart sampling
74
75   f - cycle through flow fields. The initial field is in_port
76
77   q - q for quit.
78
79 Accumulate Mode
80
81 There are two supported modes: live and accumulate. The default is live.
82 The parameter --accumulate  or the 'a' character in top mode enables the
83 latter. In live mode, recent dump-flow content is presented.
84 Where as accumulate mode keeps track of the prior historical
85 information until the flow is reset not when the flow is purged. Reset
86 flows are determined when the packet count for a flow has decreased from
87 its previous sample. There is one caveat, eventually the system will
88 run out of memory if, after the accumulate-decay period any flows that
89 have not been refreshed are purged. The goal here is to free memory
90 of flows that are not active. Statistics are not decremented. Their purpose
91 is to reflect the overall history of the flow fields.
92
93
94 Debugging Errors
95
96 Parsing errors are counted and displayed in the status line at the beginning
97 of the output. Use the --verbose option with --script to see what output
98  was not parsed, like this:
99 $ ovs-dpctl dump-flows | ovs-dpctl-top --script --verbose
100
101 Error messages will identify content that failed to parse.
102
103
104 Access Remote Hosts
105
106 The --host must follow the format user@hostname. This script simply calls
107 'ssh user@Hostname' without checking for login credentials therefore public
108 keys should be installed on the system identified by hostname, such as:
109
110 $ ssh-copy-id user@hostname
111
112 Consult ssh-copy-id man pages for more details.
113
114
115 Expected usage
116
117 $ ovs-dpctl-top
118
119 or to run as a script:
120 $ ovs-dpctl dump-flows > dump-flows.log
121 $ ovs-dpctl-top --script --flow-file dump-flows.log
122
123 """
124
125 # pylint: disable-msg=C0103
126 # pylint: disable-msg=C0302
127 # pylint: disable-msg=R0902
128 # pylint: disable-msg=R0903
129 # pylint: disable-msg=R0904
130 # pylint: disable-msg=R0912
131 # pylint: disable-msg=R0913
132 # pylint: disable-msg=R0914
133
134 import sys
135 import os
136 try:
137     ##
138     # Arg parse is not installed on older Python distributions.
139     # ovs ships with a version in the directory mentioned below.
140     import argparse
141 except ImportError:
142     sys.path.append(os.path.join("@pkgdatadir@", "python"))
143     import argparse
144 import logging
145 import re
146 import unittest
147 import copy
148 import curses
149 import operator
150 import subprocess
151 import fcntl
152 import struct
153 import termios
154 import datetime
155 import threading
156 import time
157 import socket
158
159
160 ##
161 # The following two definitions provide the necessary netaddr functionality.
162 # Python netaddr module is not part of the core installation. Packaging
163 # netaddr was involved and seems inappropriate given that only two
164 # methods where used.
165 def ipv4_to_network(ip_str):
166     """ Calculate the network given a ipv4/mask value.
167     If a mask is not present simply return ip_str.
168     """
169     pack_length = '!HH'
170     try:
171         (ip, mask) = ip_str.split("/")
172     except ValueError:
173         # just an ip address no mask.
174         return ip_str
175
176     ip_p = socket.inet_pton(socket.AF_INET, ip)
177     ip_t = struct.unpack(pack_length, ip_p)
178     mask_t = struct.unpack(pack_length, socket.inet_pton(socket.AF_INET, mask))
179     network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
180
181     return socket.inet_ntop(socket.AF_INET,
182                             struct.pack('!HH', network_n[0], network_n[1]))
183
184
185 def ipv6_to_network(ip_str):
186     """ Calculate the network given a ipv6/mask value.
187     If a mask is not present simply return ip_str.
188     """
189     pack_length = '!HHHHHHHH'
190     try:
191         (ip, mask) = ip_str.split("/")
192     except ValueError:
193         # just an ip address no mask.
194         return ip_str
195
196     ip_p = socket.inet_pton(socket.AF_INET6, ip)
197     ip_t = struct.unpack(pack_length, ip_p)
198     mask_t = struct.unpack(pack_length,
199                            socket.inet_pton(socket.AF_INET6, mask))
200     network_n = [ii & jj for (ii, jj) in zip(ip_t, mask_t)]
201
202     return socket.inet_ntop(socket.AF_INET6,
203                             struct.pack(pack_length,
204                                         network_n[0], network_n[1],
205                                         network_n[2], network_n[3],
206                                         network_n[4], network_n[5],
207                                         network_n[6], network_n[7]))
208
209
210 ##
211 # columns displayed
212 ##
213 class Columns:
214     """ Holds column specific content.
215     Titles needs to be less than 8 characters.
216     """
217     VALUE_WIDTH = 9
218     FIELDS = "fields"
219     PACKETS = "packets"
220     COUNT = "count"
221     BYTES = "bytes"
222     AVERAGE = "average"
223
224     def __init__(self):
225         pass
226
227     @staticmethod
228     def assoc_list(obj):
229         """ Return a associated list. """
230         return [(Columns.FIELDS, repr(obj)),
231                 (Columns.PACKETS, obj.packets),
232                 (Columns.BYTES, obj.bytes),
233                 (Columns.COUNT, obj.count),
234                 (Columns.AVERAGE, obj.average),
235                 ]
236
237
238 def element_eth_get(field_type, element, stats_dict):
239     """ Extract eth frame src and dst from a dump-flow element."""
240     fmt = "%s(src=%s,dst=%s)"
241
242     element = fmt % (field_type, element["src"], element["dst"])
243     return SumData(field_type, element, stats_dict["packets"],
244                    stats_dict["bytes"], element)
245
246
247 def element_ipv4_get(field_type, element, stats_dict):
248     """ Extract src and dst from a dump-flow element."""
249     fmt = "%s(src=%s,dst=%s)"
250     element_show = fmt % (field_type, element["src"], element["dst"])
251
252     element_key = fmt % (field_type, ipv4_to_network(element["src"]),
253                          ipv4_to_network(element["dst"]))
254
255     return SumData(field_type, element_show, stats_dict["packets"],
256                        stats_dict["bytes"], element_key)
257
258
259 def element_tunnel_get(field_type, element, stats_dict):
260     """ Extract src and dst from a tunnel."""
261     return element_ipv4_get(field_type, element, stats_dict)
262
263
264 def element_ipv6_get(field_type, element, stats_dict):
265     """ Extract src and dst from a dump-flow element."""
266
267     fmt = "%s(src=%s,dst=%s)"
268     element_show = fmt % (field_type, element["src"], element["dst"])
269
270     element_key = fmt % (field_type, ipv6_to_network(element["src"]),
271                          ipv6_to_network(element["dst"]))
272
273     return SumData(field_type, element_show, stats_dict["packets"],
274                        stats_dict["bytes"], element_key)
275
276
277 def element_dst_port_get(field_type, element, stats_dict):
278     """ Extract src and dst from a dump-flow element."""
279     element_key = "%s(dst=%s)" % (field_type, element["dst"])
280     return SumData(field_type, element_key, stats_dict["packets"],
281                    stats_dict["bytes"], element_key)
282
283
284 def element_passthrough_get(field_type, element, stats_dict):
285     """ Extract src and dst from a dump-flow element."""
286     element_key = "%s(%s)" % (field_type, element)
287     return SumData(field_type, element_key,
288                    stats_dict["packets"], stats_dict["bytes"], element_key)
289
290
291 # pylint: disable-msg=R0903
292 class OutputFormat:
293     """ Holds field_type and function to extract element value. """
294     def __init__(self, field_type, generator):
295         self.field_type = field_type
296         self.generator = generator
297
298 ##
299 # The order below is important. The initial flow field depends on whether
300 # --script or top mode is used. In top mode, the expected behavior, in_port
301 # flow fields are shown first. A future feature will allow users to
302 # filter output by selecting a row. Filtering by in_port is a natural
303 # filtering starting point.
304 #
305 # In script mode, all fields are shown. The expectation is that users could
306 # filter output by piping through grep.
307 #
308 # In top mode, the default flow field is in_port. In --script mode,
309 # the default flow field is all.
310 #
311 # All is added to the end of the OUTPUT_FORMAT list.
312 ##
313 OUTPUT_FORMAT = [
314     OutputFormat("in_port", element_passthrough_get),
315     OutputFormat("eth", element_eth_get),
316     OutputFormat("eth_type", element_passthrough_get),
317     OutputFormat("ipv4", element_ipv4_get),
318     OutputFormat("ipv6", element_ipv6_get),
319     OutputFormat("udp", element_dst_port_get),
320     OutputFormat("tcp", element_dst_port_get),
321     OutputFormat("tunnel", element_tunnel_get),
322     ]
323 ##
324
325
326 ELEMENT_KEY = {
327     "udp": "udp.dst",
328     "tcp": "tcp.dst"
329     }
330
331
332 def top_input_get(args):
333     """ Return subprocess stdout."""
334     cmd = []
335     if (args.host):
336         cmd += ["ssh", args.host]
337     cmd += ["ovs-dpctl", "dump-flows"]
338
339     return subprocess.Popen(cmd, stderr=subprocess.STDOUT,
340                             stdout=subprocess.PIPE).stdout
341
342
343 def args_get():
344     """ read program parameters handle any necessary validation of input. """
345
346     parser = argparse.ArgumentParser(
347                           formatter_class=argparse.RawDescriptionHelpFormatter,
348                           description=__doc__)
349     ##
350     # None is a special value indicating to read flows from stdin.
351     # This handles the case
352     #   ovs-dpctl dump-flows | ovs-dpctl-flows.py
353     parser.add_argument("-v", "--version", version="@VERSION@",
354                         action="version", help="show version")
355     parser.add_argument("-f", "--flow-file", dest="flowFiles", default=None,
356                         action="append",
357                         help="file containing flows from ovs-dpctl dump-flow")
358     parser.add_argument("-V", "--verbose", dest="verbose",
359                         default=logging.CRITICAL,
360                         action="store_const", const=logging.DEBUG,
361                         help="enable debug level verbosity")
362     parser.add_argument("-s", "--script", dest="top", action="store_false",
363                         help="Run from a script (no user interface)")
364     parser.add_argument("--host", dest="host",
365                         help="Specify a user@host for retrieving flows see"
366                              "Accessing Remote Hosts for more information")
367
368     parser.add_argument("-a", "--accumulate", dest="accumulate",
369                         action="store_true", default=False,
370                         help="Accumulate dump-flow content")
371     parser.add_argument("--accumulate-decay", dest="accumulateDecay",
372                         default=5.0 * 60, type=float,
373                         help="Decay old accumulated flows. "
374                         "The default is 5 minutes. "
375                         "A value of 0 disables decay.")
376     parser.add_argument("-d", "--delay", dest="delay", type=int,
377                         default=1000,
378                         help="Delay in milliseconds to collect dump-flow "
379                              "content (sample rate).")
380
381     args = parser.parse_args()
382
383     logging.basicConfig(level=args.verbose)
384
385     return args
386
387 ###
388 # Code to parse a single line in dump-flow
389 ###
390 # key(values)
391 FIELDS_CMPND = re.compile("([\w]+)\((.+)\)")
392 # key:value
393 FIELDS_CMPND_ELEMENT = re.compile("([\w:]+)=([/\.\w:]+)")
394 FIELDS_ELEMENT = re.compile("([\w]+):([-\.\w]+)")
395
396
397 def flow_line_iter(line):
398     """ iterate over flow dump elements.
399     return tuples of (true, element) or (false, remaining element)
400     """
401     # splits by , except for when in a (). Actions element was not
402     # split properly but we don't need it.
403     rc = []
404
405     element = ""
406     paren_count = 0
407
408     for ch in line:
409         if (ch == '('):
410             paren_count += 1
411         elif (ch == ')'):
412             paren_count -= 1
413
414         if (ch == ' '):
415             # ignore white space.
416             continue
417         elif ((ch == ',') and (paren_count == 0)):
418             rc.append(element)
419             element = ""
420         else:
421             element += ch
422
423     if (paren_count):
424         raise ValueError(line)
425     else:
426         if (len(element) > 0):
427             rc.append(element)
428     return rc
429
430
431 def flow_line_compound_parse(compound):
432     """ Parse compound element
433     for example
434     src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03
435     which is in
436     eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)
437     """
438     result = {}
439     for element in flow_line_iter(compound):
440         match = FIELDS_CMPND_ELEMENT.search(element)
441         if (match):
442             key = match.group(1)
443             value = match.group(2)
444             result[key] = value
445
446         match = FIELDS_CMPND.search(element)
447         if (match):
448             key = match.group(1)
449             value = match.group(2)
450             result[key] = flow_line_compound_parse(value)
451             continue
452
453     if (len(result.keys()) == 0):
454         return compound
455     return result
456
457
458 def flow_line_split(line):
459     """ Convert a flow dump line into ([fields], [stats], actions) tuple.
460     Where fields and stats are lists.
461     This function relies on a the following ovs-dpctl dump-flow
462     output characteristics:
463     1. The dumpe flow line consists of a list of frame fields, list of stats
464        and action.
465     2. list of frame fields, each stat and action field are delimited by ', '.
466     3. That all other non stat field are not delimited by ', '.
467
468     """
469
470     results = re.split(', ', line)
471
472     (field, stats, action) = (results[0], results[1:-1], results[-1])
473
474     fields = flow_line_iter(field)
475     return (fields, stats, action)
476
477
478 def elements_to_dict(elements):
479     """ Convert line to a hierarchy of dictionaries. """
480     result = {}
481     for element in elements:
482         match = FIELDS_CMPND.search(element)
483         if (match):
484             key = match.group(1)
485             value = match.group(2)
486             result[key] = flow_line_compound_parse(value)
487             continue
488
489         match = FIELDS_ELEMENT.search(element)
490         if (match):
491             key = match.group(1)
492             value = match.group(2)
493             result[key] = value
494         else:
495             raise ValueError("can't parse >%s<" % element)
496     return result
497
498
499 # pylint: disable-msg=R0903
500 class SumData(object):
501     """ Interface that all data going into SumDb must implement.
502     Holds the flow field and its corresponding count, total packets,
503     total bytes and calculates average.
504
505     __repr__ is used as key into SumData singleton.
506     __str__ is used as human readable output.
507     """
508
509     def __init__(self, field_type, field, packets, flow_bytes, key):
510         # Count is the number of lines in the dump-flow log.
511         self.field_type = field_type
512         self.field = field
513         self.count = 1
514         self.packets = int(packets)
515         self.bytes = int(flow_bytes)
516         self.key = key
517
518     def decrement(self, decr_packets, decr_bytes, decr_count):
519         """ Decrement content to calculate delta from previous flow sample."""
520         self.packets -= decr_packets
521         self.bytes -= decr_bytes
522         self.count -= decr_count
523
524     def __iadd__(self, other):
525         """ Add two objects. """
526
527         if (self.key != other.key):
528             raise ValueError("adding two unrelated types")
529
530         self.count += other.count
531         self.packets += other.packets
532         self.bytes += other.bytes
533         return self
534
535     def __isub__(self, other):
536         """ Decrement two objects. """
537
538         if (self.key != other.key):
539             raise ValueError("adding two unrelated types")
540
541         self.count -= other.count
542         self.packets -= other.packets
543         self.bytes -= other.bytes
544         return self
545
546     def __getattr__(self, name):
547         """ Handle average. """
548         if (name == "average"):
549             if (self.packets == 0):
550                 return float(0.0)
551             else:
552                 return float(self.bytes) / float(self.packets)
553         raise AttributeError(name)
554
555     def __str__(self):
556         """ Used for debugging. """
557         return "%s %s %s %s" % (self.field, self.count,
558                                    self.packets, self.bytes)
559
560     def __repr__(self):
561         """ Used as key in the FlowDB table. """
562         return self.key
563
564
565 def flow_aggregate(fields_dict, stats_dict):
566     """ Search for content in a line.
567     Passed the flow port of the dump-flows plus the current stats consisting
568     of packets, bytes, etc
569     """
570     result = []
571
572     for output_format in OUTPUT_FORMAT:
573         field = fields_dict.get(output_format.field_type, None)
574         if (field):
575             obj = output_format.generator(output_format.field_type,
576                                           field, stats_dict)
577             result.append(obj)
578
579     return result
580
581
582 def flows_read(ihdl, flow_db):
583     """ read flow content from ihdl and insert into flow_db. """
584
585     done = False
586     while (not done):
587         line = ihdl.readline()
588         if (len(line) == 0):
589             # end of input
590             break
591
592         try:
593             flow_db.flow_line_add(line)
594         except ValueError, arg:
595             logging.error(arg)
596
597     return flow_db
598
599
600 def get_terminal_size():
601     """
602     return column width and height of the terminal
603     """
604     for fd_io in [0, 1, 2]:
605         try:
606             result = struct.unpack('hh',
607                                    fcntl.ioctl(fd_io, termios.TIOCGWINSZ,
608                                                '1234'))
609         except IOError:
610             result = None
611             continue
612
613     if (result is None or result == (0, 0)):
614         # Maybe we can't get the width. In that case assume (25, 80)
615         result = (25, 80)
616
617     return result
618
619 ##
620 # Content derived from:
621 # http://getpython3.com/diveintopython3/your-first-python-program.html#divingin
622 ##
623 SUFFIXES = {1000: ['KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'],
624             1024: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB']}
625
626
627 def approximate_size(size, a_kilobyte_is_1024_bytes=True):
628     """Convert a file size to human-readable form.
629
630     Keyword arguments:
631     size -- file size in bytes
632     a_kilobyte_is_1024_bytes -- if True (default), use multiples of 1024
633                                     if False, use multiples of 1000
634
635     Returns: string
636
637     """
638     size = float(size)
639     if size < 0:
640         raise ValueError('number must be non-negative')
641
642     if (a_kilobyte_is_1024_bytes):
643         multiple = 1024
644     else:
645         multiple = 1000
646     for suffix in SUFFIXES[multiple]:
647         size /= multiple
648         if size < multiple:
649             return "%.1f %s" % (size, suffix)
650
651     raise ValueError('number too large')
652
653
654 ##
655 # End copied content
656 ##
657 class ColMeta:
658     """ Concepts about columns. """
659     def __init__(self, sortable, width):
660         self.sortable = sortable
661         self.width = width
662
663
664 class RowMeta:
665     """ How to render rows. """
666     def __init__(self, label, fmt):
667         self.label = label
668         self.fmt = fmt
669
670
671 def fmt_packet(obj, width):
672     """ Provide a string for packets that is appropriate for output."""
673     return str(obj.packets).rjust(width)
674
675
676 def fmt_count(obj, width):
677     """ Provide a string for average that is appropriate for output."""
678     return str(obj.count).rjust(width)
679
680
681 def fmt_avg(obj, width):
682     """ Provide a string for average that is appropriate for output."""
683     return str(int(obj.average)).rjust(width)
684
685
686 def fmt_field(obj, width):
687     """ truncate really long flow and insert ellipses to help make it
688     clear.
689     """
690
691     ellipses = " ... "
692     value = obj.field
693     if (len(obj.field) > width):
694         value = value[:(width - len(ellipses))] + ellipses
695     return value.ljust(width)
696
697
698 def fmt_bytes(obj, width):
699     """ Provide a string for average that is appropriate for output."""
700     if (len(str(obj.bytes)) <= width):
701         value = str(obj.bytes)
702     else:
703         value = approximate_size(obj.bytes)
704     return value.rjust(width)
705
706
707 def title_center(value, width):
708     """ Center a column title."""
709     return value.upper().center(width)
710
711
712 def title_rjust(value, width):
713     """ Right justify a column title. """
714     return value.upper().rjust(width)
715
716
717 def column_picker(order, obj):
718     """ return the column as specified by order. """
719     if (order == 1):
720         return obj.count
721     elif (order == 2):
722         return obj.packets
723     elif (order == 3):
724         return obj.bytes
725     elif (order == 4):
726         return obj.average
727     else:
728         raise ValueError("order outside of range %s" % order)
729
730
731 class Render:
732     """ Renders flow data.
733
734     The two FIELD_SELECT variables should be set to the actual field minus
735     1. During construction, an internal method increments and initializes
736     this object.
737     """
738     FLOW_FIELDS = [_field.field_type for _field in OUTPUT_FORMAT] + ["all"]
739
740     FIELD_SELECT_SCRIPT = 7
741     FIELD_SELECT_TOP = -1
742
743     def __init__(self, console_width, field_select):
744         """ Calculate column widths taking into account changes in format."""
745
746         self._start_time = datetime.datetime.now()
747
748         self._cols = [ColMeta(False, 0),
749                       ColMeta(True, Columns.VALUE_WIDTH),
750                       ColMeta(True, Columns.VALUE_WIDTH),
751                       ColMeta(True, Columns.VALUE_WIDTH),
752                       ColMeta(True, Columns.VALUE_WIDTH)]
753         self._console_width = console_width
754         self.console_width_set(console_width)
755
756         # Order in this array dictate the order of the columns.
757         # The 0 width for the first entry is a place holder. This is
758         # dynamically calculated. The first column is special. We need a
759         # way to indicate which field are presented.
760         self._descs = [RowMeta("", title_rjust),
761                        RowMeta("", title_rjust),
762                        RowMeta("", title_rjust),
763                        RowMeta("", title_rjust),
764                        RowMeta("", title_rjust)]
765         self._column_sort_select = 0
766         self.column_select_event()
767
768         self._titles = [
769             RowMeta(Columns.FIELDS, title_center),
770             RowMeta(Columns.COUNT, title_rjust),
771             RowMeta(Columns.PACKETS, title_rjust),
772             RowMeta(Columns.BYTES, title_rjust),
773             RowMeta(Columns.AVERAGE, title_rjust)
774         ]
775
776         self._datas = [
777             RowMeta(None, fmt_field),
778             RowMeta(None, fmt_count),
779             RowMeta(None, fmt_packet),
780             RowMeta(None, fmt_bytes),
781             RowMeta(None, fmt_avg)
782             ]
783
784         ##
785         # _field_types hold which fields are displayed in the field
786         # column, with the keyword all implying all fields.
787         ##
788         self._field_types = Render.FLOW_FIELDS
789
790         ##
791         # The default is to show all field types.
792         ##
793         self._field_type_select = field_select
794         self.field_type_toggle()
795
796     def _field_type_select_get(self):
797         """ Return which field type to display. """
798         return self._field_types[self._field_type_select]
799
800     def field_type_toggle(self):
801         """ toggle which field types to show. """
802         self._field_type_select += 1
803         if (self._field_type_select >= len(self._field_types)):
804             self._field_type_select = 0
805         value = Columns.FIELDS + " (%s)" % self._field_type_select_get()
806         self._titles[0].label = value
807
808     def column_select_event(self):
809         """ Handles column select toggle. """
810
811         self._descs[self._column_sort_select].label = ""
812         for _ in range(len(self._cols)):
813             self._column_sort_select += 1
814             if (self._column_sort_select >= len(self._cols)):
815                 self._column_sort_select = 0
816
817             # Now look for the next sortable column
818             if (self._cols[self._column_sort_select].sortable):
819                 break
820         self._descs[self._column_sort_select].label = "DESC"
821
822     def console_width_set(self, console_width):
823         """ Adjust the output given the new console_width. """
824         self._console_width = console_width
825
826         spaces = len(self._cols) - 1
827         ##
828         # Calculating column width can be tedious but important. The
829         # flow field value can be long. The goal here is to dedicate
830         # fixed column space for packets, bytes, average and counts. Give the
831         # remaining space to the flow column. When numbers get large
832         # transition output to output generated by approximate_size which
833         # limits output to ###.# XiB in other words 9 characters.
834         ##
835         # At this point, we know the maximum length values. We may
836         # truncate the flow column to get everything to fit.
837         self._cols[0].width = 0
838         values_max_length = sum([ii.width for ii in self._cols]) + spaces
839         flow_max_length = console_width - values_max_length
840         self._cols[0].width = flow_max_length
841
842     def format(self, flow_db):
843         """ shows flows based on --script parameter."""
844
845         rc = []
846         ##
847         # Top output consists of
848         # Title
849         # Column title (2 rows)
850         # data
851         # statistics and status
852
853         ##
854         # Title
855         ##
856         rc.append("Flow Summary".center(self._console_width))
857
858         stats = " Total: %(flow_total)s  errors: %(flow_errors)s " % \
859                   flow_db.flow_stats_get()
860         accumulate = flow_db.accumulate_get()
861         if (accumulate):
862             stats += "Accumulate: on "
863         else:
864             stats += "Accumulate: off "
865
866         duration = datetime.datetime.now() - self._start_time
867         stats += "Duration: %s " % str(duration)
868         rc.append(stats.ljust(self._console_width))
869
870         ##
871         # 2 rows for columns.
872         ##
873         # Indicate which column is in descending order.
874         rc.append(" ".join([ii.fmt(ii.label, col.width)
875                             for (ii, col) in zip(self._descs, self._cols)]))
876
877         rc.append(" ".join([ii.fmt(ii.label, col.width)
878                          for (ii, col) in zip(self._titles, self._cols)]))
879
880         ##
881         # Data.
882         ##
883         for dd in flow_db.field_values_in_order(self._field_type_select_get(),
884                                                 self._column_sort_select):
885             rc.append(" ".join([ii.fmt(dd, col.width)
886                                 for (ii, col) in zip(self._datas,
887                                                      self._cols)]))
888
889         return rc
890
891
892 def curses_screen_begin():
893     """ begin curses screen control. """
894     stdscr = curses.initscr()
895     curses.cbreak()
896     curses.noecho()
897     stdscr.keypad(1)
898     return stdscr
899
900
901 def curses_screen_end(stdscr):
902     """ end curses screen control. """
903     curses.nocbreak()
904     stdscr.keypad(0)
905     curses.echo()
906     curses.endwin()
907
908
909 class FlowDB:
910     """ Implements live vs accumulate mode.
911
912     Flows are stored as key value pairs. The key consists of the content
913     prior to stat fields. The value portion consists of stats in a dictionary
914     form.
915
916     @ \todo future add filtering here.
917     """
918     def __init__(self, accumulate):
919         self._accumulate = accumulate
920         self._error_count = 0
921         # Values are (stats, last update time.)
922         # The last update time is used for aging.
923         self._flow_lock = threading.Lock()
924         # This dictionary holds individual flows.
925         self._flows = {}
926         # This dictionary holds aggregate of flow fields.
927         self._fields = {}
928
929     def accumulate_get(self):
930         """ Return the current accumulate state. """
931         return self._accumulate
932
933     def accumulate_toggle(self):
934         """ toggle accumulate flow behavior. """
935         self._accumulate = not self._accumulate
936
937     def begin(self):
938         """ Indicate the beginning of processing flow content.
939         if accumulate is false clear current set of flows. """
940
941         if (not self._accumulate):
942             self._flow_lock.acquire()
943             try:
944                 self._flows.clear()
945             finally:
946                 self._flow_lock.release()
947             self._fields.clear()
948
949     def flow_line_add(self, line):
950         """ Split a line from a ovs-dpctl dump-flow into key and stats.
951         The order of the content in the flow should be:
952         - flow content
953         - stats for the flow
954         - actions
955
956         This method also assumes that the dump flow output does not
957         change order of fields of the same flow.
958         """
959
960         line = line.rstrip("\n")
961         (fields, stats, _) = flow_line_split(line)
962
963         try:
964             fields_dict = elements_to_dict(fields)
965
966             if (len(fields_dict) == 0):
967                 raise ValueError("flow fields are missing %s", line)
968
969             stats_dict = elements_to_dict(stats)
970             if (len(stats_dict) == 0):
971                 raise ValueError("statistics are missing %s.", line)
972
973             ##
974             # In accumulate mode, the Flow database can reach 10,000's of
975             # persistent flows. The interaction of the script with this many
976             # flows is too slow. Instead, delta are sent to the flow_db
977             # database allow incremental changes to be done in O(m) time
978             # where m is the current flow list, instead of iterating over
979             # all flows in O(n) time where n is the entire history of flows.
980             key = ",".join(fields)
981
982             self._flow_lock.acquire()
983             try:
984                 (stats_old_dict, _) = self._flows.get(key, (None, None))
985             finally:
986                 self._flow_lock.release()
987
988             self.flow_event(fields_dict, stats_old_dict, stats_dict)
989
990         except ValueError, arg:
991             logging.error(arg)
992             self._error_count += 1
993             raise
994
995         self._flow_lock.acquire()
996         try:
997             self._flows[key] = (stats_dict, datetime.datetime.now())
998         finally:
999             self._flow_lock.release()
1000
1001     def decay(self, decayTimeInSeconds):
1002         """ Decay content. """
1003         now = datetime.datetime.now()
1004         for (key, value) in self._flows.items():
1005             (stats_dict, updateTime) = value
1006             delta = now - updateTime
1007
1008             if (delta.seconds > decayTimeInSeconds):
1009                 self._flow_lock.acquire()
1010                 try:
1011                     del self._flows[key]
1012
1013                     fields_dict = elements_to_dict(flow_line_iter(key))
1014                     matches = flow_aggregate(fields_dict, stats_dict)
1015                     for match in matches:
1016                         self.field_dec(match)
1017
1018                 finally:
1019                     self._flow_lock.release()
1020
1021     def flow_stats_get(self):
1022         """ Return statistics in a form of a dictionary. """
1023         rc = None
1024         self._flow_lock.acquire()
1025         try:
1026             rc = {"flow_total": len(self._flows),
1027                   "flow_errors": self._error_count}
1028         finally:
1029             self._flow_lock.release()
1030         return rc
1031
1032     def field_types_get(self):
1033         """ Return the set of types stored in the singleton. """
1034         types = set((ii.field_type for ii in self._fields.values()))
1035         return types
1036
1037     def field_add(self, data):
1038         """ Collect dump-flow data to sum number of times item appears. """
1039         current = self._fields.get(repr(data), None)
1040         if (current is None):
1041             current = copy.copy(data)
1042         else:
1043             current += data
1044         self._fields[repr(current)] = current
1045
1046     def field_dec(self, data):
1047         """ Collect dump-flow data to sum number of times item appears. """
1048         current = self._fields.get(repr(data), None)
1049         if (current is None):
1050             raise ValueError("decrementing field missing %s" % repr(data))
1051
1052         current -= data
1053         self._fields[repr(current)] = current
1054         if (current.count == 0):
1055             del self._fields[repr(current)]
1056
1057     def field_values_in_order(self, field_type_select, column_order):
1058         """ Return a list of items in order maximum first. """
1059         values = self._fields.values()
1060         if (field_type_select != "all"):
1061             # If a field type other than "all" then reduce the list.
1062             values = [ii for ii in values
1063                       if (ii.field_type == field_type_select)]
1064         values = [(column_picker(column_order, ii), ii) for ii in values]
1065         values.sort(key=operator.itemgetter(0))
1066         values.reverse()
1067         values = [ii[1] for ii in values]
1068         return values
1069
1070     def flow_event(self, fields_dict, stats_old_dict, stats_new_dict):
1071         """ Receives new flow information. """
1072
1073         # In order to avoid processing every flow at every sample
1074         # period, changes in flow packet count is used to determine the
1075         # delta in the flow statistics. This delta is used in the call
1076         # to self.decrement prior to self.field_add
1077
1078         if (stats_old_dict is None):
1079             # This is a new flow
1080             matches = flow_aggregate(fields_dict, stats_new_dict)
1081             for match in matches:
1082                 self.field_add(match)
1083         else:
1084             old_packets = int(stats_old_dict.get("packets", 0))
1085             new_packets = int(stats_new_dict.get("packets", 0))
1086             if (old_packets == new_packets):
1087                 # ignore. same data.
1088                 pass
1089             else:
1090                 old_bytes = stats_old_dict.get("bytes", 0)
1091                 # old_packets != new_packets
1092                 # if old_packets > new_packets then we end up decrementing
1093                 # packets and bytes.
1094                 matches = flow_aggregate(fields_dict, stats_new_dict)
1095                 for match in matches:
1096                     match.decrement(int(old_packets), int(old_bytes), 1)
1097                     self.field_add(match)
1098
1099
1100 class DecayThread(threading.Thread):
1101     """ Periodically call flow database to see if any flows are old. """
1102     def __init__(self, flow_db, interval):
1103         """ Start decay thread. """
1104         threading.Thread.__init__(self)
1105
1106         self._interval = max(1, interval)
1107         self._min_interval = min(1, interval / 10)
1108         self._flow_db = flow_db
1109         self._event = threading.Event()
1110         self._running = True
1111
1112         self.daemon = True
1113
1114     def run(self):
1115         """ Worker thread which handles decaying accumulated flows. """
1116
1117         while(self._running):
1118             self._event.wait(self._min_interval)
1119             if (self._running):
1120                 self._flow_db.decay(self._interval)
1121
1122     def stop(self):
1123         """ Stop thread. """
1124         self._running = False
1125         self._event.set()
1126         ##
1127         # Give the calling thread time to terminate but not too long.
1128         # this thread is a daemon so the application will terminate if
1129         # we timeout during the join. This is just a cleaner way to
1130         # release resources.
1131         self.join(2.0)
1132
1133
1134 def flow_top_command(stdscr, render, flow_db):
1135     """ Handle input while in top mode. """
1136     ch = stdscr.getch()
1137     ##
1138     # Any character will restart sampling.
1139     if (ch == ord('h')):
1140         # halt output.
1141         ch = stdscr.getch()
1142         while (ch == -1):
1143             ch = stdscr.getch()
1144
1145     if (ch == ord('s')):
1146         # toggle which column sorts data in descending order.
1147         render.column_select_event()
1148     elif (ch == ord('a')):
1149         flow_db.accumulate_toggle()
1150     elif (ch == ord('f')):
1151         render.field_type_toggle()
1152     elif (ch == ord(' ')):
1153         # resample
1154         pass
1155
1156     return ch
1157
1158
1159 def decay_timer_start(flow_db, accumulateDecay):
1160     """ If accumulateDecay greater than zero then start timer. """
1161     if (accumulateDecay > 0):
1162         decay_timer = DecayThread(flow_db, accumulateDecay)
1163         decay_timer.start()
1164         return decay_timer
1165     else:
1166         return None
1167
1168
1169 def flows_top(args):
1170     """ handles top like behavior when --script is not specified. """
1171
1172     flow_db = FlowDB(args.accumulate)
1173     render = Render(0, Render.FIELD_SELECT_TOP)
1174
1175     decay_timer = decay_timer_start(flow_db, args.accumulateDecay)
1176     lines = []
1177
1178     try:
1179         stdscr = curses_screen_begin()
1180         try:
1181             ch = 'X'
1182             #stdscr.nodelay(1)
1183             stdscr.timeout(args.delay)
1184
1185             while (ch != ord('q')):
1186                 flow_db.begin()
1187
1188                 try:
1189                     ihdl = top_input_get(args)
1190                     try:
1191                         flows_read(ihdl, flow_db)
1192                     finally:
1193                         ihdl.close()
1194                 except OSError, arg:
1195                     logging.critical(arg)
1196                     break
1197
1198                 (console_height, console_width) = stdscr.getmaxyx()
1199                 render.console_width_set(console_width)
1200
1201                 output_height = console_height - 1
1202                 line_count = range(output_height)
1203                 line_output = render.format(flow_db)
1204                 lines = zip(line_count, line_output[:output_height])
1205
1206                 stdscr.erase()
1207                 for (count, line) in lines:
1208                     stdscr.addstr(count, 0, line[:console_width])
1209                 stdscr.refresh()
1210
1211                 ch = flow_top_command(stdscr, render, flow_db)
1212
1213         finally:
1214             curses_screen_end(stdscr)
1215     except KeyboardInterrupt:
1216         pass
1217     if (decay_timer):
1218         decay_timer.stop()
1219
1220     # repeat output
1221     for (count, line) in lines:
1222         print line
1223
1224
1225 def flows_script(args):
1226     """ handles --script option. """
1227
1228     flow_db = FlowDB(args.accumulate)
1229     flow_db.begin()
1230
1231     if (args.flowFiles is None):
1232         logging.info("reading flows from stdin")
1233         ihdl = os.fdopen(sys.stdin.fileno(), 'r', 0)
1234         try:
1235             flow_db = flows_read(ihdl, flow_db)
1236         finally:
1237             ihdl.close()
1238     else:
1239         for flowFile in args.flowFiles:
1240             logging.info("reading flows from %s", flowFile)
1241             ihdl = open(flowFile, "r")
1242             try:
1243                 flow_db = flows_read(ihdl, flow_db)
1244             finally:
1245                 ihdl.close()
1246
1247     (_, console_width) = get_terminal_size()
1248     render = Render(console_width, Render.FIELD_SELECT_SCRIPT)
1249
1250     for line in render.format(flow_db):
1251         print line
1252
1253
1254 def main():
1255     """ Return 0 on success or 1 on failure.
1256
1257     Algorithm
1258     There are four stages to the process ovs-dpctl dump-flow content.
1259     1. Retrieve current input
1260     2. store in FlowDB and maintain history
1261     3. Iterate over FlowDB and aggregating stats for each flow field
1262     4. present data.
1263
1264     Retrieving current input is currently trivial, the ovs-dpctl dump-flow
1265     is called. Future version will have more elaborate means for collecting
1266     dump-flow content. FlowDB returns all data as in the form of a hierarchical
1267     dictionary. Input will vary.
1268
1269     In the case of accumulate mode, flows are not purged from the FlowDB
1270     manager. Instead at the very least, merely the latest statistics are
1271     kept. In the case, of live output the FlowDB is purged prior to sampling
1272     data.
1273
1274     Aggregating results requires identify flow fields to aggregate out
1275     of the flow and summing stats.
1276
1277     """
1278     args = args_get()
1279
1280     try:
1281         if (args.top):
1282             flows_top(args)
1283         else:
1284             flows_script(args)
1285     except KeyboardInterrupt:
1286         return 1
1287     return 0
1288
1289 if __name__ == '__main__':
1290     sys.exit(main())
1291 elif __name__ == 'ovs-dpctl-top':
1292     # pylint: disable-msg=R0915
1293
1294     ##
1295     # Test case beyond this point.
1296     # pylint: disable-msg=R0904
1297     class TestsuiteFlowParse(unittest.TestCase):
1298         """
1299         parse flow into hierarchy of dictionaries.
1300         """
1301         def test_flow_parse(self):
1302             """ test_flow_parse. """
1303             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1304                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1305                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1306                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1307                    "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1308                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1309                    "38,41,44,47,50,53,56,59,62,65"
1310
1311             (fields, stats, _) = flow_line_split(line)
1312             flow_dict = elements_to_dict(fields + stats)
1313             self.assertEqual(flow_dict["eth"]["src"], "00:50:56:b4:4e:f8")
1314             self.assertEqual(flow_dict["eth"]["dst"], "33:33:00:01:00:03")
1315             self.assertEqual(flow_dict["ipv6"]["src"],
1316                              "fe80::55bf:fe42:bc96:2812")
1317             self.assertEqual(flow_dict["ipv6"]["dst"], "ff02::1:3")
1318             self.assertEqual(flow_dict["packets"], "1")
1319             self.assertEqual(flow_dict["bytes"], "92")
1320
1321             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1322                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1323                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1324                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1325                    "udp(src=61252,dst=5355), packets:1, bytes:92, "\
1326                    "used:-0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1327                    "38,41,44,47,50,53,56,59,62,65"
1328
1329             (fields, stats, _) = flow_line_split(line)
1330             flow_dict = elements_to_dict(fields + stats)
1331             self.assertEqual(flow_dict["used"], "-0.703s")
1332             self.assertEqual(flow_dict["packets"], "1")
1333             self.assertEqual(flow_dict["bytes"], "92")
1334
1335         def test_flow_sum(self):
1336             """ test_flow_sum. """
1337             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1338                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1339                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1340                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1341                    "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1342                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1343                    "38,41,44,47,50,53,56,59,62,65"
1344
1345             (fields, stats, _) = flow_line_split(line)
1346             stats_dict = elements_to_dict(stats)
1347             fields_dict = elements_to_dict(fields)
1348             ##
1349             # Test simple case of one line.
1350             flow_db = FlowDB(False)
1351             matches = flow_aggregate(fields_dict, stats_dict)
1352             for match in matches:
1353                 flow_db.field_add(match)
1354
1355             flow_types = flow_db.field_types_get()
1356             expected_flow_types = ["eth", "eth_type", "udp", "in_port", "ipv6"]
1357             self.assert_(len(flow_types) == len(expected_flow_types))
1358             for flow_type in flow_types:
1359                 self.assertTrue(flow_type in expected_flow_types)
1360
1361             for flow_type in flow_types:
1362                 sum_value = flow_db.field_values_in_order("all", 1)
1363                 self.assert_(len(sum_value) == 5)
1364                 self.assert_(sum_value[0].packets == 2)
1365                 self.assert_(sum_value[0].count == 1)
1366                 self.assert_(sum_value[0].bytes == 92)
1367
1368             ##
1369             # Add line again just to see counts go up.
1370             matches = flow_aggregate(fields_dict, stats_dict)
1371             for match in matches:
1372                 flow_db.field_add(match)
1373
1374             flow_types = flow_db.field_types_get()
1375             self.assert_(len(flow_types) == len(expected_flow_types))
1376             for flow_type in flow_types:
1377                 self.assertTrue(flow_type in expected_flow_types)
1378
1379             for flow_type in flow_types:
1380                 sum_value = flow_db.field_values_in_order("all", 1)
1381                 self.assert_(len(sum_value) == 5)
1382                 self.assert_(sum_value[0].packets == 4)
1383                 self.assert_(sum_value[0].count == 2)
1384                 self.assert_(sum_value[0].bytes == 2 * 92)
1385
1386         def test_assoc_list(self):
1387             """ test_assoc_list. """
1388             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1389                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1390                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1391                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1392                    "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1393                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1394                    "38,41,44,47,50,53,56,59,62,65"
1395
1396             valid_flows = [
1397                 'eth_type(0x86dd)',
1398                 'udp(dst=5355)',
1399                 'in_port(4)',
1400                 'ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3)',
1401                 'eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)'
1402                 ]
1403
1404             (fields, stats, _) = flow_line_split(line)
1405             stats_dict = elements_to_dict(stats)
1406             fields_dict = elements_to_dict(fields)
1407
1408             ##
1409             # Test simple case of one line.
1410             flow_db = FlowDB(False)
1411             matches = flow_aggregate(fields_dict, stats_dict)
1412             for match in matches:
1413                 flow_db.field_add(match)
1414
1415             for sum_value in flow_db.field_values_in_order("all", 1):
1416                 assoc_list = Columns.assoc_list(sum_value)
1417                 for item in assoc_list:
1418                     if (item[0] == "fields"):
1419                         self.assertTrue(item[1] in valid_flows)
1420                     elif (item[0] == "packets"):
1421                         self.assertTrue(item[1] == 2)
1422                     elif (item[0] == "count"):
1423                         self.assertTrue(item[1] == 1)
1424                     elif (item[0] == "average"):
1425                         self.assertTrue(item[1] == 46.0)
1426                     elif (item[0] == "bytes"):
1427                         self.assertTrue(item[1] == 92)
1428                     else:
1429                         raise ValueError("unknown %s", item[0])
1430
1431         def test_human_format(self):
1432             """ test_assoc_list. """
1433
1434             self.assertEqual(approximate_size(0.0), "0.0 KiB")
1435             self.assertEqual(approximate_size(1024), "1.0 KiB")
1436             self.assertEqual(approximate_size(1024 * 1024), "1.0 MiB")
1437             self.assertEqual(approximate_size((1024 * 1024) + 100000),
1438                              "1.1 MiB")
1439             value = (1024 * 1024 * 1024) + 100000000
1440             self.assertEqual(approximate_size(value), "1.1 GiB")
1441
1442         def test_flow_line_split(self):
1443             """ Splitting a flow line is not trivial.
1444             There is no clear delimiter. Comma is used liberally."""
1445             expected_fields = ["in_port(4)",
1446                             "eth(src=00:50:56:b4:4e:f8,dst=33:33:00:01:00:03)",
1447                             "eth_type(0x86dd)",
1448                            "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"
1449                            "label=0,proto=17,tclass=0,hlimit=1,frag=no)",
1450                            "udp(src=61252,dst=5355)"]
1451             expected_stats = ["packets:2", "bytes:92", "used:0.703s"]
1452             expected_actions = "actions:3,8,11,14,17,20,23,26,29,32,35," \
1453                                "38,41,44,47,50,53,56,59,62,65"
1454
1455             line = "in_port(4),eth(src=00:50:56:b4:4e:f8,"\
1456                    "dst=33:33:00:01:00:03),eth_type(0x86dd),"\
1457                    "ipv6(src=fe80::55bf:fe42:bc96:2812,dst=ff02::1:3,"\
1458                    "label=0,proto=17,tclass=0,hlimit=1,frag=no),"\
1459                    "udp(src=61252,dst=5355), packets:2, bytes:92, "\
1460                    "used:0.703s, actions:3,8,11,14,17,20,23,26,29,32,35,"\
1461                    "38,41,44,47,50,53,56,59,62,65"
1462
1463             (fields, stats, actions) = flow_line_split(line)
1464
1465             self.assertEqual(fields, expected_fields)
1466             self.assertEqual(stats, expected_stats)
1467             self.assertEqual(actions, expected_actions)
1468
1469         def test_accumulate_decay(self):
1470             """ test_accumulate_decay: test accumulated decay. """
1471             lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1472                      "dst=ff:ff:ff:ff:ff:ff),"
1473                      "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1474                      "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1475                      "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1476                      "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1477                      "packets:1, bytes:120, used:0.004s, actions:1"]
1478
1479             flow_db = FlowDB(True)
1480             flow_db.begin()
1481             flow_db.flow_line_add(lines[0])
1482
1483             # Make sure we decay
1484             time.sleep(4)
1485             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1486             flow_db.decay(1)
1487             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1488
1489             flow_db.flow_line_add(lines[0])
1490             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1491             flow_db.decay(30)
1492             # Should not be deleted.
1493             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1494
1495             flow_db.flow_line_add(lines[0])
1496             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 1)
1497             timer = decay_timer_start(flow_db, 2)
1498             time.sleep(10)
1499             self.assertEqual(flow_db.flow_stats_get()["flow_total"], 0)
1500             timer.stop()
1501
1502         def test_accumulate(self):
1503             """ test_accumulate test that FlowDB supports accumulate. """
1504
1505             lines = ["in_port(1),eth(src=00:50:56:4f:dc:3b,"
1506                      "dst=ff:ff:ff:ff:ff:ff),"
1507                      "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1508                      "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1509                      "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1510                      "tha=00:00:00:00:00:00/00:00:00:00:00:00), "
1511                      "packets:1, bytes:120, used:0.004s, actions:1",
1512                      "in_port(2),"
1513                      "eth(src=68:ef:bd:25:ef:c0,dst=33:33:00:00:00:66),"
1514                      "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1515                      "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1516                      "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029), "
1517                      "packets:2, bytes:5026, used:0.348s, actions:1",
1518                      "in_port(1),eth(src=ee:ee:ee:ee:ee:ee,"
1519                      "dst=ff:ff:ff:ff:ff:ff),"
1520                      "eth_type(0x0806),arp(sip=10.24.105.107/255.255.255.255,"
1521                      "tip=10.24.104.230/255.255.255.255,op=1/0xff,"
1522                      "sha=00:50:56:4f:dc:3b/00:00:00:00:00:00,"
1523                      "tha=00:00:00:00:00:00/00:00:00:00:00:00), packets:2, "
1524                      "bytes:240, used:0.004s, actions:1"]
1525
1526             lines = [
1527                 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1528                 "in_port(2),eth_type(0x0806), packets:2, bytes:126, actions:1",
1529                 "in_port(1),eth_type(0x0806), packets:2, bytes:240, actions:1",
1530                 "in_port(1),eth_type(0x0800), packets:1, bytes:120, actions:1",
1531                 "in_port(1),eth_type(0x0800), packets:2, bytes:240, actions:1",
1532                 "in_port(1),eth_type(0x0806), packets:1, bytes:120, actions:1",
1533                 ]
1534
1535             # Turn on accumulate.
1536             flow_db = FlowDB(True)
1537             flow_db.begin()
1538
1539             flow_db.flow_line_add(lines[0])
1540
1541             # Test one flow exist.
1542             sum_values = flow_db.field_values_in_order("all", 1)
1543             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1544             self.assertEqual(len(in_ports), 1)
1545             self.assertEqual(in_ports[0].packets, 1)
1546             self.assertEqual(in_ports[0].bytes, 120)
1547             self.assertEqual(in_ports[0].count, 1)
1548
1549             # simulate another sample
1550             # Test two different flows exist.
1551             flow_db.begin()
1552             flow_db.flow_line_add(lines[1])
1553             sum_values = flow_db.field_values_in_order("all", 1)
1554             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1555             self.assertEqual(len(in_ports), 1)
1556             self.assertEqual(in_ports[0].packets, 1)
1557             self.assertEqual(in_ports[0].bytes, 120)
1558             self.assertEqual(in_ports[0].count, 1)
1559
1560             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1561             self.assertEqual(len(in_ports), 1)
1562             self.assertEqual(in_ports[0].packets, 2)
1563             self.assertEqual(in_ports[0].bytes, 126)
1564             self.assertEqual(in_ports[0].count, 1)
1565
1566             # Test first flow increments packets.
1567             flow_db.begin()
1568             flow_db.flow_line_add(lines[2])
1569             sum_values = flow_db.field_values_in_order("all", 1)
1570             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1571             self.assertEqual(len(in_ports), 1)
1572             self.assertEqual(in_ports[0].packets, 2)
1573             self.assertEqual(in_ports[0].bytes, 240)
1574             self.assertEqual(in_ports[0].count, 1)
1575
1576             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1577             self.assertEqual(len(in_ports), 1)
1578             self.assertEqual(in_ports[0].packets, 2)
1579             self.assertEqual(in_ports[0].bytes, 126)
1580             self.assertEqual(in_ports[0].count, 1)
1581
1582             # Test third flow but with the same in_port(1) as the first flow.
1583             flow_db.begin()
1584             flow_db.flow_line_add(lines[3])
1585             sum_values = flow_db.field_values_in_order("all", 1)
1586             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1587             self.assertEqual(len(in_ports), 1)
1588             self.assertEqual(in_ports[0].packets, 3)
1589             self.assertEqual(in_ports[0].bytes, 360)
1590             self.assertEqual(in_ports[0].count, 2)
1591
1592             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1593             self.assertEqual(len(in_ports), 1)
1594             self.assertEqual(in_ports[0].packets, 2)
1595             self.assertEqual(in_ports[0].bytes, 126)
1596             self.assertEqual(in_ports[0].count, 1)
1597
1598             # Third flow has changes.
1599             flow_db.begin()
1600             flow_db.flow_line_add(lines[4])
1601             sum_values = flow_db.field_values_in_order("all", 1)
1602             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1603             self.assertEqual(len(in_ports), 1)
1604             self.assertEqual(in_ports[0].packets, 4)
1605             self.assertEqual(in_ports[0].bytes, 480)
1606             self.assertEqual(in_ports[0].count, 2)
1607
1608             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1609             self.assertEqual(len(in_ports), 1)
1610             self.assertEqual(in_ports[0].packets, 2)
1611             self.assertEqual(in_ports[0].bytes, 126)
1612             self.assertEqual(in_ports[0].count, 1)
1613
1614             # First flow reset.
1615             flow_db.begin()
1616             flow_db.flow_line_add(lines[5])
1617             sum_values = flow_db.field_values_in_order("all", 1)
1618             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1619             self.assertEqual(len(in_ports), 1)
1620             self.assertEqual(in_ports[0].packets, 3)
1621             self.assertEqual(in_ports[0].bytes, 360)
1622             self.assertEqual(in_ports[0].count, 2)
1623
1624             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(2)")]
1625             self.assertEqual(len(in_ports), 1)
1626             self.assertEqual(in_ports[0].packets, 2)
1627             self.assertEqual(in_ports[0].bytes, 126)
1628             self.assertEqual(in_ports[0].count, 1)
1629
1630         def test_parse_character_errors(self):
1631             """ test_parsing errors.
1632             The flow parses is purposely loose. Its not designed to validate
1633             input. Merely pull out what it can but there are situations
1634             that a parse error can be detected.
1635             """
1636
1637             lines = ["complete garbage",
1638                      "in_port(2),eth(src=68:ef:bd:25:ef:c0,"
1639                      "dst=33:33:00:00:00:66),"
1640                      "eth_type(0x86dd),ipv6(src=fe80::6aef:bdff:fe25:efc0/::,"
1641                      "dst=ff02::66/::,label=0/0,proto=17/0xff,tclass=0xe0/0,"
1642                      "hlimit=255/0,frag=no/0),udp(src=2029,dst=2029),"
1643                      "packets:2,bytes:5026,actions:1"]
1644
1645             flow_db = FlowDB(False)
1646             flow_db.begin()
1647             for line in lines:
1648                 try:
1649                     flow_db.flow_line_add(line)
1650                 except ValueError:
1651                     # We want an exception. That is how we know we have
1652                     # correctly found a simple parsing error. We are not
1653                     # looking to validate flow output just catch simple issues.
1654                     continue
1655                 self.assertTrue(False)
1656
1657         def test_tunnel_parsing(self):
1658             """ test_tunnel_parsing test parse flows with tunnel. """
1659             lines = [
1660                 "tunnel(tun_id=0x0,src=192.168.1.1,dst=192.168.1.10,"
1661                 "tos=0x0,ttl=64,flags(key)),in_port(1),"
1662                 "eth(src=9e:40:f5:ef:ec:ee,dst=01:23:20:00:00:30),"
1663                 "eth_type(0x8902), packets:6, bytes:534, used:0.128s, "
1664                 "actions:userspace(pid=4294962691,slow_path(cfm))"
1665                 ]
1666             flow_db = FlowDB(False)
1667             flow_db.begin()
1668             flow_db.flow_line_add(lines[0])
1669             sum_values = flow_db.field_values_in_order("all", 1)
1670             in_ports = [ii for ii in sum_values if (repr(ii) == "in_port(1)")]
1671             self.assertEqual(len(in_ports), 1)
1672             self.assertEqual(in_ports[0].packets, 6)
1673             self.assertEqual(in_ports[0].bytes, 534)
1674             self.assertEqual(in_ports[0].count, 1)
1675
1676         def test_flow_multiple_paren(self):
1677             """ test_flow_multiple_paren. """
1678             line = "tunnel(tun_id=0x0,src=192.168.1.1,flags(key)),in_port(2)"
1679             valid = ["tunnel(tun_id=0x0,src=192.168.1.1,flags(key))",
1680                      "in_port(2)"]
1681             rc = flow_line_iter(line)
1682             self.assertEqual(valid, rc)
1683
1684         def test_to_network(self):
1685             """ test_to_network test ipv4_to_network and ipv6_to_network. """
1686             ipv4s = [
1687                 ("192.168.0.1", "192.168.0.1"),
1688                 ("192.168.0.1/255.255.255.255", "192.168.0.1"),
1689                 ("192.168.0.1/255.255.255.0", "192.168.0.0"),
1690                 ("192.168.0.1/255.255.0.0", "192.168.0.0"),
1691                 ("192.168.0.1/255.0.0.0", "192.0.0.0"),
1692                 ("192.168.0.1/0.0.0.0", "0.0.0.0"),
1693                 ("10.24.106.230/255.255.255.255", "10.24.106.230"),
1694                 ("10.24.106.230/255.255.255.0", "10.24.106.0"),
1695                 ("10.24.106.0/255.255.255.0", "10.24.106.0"),
1696                 ("10.24.106.0/255.255.252.0", "10.24.104.0")
1697                 ]
1698
1699             ipv6s = [
1700                 ("1::192:168:0:1", "1::192:168:0:1"),
1701                 ("1::192:168:0:1/1::ffff:ffff:ffff:ffff", "1::192:168:0:1"),
1702                 ("1::192:168:0:1/1::ffff:ffff:ffff:0", "1::192:168:0:0"),
1703                 ("1::192:168:0:1/1::ffff:ffff:0:0", "1::192:168:0:0"),
1704                 ("1::192:168:0:1/1::ffff:0:0:0", "1::192:0:0:0"),
1705                 ("1::192:168:0:1/1::0:0:0:0", "1::"),
1706                 ("1::192:168:0:1/::", "::")
1707                 ]
1708
1709             for (ipv4_test, ipv4_check) in ipv4s:
1710                 self.assertEqual(ipv4_to_network(ipv4_test), ipv4_check)
1711
1712             for (ipv6_test, ipv6_check) in ipv6s:
1713                 self.assertEqual(ipv6_to_network(ipv6_test), ipv6_check)
1714
1715         def test_ui(self):
1716             """ test_ui: test expected ui behavior. """
1717             #pylint: disable=W0212
1718             top_render = Render(80, Render.FIELD_SELECT_TOP)
1719             script_render = Render(80, Render.FIELD_SELECT_SCRIPT)
1720             self.assertEqual(top_render._field_type_select_get(), "in_port")
1721             self.assertEqual(script_render._field_type_select_get(), "all")
1722             #pylint: enable=W0212