that this function accepts."""
is_map = type_.is_map()
if (is_map or
- (type(json) == list and len(json) > 0 and json[0] == "set")):
+ (isinstance(json, list) and len(json) > 0 and json[0] == "set")):
if is_map:
class_ = "map"
else:
Raises ovs.db.error.Error if 'value' is not in an appropriate form for
'type_'."""
d = {}
- if type(value) == dict:
+ if isinstance(value, dict):
for k, v in six.iteritems(value):
ka = Atom.from_python(type_.key, row_to_uuid(k))
va = Atom.from_python(type_.value, row_to_uuid(v))
d[ka] = va
- elif type(value) in (list, tuple):
+ elif isinstance(value, (list, tuple)):
for k in value:
ka = Atom.from_python(type_.key, row_to_uuid(k))
d[ka] = None
def __parse_lock_reply(self, result):
self._lock_request_id = None
- got_lock = type(result) == dict and result.get("locked") is True
+ got_lock = isinstance(result, dict) and result.get("locked") is True
self.__update_has_lock(got_lock)
if not got_lock:
self.is_lock_contended = True
def __parse_lock_notify(self, params, new_has_lock):
if (self.lock_name is not None
- and type(params) in (list, tuple)
+ and isinstance(params, (list, tuple))
and params
and params[0] == self.lock_name):
self.__update_has_lock(new_has_lock)
% (self._session.get_name(), e))
def __do_parse_update(self, table_updates):
- if type(table_updates) != dict:
+ if not isinstance(table_updates, dict):
raise error.Error("<table-updates> is not an object",
table_updates)
raise error.Error('<table-updates> includes unknown '
'table "%s"' % table_name)
- if type(table_update) != dict:
+ if not isinstance(table_update, dict):
raise error.Error('<table-update> for table "%s" is not '
'an object' % table_name, table_update)
table_update)
uuid = ovs.ovsuuid.from_string(uuid_string)
- if type(row_update) != dict:
+ if not isinstance(row_update, dict):
raise error.Error('<table-update> for table "%s" '
'contains <row-update> for %s that '
'is not an object'
def _row_to_uuid(value):
- if type(value) == Row:
+ if isinstance(value, Row):
return value.uuid
else:
return value
poller.immediate_wake()
def _substitute_uuids(self, json):
- if type(json) in (list, tuple):
+ if isinstance(json, (list, tuple)):
if (len(json) == 2
and json[0] == 'uuid'
and ovs.ovsuuid.is_valid_string(json[1])):
def _process_reply(self, msg):
if msg.type == ovs.jsonrpc.Message.T_ERROR:
self._status = Transaction.ERROR
- elif type(msg.result) not in (list, tuple):
+ elif not isinstance(msg.result, (list, tuple)):
# XXX rate-limit
vlog.warn('reply to "transact" is not JSON array')
else:
# prior operation failed, so make sure that we know about
# it.
soft_errors = True
- elif type(op) == dict:
+ elif isinstance(op, dict):
error = op.get("error")
if error is not None:
if error == "timed out":
# XXX rate-limit
vlog.warn("%s is missing" % name)
return False
- elif type(json) not in types:
+ elif not isinstance(json, tuple(types)):
# XXX rate-limit
vlog.warn("%s has unexpected type %s" % (name, type(json)))
return False
'readonly' must be a list of strings.
"""
- assert type(table) is str
- assert type(columns) is list
+ assert isinstance(table, six.string_types)
+ assert isinstance(columns, list)
columns = set(columns) | self._tables.get(table, set())
self._tables[table] = columns
'table' must be a string
"""
- assert type(table) is str
+ assert isinstance(table, six.string_types)
self._tables[table] = set() # empty set means all columns in the table
def register_all(self):
new_columns = {}
for column_name in columns:
- assert type(column_name) is str
+ assert isinstance(column_name, six.string_types)
assert column_name in table.columns
new_columns[column_name] = table.columns[column_name]