1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from ovs.db import error
27 vlog = ovs.vlog.Vlog("idl")
29 __pychecker__ = 'no-classattr no-objattrs'
37 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
39 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
40 requests to an OVSDB database server and parses the responses, converting
41 raw JSON into data structures that are easier for clients to digest.
43 The IDL also assists with issuing database transactions. The client
44 creates a transaction, manipulates the IDL data structures, and commits or
45 aborts the transaction. The IDL then composes and issues the necessary
46 JSON-RPC requests and reports to the client whether the transaction
47 completed successfully.
49 The client is allowed to access the following attributes directly, in a
52 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
53 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
54 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
57 The client may directly read and write the Row objects referenced by the
58 'rows' map values. Refer to Row for more details.
60 - 'change_seqno': A number that represents the IDL's state. When the IDL
61 is updated (by Idl.run()), its value changes. The sequence number can
62 occasionally change even if the database does not. This happens if the
63 connection to the database drops and reconnects, which causes the
64 database contents to be reloaded even if they didn't change. (It could
65 also happen if the database server sends out a "change" that reflects
66 what the IDL already thought was in the database. The database server is
67 not supposed to do that, but bugs could in theory cause it to do so.)
69 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
70 if no lock is configured.
72 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
73 lock, and False otherwise.
75 Locking and unlocking happens asynchronously from the database client's
76 point of view, so the information is only useful for optimization
77 (e.g. if the client doesn't have the lock then there's no point in trying
78 to write to the database).
80 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
81 the database server has indicated that some other client already owns the
82 requested lock, and False otherwise.
84 - 'txn': The ovs.db.idl.Transaction object for the database transaction
85 currently being constructed, if there is one, or None otherwise.
88 def __init__(self, remote, schema):
89 """Creates and returns a connection to the database named 'db_name' on
90 'remote', which should be in a form acceptable to
91 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
92 replica of the remote database.
94 'schema' should be the schema for the remote database. The caller may
95 have cut it down by removing tables or columns that are not of
96 interest. The IDL will only replicate the tables and columns that
97 remain. The caller may also add a attribute named 'alert' to selected
98 remaining columns, setting its value to False; if so, then changes to
99 those columns will not be considered changes to the database for the
100 purpose of the return value of Idl.run() and Idl.change_seqno. This is
101 useful for columns that the IDL's client will write but not read.
103 As a convenience to users, 'schema' may also be an instance of the
106 The IDL uses and modifies 'schema' directly."""
108 assert isinstance(schema, SchemaHelper)
109 schema = schema.get_idl_schema()
111 self.tables = schema.tables
112 self.readonly = schema.readonly
114 self._session = ovs.jsonrpc.Session.open(remote)
115 self._monitor_request_id = None
116 self._last_seqno = None
117 self.change_seqno = 0
120 self.lock_name = None # Name of lock we need, None if none.
121 self.has_lock = False # Has db server said we have the lock?
122 self.is_lock_contended = False # Has db server said we can't get lock?
123 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
125 # Transaction support.
127 self._outstanding_txns = {}
129 for table in six.itervalues(schema.tables):
130 for column in six.itervalues(table.columns):
131 if not hasattr(column, 'alert'):
133 table.need_table = False
138 """Closes the connection to the database. The IDL will no longer
140 self._session.close()
143 """Processes a batch of messages from the database server. Returns
144 True if the database as seen through the IDL changed, False if it did
145 not change. The initial fetch of the entire contents of the remote
146 database is considered to be one kind of change. If the IDL has been
147 configured to acquire a database lock (with Idl.set_lock()), then
148 successfully acquiring the lock is also considered to be a change.
150 This function can return occasional false positives, that is, report
151 that the database changed even though it didn't. This happens if the
152 connection to the database drops and reconnects, which causes the
153 database contents to be reloaded even if they didn't change. (It could
154 also happen if the database server sends out a "change" that reflects
155 what we already thought was in the database, but the database server is
156 not supposed to do that.)
158 As an alternative to checking the return value, the client may check
159 for changes in self.change_seqno."""
161 initial_change_seqno = self.change_seqno
166 if not self._session.is_connected():
169 seqno = self._session.get_seqno()
170 if seqno != self._last_seqno:
171 self._last_seqno = seqno
172 self.__txn_abort_all()
173 self.__send_monitor_request()
175 self.__send_lock_request()
178 msg = self._session.recv()
181 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
182 and msg.method == "update"
183 and len(msg.params) == 2
184 and msg.params[0] is None):
185 # Database contents changed.
186 self.__parse_update(msg.params[1])
187 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
188 and self._monitor_request_id is not None
189 and self._monitor_request_id == msg.id):
190 # Reply to our "monitor" request.
192 self.change_seqno += 1
193 self._monitor_request_id = None
195 self.__parse_update(msg.result)
196 except error.Error as e:
197 vlog.err("%s: parse error in received schema: %s"
198 % (self._session.get_name(), e))
200 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
201 and self._lock_request_id is not None
202 and self._lock_request_id == msg.id):
203 # Reply to our "lock" request.
204 self.__parse_lock_reply(msg.result)
205 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
206 and msg.method == "locked"):
208 self.__parse_lock_notify(msg.params, True)
209 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
210 and msg.method == "stolen"):
211 # Someone else stole our lock.
212 self.__parse_lock_notify(msg.params, False)
213 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
214 # Reply to our echo request. Ignore it.
216 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
217 ovs.jsonrpc.Message.T_REPLY)
218 and self.__txn_process_reply(msg)):
219 # __txn_process_reply() did everything needed.
222 # This can happen if a transaction is destroyed before we
223 # receive the reply, so keep the log level low.
224 vlog.dbg("%s: received unexpected %s message"
225 % (self._session.get_name(),
226 ovs.jsonrpc.Message.type_to_string(msg.type)))
228 return initial_change_seqno != self.change_seqno
230 def wait(self, poller):
231 """Arranges for poller.block() to wake up when self.run() has something
232 to do or when activity occurs on a transaction on 'self'."""
233 self._session.wait(poller)
234 self._session.recv_wait(poller)
236 def has_ever_connected(self):
237 """Returns True, if the IDL successfully connected to the remote
238 database and retrieved its contents (even if the connection
239 subsequently dropped and is in the process of reconnecting). If so,
240 then the IDL contains an atomic snapshot of the database's contents
241 (but it might be arbitrarily old if the connection dropped).
243 Returns False if the IDL has never connected or retrieved the
244 database's contents. If so, the IDL is empty."""
245 return self.change_seqno != 0
247 def force_reconnect(self):
248 """Forces the IDL to drop its connection to the database and reconnect.
249 In the meantime, the contents of the IDL will not change."""
250 self._session.force_reconnect()
252 def set_lock(self, lock_name):
253 """If 'lock_name' is not None, configures the IDL to obtain the named
254 lock from the database server and to avoid modifying the database when
255 the lock cannot be acquired (that is, when another client has the same
258 If 'lock_name' is None, drops the locking requirement and releases the
261 assert not self._outstanding_txns
263 if self.lock_name and (not lock_name or lock_name != self.lock_name):
264 # Release previous lock.
265 self.__send_unlock_request()
266 self.lock_name = None
267 self.is_lock_contended = False
269 if lock_name and not self.lock_name:
271 self.lock_name = lock_name
272 self.__send_lock_request()
274 def notify(self, event, row, updates=None):
275 """Hook for implementing create/update/delete notifications
277 :param event: The event that was triggered
278 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
279 :param row: The row as it is after the operation has occured
281 :param updates: For updates, a Row object with just the changed columns
288 for table in six.itervalues(self.tables):
294 self.change_seqno += 1
296 def __update_has_lock(self, new_has_lock):
297 if new_has_lock and not self.has_lock:
298 if self._monitor_request_id is None:
299 self.change_seqno += 1
301 # We're waiting for a monitor reply, so don't signal that the
302 # database changed. The monitor reply will increment
303 # change_seqno anyhow.
305 self.is_lock_contended = False
306 self.has_lock = new_has_lock
308 def __do_send_lock_request(self, method):
309 self.__update_has_lock(False)
310 self._lock_request_id = None
311 if self._session.is_connected():
312 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
314 self._session.send(msg)
319 def __send_lock_request(self):
320 self._lock_request_id = self.__do_send_lock_request("lock")
322 def __send_unlock_request(self):
323 self.__do_send_lock_request("unlock")
325 def __parse_lock_reply(self, result):
326 self._lock_request_id = None
327 got_lock = isinstance(result, dict) and result.get("locked") is True
328 self.__update_has_lock(got_lock)
330 self.is_lock_contended = True
332 def __parse_lock_notify(self, params, new_has_lock):
333 if (self.lock_name is not None
334 and isinstance(params, (list, tuple))
336 and params[0] == self.lock_name):
337 self.__update_has_lock(new_has_lock)
339 self.is_lock_contended = True
341 def __send_monitor_request(self):
342 monitor_requests = {}
343 for table in six.itervalues(self.tables):
345 for column in six.iterkeys(table.columns):
346 if ((table.name not in self.readonly) or
347 (table.name in self.readonly) and
348 (column not in self.readonly[table.name])):
349 columns.append(column)
350 monitor_requests[table.name] = {"columns": columns}
351 msg = ovs.jsonrpc.Message.create_request(
352 "monitor", [self._db.name, None, monitor_requests])
353 self._monitor_request_id = msg.id
354 self._session.send(msg)
356 def __parse_update(self, update):
358 self.__do_parse_update(update)
359 except error.Error as e:
360 vlog.err("%s: error parsing update: %s"
361 % (self._session.get_name(), e))
363 def __do_parse_update(self, table_updates):
364 if not isinstance(table_updates, dict):
365 raise error.Error("<table-updates> is not an object",
368 for table_name, table_update in six.iteritems(table_updates):
369 table = self.tables.get(table_name)
371 raise error.Error('<table-updates> includes unknown '
372 'table "%s"' % table_name)
374 if not isinstance(table_update, dict):
375 raise error.Error('<table-update> for table "%s" is not '
376 'an object' % table_name, table_update)
378 for uuid_string, row_update in six.iteritems(table_update):
379 if not ovs.ovsuuid.is_valid_string(uuid_string):
380 raise error.Error('<table-update> for table "%s" '
381 'contains bad UUID "%s" as member '
382 'name' % (table_name, uuid_string),
384 uuid = ovs.ovsuuid.from_string(uuid_string)
386 if not isinstance(row_update, dict):
387 raise error.Error('<table-update> for table "%s" '
388 'contains <row-update> for %s that '
390 % (table_name, uuid_string))
392 parser = ovs.db.parser.Parser(row_update, "row-update")
393 old = parser.get_optional("old", [dict])
394 new = parser.get_optional("new", [dict])
397 if not old and not new:
398 raise error.Error('<row-update> missing "old" and '
399 '"new" members', row_update)
401 if self.__process_update(table, uuid, old, new):
402 self.change_seqno += 1
404 def __process_update(self, table, uuid, old, new):
405 """Returns True if a column changed, False otherwise."""
406 row = table.rows.get(uuid)
413 self.notify(ROW_DELETE, row)
416 vlog.warn("cannot delete missing row %s from table %s"
417 % (uuid, table.name))
421 row = self.__create_row(table, uuid)
425 vlog.warn("cannot add existing row %s to table %s"
426 % (uuid, table.name))
427 if self.__row_update(table, row, new):
429 self.notify(ROW_CREATE, row)
433 row = self.__create_row(table, uuid)
437 vlog.warn("cannot modify missing row %s in table %s"
438 % (uuid, table.name))
439 if self.__row_update(table, row, new):
441 self.notify(op, row, Row.from_json(self, table, uuid, old))
444 def __row_update(self, table, row, row_json):
446 for column_name, datum_json in six.iteritems(row_json):
447 column = table.columns.get(column_name)
450 vlog.warn("unknown column %s updating table %s"
451 % (column_name, table.name))
455 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
456 except error.Error as e:
458 vlog.warn("error parsing column %s in table %s: %s"
459 % (column_name, table.name, e))
462 if datum != row._data[column_name]:
463 row._data[column_name] = datum
467 # Didn't really change but the OVSDB monitor protocol always
468 # includes every value in a row.
472 def __create_row(self, table, uuid):
474 for column in six.itervalues(table.columns):
475 data[column.name] = ovs.db.data.Datum.default(column.type)
476 row = table.rows[uuid] = Row(self, table, uuid, data)
480 self._session.force_reconnect()
482 def __txn_abort_all(self):
483 while self._outstanding_txns:
484 txn = self._outstanding_txns.popitem()[1]
485 txn._status = Transaction.TRY_AGAIN
487 def __txn_process_reply(self, msg):
488 txn = self._outstanding_txns.pop(msg.id, None)
490 txn._process_reply(msg)
493 def _uuid_to_row(atom, base):
495 return base.ref_table.rows.get(atom)
500 def _row_to_uuid(value):
501 if isinstance(value, Row):
508 """A row within an IDL.
510 The client may access the following attributes directly:
512 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
514 - An attribute for each column in the Row's table, named for the column,
515 whose values are as returned by Datum.to_python() for the column's type.
517 If some error occurs (e.g. the database server's idea of the column is
518 different from the IDL's idea), then the attribute values is the
519 "default" value return by Datum.default() for the column's type. (It is
520 important to know this because the default value may violate constraints
521 for the column's type, e.g. the default integer value is 0 even if column
522 contraints require the column's value to be positive.)
524 When a transaction is active, column attributes may also be assigned new
525 values. Committing the transaction will then cause the new value to be
526 stored into the database.
528 *NOTE*: In the current implementation, the value of a column is a *copy*
529 of the value in the database. This means that modifying its value
530 directly will have no useful effect. For example, the following:
531 row.mycolumn["a"] = "b" # don't do this
532 will not change anything in the database, even after commit. To modify
533 the column, instead assign the modified column value back to the column:
538 def __init__(self, idl, table, uuid, data):
539 # All of the explicit references to self.__dict__ below are required
540 # to set real attributes with invoking self.__getattr__().
541 self.__dict__["uuid"] = uuid
543 self.__dict__["_idl"] = idl
544 self.__dict__["_table"] = table
546 # _data is the committed data. It takes the following values:
548 # - A dictionary that maps every column name to a Datum, if the row
549 # exists in the committed form of the database.
551 # - None, if this row is newly inserted within the active transaction
552 # and thus has no committed form.
553 self.__dict__["_data"] = data
555 # _changes describes changes to this row within the active transaction.
556 # It takes the following values:
558 # - {}, the empty dictionary, if no transaction is active or if the
559 # row has yet not been changed within this transaction.
561 # - A dictionary that maps a column name to its new Datum, if an
562 # active transaction changes those columns' values.
564 # - A dictionary that maps every column name to a Datum, if the row
565 # is newly inserted within the active transaction.
567 # - None, if this transaction deletes this row.
568 self.__dict__["_changes"] = {}
570 # A dictionary whose keys are the names of columns that must be
571 # verified as prerequisites when the transaction commits. The values
572 # in the dictionary are all None.
573 self.__dict__["_prereqs"] = {}
575 def __getattr__(self, column_name):
576 assert self._changes is not None
578 datum = self._changes.get(column_name)
580 if self._data is None:
581 raise AttributeError("%s instance has no attribute '%s'" %
582 (self.__class__.__name__, column_name))
583 if column_name in self._data:
584 datum = self._data[column_name]
586 raise AttributeError("%s instance has no attribute '%s'" %
587 (self.__class__.__name__, column_name))
589 return datum.to_python(_uuid_to_row)
591 def __setattr__(self, column_name, value):
592 assert self._changes is not None
595 if ((self._table.name in self._idl.readonly) and
596 (column_name in self._idl.readonly[self._table.name])):
597 vlog.warn("attempting to write to readonly column %s"
601 column = self._table.columns[column_name]
603 datum = ovs.db.data.Datum.from_python(column.type, value,
605 except error.Error as e:
607 vlog.err("attempting to write bad value to column %s (%s)"
610 self._idl.txn._write(self, column, datum)
613 def from_json(cls, idl, table, uuid, row_json):
615 for column_name, datum_json in six.iteritems(row_json):
616 column = table.columns.get(column_name)
619 vlog.warn("unknown column %s in table %s"
620 % (column_name, table.name))
623 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
624 except error.Error as e:
626 vlog.warn("error parsing column %s in table %s: %s"
627 % (column_name, table.name, e))
629 data[column_name] = datum
630 return cls(idl, table, uuid, data)
632 def verify(self, column_name):
633 """Causes the original contents of column 'column_name' in this row to
634 be verified as a prerequisite to completing the transaction. That is,
635 if 'column_name' changed in this row (or if this row was deleted)
636 between the time that the IDL originally read its contents and the time
637 that the transaction commits, then the transaction aborts and
638 Transaction.commit() returns Transaction.TRY_AGAIN.
640 The intention is that, to ensure that no transaction commits based on
641 dirty reads, an application should call Row.verify() on each data item
642 read as part of a read-modify-write operation.
644 In some cases Row.verify() reduces to a no-op, because the current
645 value of the column is already known:
647 - If this row is a row created by the current transaction (returned
648 by Transaction.insert()).
650 - If the column has already been modified within the current
653 Because of the latter property, always call Row.verify() *before*
654 modifying the column, for a given read-modify-write.
656 A transaction must be in progress."""
658 assert self._changes is not None
659 if not self._data or column_name in self._changes:
662 self._prereqs[column_name] = None
665 """Deletes this row from its table.
667 A transaction must be in progress."""
669 assert self._changes is not None
670 if self._data is None:
671 del self._idl.txn._txn_rows[self.uuid]
673 self._idl.txn._txn_rows[self.uuid] = self
674 self.__dict__["_changes"] = None
675 del self._table.rows[self.uuid]
677 def fetch(self, column_name):
678 self._idl.txn._fetch(self, column_name)
680 def increment(self, column_name):
681 """Causes the transaction, when committed, to increment the value of
682 'column_name' within this row by 1. 'column_name' must have an integer
683 type. After the transaction commits successfully, the client may
684 retrieve the final (incremented) value of 'column_name' with
685 Transaction.get_increment_new_value().
687 The client could accomplish something similar by reading and writing
688 and verify()ing columns. However, increment() will never (by itself)
689 cause a transaction to fail because of a verify error.
691 The intended use is for incrementing the "next_cfg" column in
692 the Open_vSwitch table."""
693 self._idl.txn._increment(self, column_name)
696 def _uuid_name_from_uuid(uuid):
697 return "row%s" % str(uuid).replace("-", "_")
700 def _where_uuid_equals(uuid):
701 return [["_uuid", "==", ["uuid", str(uuid)]]]
704 class _InsertedRow(object):
705 def __init__(self, op_index):
706 self.op_index = op_index
710 class Transaction(object):
711 """A transaction may modify the contents of a database by modifying the
712 values of columns, deleting rows, inserting rows, or adding checks that
713 columns in the database have not changed ("verify" operations), through
716 Reading and writing columns and inserting and deleting rows are all
717 straightforward. The reasons to verify columns are less obvious.
718 Verification is the key to maintaining transactional integrity. Because
719 OVSDB handles multiple clients, it can happen that between the time that
720 OVSDB client A reads a column and writes a new value, OVSDB client B has
721 written that column. Client A's write should not ordinarily overwrite
722 client B's, especially if the column in question is a "map" column that
723 contains several more or less independent data items. If client A adds a
724 "verify" operation before it writes the column, then the transaction fails
725 in case client B modifies it first. Client A will then see the new value
726 of the column and compose a new transaction based on the new contents
729 When a transaction is complete, which must be before the next call to
730 Idl.run(), call Transaction.commit() or Transaction.abort().
732 The life-cycle of a transaction looks like this:
734 1. Create the transaction and record the initial sequence number:
736 seqno = idl.change_seqno(idl)
737 txn = Transaction(idl)
739 2. Modify the database with Row and Transaction methods.
741 3. Commit the transaction by calling Transaction.commit(). The first call
742 to this function probably returns Transaction.INCOMPLETE. The client
743 must keep calling again along as this remains true, calling Idl.run() in
744 between to let the IDL do protocol processing. (If the client doesn't
745 have anything else to do in the meantime, it can use
746 Transaction.commit_block() to avoid having to loop itself.)
748 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
749 to change from the saved 'seqno' (it's possible that it's already
750 changed, in which case the client should not wait at all), then start
751 over from step 1. Only a call to Idl.run() will change the return value
752 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
754 # Status values that Transaction.commit() can return.
756 # Not yet committed or aborted.
757 UNCOMMITTED = "uncommitted"
758 # Transaction didn't include any changes.
759 UNCHANGED = "unchanged"
760 # Commit in progress, please wait.
761 INCOMPLETE = "incomplete"
762 # ovsdb_idl_txn_abort() called.
766 # Commit failed because a "verify" operation
767 # reported an inconsistency, due to a network
768 # problem, or other transient failure. Wait
769 # for a change, then try again.
770 TRY_AGAIN = "try again"
771 # Server hasn't given us the lock yet.
772 NOT_LOCKED = "not locked"
773 # Commit failed due to a hard error.
777 def status_to_string(status):
778 """Converts one of the status values that Transaction.commit() can
779 return into a human-readable string.
781 (The status values are in fact such strings already, so
782 there's nothing to do.)"""
785 def __init__(self, idl):
786 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
787 A given Idl may only have a single active transaction at a time.
789 A Transaction may modify the contents of a database by assigning new
790 values to columns (attributes of Row), deleting rows (with
791 Row.delete()), or inserting rows (with Transaction.insert()). It may
792 also check that columns in the database have not changed with
795 When a transaction is complete (which must be before the next call to
796 Idl.run()), call Transaction.commit() or Transaction.abort()."""
797 assert idl.txn is None
800 self._request_id = None
804 self._status = Transaction.UNCOMMITTED
809 self._inc_column = None
811 self._fetch_requests = []
813 self._inserted_rows = {} # Map from UUID to _InsertedRow
815 def add_comment(self, comment):
816 """Appends 'comment' to the comments that will be passed to the OVSDB
817 server when this transaction is committed. (The comment will be
818 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
819 relatively human-readable form.)"""
820 self._comments.append(comment)
822 def wait(self, poller):
823 """Causes poll_block() to wake up if this transaction has completed
825 if self._status not in (Transaction.UNCOMMITTED,
826 Transaction.INCOMPLETE):
827 poller.immediate_wake()
829 def _substitute_uuids(self, json):
830 if isinstance(json, (list, tuple)):
832 and json[0] == 'uuid'
833 and ovs.ovsuuid.is_valid_string(json[1])):
834 uuid = ovs.ovsuuid.from_string(json[1])
835 row = self._txn_rows.get(uuid, None)
836 if row and row._data is None:
837 return ["named-uuid", _uuid_name_from_uuid(uuid)]
839 return [self._substitute_uuids(elem) for elem in json]
842 def __disassemble(self):
845 for row in six.itervalues(self._txn_rows):
846 if row._changes is None:
847 row._table.rows[row.uuid] = row
848 elif row._data is None:
849 del row._table.rows[row.uuid]
850 row.__dict__["_changes"] = {}
851 row.__dict__["_prereqs"] = {}
855 """Attempts to commit 'txn'. Returns the status of the commit
856 operation, one of the following constants:
858 Transaction.INCOMPLETE:
860 The transaction is in progress, but not yet complete. The caller
861 should call again later, after calling Idl.run() to let the
862 IDL do OVSDB protocol processing.
864 Transaction.UNCHANGED:
866 The transaction is complete. (It didn't actually change the
867 database, so the IDL didn't send any request to the database
872 The caller previously called Transaction.abort().
876 The transaction was successful. The update made by the
877 transaction (and possibly other changes made by other database
878 clients) should already be visible in the IDL.
880 Transaction.TRY_AGAIN:
882 The transaction failed for some transient reason, e.g. because a
883 "verify" operation reported an inconsistency or due to a network
884 problem. The caller should wait for a change to the database,
885 then compose a new transaction, and commit the new transaction.
887 Use Idl.change_seqno to wait for a change in the database. It is
888 important to use its value *before* the initial call to
889 Transaction.commit() as the baseline for this purpose, because
890 the change that one should wait for can happen after the initial
891 call but before the call that returns Transaction.TRY_AGAIN, and
892 using some other baseline value in that situation could cause an
893 indefinite wait if the database rarely changes.
895 Transaction.NOT_LOCKED:
897 The transaction failed because the IDL has been configured to
898 require a database lock (with Idl.set_lock()) but didn't
899 get it yet or has already lost it.
901 Committing a transaction rolls back all of the changes that it made to
902 the IDL's copy of the database. If the transaction commits
903 successfully, then the database server will send an update and, thus,
904 the IDL will be updated with the committed changes."""
905 # The status can only change if we're the active transaction.
906 # (Otherwise, our status will change only in Idl.run().)
907 if self != self.idl.txn:
910 # If we need a lock but don't have it, give up quickly.
911 if self.idl.lock_name and not self.idl.has_lock:
912 self._status = Transaction.NOT_LOCKED
916 operations = [self.idl._db.name]
918 # Assert that we have the required lock (avoiding a race).
919 if self.idl.lock_name:
920 operations.append({"op": "assert",
921 "lock": self.idl.lock_name})
923 # Add prerequisites and declarations of new rows.
924 for row in six.itervalues(self._txn_rows):
928 for column_name in row._prereqs:
929 columns.append(column_name)
930 rows[column_name] = row._data[column_name].to_json()
931 operations.append({"op": "wait",
932 "table": row._table.name,
934 "where": _where_uuid_equals(row.uuid),
941 for row in six.itervalues(self._txn_rows):
942 if row._changes is None:
943 if row._table.is_root:
944 operations.append({"op": "delete",
945 "table": row._table.name,
946 "where": _where_uuid_equals(row.uuid)})
949 # Let ovsdb-server decide whether to really delete it.
952 op = {"table": row._table.name}
953 if row._data is None:
955 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
958 op_index = len(operations) - 1
959 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
962 op["where"] = _where_uuid_equals(row.uuid)
967 for column_name, datum in six.iteritems(row._changes):
968 if row._data is not None or not datum.is_default():
969 row_json[column_name] = (
970 self._substitute_uuids(datum.to_json()))
972 # If anything really changed, consider it an update.
973 # We can't suppress not-really-changed values earlier
974 # or transactions would become nonatomic (see the big
975 # comment inside Transaction._write()).
976 if (not any_updates and row._data is not None and
977 row._data[column_name] != datum):
980 if row._data is None or row_json:
981 operations.append(op)
983 if self._fetch_requests:
984 for fetch in self._fetch_requests:
985 fetch["index"] = len(operations) - 1
986 operations.append({"op": "select",
987 "table": fetch["row"]._table.name,
988 "where": self._substitute_uuids(
989 _where_uuid_equals(fetch["row"].uuid)),
990 "columns": [fetch["column_name"]]})
994 if self._inc_row and any_updates:
995 self._inc_index = len(operations) - 1
997 operations.append({"op": "mutate",
998 "table": self._inc_row._table.name,
999 "where": self._substitute_uuids(
1000 _where_uuid_equals(self._inc_row.uuid)),
1001 "mutations": [[self._inc_column, "+=", 1]]})
1002 operations.append({"op": "select",
1003 "table": self._inc_row._table.name,
1004 "where": self._substitute_uuids(
1005 _where_uuid_equals(self._inc_row.uuid)),
1006 "columns": [self._inc_column]})
1010 operations.append({"op": "comment",
1011 "comment": "\n".join(self._comments)})
1015 operations.append({"op": "abort"})
1018 self._status = Transaction.UNCHANGED
1020 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1021 self._request_id = msg.id
1022 if not self.idl._session.send(msg):
1023 self.idl._outstanding_txns[self._request_id] = self
1024 self._status = Transaction.INCOMPLETE
1026 self._status = Transaction.TRY_AGAIN
1028 self.__disassemble()
1031 def commit_block(self):
1032 """Attempts to commit this transaction, blocking until the commit
1033 either succeeds or fails. Returns the final commit status, which may
1034 be any Transaction.* value other than Transaction.INCOMPLETE.
1036 This function calls Idl.run() on this transaction'ss IDL, so it may
1037 cause Idl.change_seqno to change."""
1039 status = self.commit()
1040 if status != Transaction.INCOMPLETE:
1045 poller = ovs.poller.Poller()
1046 self.idl.wait(poller)
1050 def get_increment_new_value(self):
1051 """Returns the final (incremented) value of the column in this
1052 transaction that was set to be incremented by Row.increment. This
1053 transaction must have committed successfully."""
1054 assert self._status == Transaction.SUCCESS
1055 return self._inc_new_value
1058 """Aborts this transaction. If Transaction.commit() has already been
1059 called then the transaction might get committed anyhow."""
1060 self.__disassemble()
1061 if self._status in (Transaction.UNCOMMITTED,
1062 Transaction.INCOMPLETE):
1063 self._status = Transaction.ABORTED
1065 def get_error(self):
1066 """Returns a string representing this transaction's current status,
1067 suitable for use in log messages."""
1068 if self._status != Transaction.ERROR:
1069 return Transaction.status_to_string(self._status)
1073 return "no error details available"
1075 def __set_error_json(self, json):
1076 if self._error is None:
1077 self._error = ovs.json.to_string(json)
1079 def get_insert_uuid(self, uuid):
1080 """Finds and returns the permanent UUID that the database assigned to a
1081 newly inserted row, given the UUID that Transaction.insert() assigned
1082 locally to that row.
1084 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1085 or if it was assigned by that function and then deleted by Row.delete()
1086 within the same transaction. (Rows that are inserted and then deleted
1087 within a single transaction are never sent to the database server, so
1088 it never assigns them a permanent UUID.)
1090 This transaction must have completed successfully."""
1091 assert self._status in (Transaction.SUCCESS,
1092 Transaction.UNCHANGED)
1093 inserted_row = self._inserted_rows.get(uuid)
1095 return inserted_row.real
1098 def _increment(self, row, column):
1099 assert not self._inc_row
1101 self._inc_column = column
1103 def _fetch(self, row, column_name):
1104 self._fetch_requests.append({"row": row, "column_name": column_name})
1106 def _write(self, row, column, datum):
1107 assert row._changes is not None
1111 # If this is a write-only column and the datum being written is the
1112 # same as the one already there, just skip the update entirely. This
1113 # is worth optimizing because we have a lot of columns that get
1114 # periodically refreshed into the database but don't actually change
1117 # We don't do this for read/write columns because that would break
1118 # atomicity of transactions--some other client might have written a
1119 # different value in that column since we read it. (But if a whole
1120 # transaction only does writes of existing values, without making any
1121 # real changes, we will drop the whole transaction later in
1122 # ovsdb_idl_txn_commit().)
1123 if (not column.alert and row._data and
1124 row._data.get(column.name) == datum):
1125 new_value = row._changes.get(column.name)
1126 if new_value is None or new_value == datum:
1129 txn._txn_rows[row.uuid] = row
1130 row._changes[column.name] = datum.copy()
1132 def insert(self, table, new_uuid=None):
1133 """Inserts and returns a new row in 'table', which must be one of the
1134 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1136 The new row is assigned a provisional UUID. If 'uuid' is None then one
1137 is randomly generated; otherwise 'uuid' should specify a randomly
1138 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1139 different UUID when 'txn' is committed, but the IDL will replace any
1140 uses of the provisional UUID in the data to be to be committed by the
1141 UUID assigned by ovsdb-server."""
1142 assert self._status == Transaction.UNCOMMITTED
1143 if new_uuid is None:
1144 new_uuid = uuid.uuid4()
1145 row = Row(self.idl, table, new_uuid, None)
1146 table.rows[row.uuid] = row
1147 self._txn_rows[row.uuid] = row
1150 def _process_reply(self, msg):
1151 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1152 self._status = Transaction.ERROR
1153 elif not isinstance(msg.result, (list, tuple)):
1155 vlog.warn('reply to "transact" is not JSON array')
1164 # This isn't an error in itself but indicates that some
1165 # prior operation failed, so make sure that we know about
1168 elif isinstance(op, dict):
1169 error = op.get("error")
1170 if error is not None:
1171 if error == "timed out":
1173 elif error == "not owner":
1175 elif error == "aborted":
1179 self.__set_error_json(op)
1182 self.__set_error_json(op)
1184 vlog.warn("operation reply is not JSON null or object")
1186 if not soft_errors and not hard_errors and not lock_errors:
1187 if self._inc_row and not self.__process_inc_reply(ops):
1189 if self._fetch_requests:
1190 if self.__process_fetch_reply(ops):
1191 self.idl.change_seqno += 1
1195 for insert in six.itervalues(self._inserted_rows):
1196 if not self.__process_insert_reply(insert, ops):
1200 self._status = Transaction.ERROR
1202 self._status = Transaction.NOT_LOCKED
1204 self._status = Transaction.TRY_AGAIN
1206 self._status = Transaction.SUCCESS
1209 def __check_json_type(json, types, name):
1212 vlog.warn("%s is missing" % name)
1214 elif not isinstance(json, tuple(types)):
1216 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1221 def __process_fetch_reply(self, ops):
1223 for fetch_request in self._fetch_requests:
1224 row = fetch_request["row"]
1225 column_name = fetch_request["column_name"]
1226 index = fetch_request["index"]
1230 fetched_rows = select.get("rows")
1231 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1232 '"select" reply "rows"'):
1234 if len(fetched_rows) != 1:
1236 vlog.warn('"select" reply "rows" has %d elements '
1237 'instead of 1' % len(fetched_rows))
1239 fetched_row = fetched_rows[0]
1240 if not Transaction.__check_json_type(fetched_row, (dict,),
1241 '"select" reply row'):
1244 column = table.columns.get(column_name)
1245 datum_json = fetched_row.get(column_name)
1246 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1248 row._data[column_name] = datum
1253 def __process_inc_reply(self, ops):
1254 if self._inc_index + 2 > len(ops):
1256 vlog.warn("reply does not contain enough operations for "
1257 "increment (has %d, needs %d)" %
1258 (len(ops), self._inc_index + 2))
1260 # We know that this is a JSON object because the loop in
1261 # __process_reply() already checked.
1262 mutate = ops[self._inc_index]
1263 count = mutate.get("count")
1264 if not Transaction.__check_json_type(count, six.integer_types,
1265 '"mutate" reply "count"'):
1269 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1272 select = ops[self._inc_index + 1]
1273 rows = select.get("rows")
1274 if not Transaction.__check_json_type(rows, (list, tuple),
1275 '"select" reply "rows"'):
1279 vlog.warn('"select" reply "rows" has %d elements '
1280 'instead of 1' % len(rows))
1283 if not Transaction.__check_json_type(row, (dict,),
1284 '"select" reply row'):
1286 column = row.get(self._inc_column)
1287 if not Transaction.__check_json_type(column, six.integer_types,
1288 '"select" reply inc column'):
1290 self._inc_new_value = column
1293 def __process_insert_reply(self, insert, ops):
1294 if insert.op_index >= len(ops):
1296 vlog.warn("reply does not contain enough operations "
1297 "for insert (has %d, needs %d)"
1298 % (len(ops), insert.op_index))
1301 # We know that this is a JSON object because the loop in
1302 # __process_reply() already checked.
1303 reply = ops[insert.op_index]
1304 json_uuid = reply.get("uuid")
1305 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1306 '"insert" reply "uuid"'):
1310 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1313 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1320 class SchemaHelper(object):
1321 """IDL Schema helper.
1323 This class encapsulates the logic required to generate schemas suitable
1324 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1325 they are interested in using register_columns(). When finished, the
1326 get_idl_schema() function may be called.
1328 The location on disk of the schema used may be found in the
1329 'schema_location' variable."""
1331 def __init__(self, location=None, schema_json=None):
1332 """Creates a new Schema object.
1334 'location' file path to ovs schema. None means default location
1335 'schema_json' schema in json preresentation in memory
1338 if location and schema_json:
1339 raise ValueError("both location and schema_json can't be "
1340 "specified. it's ambiguous.")
1341 if schema_json is None:
1342 if location is None:
1343 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1344 schema_json = ovs.json.from_file(location)
1346 self.schema_json = schema_json
1351 def register_columns(self, table, columns, readonly=[]):
1352 """Registers interest in the given 'columns' of 'table'. Future calls
1353 to get_idl_schema() will include 'table':column for each column in
1354 'columns'. This function automatically avoids adding duplicate entries
1356 A subset of 'columns' can be specified as 'readonly'. The readonly
1357 columns are not replicated but can be fetched on-demand by the user
1360 'table' must be a string.
1361 'columns' must be a list of strings.
1362 'readonly' must be a list of strings.
1365 assert isinstance(table, six.string_types)
1366 assert isinstance(columns, list)
1368 columns = set(columns) | self._tables.get(table, set())
1369 self._tables[table] = columns
1370 self._readonly[table] = readonly
1372 def register_table(self, table):
1373 """Registers interest in the given all columns of 'table'. Future calls
1374 to get_idl_schema() will include all columns of 'table'.
1376 'table' must be a string
1378 assert isinstance(table, six.string_types)
1379 self._tables[table] = set() # empty set means all columns in the table
1381 def register_all(self):
1382 """Registers interest in every column of every table."""
1385 def get_idl_schema(self):
1386 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1387 object based on columns registered using the register_columns()
1390 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1391 self.schema_json = None
1395 for table, columns in six.iteritems(self._tables):
1396 schema_tables[table] = (
1397 self._keep_table_columns(schema, table, columns))
1399 schema.tables = schema_tables
1400 schema.readonly = self._readonly
1403 def _keep_table_columns(self, schema, table_name, columns):
1404 assert table_name in schema.tables
1405 table = schema.tables[table_name]
1408 # empty set means all columns in the table
1412 for column_name in columns:
1413 assert isinstance(column_name, six.string_types)
1414 assert column_name in table.columns
1416 new_columns[column_name] = table.columns[column_name]
1418 table.columns = new_columns