schema = schema.get_idl_schema()
self.tables = schema.tables
+ self.readonly = schema.readonly
self._db = schema
self._session = ovs.jsonrpc.Session.open(remote)
self._monitor_request_id = None
def __send_monitor_request(self):
monitor_requests = {}
for table in self.tables.itervalues():
- monitor_requests[table.name] = {"columns": table.columns.keys()}
+ columns = []
+ for column in table.columns.keys():
+ if ((table.name not in self.readonly) or
+ (table.name in self.readonly) and
+ (column not in self.readonly[table.name])):
+ columns.append(column)
+ monitor_requests[table.name] = {"columns": columns}
msg = ovs.jsonrpc.Message.create_request(
"monitor", [self._db.name, None, monitor_requests])
self._monitor_request_id = msg.id
if self._data is None:
raise AttributeError("%s instance has no attribute '%s'" %
(self.__class__.__name__, column_name))
- datum = self._data[column_name]
+ if column_name in self._data:
+ datum = self._data[column_name]
+ else:
+ raise AttributeError("%s instance has no attribute '%s'" %
+ (self.__class__.__name__, column_name))
return datum.to_python(_uuid_to_row)
assert self._changes is not None
assert self._idl.txn
+ if ((self._table.name in self._idl.readonly) and
+ (column_name in self._idl.readonly[self._table.name])):
+ vlog.warn("attempting to write to readonly column %s" % column_name)
+ return
+
column = self._table.columns[column_name]
try:
datum = ovs.db.data.Datum.from_python(column.type, value,
self.__dict__["_changes"] = None
del self._table.rows[self.uuid]
+ def fetch(self, column_name):
+ self._idl.txn._fetch(self, column_name)
+
def increment(self, column_name):
"""Causes the transaction, when committed, to increment the value of
'column_name' within this row by 1. 'column_name' must have an integer
self._inc_row = None
self._inc_column = None
+ self._fetch_requests = []
+
self._inserted_rows = {} # Map from UUID to _InsertedRow
def add_comment(self, comment):
- """Appens 'comment' to the comments that will be passed to the OVSDB
+ """Appends 'comment' to the comments that will be passed to the OVSDB
server when this transaction is committed. (The comment will be
committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
relatively human-readable form.)"""
if row._data is None or row_json:
operations.append(op)
+ if self._fetch_requests:
+ for fetch in self._fetch_requests:
+ fetch["index"] = len(operations) - 1
+ operations.append({"op": "select",
+ "table": fetch["row"]._table.name,
+ "where": self._substitute_uuids(
+ _where_uuid_equals(fetch["row"].uuid)),
+ "columns": [fetch["column_name"]]})
+ any_updates = True
+
# Add increment.
if self._inc_row and any_updates:
self._inc_index = len(operations) - 1
self._inc_row = row
self._inc_column = column
+ def _fetch(self, row, column_name):
+ self._fetch_requests.append({"row":row, "column_name":column_name})
+
def _write(self, row, column, datum):
assert row._changes is not None
if not soft_errors and not hard_errors and not lock_errors:
if self._inc_row and not self.__process_inc_reply(ops):
hard_errors = True
+ if self._fetch_requests:
+ if self.__process_fetch_reply(ops):
+ self.idl.change_seqno += 1
+ else:
+ hard_errors = True
for insert in self._inserted_rows.itervalues():
if not self.__process_insert_reply(insert, ops):
else:
return True
+ def __process_fetch_reply(self, ops):
+ update = False
+ for fetch_request in self._fetch_requests:
+ row = fetch_request["row"]
+ column_name = fetch_request["column_name"]
+ index = fetch_request["index"]
+ table = row._table
+
+ select = ops[index]
+ fetched_rows = select.get("rows")
+ if not Transaction.__check_json_type(fetched_rows, (list, tuple),
+ '"select" reply "rows"'):
+ return False
+ if len(fetched_rows) != 1:
+ # XXX rate-limit
+ vlog.warn('"select" reply "rows" has %d elements '
+ 'instead of 1' % len(rows))
+ continue
+ fetched_row = fetched_rows[0]
+ if not Transaction.__check_json_type(fetched_row, (dict,),
+ '"select" reply row'):
+ continue
+
+ column = table.columns.get(column_name)
+ datum_json = fetched_row.get(column_name)
+ datum = ovs.db.data.Datum.from_json(column.type, datum_json)
+
+ row._data[column_name] = datum
+ update = True
+
+ return update
+
def __process_inc_reply(self, ops):
if self._inc_index + 2 > len(ops):
# XXX rate-limit
self.schema_json = schema_json
self._tables = {}
+ self._readonly = {}
self._all = False
- def register_columns(self, table, columns):
+ def register_columns(self, table, columns, readonly=[]):
"""Registers interest in the given 'columns' of 'table'. Future calls
to get_idl_schema() will include 'table':column for each column in
'columns'. This function automatically avoids adding duplicate entries
to the schema.
+ A subset of 'columns' can be specified as 'readonly'. The readonly
+ columns are not replicated but can be fetched on-demand by the user
+ with Row.fetch().
'table' must be a string.
'columns' must be a list of strings.
+ 'readonly' must be a list of strings.
"""
assert type(table) is str
columns = set(columns) | self._tables.get(table, set())
self._tables[table] = columns
+ self._readonly[table] = readonly
def register_table(self, table):
"""Registers interest in the given all columns of 'table'. Future calls
self._keep_table_columns(schema, table, columns))
schema.tables = schema_tables
+ schema.readonly = self._readonly
return schema
def _keep_table_columns(self, schema, table_name, columns):
])
OVSDB_SERVER_SHUTDOWN
AT_CLEANUP
+
+m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS_PY],
+ [AT_SETUP([$1 - Python fetch])
+ AT_SKIP_IF([test $HAVE_PYTHON = no])
+ AT_KEYWORDS([ovsdb server idl positive Python increment fetch $6])
+ AT_CHECK([ovsdb-tool create db $abs_srcdir/idltest.ovsschema],
+ [0], [stdout], [ignore])
+ AT_CHECK([ovsdb-server '-vPATTERN:console:ovsdb-server|%c|%m' --detach --no-chdir --pidfile="`pwd`"/pid --remote=punix:socket --unixctl="`pwd`"/unixctl db], [0], [ignore], [ignore])
+ m4_if([$2], [], [],
+ [AT_CHECK([ovsdb-client transact unix:socket $2], [0], [ignore], [ignore], [kill `cat pid`])])
+ AT_CHECK([$PYTHON $srcdir/test-ovsdb.py -t10 idl $srcdir/idltest.ovsschema unix:socket [$3] $4],
+ [0], [stdout], [ignore], [kill `cat pid`])
+ AT_CHECK([sort stdout | ${PERL} $srcdir/uuidfilt.pl]m4_if([$7],,, [[| $7]]),
+ [0], [$5], [], [kill `cat pid`])
+ OVSDB_SERVER_SHUTDOWN
+ AT_CLEANUP])
+
+m4_define([OVSDB_CHECK_IDL_FETCH_COLUMNS],
+ [OVSDB_CHECK_IDL_FETCH_COLUMNS_PY($@)])
+
+OVSDB_CHECK_IDL_FETCH_COLUMNS([simple idl, initially populated],
+ [['["idltest",
+ {"op": "insert",
+ "table": "simple",
+ "row": {"i": 1,
+ "r": 2.0,
+ "b": true,
+ "s": "mystring",
+ "u": ["uuid", "84f5c8f5-ac76-4dbc-a24f-8860eb407fc1"],
+ "ia": ["set", [1, 2, 3]],
+ "ra": ["set", [-0.5]],
+ "ba": ["set", [true]],
+ "sa": ["set", ["abc", "def"]],
+ "ua": ["set", [["uuid", "69443985-7806-45e2-b35f-574a04e720f9"],
+ ["uuid", "aad11ef0-816a-4b01-93e6-03b8b4256b98"]]]}},
+ {"op": "insert",
+ "table": "simple",
+ "row": {}}]']],
+ [?simple:i,r!],
+ ['fetch 0 r'],
+ [[000: i=0 uuid=<0>
+000: i=1 uuid=<1>
+001: commit, status=success
+002: i=0 r=0 uuid=<0>
+002: i=1 uuid=<1>
+003: done
+]])
def print_idl(idl, step):
- simple = idl.tables["simple"].rows
- l1 = idl.tables["link1"].rows
- l2 = idl.tables["link2"].rows
-
n = 0
- for row in simple.itervalues():
- s = ("%03d: i=%s r=%s b=%s s=%s u=%s "
- "ia=%s ra=%s ba=%s sa=%s ua=%s uuid=%s"
- % (step, row.i, row.r, row.b, row.s, row.u,
- row.ia, row.ra, row.ba, row.sa, row.ua, row.uuid))
- s = re.sub('""|,|u?\'', "", s)
- s = re.sub('UUID\(([^)]+)\)', r'\1', s)
- s = re.sub('False', 'false', s)
- s = re.sub('True', 'true', s)
- s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
- print(s)
- n += 1
-
- for row in l1.itervalues():
- s = ["%03d: i=%s k=" % (step, row.i)]
- if row.k:
- s.append(str(row.k.i))
- s.append(" ka=[")
- s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
- s.append("] l2=")
- if row.l2:
- s.append(str(row.l2[0].i))
- s.append(" uuid=%s" % row.uuid)
- print(''.join(s))
- n += 1
-
- for row in l2.itervalues():
- s = ["%03d: i=%s l1=" % (step, row.i)]
- if row.l1:
- s.append(str(row.l1[0].i))
- s.append(" uuid=%s" % row.uuid)
- print(''.join(s))
- n += 1
+ if "simple" in idl.tables:
+ simple_columns = ["i", "r", "b", "s", "u", "ia",
+ "ra", "ba", "sa", "ua", "uuid"]
+ simple = idl.tables["simple"].rows
+ for row in simple.itervalues():
+ s = "%03d:" % step
+ for column in simple_columns:
+ if hasattr(row, column) and not (type(getattr(row, column))
+ is ovs.db.data.Atom):
+ s += " %s=%s" % (column, getattr(row, column))
+ s = re.sub('""|,|u?\'', "", s)
+ s = re.sub('UUID\(([^)]+)\)', r'\1', s)
+ s = re.sub('False', 'false', s)
+ s = re.sub('True', 'true', s)
+ s = re.sub(r'(ba)=([^[][^ ]*) ', r'\1=[\2] ', s)
+ print(s)
+ n += 1
+
+ if "link1" in idl.tables:
+ l1 = idl.tables["link1"].rows
+ for row in l1.itervalues():
+ s = ["%03d: i=%s k=" % (step, row.i)]
+ if hasattr(row, "k") and row.k:
+ s.append(str(row.k.i))
+ if hasattr(row, "ka"):
+ s.append(" ka=[")
+ s.append(' '.join(sorted(str(ka.i) for ka in row.ka)))
+ s.append("] l2=")
+ if hasattr(row, "l2") and row.l2:
+ s.append(str(row.l2[0].i))
+ if hasattr(row, "uuid"):
+ s.append(" uuid=%s" % row.uuid)
+ print(''.join(s))
+ n += 1
+
+ if "link2" in idl.tables:
+ l2 = idl.tables["link2"].rows
+ for row in l2.itervalues():
+ s = ["%03d:" % step]
+ s.append(" i=%s l1=" % row.i)
+ if hasattr(row, "l1") and row.l1:
+ s.append(str(row.l1[0].i))
+ if hasattr(row, "uuid"):
+ s.append(" uuid=%s" % row.uuid)
+ print(''.join(s))
+ n += 1
if not n:
print("%03d: empty" % step)
def idl_set(idl, commands, step):
txn = ovs.db.idl.Transaction(idl)
increment = False
+ fetch_cmds = []
events = []
for command in commands.split(','):
words = command.split()
sys.stderr.write('"verify" command asks for unknown column '
'"%s"\n' % args[1])
sys.exit(1)
+ elif name == "fetch":
+ if len(args) != 2:
+ sys.stderr.write('"fetch" command requires 2 argument\n')
+ sys.exit(1)
+
+ row = idltest_find_simple(idl, int(args[0]))
+ if not row:
+ sys.stderr.write('"fetch" command asks for nonexistent i=%d\n'
+ % int(args[0]))
+ sys.exit(1)
+
+ column = args[1]
+ row.fetch(column)
+ fetch_cmds.append([row, column])
elif name == "increment":
if len(args) != 1:
sys.stderr.write('"increment" command requires 1 argument\n')
schema_helper = ovs.db.idl.SchemaHelper(schema_file)
if commands and commands[0].startswith("?"):
monitor = {}
+ readonly = {}
for x in commands[0][1:].split("?"):
+ readonly = []
table, columns = x.split(":")
- monitor[table] = columns.split(",")
- schema_helper.register_columns(table, monitor[table])
+ columns = columns.split(",")
+ for index, column in enumerate(columns):
+ if column[-1] == '!':
+ columns[index] = columns[index][:-1]
+ readonly.append(columns[index])
+ schema_helper.register_columns(table, columns, readonly)
commands = commands[1:]
else:
schema_helper.register_all()
e.g.:
?simple:b?link1:i,k - Monitor column "b" in table "simple",
and column "i", "k" in table "link1"
+ Readonly columns: Suffixing a "!" after a column indicates that the
+ column is to be registered "readonly".
+ e.g.:
+ ?simple:i,b! - Register interest in column "i" (monitoring) and
+ column "b" (readonly).
+
The following options are also available:
-t, --timeout=SECS give up after SECS seconds