# D*** -- warnings from flake8-docstrings plugin
# H*** -- warnings from flake8 hacking plugin (custom style checks beyond PEP8)
# H231 Python 3.x incompatible 'except x,y:' construct
+# H233 Python 3.x incompatible use of print operator
flake8-check: $(FLAKE8_PYFILES)
- $(AM_V_GEN) if flake8 $^ --select=H231 --ignore=E121,E123,E125,E126,E127,E128,E129,E131,W503,F811,D,H ${FLAKE8_FLAGS}; then touch $@; else exit 1; fi
+ $(AM_V_GEN) if flake8 $^ --select=H231,H233 --ignore=E121,E123,E125,E126,E127,E128,E129,E131,W503,F811,D,H ${FLAKE8_FLAGS}; then touch $@; else exit 1; fi
endif
include $(srcdir)/manpages.mk
# notice and this notice are preserved. This file is offered as-is,
# without warranty of any kind.
+from __future__ import print_function
+
import getopt
import re
import sys
self.current_record = dict()
def startDocument(self):
- print """\
+ print("""\
/* IPFIX entities. */
#ifndef IPFIX_ENTITY
#define IPFIX_ENTITY(ENUM, ID, SIZE, NAME)
#endif
-"""
+""")
def endDocument(self):
- print """
-#undef IPFIX_ENTITY"""
+ print("""
+#undef IPFIX_ENTITY""")
def startElement(self, name, attrs):
if name in self.RECORD_FIELDS:
self.current_record['dataTypeSize'] = self.DATA_TYPE_SIZE.get(
self.current_record['dataType'], 0)
- print 'IPFIX_ENTITY(%(enumName)s, %(elementId)s, ' \
- '%(dataTypeSize)i, %(name)s)' % self.current_record
+ print('IPFIX_ENTITY(%(enumName)s, %(elementId)s, '
+ '%(dataTypeSize)i, %(name)s)' % self.current_record)
self.current_record.clear()
def characters(self, content):
def usage(name):
- print """\
+ print("""\
%(name)s: IPFIX entity definition generator
Prints C macros defining IPFIX entities from the standard IANA file at
<http://www.iana.org/assignments/ipfix/ipfix.xml>
The following options are also available:
-h, --help display this help message
-V, --version display version information\
-""" % {'name': name}
+""" % {'name': name})
sys.exit(0)
if __name__ == '__main__':
if key in ['-h', '--help']:
usage()
elif key in ['-V', '--version']:
- print 'ipfix-gen-entities (Open vSwitch)'
+ print('ipfix-gen-entities (Open vSwitch)')
else:
sys.exit(0)
rpcserver is an XML RPC server that allows RPC client to initiate tests
"""
+from __future__ import print_function
+
import exceptions
import sys
import xmlrpclib
rpc_server = TestArena()
reactor.listenTCP(port, server.Site(rpc_server))
try:
- print "Starting RPC server\n"
+ print("Starting RPC server\n")
sys.stdout.flush()
# If this server was started from ovs-test client then we must flush
# STDOUT so that client would know that server is ready to accept
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import math
import time
udpformat = '{0:>15} {1:>15} {2:>15} {3:>15} {4:>15}'
- print ("UDP test from %s:%u to %s:%u with target bandwidth %s" %
+ print("UDP test from %s:%u to %s:%u with target bandwidth %s" %
(sender[0], sender[1], receiver[0], receiver[1],
util.bandwidth_to_string(tbwidth)))
- print udpformat.format("Datagram Size", "Snt Datagrams", "Rcv Datagrams",
- "Datagram Loss", "Bandwidth")
+ print(udpformat.format("Datagram Size", "Snt Datagrams", "Rcv Datagrams",
+ "Datagram Loss", "Bandwidth"))
for size in port_sizes:
listen_handle = NO_HANDLE
listen_handle = server1.create_udp_listener(receiver[3])
if listen_handle == NO_HANDLE:
- print ("Server could not open UDP listening socket on port"
- " %u. Try to restart the server.\n" % receiver[3])
+ print("Server could not open UDP listening socket on port"
+ " %u. Try to restart the server.\n" % receiver[3])
return
send_handle = server2.create_udp_sender(
(util.ip_from_cidr(receiver[2]),
snt_packets) / 100
bwidth = (rcv_packets * size) / duration
- print udpformat.format(size, snt_packets, rcv_packets,
- '%.2f%%' % loss, util.bandwidth_to_string(bwidth))
+ print(udpformat.format(size, snt_packets, rcv_packets,
+ '%.2f%%' % loss, util.bandwidth_to_string(bwidth)))
finally:
if listen_handle != NO_HANDLE:
server1.close_udp_listener(listen_handle)
if send_handle != NO_HANDLE:
server2.close_udp_sender(send_handle)
- print "\n"
+ print("\n")
def do_tcp_tests(receiver, sender, duration):
server2 = util.rpc_client(sender[0], sender[1])
tcpformat = '{0:>15} {1:>15} {2:>15}'
- print "TCP test from %s:%u to %s:%u (full speed)" % (sender[0], sender[1],
- receiver[0], receiver[1])
- print tcpformat.format("Snt Bytes", "Rcv Bytes", "Bandwidth")
+ print("TCP test from %s:%u to %s:%u (full speed)" % (sender[0], sender[1],
+ receiver[0], receiver[1]))
+ print(tcpformat.format("Snt Bytes", "Rcv Bytes", "Bandwidth"))
listen_handle = NO_HANDLE
send_handle = NO_HANDLE
try:
listen_handle = server1.create_tcp_listener(receiver[3])
if listen_handle == NO_HANDLE:
- print ("Server was unable to open TCP listening socket on port"
- " %u. Try to restart the server.\n" % receiver[3])
+ print("Server was unable to open TCP listening socket on port"
+ " %u. Try to restart the server.\n" % receiver[3])
return
send_handle = server2.create_tcp_sender(util.ip_from_cidr(receiver[2]),
receiver[3], duration)
bwidth = rcv_bytes / duration
- print tcpformat.format(snt_bytes, rcv_bytes,
- util.bandwidth_to_string(bwidth))
+ print(tcpformat.format(snt_bytes, rcv_bytes,
+ util.bandwidth_to_string(bwidth)))
finally:
if listen_handle != NO_HANDLE:
server1.close_tcp_listener(listen_handle)
if send_handle != NO_HANDLE:
server2.close_tcp_sender(send_handle)
- print "\n"
+ print("\n")
def do_l3_tests(node1, node2, bandwidth, duration, ps, type):
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import codecs
import getopt
import sys
def print_json(json):
if type(json) in [str, unicode]:
- print "error: %s" % json
+ print("error: %s" % json)
return False
else:
ovs.json.to_stream(json, sys.stdout)
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import argparse
import errno
import os
sys.stderr.write("error waiting for reply: %s\n" % os.strerror(error))
sys.exit(1)
- print ovs.json.to_string(msg.to_json())
+ print(ovs.json.to_string(msg.to_json()))
rpc.close()
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import getopt
import re
import os
def do_parse_atomic_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
atomic_type = ovs.db.types.AtomicType.from_json(type_json)
- print ovs.json.to_string(atomic_type.to_json(), sort_keys=True)
+ print(ovs.json.to_string(atomic_type.to_json(), sort_keys=True))
def do_parse_base_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
base_type = ovs.db.types.BaseType.from_json(type_json)
- print ovs.json.to_string(base_type.to_json(), sort_keys=True)
+ print(ovs.json.to_string(base_type.to_json(), sort_keys=True))
def do_parse_type(type_string):
type_json = unbox_json(ovs.json.from_string(type_string))
type_ = ovs.db.types.Type.from_json(type_json)
- print ovs.json.to_string(type_.to_json(), sort_keys=True)
+ print(ovs.json.to_string(type_.to_json(), sort_keys=True))
def do_parse_atoms(type_string, *atom_strings):
atom_json = unbox_json(ovs.json.from_string(atom_string))
try:
atom = data.Atom.from_json(base, atom_json)
- print ovs.json.to_string(atom.to_json())
+ print(ovs.json.to_string(atom.to_json()))
except error.Error as e:
- print e.args[0].encode("utf8")
+ print(e.args[0].encode("utf8"))
def do_parse_data(type_string, *data_strings):
for datum_string in data_strings:
datum_json = unbox_json(ovs.json.from_string(datum_string))
datum = data.Datum.from_json(type_, datum_json)
- print ovs.json.to_string(datum.to_json())
+ print(ovs.json.to_string(datum.to_json()))
def do_sort_atoms(type_string, atom_strings):
base = ovs.db.types.BaseType.from_json(type_json)
atoms = [data.Atom.from_json(base, atom_json)
for atom_json in unbox_json(ovs.json.from_string(atom_strings))]
- print ovs.json.to_string([data.Atom.to_json(atom)
- for atom in sorted(atoms)])
+ print(ovs.json.to_string([data.Atom.to_json(atom)
+ for atom in sorted(atoms)]))
def do_parse_column(name, column_string):
column_json = unbox_json(ovs.json.from_string(column_string))
column = ovs.db.schema.ColumnSchema.from_json(column_json, name)
- print ovs.json.to_string(column.to_json(), sort_keys=True)
+ print(ovs.json.to_string(column.to_json(), sort_keys=True))
def do_parse_table(name, table_string, default_is_root_string='false'):
default_is_root = default_is_root_string == 'true'
table_json = unbox_json(ovs.json.from_string(table_string))
table = ovs.db.schema.TableSchema.from_json(table_json, name)
- print ovs.json.to_string(table.to_json(default_is_root), sort_keys=True)
+ print(ovs.json.to_string(table.to_json(default_is_root), sort_keys=True))
def do_parse_schema(schema_string):
schema_json = unbox_json(ovs.json.from_string(schema_string))
schema = ovs.db.schema.DbSchema.from_json(schema_json)
- print ovs.json.to_string(schema.to_json(), sort_keys=True)
+ print(ovs.json.to_string(schema.to_json(), sort_keys=True))
def print_idl(idl, step):
txn.abort()
break
elif name == "destroy":
- print "%03d: destroy" % step
+ print("%03d: destroy" % step)
sys.stdout.flush()
txn.abort()
return
def usage():
- print """\
+ print("""\
%(program_name)s: test utility for Open vSwitch database Python bindings
usage: %(program_name)s [OPTIONS] COMMAND ARG...
The following options are also available:
-t, --timeout=SECS give up after SECS seconds
-h, --help display this help message\
-""" % {'program_name': ovs.util.PROGRAM_NAME}
+""" % {'program_name': ovs.util.PROGRAM_NAME})
sys.exit(0)
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import errno
import sys
if action is None:
pass
elif action == ovs.reconnect.CONNECT:
- print " should connect"
+ print(" should connect")
elif action == ovs.reconnect.DISCONNECT:
- print " should disconnect"
+ print(" should disconnect")
elif action == ovs.reconnect.PROBE:
- print " should send probe"
+ print(" should send probe")
else:
assert False
global now
timeout = r.timeout(now)
if timeout >= 0:
- print " advance %d ms" % timeout
+ print(" advance %d ms" % timeout)
now += timeout
else:
- print " no timeout"
+ print(" no timeout")
def do_set_max_tries(arg):
r = ovs.reconnect.Reconnect(now)
r.set_name("remote")
prev = r.get_stats(now)
- print "### t=%d ###" % now
+ print("### t=%d ###" % now)
old_time = now
old_max_tries = r.get_max_tries()
while True:
if line == "":
break
- print line[:-1]
+ print(line[:-1])
if line[0] == "#":
continue
commands[command](op)
if old_time != now:
- print
- print "### t=%d ###" % now
+ print()
+ print("### t=%d ###" % now)
cur = r.get_stats(now)
diff_stats(prev, cur, now - old_time)
prev = cur
if r.get_max_tries() != old_max_tries:
old_max_tries = r.get_max_tries()
- print " %d tries left" % old_max_tries
+ print(" %d tries left" % old_max_tries)
old_time = now
# See the License for the specific language governing permissions and
# limitations under the License.
+from __future__ import print_function
+
import binascii
import getopt
import struct
def usage():
- print """\
+ print("""\
%(argv0)s: print pcap file packet data as hex
usage: %(argv0)s FILE
where FILE is a PCAP file.
The following options are also available:
-h, --help display this help message
-V, --version display version information\
-""" % {'argv0': argv0}
+""" % {'argv0': argv0})
sys.exit(0)
if __name__ == "__main__":
if key in ['-h', '--help']:
usage()
elif key in ['-V', '--version']:
- print "ovs-pcap (Open vSwitch) @VERSION@"
+ print("ovs-pcap (Open vSwitch) @VERSION@")
else:
sys.exit(0)
if packet is None:
break
- print binascii.hexlify(packet)
+ print(binascii.hexlify(packet))
except PcapException as e:
sys.stderr.write("%s: %s\n" % (argv0, e))