1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from ovs.db import error
28 vlog = ovs.vlog.Vlog("idl")
30 __pychecker__ = 'no-classattr no-objattrs'
41 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
43 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
44 requests to an OVSDB database server and parses the responses, converting
45 raw JSON into data structures that are easier for clients to digest.
47 The IDL also assists with issuing database transactions. The client
48 creates a transaction, manipulates the IDL data structures, and commits or
49 aborts the transaction. The IDL then composes and issues the necessary
50 JSON-RPC requests and reports to the client whether the transaction
51 completed successfully.
53 The client is allowed to access the following attributes directly, in a
56 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
57 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
58 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
61 The client may directly read and write the Row objects referenced by the
62 'rows' map values. Refer to Row for more details.
64 - 'change_seqno': A number that represents the IDL's state. When the IDL
65 is updated (by Idl.run()), its value changes. The sequence number can
66 occasionally change even if the database does not. This happens if the
67 connection to the database drops and reconnects, which causes the
68 database contents to be reloaded even if they didn't change. (It could
69 also happen if the database server sends out a "change" that reflects
70 what the IDL already thought was in the database. The database server is
71 not supposed to do that, but bugs could in theory cause it to do so.)
73 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
74 if no lock is configured.
76 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
77 lock, and False otherwise.
79 Locking and unlocking happens asynchronously from the database client's
80 point of view, so the information is only useful for optimization
81 (e.g. if the client doesn't have the lock then there's no point in trying
82 to write to the database).
84 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
85 the database server has indicated that some other client already owns the
86 requested lock, and False otherwise.
88 - 'txn': The ovs.db.idl.Transaction object for the database transaction
89 currently being constructed, if there is one, or None otherwise.
93 IDL_S_MONITOR_REQUESTED = 1
94 IDL_S_MONITOR_COND_REQUESTED = 2
96 def __init__(self, remote, schema):
97 """Creates and returns a connection to the database named 'db_name' on
98 'remote', which should be in a form acceptable to
99 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
100 replica of the remote database.
102 'schema' should be the schema for the remote database. The caller may
103 have cut it down by removing tables or columns that are not of
104 interest. The IDL will only replicate the tables and columns that
105 remain. The caller may also add a attribute named 'alert' to selected
106 remaining columns, setting its value to False; if so, then changes to
107 those columns will not be considered changes to the database for the
108 purpose of the return value of Idl.run() and Idl.change_seqno. This is
109 useful for columns that the IDL's client will write but not read.
111 As a convenience to users, 'schema' may also be an instance of the
114 The IDL uses and modifies 'schema' directly."""
116 assert isinstance(schema, SchemaHelper)
117 schema = schema.get_idl_schema()
119 self.tables = schema.tables
120 self.readonly = schema.readonly
122 self._session = ovs.jsonrpc.Session.open(remote)
123 self._monitor_request_id = None
124 self._last_seqno = None
125 self.change_seqno = 0
126 self.uuid = uuid.uuid1()
127 self.state = self.IDL_S_INITIAL
130 self.lock_name = None # Name of lock we need, None if none.
131 self.has_lock = False # Has db server said we have the lock?
132 self.is_lock_contended = False # Has db server said we can't get lock?
133 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
135 # Transaction support.
137 self._outstanding_txns = {}
139 for table in six.itervalues(schema.tables):
140 for column in six.itervalues(table.columns):
141 if not hasattr(column, 'alert'):
143 table.need_table = False
147 table.cond_changed = False
150 """Closes the connection to the database. The IDL will no longer
152 self._session.close()
155 """Processes a batch of messages from the database server. Returns
156 True if the database as seen through the IDL changed, False if it did
157 not change. The initial fetch of the entire contents of the remote
158 database is considered to be one kind of change. If the IDL has been
159 configured to acquire a database lock (with Idl.set_lock()), then
160 successfully acquiring the lock is also considered to be a change.
162 This function can return occasional false positives, that is, report
163 that the database changed even though it didn't. This happens if the
164 connection to the database drops and reconnects, which causes the
165 database contents to be reloaded even if they didn't change. (It could
166 also happen if the database server sends out a "change" that reflects
167 what we already thought was in the database, but the database server is
168 not supposed to do that.)
170 As an alternative to checking the return value, the client may check
171 for changes in self.change_seqno."""
173 initial_change_seqno = self.change_seqno
175 self.send_cond_change()
180 if not self._session.is_connected():
183 seqno = self._session.get_seqno()
184 if seqno != self._last_seqno:
185 self._last_seqno = seqno
186 self.__txn_abort_all()
187 self.__send_monitor_request()
189 self.__send_lock_request()
192 msg = self._session.recv()
195 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
196 and msg.method == "update2"
197 and len(msg.params) == 2):
198 # Database contents changed.
199 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
200 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
201 and msg.method == "update"
202 and len(msg.params) == 2):
203 # Database contents changed.
204 self.__parse_update(msg.params[1], OVSDB_UPDATE)
205 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
206 and self._monitor_request_id is not None
207 and self._monitor_request_id == msg.id):
208 # Reply to our "monitor" request.
210 self.change_seqno += 1
211 self._monitor_request_id = None
213 if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
214 self.__parse_update(msg.result, OVSDB_UPDATE2)
216 assert self.state == self.IDL_S_MONITOR_REQUESTED
217 self.__parse_update(msg.result, OVSDB_UPDATE)
219 except error.Error as e:
220 vlog.err("%s: parse error in received schema: %s"
221 % (self._session.get_name(), e))
223 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
224 and self._lock_request_id is not None
225 and self._lock_request_id == msg.id):
226 # Reply to our "lock" request.
227 self.__parse_lock_reply(msg.result)
228 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
229 and msg.method == "locked"):
231 self.__parse_lock_notify(msg.params, True)
232 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
233 and msg.method == "stolen"):
234 # Someone else stole our lock.
235 self.__parse_lock_notify(msg.params, False)
236 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
237 # Reply to our echo request. Ignore it.
239 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
240 self.state == self.IDL_S_MONITOR_COND_REQUESTED and
241 self._monitor_request_id == msg.id):
242 if msg.error == "unknown method":
243 self.__send_monitor_request()
244 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
245 ovs.jsonrpc.Message.T_REPLY)
246 and self.__txn_process_reply(msg)):
247 # __txn_process_reply() did everything needed.
250 # This can happen if a transaction is destroyed before we
251 # receive the reply, so keep the log level low.
252 vlog.dbg("%s: received unexpected %s message"
253 % (self._session.get_name(),
254 ovs.jsonrpc.Message.type_to_string(msg.type)))
256 return initial_change_seqno != self.change_seqno
258 def send_cond_change(self):
259 if not self._session.is_connected():
262 for table in six.itervalues(self.tables):
263 if table.cond_changed:
264 self.__send_cond_change(table, table.condition)
265 table.cond_changed = False
267 def cond_change(self, table_name, add_cmd, cond):
268 """Change conditions for this IDL session. If session is not already
269 connected, add condtion to table and submit it on send_monitor_request.
270 Otherwise send monitor_cond_change method with the requested
273 table = self.tables.get(table_name)
275 raise error.Error('Unknown table "%s"' % table_name)
278 table.condition += cond
281 table.condition.remove(c)
283 table.cond_changed = True
285 def wait(self, poller):
286 """Arranges for poller.block() to wake up when self.run() has something
287 to do or when activity occurs on a transaction on 'self'."""
288 self._session.wait(poller)
289 self._session.recv_wait(poller)
291 def has_ever_connected(self):
292 """Returns True, if the IDL successfully connected to the remote
293 database and retrieved its contents (even if the connection
294 subsequently dropped and is in the process of reconnecting). If so,
295 then the IDL contains an atomic snapshot of the database's contents
296 (but it might be arbitrarily old if the connection dropped).
298 Returns False if the IDL has never connected or retrieved the
299 database's contents. If so, the IDL is empty."""
300 return self.change_seqno != 0
302 def force_reconnect(self):
303 """Forces the IDL to drop its connection to the database and reconnect.
304 In the meantime, the contents of the IDL will not change."""
305 self._session.force_reconnect()
307 def set_lock(self, lock_name):
308 """If 'lock_name' is not None, configures the IDL to obtain the named
309 lock from the database server and to avoid modifying the database when
310 the lock cannot be acquired (that is, when another client has the same
313 If 'lock_name' is None, drops the locking requirement and releases the
316 assert not self._outstanding_txns
318 if self.lock_name and (not lock_name or lock_name != self.lock_name):
319 # Release previous lock.
320 self.__send_unlock_request()
321 self.lock_name = None
322 self.is_lock_contended = False
324 if lock_name and not self.lock_name:
326 self.lock_name = lock_name
327 self.__send_lock_request()
329 def notify(self, event, row, updates=None):
330 """Hook for implementing create/update/delete notifications
332 :param event: The event that was triggered
333 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
334 :param row: The row as it is after the operation has occured
336 :param updates: For updates, row with only updated columns
340 def __send_cond_change(self, table, cond):
341 monitor_cond_change = {table.name: [{"where": cond}]}
342 old_uuid = str(self.uuid)
343 self.uuid = uuid.uuid1()
344 params = [old_uuid, str(self.uuid), monitor_cond_change]
345 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
346 self._session.send(msg)
351 for table in six.itervalues(self.tables):
357 self.change_seqno += 1
359 def __update_has_lock(self, new_has_lock):
360 if new_has_lock and not self.has_lock:
361 if self._monitor_request_id is None:
362 self.change_seqno += 1
364 # We're waiting for a monitor reply, so don't signal that the
365 # database changed. The monitor reply will increment
366 # change_seqno anyhow.
368 self.is_lock_contended = False
369 self.has_lock = new_has_lock
371 def __do_send_lock_request(self, method):
372 self.__update_has_lock(False)
373 self._lock_request_id = None
374 if self._session.is_connected():
375 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
377 self._session.send(msg)
382 def __send_lock_request(self):
383 self._lock_request_id = self.__do_send_lock_request("lock")
385 def __send_unlock_request(self):
386 self.__do_send_lock_request("unlock")
388 def __parse_lock_reply(self, result):
389 self._lock_request_id = None
390 got_lock = isinstance(result, dict) and result.get("locked") is True
391 self.__update_has_lock(got_lock)
393 self.is_lock_contended = True
395 def __parse_lock_notify(self, params, new_has_lock):
396 if (self.lock_name is not None
397 and isinstance(params, (list, tuple))
399 and params[0] == self.lock_name):
400 self.__update_has_lock(new_has_lock)
402 self.is_lock_contended = True
404 def __send_monitor_request(self):
405 if self.state == self.IDL_S_INITIAL:
406 self.state = self.IDL_S_MONITOR_COND_REQUESTED
407 method = "monitor_cond"
409 self.state = self.IDL_S_MONITOR_REQUESTED
412 monitor_requests = {}
413 for table in six.itervalues(self.tables):
415 for column in six.iterkeys(table.columns):
416 if ((table.name not in self.readonly) or
417 (table.name in self.readonly) and
418 (column not in self.readonly[table.name])):
419 columns.append(column)
420 monitor_requests[table.name] = {"columns": columns}
421 if method == "monitor_cond" and table.cond_changed and \
423 monitor_requests[table.name]["where"] = table.condition
424 table.cond_change = False
426 msg = ovs.jsonrpc.Message.create_request(
427 method, [self._db.name, str(self.uuid), monitor_requests])
428 self._monitor_request_id = msg.id
429 self._session.send(msg)
431 def __parse_update(self, update, version):
433 self.__do_parse_update(update, version)
434 except error.Error as e:
435 vlog.err("%s: error parsing update: %s"
436 % (self._session.get_name(), e))
438 def __do_parse_update(self, table_updates, version):
439 if not isinstance(table_updates, dict):
440 raise error.Error("<table-updates> is not an object",
443 for table_name, table_update in six.iteritems(table_updates):
444 table = self.tables.get(table_name)
446 raise error.Error('<table-updates> includes unknown '
447 'table "%s"' % table_name)
449 if not isinstance(table_update, dict):
450 raise error.Error('<table-update> for table "%s" is not '
451 'an object' % table_name, table_update)
453 for uuid_string, row_update in six.iteritems(table_update):
454 if not ovs.ovsuuid.is_valid_string(uuid_string):
455 raise error.Error('<table-update> for table "%s" '
456 'contains bad UUID "%s" as member '
457 'name' % (table_name, uuid_string),
459 uuid = ovs.ovsuuid.from_string(uuid_string)
461 if not isinstance(row_update, dict):
462 raise error.Error('<table-update> for table "%s" '
463 'contains <row-update> for %s that '
465 % (table_name, uuid_string))
467 if version == OVSDB_UPDATE2:
468 if self.__process_update2(table, uuid, row_update):
469 self.change_seqno += 1
472 parser = ovs.db.parser.Parser(row_update, "row-update")
473 old = parser.get_optional("old", [dict])
474 new = parser.get_optional("new", [dict])
477 if not old and not new:
478 raise error.Error('<row-update> missing "old" and '
479 '"new" members', row_update)
481 if self.__process_update(table, uuid, old, new):
482 self.change_seqno += 1
484 def __process_update2(self, table, uuid, row_update):
485 row = table.rows.get(uuid)
487 if "delete" in row_update:
490 self.notify(ROW_DELETE, row)
494 vlog.warn("cannot delete missing row %s from table"
495 "%s" % (uuid, table.name))
496 elif "insert" in row_update or "initial" in row_update:
498 vlog.warn("cannot add existing row %s from table"
499 " %s" % (uuid, table.name))
501 row = self.__create_row(table, uuid)
502 if "insert" in row_update:
503 row_update = row_update['insert']
505 row_update = row_update['initial']
506 self.__add_default(table, row_update)
507 if self.__row_update(table, row, row_update):
509 self.notify(ROW_CREATE, row)
510 elif "modify" in row_update:
512 raise error.Error('Modify non-existing row')
514 self.__apply_diff(table, row, row_update['modify'])
515 self.notify(ROW_UPDATE, row,
516 Row.from_json(self, table, uuid, row_update['modify']))
519 raise error.Error('<row-update> unknown operation',
523 def __process_update(self, table, uuid, old, new):
524 """Returns True if a column changed, False otherwise."""
525 row = table.rows.get(uuid)
532 self.notify(ROW_DELETE, row)
535 vlog.warn("cannot delete missing row %s from table %s"
536 % (uuid, table.name))
540 row = self.__create_row(table, uuid)
544 vlog.warn("cannot add existing row %s to table %s"
545 % (uuid, table.name))
546 if self.__row_update(table, row, new):
548 self.notify(ROW_CREATE, row)
552 row = self.__create_row(table, uuid)
556 vlog.warn("cannot modify missing row %s in table %s"
557 % (uuid, table.name))
558 if self.__row_update(table, row, new):
560 self.notify(op, row, Row.from_json(self, table, uuid, old))
563 def __column_name(self, column):
564 if column.type.key.type == ovs.db.types.UuidType:
565 return ovs.ovsuuid.to_json(column.type.key.type.default)
567 return column.type.key.type.default
569 def __add_default(self, table, row_update):
570 for column in six.itervalues(table.columns):
571 if column.name not in row_update:
572 if ((table.name not in self.readonly) or
573 (table.name in self.readonly) and
574 (column.name not in self.readonly[table.name])):
575 if column.type.n_min != 0 and not column.type.is_map():
576 row_update[column.name] = self.__column_name(column)
578 def __apply_diff(self, table, row, row_diff):
579 for column_name, datum_json in six.iteritems(row_diff):
580 column = table.columns.get(column_name)
583 vlog.warn("unknown column %s updating table %s"
584 % (column_name, table.name))
588 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
589 except error.Error as e:
591 vlog.warn("error parsing column %s in table %s: %s"
592 % (column_name, table.name, e))
595 datum = row._data[column_name].diff(datum)
596 if datum != row._data[column_name]:
597 row._data[column_name] = datum
599 def __row_update(self, table, row, row_json):
601 for column_name, datum_json in six.iteritems(row_json):
602 column = table.columns.get(column_name)
605 vlog.warn("unknown column %s updating table %s"
606 % (column_name, table.name))
610 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
611 except error.Error as e:
613 vlog.warn("error parsing column %s in table %s: %s"
614 % (column_name, table.name, e))
617 if datum != row._data[column_name]:
618 row._data[column_name] = datum
622 # Didn't really change but the OVSDB monitor protocol always
623 # includes every value in a row.
627 def __create_row(self, table, uuid):
629 for column in six.itervalues(table.columns):
630 data[column.name] = ovs.db.data.Datum.default(column.type)
631 row = table.rows[uuid] = Row(self, table, uuid, data)
635 self._session.force_reconnect()
637 def __txn_abort_all(self):
638 while self._outstanding_txns:
639 txn = self._outstanding_txns.popitem()[1]
640 txn._status = Transaction.TRY_AGAIN
642 def __txn_process_reply(self, msg):
643 txn = self._outstanding_txns.pop(msg.id, None)
645 txn._process_reply(msg)
648 def _uuid_to_row(atom, base):
650 return base.ref_table.rows.get(atom)
655 def _row_to_uuid(value):
656 if isinstance(value, Row):
662 @functools.total_ordering
664 """A row within an IDL.
666 The client may access the following attributes directly:
668 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
670 - An attribute for each column in the Row's table, named for the column,
671 whose values are as returned by Datum.to_python() for the column's type.
673 If some error occurs (e.g. the database server's idea of the column is
674 different from the IDL's idea), then the attribute values is the
675 "default" value return by Datum.default() for the column's type. (It is
676 important to know this because the default value may violate constraints
677 for the column's type, e.g. the default integer value is 0 even if column
678 contraints require the column's value to be positive.)
680 When a transaction is active, column attributes may also be assigned new
681 values. Committing the transaction will then cause the new value to be
682 stored into the database.
684 *NOTE*: In the current implementation, the value of a column is a *copy*
685 of the value in the database. This means that modifying its value
686 directly will have no useful effect. For example, the following:
687 row.mycolumn["a"] = "b" # don't do this
688 will not change anything in the database, even after commit. To modify
689 the column, instead assign the modified column value back to the column:
694 def __init__(self, idl, table, uuid, data):
695 # All of the explicit references to self.__dict__ below are required
696 # to set real attributes with invoking self.__getattr__().
697 self.__dict__["uuid"] = uuid
699 self.__dict__["_idl"] = idl
700 self.__dict__["_table"] = table
702 # _data is the committed data. It takes the following values:
704 # - A dictionary that maps every column name to a Datum, if the row
705 # exists in the committed form of the database.
707 # - None, if this row is newly inserted within the active transaction
708 # and thus has no committed form.
709 self.__dict__["_data"] = data
711 # _changes describes changes to this row within the active transaction.
712 # It takes the following values:
714 # - {}, the empty dictionary, if no transaction is active or if the
715 # row has yet not been changed within this transaction.
717 # - A dictionary that maps a column name to its new Datum, if an
718 # active transaction changes those columns' values.
720 # - A dictionary that maps every column name to a Datum, if the row
721 # is newly inserted within the active transaction.
723 # - None, if this transaction deletes this row.
724 self.__dict__["_changes"] = {}
726 # A dictionary whose keys are the names of columns that must be
727 # verified as prerequisites when the transaction commits. The values
728 # in the dictionary are all None.
729 self.__dict__["_prereqs"] = {}
731 def __lt__(self, other):
732 if not isinstance(other, Row):
733 return NotImplemented
734 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
736 def __eq__(self, other):
737 if not isinstance(other, Row):
738 return NotImplemented
739 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
742 return int(self.__dict__['uuid'])
744 def __getattr__(self, column_name):
745 assert self._changes is not None
747 datum = self._changes.get(column_name)
749 if self._data is None:
750 raise AttributeError("%s instance has no attribute '%s'" %
751 (self.__class__.__name__, column_name))
752 if column_name in self._data:
753 datum = self._data[column_name]
755 raise AttributeError("%s instance has no attribute '%s'" %
756 (self.__class__.__name__, column_name))
758 return datum.to_python(_uuid_to_row)
760 def __setattr__(self, column_name, value):
761 assert self._changes is not None
764 if ((self._table.name in self._idl.readonly) and
765 (column_name in self._idl.readonly[self._table.name])):
766 vlog.warn("attempting to write to readonly column %s"
770 column = self._table.columns[column_name]
772 datum = ovs.db.data.Datum.from_python(column.type, value,
774 except error.Error as e:
776 vlog.err("attempting to write bad value to column %s (%s)"
779 self._idl.txn._write(self, column, datum)
782 def from_json(cls, idl, table, uuid, row_json):
784 for column_name, datum_json in six.iteritems(row_json):
785 column = table.columns.get(column_name)
788 vlog.warn("unknown column %s in table %s"
789 % (column_name, table.name))
792 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
793 except error.Error as e:
795 vlog.warn("error parsing column %s in table %s: %s"
796 % (column_name, table.name, e))
798 data[column_name] = datum
799 return cls(idl, table, uuid, data)
801 def verify(self, column_name):
802 """Causes the original contents of column 'column_name' in this row to
803 be verified as a prerequisite to completing the transaction. That is,
804 if 'column_name' changed in this row (or if this row was deleted)
805 between the time that the IDL originally read its contents and the time
806 that the transaction commits, then the transaction aborts and
807 Transaction.commit() returns Transaction.TRY_AGAIN.
809 The intention is that, to ensure that no transaction commits based on
810 dirty reads, an application should call Row.verify() on each data item
811 read as part of a read-modify-write operation.
813 In some cases Row.verify() reduces to a no-op, because the current
814 value of the column is already known:
816 - If this row is a row created by the current transaction (returned
817 by Transaction.insert()).
819 - If the column has already been modified within the current
822 Because of the latter property, always call Row.verify() *before*
823 modifying the column, for a given read-modify-write.
825 A transaction must be in progress."""
827 assert self._changes is not None
828 if not self._data or column_name in self._changes:
831 self._prereqs[column_name] = None
834 """Deletes this row from its table.
836 A transaction must be in progress."""
838 assert self._changes is not None
839 if self._data is None:
840 del self._idl.txn._txn_rows[self.uuid]
842 self._idl.txn._txn_rows[self.uuid] = self
843 self.__dict__["_changes"] = None
844 del self._table.rows[self.uuid]
846 def fetch(self, column_name):
847 self._idl.txn._fetch(self, column_name)
849 def increment(self, column_name):
850 """Causes the transaction, when committed, to increment the value of
851 'column_name' within this row by 1. 'column_name' must have an integer
852 type. After the transaction commits successfully, the client may
853 retrieve the final (incremented) value of 'column_name' with
854 Transaction.get_increment_new_value().
856 The client could accomplish something similar by reading and writing
857 and verify()ing columns. However, increment() will never (by itself)
858 cause a transaction to fail because of a verify error.
860 The intended use is for incrementing the "next_cfg" column in
861 the Open_vSwitch table."""
862 self._idl.txn._increment(self, column_name)
865 def _uuid_name_from_uuid(uuid):
866 return "row%s" % str(uuid).replace("-", "_")
869 def _where_uuid_equals(uuid):
870 return [["_uuid", "==", ["uuid", str(uuid)]]]
873 class _InsertedRow(object):
874 def __init__(self, op_index):
875 self.op_index = op_index
879 class Transaction(object):
880 """A transaction may modify the contents of a database by modifying the
881 values of columns, deleting rows, inserting rows, or adding checks that
882 columns in the database have not changed ("verify" operations), through
885 Reading and writing columns and inserting and deleting rows are all
886 straightforward. The reasons to verify columns are less obvious.
887 Verification is the key to maintaining transactional integrity. Because
888 OVSDB handles multiple clients, it can happen that between the time that
889 OVSDB client A reads a column and writes a new value, OVSDB client B has
890 written that column. Client A's write should not ordinarily overwrite
891 client B's, especially if the column in question is a "map" column that
892 contains several more or less independent data items. If client A adds a
893 "verify" operation before it writes the column, then the transaction fails
894 in case client B modifies it first. Client A will then see the new value
895 of the column and compose a new transaction based on the new contents
898 When a transaction is complete, which must be before the next call to
899 Idl.run(), call Transaction.commit() or Transaction.abort().
901 The life-cycle of a transaction looks like this:
903 1. Create the transaction and record the initial sequence number:
905 seqno = idl.change_seqno(idl)
906 txn = Transaction(idl)
908 2. Modify the database with Row and Transaction methods.
910 3. Commit the transaction by calling Transaction.commit(). The first call
911 to this function probably returns Transaction.INCOMPLETE. The client
912 must keep calling again along as this remains true, calling Idl.run() in
913 between to let the IDL do protocol processing. (If the client doesn't
914 have anything else to do in the meantime, it can use
915 Transaction.commit_block() to avoid having to loop itself.)
917 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
918 to change from the saved 'seqno' (it's possible that it's already
919 changed, in which case the client should not wait at all), then start
920 over from step 1. Only a call to Idl.run() will change the return value
921 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
923 # Status values that Transaction.commit() can return.
925 # Not yet committed or aborted.
926 UNCOMMITTED = "uncommitted"
927 # Transaction didn't include any changes.
928 UNCHANGED = "unchanged"
929 # Commit in progress, please wait.
930 INCOMPLETE = "incomplete"
931 # ovsdb_idl_txn_abort() called.
935 # Commit failed because a "verify" operation
936 # reported an inconsistency, due to a network
937 # problem, or other transient failure. Wait
938 # for a change, then try again.
939 TRY_AGAIN = "try again"
940 # Server hasn't given us the lock yet.
941 NOT_LOCKED = "not locked"
942 # Commit failed due to a hard error.
946 def status_to_string(status):
947 """Converts one of the status values that Transaction.commit() can
948 return into a human-readable string.
950 (The status values are in fact such strings already, so
951 there's nothing to do.)"""
954 def __init__(self, idl):
955 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
956 A given Idl may only have a single active transaction at a time.
958 A Transaction may modify the contents of a database by assigning new
959 values to columns (attributes of Row), deleting rows (with
960 Row.delete()), or inserting rows (with Transaction.insert()). It may
961 also check that columns in the database have not changed with
964 When a transaction is complete (which must be before the next call to
965 Idl.run()), call Transaction.commit() or Transaction.abort()."""
966 assert idl.txn is None
969 self._request_id = None
973 self._status = Transaction.UNCOMMITTED
978 self._inc_column = None
980 self._fetch_requests = []
982 self._inserted_rows = {} # Map from UUID to _InsertedRow
984 def add_comment(self, comment):
985 """Appends 'comment' to the comments that will be passed to the OVSDB
986 server when this transaction is committed. (The comment will be
987 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
988 relatively human-readable form.)"""
989 self._comments.append(comment)
991 def wait(self, poller):
992 """Causes poll_block() to wake up if this transaction has completed
994 if self._status not in (Transaction.UNCOMMITTED,
995 Transaction.INCOMPLETE):
996 poller.immediate_wake()
998 def _substitute_uuids(self, json):
999 if isinstance(json, (list, tuple)):
1001 and json[0] == 'uuid'
1002 and ovs.ovsuuid.is_valid_string(json[1])):
1003 uuid = ovs.ovsuuid.from_string(json[1])
1004 row = self._txn_rows.get(uuid, None)
1005 if row and row._data is None:
1006 return ["named-uuid", _uuid_name_from_uuid(uuid)]
1008 return [self._substitute_uuids(elem) for elem in json]
1011 def __disassemble(self):
1014 for row in six.itervalues(self._txn_rows):
1015 if row._changes is None:
1016 row._table.rows[row.uuid] = row
1017 elif row._data is None:
1018 del row._table.rows[row.uuid]
1019 row.__dict__["_changes"] = {}
1020 row.__dict__["_prereqs"] = {}
1024 """Attempts to commit 'txn'. Returns the status of the commit
1025 operation, one of the following constants:
1027 Transaction.INCOMPLETE:
1029 The transaction is in progress, but not yet complete. The caller
1030 should call again later, after calling Idl.run() to let the
1031 IDL do OVSDB protocol processing.
1033 Transaction.UNCHANGED:
1035 The transaction is complete. (It didn't actually change the
1036 database, so the IDL didn't send any request to the database
1039 Transaction.ABORTED:
1041 The caller previously called Transaction.abort().
1043 Transaction.SUCCESS:
1045 The transaction was successful. The update made by the
1046 transaction (and possibly other changes made by other database
1047 clients) should already be visible in the IDL.
1049 Transaction.TRY_AGAIN:
1051 The transaction failed for some transient reason, e.g. because a
1052 "verify" operation reported an inconsistency or due to a network
1053 problem. The caller should wait for a change to the database,
1054 then compose a new transaction, and commit the new transaction.
1056 Use Idl.change_seqno to wait for a change in the database. It is
1057 important to use its value *before* the initial call to
1058 Transaction.commit() as the baseline for this purpose, because
1059 the change that one should wait for can happen after the initial
1060 call but before the call that returns Transaction.TRY_AGAIN, and
1061 using some other baseline value in that situation could cause an
1062 indefinite wait if the database rarely changes.
1064 Transaction.NOT_LOCKED:
1066 The transaction failed because the IDL has been configured to
1067 require a database lock (with Idl.set_lock()) but didn't
1068 get it yet or has already lost it.
1070 Committing a transaction rolls back all of the changes that it made to
1071 the IDL's copy of the database. If the transaction commits
1072 successfully, then the database server will send an update and, thus,
1073 the IDL will be updated with the committed changes."""
1074 # The status can only change if we're the active transaction.
1075 # (Otherwise, our status will change only in Idl.run().)
1076 if self != self.idl.txn:
1079 # If we need a lock but don't have it, give up quickly.
1080 if self.idl.lock_name and not self.idl.has_lock:
1081 self._status = Transaction.NOT_LOCKED
1082 self.__disassemble()
1085 operations = [self.idl._db.name]
1087 # Assert that we have the required lock (avoiding a race).
1088 if self.idl.lock_name:
1089 operations.append({"op": "assert",
1090 "lock": self.idl.lock_name})
1092 # Add prerequisites and declarations of new rows.
1093 for row in six.itervalues(self._txn_rows):
1097 for column_name in row._prereqs:
1098 columns.append(column_name)
1099 rows[column_name] = row._data[column_name].to_json()
1100 operations.append({"op": "wait",
1101 "table": row._table.name,
1103 "where": _where_uuid_equals(row.uuid),
1110 for row in six.itervalues(self._txn_rows):
1111 if row._changes is None:
1112 if row._table.is_root:
1113 operations.append({"op": "delete",
1114 "table": row._table.name,
1115 "where": _where_uuid_equals(row.uuid)})
1118 # Let ovsdb-server decide whether to really delete it.
1121 op = {"table": row._table.name}
1122 if row._data is None:
1124 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1127 op_index = len(operations) - 1
1128 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1131 op["where"] = _where_uuid_equals(row.uuid)
1134 op["row"] = row_json
1136 for column_name, datum in six.iteritems(row._changes):
1137 if row._data is not None or not datum.is_default():
1138 row_json[column_name] = (
1139 self._substitute_uuids(datum.to_json()))
1141 # If anything really changed, consider it an update.
1142 # We can't suppress not-really-changed values earlier
1143 # or transactions would become nonatomic (see the big
1144 # comment inside Transaction._write()).
1145 if (not any_updates and row._data is not None and
1146 row._data[column_name] != datum):
1149 if row._data is None or row_json:
1150 operations.append(op)
1152 if self._fetch_requests:
1153 for fetch in self._fetch_requests:
1154 fetch["index"] = len(operations) - 1
1155 operations.append({"op": "select",
1156 "table": fetch["row"]._table.name,
1157 "where": self._substitute_uuids(
1158 _where_uuid_equals(fetch["row"].uuid)),
1159 "columns": [fetch["column_name"]]})
1163 if self._inc_row and any_updates:
1164 self._inc_index = len(operations) - 1
1166 operations.append({"op": "mutate",
1167 "table": self._inc_row._table.name,
1168 "where": self._substitute_uuids(
1169 _where_uuid_equals(self._inc_row.uuid)),
1170 "mutations": [[self._inc_column, "+=", 1]]})
1171 operations.append({"op": "select",
1172 "table": self._inc_row._table.name,
1173 "where": self._substitute_uuids(
1174 _where_uuid_equals(self._inc_row.uuid)),
1175 "columns": [self._inc_column]})
1179 operations.append({"op": "comment",
1180 "comment": "\n".join(self._comments)})
1184 operations.append({"op": "abort"})
1187 self._status = Transaction.UNCHANGED
1189 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1190 self._request_id = msg.id
1191 if not self.idl._session.send(msg):
1192 self.idl._outstanding_txns[self._request_id] = self
1193 self._status = Transaction.INCOMPLETE
1195 self._status = Transaction.TRY_AGAIN
1197 self.__disassemble()
1200 def commit_block(self):
1201 """Attempts to commit this transaction, blocking until the commit
1202 either succeeds or fails. Returns the final commit status, which may
1203 be any Transaction.* value other than Transaction.INCOMPLETE.
1205 This function calls Idl.run() on this transaction'ss IDL, so it may
1206 cause Idl.change_seqno to change."""
1208 status = self.commit()
1209 if status != Transaction.INCOMPLETE:
1214 poller = ovs.poller.Poller()
1215 self.idl.wait(poller)
1219 def get_increment_new_value(self):
1220 """Returns the final (incremented) value of the column in this
1221 transaction that was set to be incremented by Row.increment. This
1222 transaction must have committed successfully."""
1223 assert self._status == Transaction.SUCCESS
1224 return self._inc_new_value
1227 """Aborts this transaction. If Transaction.commit() has already been
1228 called then the transaction might get committed anyhow."""
1229 self.__disassemble()
1230 if self._status in (Transaction.UNCOMMITTED,
1231 Transaction.INCOMPLETE):
1232 self._status = Transaction.ABORTED
1234 def get_error(self):
1235 """Returns a string representing this transaction's current status,
1236 suitable for use in log messages."""
1237 if self._status != Transaction.ERROR:
1238 return Transaction.status_to_string(self._status)
1242 return "no error details available"
1244 def __set_error_json(self, json):
1245 if self._error is None:
1246 self._error = ovs.json.to_string(json)
1248 def get_insert_uuid(self, uuid):
1249 """Finds and returns the permanent UUID that the database assigned to a
1250 newly inserted row, given the UUID that Transaction.insert() assigned
1251 locally to that row.
1253 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1254 or if it was assigned by that function and then deleted by Row.delete()
1255 within the same transaction. (Rows that are inserted and then deleted
1256 within a single transaction are never sent to the database server, so
1257 it never assigns them a permanent UUID.)
1259 This transaction must have completed successfully."""
1260 assert self._status in (Transaction.SUCCESS,
1261 Transaction.UNCHANGED)
1262 inserted_row = self._inserted_rows.get(uuid)
1264 return inserted_row.real
1267 def _increment(self, row, column):
1268 assert not self._inc_row
1270 self._inc_column = column
1272 def _fetch(self, row, column_name):
1273 self._fetch_requests.append({"row": row, "column_name": column_name})
1275 def _write(self, row, column, datum):
1276 assert row._changes is not None
1280 # If this is a write-only column and the datum being written is the
1281 # same as the one already there, just skip the update entirely. This
1282 # is worth optimizing because we have a lot of columns that get
1283 # periodically refreshed into the database but don't actually change
1286 # We don't do this for read/write columns because that would break
1287 # atomicity of transactions--some other client might have written a
1288 # different value in that column since we read it. (But if a whole
1289 # transaction only does writes of existing values, without making any
1290 # real changes, we will drop the whole transaction later in
1291 # ovsdb_idl_txn_commit().)
1292 if (not column.alert and row._data and
1293 row._data.get(column.name) == datum):
1294 new_value = row._changes.get(column.name)
1295 if new_value is None or new_value == datum:
1298 txn._txn_rows[row.uuid] = row
1299 row._changes[column.name] = datum.copy()
1301 def insert(self, table, new_uuid=None):
1302 """Inserts and returns a new row in 'table', which must be one of the
1303 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1305 The new row is assigned a provisional UUID. If 'uuid' is None then one
1306 is randomly generated; otherwise 'uuid' should specify a randomly
1307 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1308 different UUID when 'txn' is committed, but the IDL will replace any
1309 uses of the provisional UUID in the data to be to be committed by the
1310 UUID assigned by ovsdb-server."""
1311 assert self._status == Transaction.UNCOMMITTED
1312 if new_uuid is None:
1313 new_uuid = uuid.uuid4()
1314 row = Row(self.idl, table, new_uuid, None)
1315 table.rows[row.uuid] = row
1316 self._txn_rows[row.uuid] = row
1319 def _process_reply(self, msg):
1320 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1321 self._status = Transaction.ERROR
1322 elif not isinstance(msg.result, (list, tuple)):
1324 vlog.warn('reply to "transact" is not JSON array')
1333 # This isn't an error in itself but indicates that some
1334 # prior operation failed, so make sure that we know about
1337 elif isinstance(op, dict):
1338 error = op.get("error")
1339 if error is not None:
1340 if error == "timed out":
1342 elif error == "not owner":
1344 elif error == "aborted":
1348 self.__set_error_json(op)
1351 self.__set_error_json(op)
1353 vlog.warn("operation reply is not JSON null or object")
1355 if not soft_errors and not hard_errors and not lock_errors:
1356 if self._inc_row and not self.__process_inc_reply(ops):
1358 if self._fetch_requests:
1359 if self.__process_fetch_reply(ops):
1360 self.idl.change_seqno += 1
1364 for insert in six.itervalues(self._inserted_rows):
1365 if not self.__process_insert_reply(insert, ops):
1369 self._status = Transaction.ERROR
1371 self._status = Transaction.NOT_LOCKED
1373 self._status = Transaction.TRY_AGAIN
1375 self._status = Transaction.SUCCESS
1378 def __check_json_type(json, types, name):
1381 vlog.warn("%s is missing" % name)
1383 elif not isinstance(json, tuple(types)):
1385 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1390 def __process_fetch_reply(self, ops):
1392 for fetch_request in self._fetch_requests:
1393 row = fetch_request["row"]
1394 column_name = fetch_request["column_name"]
1395 index = fetch_request["index"]
1399 fetched_rows = select.get("rows")
1400 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1401 '"select" reply "rows"'):
1403 if len(fetched_rows) != 1:
1405 vlog.warn('"select" reply "rows" has %d elements '
1406 'instead of 1' % len(fetched_rows))
1408 fetched_row = fetched_rows[0]
1409 if not Transaction.__check_json_type(fetched_row, (dict,),
1410 '"select" reply row'):
1413 column = table.columns.get(column_name)
1414 datum_json = fetched_row.get(column_name)
1415 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1417 row._data[column_name] = datum
1422 def __process_inc_reply(self, ops):
1423 if self._inc_index + 2 > len(ops):
1425 vlog.warn("reply does not contain enough operations for "
1426 "increment (has %d, needs %d)" %
1427 (len(ops), self._inc_index + 2))
1429 # We know that this is a JSON object because the loop in
1430 # __process_reply() already checked.
1431 mutate = ops[self._inc_index]
1432 count = mutate.get("count")
1433 if not Transaction.__check_json_type(count, six.integer_types,
1434 '"mutate" reply "count"'):
1438 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1441 select = ops[self._inc_index + 1]
1442 rows = select.get("rows")
1443 if not Transaction.__check_json_type(rows, (list, tuple),
1444 '"select" reply "rows"'):
1448 vlog.warn('"select" reply "rows" has %d elements '
1449 'instead of 1' % len(rows))
1452 if not Transaction.__check_json_type(row, (dict,),
1453 '"select" reply row'):
1455 column = row.get(self._inc_column)
1456 if not Transaction.__check_json_type(column, six.integer_types,
1457 '"select" reply inc column'):
1459 self._inc_new_value = column
1462 def __process_insert_reply(self, insert, ops):
1463 if insert.op_index >= len(ops):
1465 vlog.warn("reply does not contain enough operations "
1466 "for insert (has %d, needs %d)"
1467 % (len(ops), insert.op_index))
1470 # We know that this is a JSON object because the loop in
1471 # __process_reply() already checked.
1472 reply = ops[insert.op_index]
1473 json_uuid = reply.get("uuid")
1474 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1475 '"insert" reply "uuid"'):
1479 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1482 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1489 class SchemaHelper(object):
1490 """IDL Schema helper.
1492 This class encapsulates the logic required to generate schemas suitable
1493 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1494 they are interested in using register_columns(). When finished, the
1495 get_idl_schema() function may be called.
1497 The location on disk of the schema used may be found in the
1498 'schema_location' variable."""
1500 def __init__(self, location=None, schema_json=None):
1501 """Creates a new Schema object.
1503 'location' file path to ovs schema. None means default location
1504 'schema_json' schema in json preresentation in memory
1507 if location and schema_json:
1508 raise ValueError("both location and schema_json can't be "
1509 "specified. it's ambiguous.")
1510 if schema_json is None:
1511 if location is None:
1512 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1513 schema_json = ovs.json.from_file(location)
1515 self.schema_json = schema_json
1520 def register_columns(self, table, columns, readonly=[]):
1521 """Registers interest in the given 'columns' of 'table'. Future calls
1522 to get_idl_schema() will include 'table':column for each column in
1523 'columns'. This function automatically avoids adding duplicate entries
1525 A subset of 'columns' can be specified as 'readonly'. The readonly
1526 columns are not replicated but can be fetched on-demand by the user
1529 'table' must be a string.
1530 'columns' must be a list of strings.
1531 'readonly' must be a list of strings.
1534 assert isinstance(table, six.string_types)
1535 assert isinstance(columns, list)
1537 columns = set(columns) | self._tables.get(table, set())
1538 self._tables[table] = columns
1539 self._readonly[table] = readonly
1541 def register_table(self, table):
1542 """Registers interest in the given all columns of 'table'. Future calls
1543 to get_idl_schema() will include all columns of 'table'.
1545 'table' must be a string
1547 assert isinstance(table, six.string_types)
1548 self._tables[table] = set() # empty set means all columns in the table
1550 def register_all(self):
1551 """Registers interest in every column of every table."""
1554 def get_idl_schema(self):
1555 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1556 object based on columns registered using the register_columns()
1559 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1560 self.schema_json = None
1564 for table, columns in six.iteritems(self._tables):
1565 schema_tables[table] = (
1566 self._keep_table_columns(schema, table, columns))
1568 schema.tables = schema_tables
1569 schema.readonly = self._readonly
1572 def _keep_table_columns(self, schema, table_name, columns):
1573 assert table_name in schema.tables
1574 table = schema.tables[table_name]
1577 # empty set means all columns in the table
1581 for column_name in columns:
1582 assert isinstance(column_name, six.string_types)
1583 assert column_name in table.columns
1585 new_columns[column_name] = table.columns[column_name]
1587 table.columns = new_columns