1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from ovs.db import error
25 vlog = ovs.vlog.Vlog("idl")
27 __pychecker__ = 'no-classattr no-objattrs'
35 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
37 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
38 requests to an OVSDB database server and parses the responses, converting
39 raw JSON into data structures that are easier for clients to digest.
41 The IDL also assists with issuing database transactions. The client
42 creates a transaction, manipulates the IDL data structures, and commits or
43 aborts the transaction. The IDL then composes and issues the necessary
44 JSON-RPC requests and reports to the client whether the transaction
45 completed successfully.
47 The client is allowed to access the following attributes directly, in a
50 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
51 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
52 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
55 The client may directly read and write the Row objects referenced by the
56 'rows' map values. Refer to Row for more details.
58 - 'change_seqno': A number that represents the IDL's state. When the IDL
59 is updated (by Idl.run()), its value changes. The sequence number can
60 occasionally change even if the database does not. This happens if the
61 connection to the database drops and reconnects, which causes the
62 database contents to be reloaded even if they didn't change. (It could
63 also happen if the database server sends out a "change" that reflects
64 what the IDL already thought was in the database. The database server is
65 not supposed to do that, but bugs could in theory cause it to do so.)
67 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
68 if no lock is configured.
70 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
71 lock, and False otherwise.
73 Locking and unlocking happens asynchronously from the database client's
74 point of view, so the information is only useful for optimization
75 (e.g. if the client doesn't have the lock then there's no point in trying
76 to write to the database).
78 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
79 the database server has indicated that some other client already owns the
80 requested lock, and False otherwise.
82 - 'txn': The ovs.db.idl.Transaction object for the database transaction
83 currently being constructed, if there is one, or None otherwise.
86 def __init__(self, remote, schema):
87 """Creates and returns a connection to the database named 'db_name' on
88 'remote', which should be in a form acceptable to
89 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
90 replica of the remote database.
92 'schema' should be the schema for the remote database. The caller may
93 have cut it down by removing tables or columns that are not of
94 interest. The IDL will only replicate the tables and columns that
95 remain. The caller may also add a attribute named 'alert' to selected
96 remaining columns, setting its value to False; if so, then changes to
97 those columns will not be considered changes to the database for the
98 purpose of the return value of Idl.run() and Idl.change_seqno. This is
99 useful for columns that the IDL's client will write but not read.
101 As a convenience to users, 'schema' may also be an instance of the
104 The IDL uses and modifies 'schema' directly."""
106 assert isinstance(schema, SchemaHelper)
107 schema = schema.get_idl_schema()
109 self.tables = schema.tables
110 self.readonly = schema.readonly
112 self._session = ovs.jsonrpc.Session.open(remote)
113 self._monitor_request_id = None
114 self._last_seqno = None
115 self.change_seqno = 0
118 self.lock_name = None # Name of lock we need, None if none.
119 self.has_lock = False # Has db server said we have the lock?
120 self.is_lock_contended = False # Has db server said we can't get lock?
121 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
123 # Transaction support.
125 self._outstanding_txns = {}
127 for table in schema.tables.itervalues():
128 for column in table.columns.itervalues():
129 if not hasattr(column, 'alert'):
131 table.need_table = False
136 """Closes the connection to the database. The IDL will no longer
138 self._session.close()
141 """Processes a batch of messages from the database server. Returns
142 True if the database as seen through the IDL changed, False if it did
143 not change. The initial fetch of the entire contents of the remote
144 database is considered to be one kind of change. If the IDL has been
145 configured to acquire a database lock (with Idl.set_lock()), then
146 successfully acquiring the lock is also considered to be a change.
148 This function can return occasional false positives, that is, report
149 that the database changed even though it didn't. This happens if the
150 connection to the database drops and reconnects, which causes the
151 database contents to be reloaded even if they didn't change. (It could
152 also happen if the database server sends out a "change" that reflects
153 what we already thought was in the database, but the database server is
154 not supposed to do that.)
156 As an alternative to checking the return value, the client may check
157 for changes in self.change_seqno."""
159 initial_change_seqno = self.change_seqno
164 if not self._session.is_connected():
167 seqno = self._session.get_seqno()
168 if seqno != self._last_seqno:
169 self._last_seqno = seqno
170 self.__txn_abort_all()
171 self.__send_monitor_request()
173 self.__send_lock_request()
176 msg = self._session.recv()
179 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
180 and msg.method == "update"
181 and len(msg.params) == 2
182 and msg.params[0] is None):
183 # Database contents changed.
184 self.__parse_update(msg.params[1])
185 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
186 and self._monitor_request_id is not None
187 and self._monitor_request_id == msg.id):
188 # Reply to our "monitor" request.
190 self.change_seqno += 1
191 self._monitor_request_id = None
193 self.__parse_update(msg.result)
194 except error.Error, e:
195 vlog.err("%s: parse error in received schema: %s"
196 % (self._session.get_name(), e))
198 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
199 and self._lock_request_id is not None
200 and self._lock_request_id == msg.id):
201 # Reply to our "lock" request.
202 self.__parse_lock_reply(msg.result)
203 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
204 and msg.method == "locked"):
206 self.__parse_lock_notify(msg.params, True)
207 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
208 and msg.method == "stolen"):
209 # Someone else stole our lock.
210 self.__parse_lock_notify(msg.params, False)
211 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
212 # Reply to our echo request. Ignore it.
214 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
215 ovs.jsonrpc.Message.T_REPLY)
216 and self.__txn_process_reply(msg)):
217 # __txn_process_reply() did everything needed.
220 # This can happen if a transaction is destroyed before we
221 # receive the reply, so keep the log level low.
222 vlog.dbg("%s: received unexpected %s message"
223 % (self._session.get_name(),
224 ovs.jsonrpc.Message.type_to_string(msg.type)))
226 return initial_change_seqno != self.change_seqno
228 def wait(self, poller):
229 """Arranges for poller.block() to wake up when self.run() has something
230 to do or when activity occurs on a transaction on 'self'."""
231 self._session.wait(poller)
232 self._session.recv_wait(poller)
234 def has_ever_connected(self):
235 """Returns True, if the IDL successfully connected to the remote
236 database and retrieved its contents (even if the connection
237 subsequently dropped and is in the process of reconnecting). If so,
238 then the IDL contains an atomic snapshot of the database's contents
239 (but it might be arbitrarily old if the connection dropped).
241 Returns False if the IDL has never connected or retrieved the
242 database's contents. If so, the IDL is empty."""
243 return self.change_seqno != 0
245 def force_reconnect(self):
246 """Forces the IDL to drop its connection to the database and reconnect.
247 In the meantime, the contents of the IDL will not change."""
248 self._session.force_reconnect()
250 def set_lock(self, lock_name):
251 """If 'lock_name' is not None, configures the IDL to obtain the named
252 lock from the database server and to avoid modifying the database when
253 the lock cannot be acquired (that is, when another client has the same
256 If 'lock_name' is None, drops the locking requirement and releases the
259 assert not self._outstanding_txns
261 if self.lock_name and (not lock_name or lock_name != self.lock_name):
262 # Release previous lock.
263 self.__send_unlock_request()
264 self.lock_name = None
265 self.is_lock_contended = False
267 if lock_name and not self.lock_name:
269 self.lock_name = lock_name
270 self.__send_lock_request()
272 def notify(self, event, row, updates=None):
273 """Hook for implementing create/update/delete notifications
275 :param event: The event that was triggered
276 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
277 :param row: The row as it is after the operation has occured
279 :param updates: For updates, a Row object with just the changed columns
286 for table in self.tables.itervalues():
292 self.change_seqno += 1
294 def __update_has_lock(self, new_has_lock):
295 if new_has_lock and not self.has_lock:
296 if self._monitor_request_id is None:
297 self.change_seqno += 1
299 # We're waiting for a monitor reply, so don't signal that the
300 # database changed. The monitor reply will increment
301 # change_seqno anyhow.
303 self.is_lock_contended = False
304 self.has_lock = new_has_lock
306 def __do_send_lock_request(self, method):
307 self.__update_has_lock(False)
308 self._lock_request_id = None
309 if self._session.is_connected():
310 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
312 self._session.send(msg)
317 def __send_lock_request(self):
318 self._lock_request_id = self.__do_send_lock_request("lock")
320 def __send_unlock_request(self):
321 self.__do_send_lock_request("unlock")
323 def __parse_lock_reply(self, result):
324 self._lock_request_id = None
325 got_lock = type(result) == dict and result.get("locked") is True
326 self.__update_has_lock(got_lock)
328 self.is_lock_contended = True
330 def __parse_lock_notify(self, params, new_has_lock):
331 if (self.lock_name is not None
332 and type(params) in (list, tuple)
334 and params[0] == self.lock_name):
335 self.__update_has_lock(self, new_has_lock)
337 self.is_lock_contended = True
339 def __send_monitor_request(self):
340 monitor_requests = {}
341 for table in self.tables.itervalues():
343 for column in table.columns.keys():
344 if ((table.name not in self.readonly) or
345 (table.name in self.readonly) and
346 (column not in self.readonly[table.name])):
347 columns.append(column)
348 monitor_requests[table.name] = {"columns": columns}
349 msg = ovs.jsonrpc.Message.create_request(
350 "monitor", [self._db.name, None, monitor_requests])
351 self._monitor_request_id = msg.id
352 self._session.send(msg)
354 def __parse_update(self, update):
356 self.__do_parse_update(update)
357 except error.Error, e:
358 vlog.err("%s: error parsing update: %s"
359 % (self._session.get_name(), e))
361 def __do_parse_update(self, table_updates):
362 if type(table_updates) != dict:
363 raise error.Error("<table-updates> is not an object",
366 for table_name, table_update in table_updates.iteritems():
367 table = self.tables.get(table_name)
369 raise error.Error('<table-updates> includes unknown '
370 'table "%s"' % table_name)
372 if type(table_update) != dict:
373 raise error.Error('<table-update> for table "%s" is not '
374 'an object' % table_name, table_update)
376 for uuid_string, row_update in table_update.iteritems():
377 if not ovs.ovsuuid.is_valid_string(uuid_string):
378 raise error.Error('<table-update> for table "%s" '
379 'contains bad UUID "%s" as member '
380 'name' % (table_name, uuid_string),
382 uuid = ovs.ovsuuid.from_string(uuid_string)
384 if type(row_update) != dict:
385 raise error.Error('<table-update> for table "%s" '
386 'contains <row-update> for %s that '
388 % (table_name, uuid_string))
390 parser = ovs.db.parser.Parser(row_update, "row-update")
391 old = parser.get_optional("old", [dict])
392 new = parser.get_optional("new", [dict])
395 if not old and not new:
396 raise error.Error('<row-update> missing "old" and '
397 '"new" members', row_update)
399 if self.__process_update(table, uuid, old, new):
400 self.change_seqno += 1
402 def __process_update(self, table, uuid, old, new):
403 """Returns True if a column changed, False otherwise."""
404 row = table.rows.get(uuid)
411 self.notify(ROW_DELETE, row)
414 vlog.warn("cannot delete missing row %s from table %s"
415 % (uuid, table.name))
419 row = self.__create_row(table, uuid)
423 vlog.warn("cannot add existing row %s to table %s"
424 % (uuid, table.name))
425 if self.__row_update(table, row, new):
427 self.notify(ROW_CREATE, row)
431 row = self.__create_row(table, uuid)
435 vlog.warn("cannot modify missing row %s in table %s"
436 % (uuid, table.name))
437 if self.__row_update(table, row, new):
439 self.notify(op, row, Row.from_json(self, table, uuid, old))
442 def __row_update(self, table, row, row_json):
444 for column_name, datum_json in row_json.iteritems():
445 column = table.columns.get(column_name)
448 vlog.warn("unknown column %s updating table %s"
449 % (column_name, table.name))
453 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
454 except error.Error, e:
456 vlog.warn("error parsing column %s in table %s: %s"
457 % (column_name, table.name, e))
460 if datum != row._data[column_name]:
461 row._data[column_name] = datum
465 # Didn't really change but the OVSDB monitor protocol always
466 # includes every value in a row.
470 def __create_row(self, table, uuid):
472 for column in table.columns.itervalues():
473 data[column.name] = ovs.db.data.Datum.default(column.type)
474 row = table.rows[uuid] = Row(self, table, uuid, data)
478 self._session.force_reconnect()
480 def __txn_abort_all(self):
481 while self._outstanding_txns:
482 txn = self._outstanding_txns.popitem()[1]
483 txn._status = Transaction.TRY_AGAIN
485 def __txn_process_reply(self, msg):
486 txn = self._outstanding_txns.pop(msg.id, None)
488 txn._process_reply(msg)
491 def _uuid_to_row(atom, base):
493 return base.ref_table.rows.get(atom)
498 def _row_to_uuid(value):
499 if type(value) == Row:
506 """A row within an IDL.
508 The client may access the following attributes directly:
510 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
512 - An attribute for each column in the Row's table, named for the column,
513 whose values are as returned by Datum.to_python() for the column's type.
515 If some error occurs (e.g. the database server's idea of the column is
516 different from the IDL's idea), then the attribute values is the
517 "default" value return by Datum.default() for the column's type. (It is
518 important to know this because the default value may violate constraints
519 for the column's type, e.g. the default integer value is 0 even if column
520 contraints require the column's value to be positive.)
522 When a transaction is active, column attributes may also be assigned new
523 values. Committing the transaction will then cause the new value to be
524 stored into the database.
526 *NOTE*: In the current implementation, the value of a column is a *copy*
527 of the value in the database. This means that modifying its value
528 directly will have no useful effect. For example, the following:
529 row.mycolumn["a"] = "b" # don't do this
530 will not change anything in the database, even after commit. To modify
531 the column, instead assign the modified column value back to the column:
536 def __init__(self, idl, table, uuid, data):
537 # All of the explicit references to self.__dict__ below are required
538 # to set real attributes with invoking self.__getattr__().
539 self.__dict__["uuid"] = uuid
541 self.__dict__["_idl"] = idl
542 self.__dict__["_table"] = table
544 # _data is the committed data. It takes the following values:
546 # - A dictionary that maps every column name to a Datum, if the row
547 # exists in the committed form of the database.
549 # - None, if this row is newly inserted within the active transaction
550 # and thus has no committed form.
551 self.__dict__["_data"] = data
553 # _changes describes changes to this row within the active transaction.
554 # It takes the following values:
556 # - {}, the empty dictionary, if no transaction is active or if the
557 # row has yet not been changed within this transaction.
559 # - A dictionary that maps a column name to its new Datum, if an
560 # active transaction changes those columns' values.
562 # - A dictionary that maps every column name to a Datum, if the row
563 # is newly inserted within the active transaction.
565 # - None, if this transaction deletes this row.
566 self.__dict__["_changes"] = {}
568 # A dictionary whose keys are the names of columns that must be
569 # verified as prerequisites when the transaction commits. The values
570 # in the dictionary are all None.
571 self.__dict__["_prereqs"] = {}
573 def __getattr__(self, column_name):
574 assert self._changes is not None
576 datum = self._changes.get(column_name)
578 if self._data is None:
579 raise AttributeError("%s instance has no attribute '%s'" %
580 (self.__class__.__name__, column_name))
581 if column_name in self._data:
582 datum = self._data[column_name]
584 raise AttributeError("%s instance has no attribute '%s'" %
585 (self.__class__.__name__, column_name))
587 return datum.to_python(_uuid_to_row)
589 def __setattr__(self, column_name, value):
590 assert self._changes is not None
593 if ((self._table.name in self._idl.readonly) and
594 (column_name in self._idl.readonly[self._table.name])):
595 vlog.warn("attempting to write to readonly column %s" % column_name)
598 column = self._table.columns[column_name]
600 datum = ovs.db.data.Datum.from_python(column.type, value,
602 except error.Error, e:
604 vlog.err("attempting to write bad value to column %s (%s)"
607 self._idl.txn._write(self, column, datum)
610 def from_json(cls, idl, table, uuid, row_json):
612 for column_name, datum_json in row_json.iteritems():
613 column = table.columns.get(column_name)
616 vlog.warn("unknown column %s in table %s"
617 % (column_name, table.name))
620 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
621 except error.Error, e:
623 vlog.warn("error parsing column %s in table %s: %s"
624 % (column_name, table.name, e))
626 data[column_name] = datum
627 return cls(idl, table, uuid, data)
629 def verify(self, column_name):
630 """Causes the original contents of column 'column_name' in this row to
631 be verified as a prerequisite to completing the transaction. That is,
632 if 'column_name' changed in this row (or if this row was deleted)
633 between the time that the IDL originally read its contents and the time
634 that the transaction commits, then the transaction aborts and
635 Transaction.commit() returns Transaction.TRY_AGAIN.
637 The intention is that, to ensure that no transaction commits based on
638 dirty reads, an application should call Row.verify() on each data item
639 read as part of a read-modify-write operation.
641 In some cases Row.verify() reduces to a no-op, because the current
642 value of the column is already known:
644 - If this row is a row created by the current transaction (returned
645 by Transaction.insert()).
647 - If the column has already been modified within the current
650 Because of the latter property, always call Row.verify() *before*
651 modifying the column, for a given read-modify-write.
653 A transaction must be in progress."""
655 assert self._changes is not None
656 if not self._data or column_name in self._changes:
659 self._prereqs[column_name] = None
662 """Deletes this row from its table.
664 A transaction must be in progress."""
666 assert self._changes is not None
667 if self._data is None:
668 del self._idl.txn._txn_rows[self.uuid]
670 self._idl.txn._txn_rows[self.uuid] = self
671 self.__dict__["_changes"] = None
672 del self._table.rows[self.uuid]
674 def fetch(self, column_name):
675 self._idl.txn._fetch(self, column_name)
677 def increment(self, column_name):
678 """Causes the transaction, when committed, to increment the value of
679 'column_name' within this row by 1. 'column_name' must have an integer
680 type. After the transaction commits successfully, the client may
681 retrieve the final (incremented) value of 'column_name' with
682 Transaction.get_increment_new_value().
684 The client could accomplish something similar by reading and writing
685 and verify()ing columns. However, increment() will never (by itself)
686 cause a transaction to fail because of a verify error.
688 The intended use is for incrementing the "next_cfg" column in
689 the Open_vSwitch table."""
690 self._idl.txn._increment(self, column_name)
693 def _uuid_name_from_uuid(uuid):
694 return "row%s" % str(uuid).replace("-", "_")
697 def _where_uuid_equals(uuid):
698 return [["_uuid", "==", ["uuid", str(uuid)]]]
701 class _InsertedRow(object):
702 def __init__(self, op_index):
703 self.op_index = op_index
707 class Transaction(object):
708 """A transaction may modify the contents of a database by modifying the
709 values of columns, deleting rows, inserting rows, or adding checks that
710 columns in the database have not changed ("verify" operations), through
713 Reading and writing columns and inserting and deleting rows are all
714 straightforward. The reasons to verify columns are less obvious.
715 Verification is the key to maintaining transactional integrity. Because
716 OVSDB handles multiple clients, it can happen that between the time that
717 OVSDB client A reads a column and writes a new value, OVSDB client B has
718 written that column. Client A's write should not ordinarily overwrite
719 client B's, especially if the column in question is a "map" column that
720 contains several more or less independent data items. If client A adds a
721 "verify" operation before it writes the column, then the transaction fails
722 in case client B modifies it first. Client A will then see the new value
723 of the column and compose a new transaction based on the new contents
726 When a transaction is complete, which must be before the next call to
727 Idl.run(), call Transaction.commit() or Transaction.abort().
729 The life-cycle of a transaction looks like this:
731 1. Create the transaction and record the initial sequence number:
733 seqno = idl.change_seqno(idl)
734 txn = Transaction(idl)
736 2. Modify the database with Row and Transaction methods.
738 3. Commit the transaction by calling Transaction.commit(). The first call
739 to this function probably returns Transaction.INCOMPLETE. The client
740 must keep calling again along as this remains true, calling Idl.run() in
741 between to let the IDL do protocol processing. (If the client doesn't
742 have anything else to do in the meantime, it can use
743 Transaction.commit_block() to avoid having to loop itself.)
745 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
746 to change from the saved 'seqno' (it's possible that it's already
747 changed, in which case the client should not wait at all), then start
748 over from step 1. Only a call to Idl.run() will change the return value
749 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
751 # Status values that Transaction.commit() can return.
752 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
753 UNCHANGED = "unchanged" # Transaction didn't include any changes.
754 INCOMPLETE = "incomplete" # Commit in progress, please wait.
755 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
756 SUCCESS = "success" # Commit successful.
757 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
758 # reported an inconsistency, due to a network
759 # problem, or other transient failure. Wait
760 # for a change, then try again.
761 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
762 ERROR = "error" # Commit failed due to a hard error.
765 def status_to_string(status):
766 """Converts one of the status values that Transaction.commit() can
767 return into a human-readable string.
769 (The status values are in fact such strings already, so
770 there's nothing to do.)"""
773 def __init__(self, idl):
774 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
775 A given Idl may only have a single active transaction at a time.
777 A Transaction may modify the contents of a database by assigning new
778 values to columns (attributes of Row), deleting rows (with
779 Row.delete()), or inserting rows (with Transaction.insert()). It may
780 also check that columns in the database have not changed with
783 When a transaction is complete (which must be before the next call to
784 Idl.run()), call Transaction.commit() or Transaction.abort()."""
785 assert idl.txn is None
788 self._request_id = None
792 self._status = Transaction.UNCOMMITTED
797 self._inc_column = None
799 self._fetch_requests = []
801 self._inserted_rows = {} # Map from UUID to _InsertedRow
803 def add_comment(self, comment):
804 """Appends 'comment' to the comments that will be passed to the OVSDB
805 server when this transaction is committed. (The comment will be
806 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
807 relatively human-readable form.)"""
808 self._comments.append(comment)
810 def wait(self, poller):
811 """Causes poll_block() to wake up if this transaction has completed
813 if self._status not in (Transaction.UNCOMMITTED,
814 Transaction.INCOMPLETE):
815 poller.immediate_wake()
817 def _substitute_uuids(self, json):
818 if type(json) in (list, tuple):
820 and json[0] == 'uuid'
821 and ovs.ovsuuid.is_valid_string(json[1])):
822 uuid = ovs.ovsuuid.from_string(json[1])
823 row = self._txn_rows.get(uuid, None)
824 if row and row._data is None:
825 return ["named-uuid", _uuid_name_from_uuid(uuid)]
827 return [self._substitute_uuids(elem) for elem in json]
830 def __disassemble(self):
833 for row in self._txn_rows.itervalues():
834 if row._changes is None:
835 row._table.rows[row.uuid] = row
836 elif row._data is None:
837 del row._table.rows[row.uuid]
838 row.__dict__["_changes"] = {}
839 row.__dict__["_prereqs"] = {}
843 """Attempts to commit 'txn'. Returns the status of the commit
844 operation, one of the following constants:
846 Transaction.INCOMPLETE:
848 The transaction is in progress, but not yet complete. The caller
849 should call again later, after calling Idl.run() to let the
850 IDL do OVSDB protocol processing.
852 Transaction.UNCHANGED:
854 The transaction is complete. (It didn't actually change the
855 database, so the IDL didn't send any request to the database
860 The caller previously called Transaction.abort().
864 The transaction was successful. The update made by the
865 transaction (and possibly other changes made by other database
866 clients) should already be visible in the IDL.
868 Transaction.TRY_AGAIN:
870 The transaction failed for some transient reason, e.g. because a
871 "verify" operation reported an inconsistency or due to a network
872 problem. The caller should wait for a change to the database,
873 then compose a new transaction, and commit the new transaction.
875 Use Idl.change_seqno to wait for a change in the database. It is
876 important to use its value *before* the initial call to
877 Transaction.commit() as the baseline for this purpose, because
878 the change that one should wait for can happen after the initial
879 call but before the call that returns Transaction.TRY_AGAIN, and
880 using some other baseline value in that situation could cause an
881 indefinite wait if the database rarely changes.
883 Transaction.NOT_LOCKED:
885 The transaction failed because the IDL has been configured to
886 require a database lock (with Idl.set_lock()) but didn't
887 get it yet or has already lost it.
889 Committing a transaction rolls back all of the changes that it made to
890 the IDL's copy of the database. If the transaction commits
891 successfully, then the database server will send an update and, thus,
892 the IDL will be updated with the committed changes."""
893 # The status can only change if we're the active transaction.
894 # (Otherwise, our status will change only in Idl.run().)
895 if self != self.idl.txn:
898 # If we need a lock but don't have it, give up quickly.
899 if self.idl.lock_name and not self.idl.has_lock:
900 self._status = Transaction.NOT_LOCKED
904 operations = [self.idl._db.name]
906 # Assert that we have the required lock (avoiding a race).
907 if self.idl.lock_name:
908 operations.append({"op": "assert",
909 "lock": self.idl.lock_name})
911 # Add prerequisites and declarations of new rows.
912 for row in self._txn_rows.itervalues():
916 for column_name in row._prereqs:
917 columns.append(column_name)
918 rows[column_name] = row._data[column_name].to_json()
919 operations.append({"op": "wait",
920 "table": row._table.name,
922 "where": _where_uuid_equals(row.uuid),
929 for row in self._txn_rows.itervalues():
930 if row._changes is None:
931 if row._table.is_root:
932 operations.append({"op": "delete",
933 "table": row._table.name,
934 "where": _where_uuid_equals(row.uuid)})
937 # Let ovsdb-server decide whether to really delete it.
940 op = {"table": row._table.name}
941 if row._data is None:
943 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
946 op_index = len(operations) - 1
947 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
950 op["where"] = _where_uuid_equals(row.uuid)
955 for column_name, datum in row._changes.iteritems():
956 if row._data is not None or not datum.is_default():
957 row_json[column_name] = (
958 self._substitute_uuids(datum.to_json()))
960 # If anything really changed, consider it an update.
961 # We can't suppress not-really-changed values earlier
962 # or transactions would become nonatomic (see the big
963 # comment inside Transaction._write()).
964 if (not any_updates and row._data is not None and
965 row._data[column_name] != datum):
968 if row._data is None or row_json:
969 operations.append(op)
971 if self._fetch_requests:
972 for fetch in self._fetch_requests:
973 fetch["index"] = len(operations) - 1
974 operations.append({"op": "select",
975 "table": fetch["row"]._table.name,
976 "where": self._substitute_uuids(
977 _where_uuid_equals(fetch["row"].uuid)),
978 "columns": [fetch["column_name"]]})
982 if self._inc_row and any_updates:
983 self._inc_index = len(operations) - 1
985 operations.append({"op": "mutate",
986 "table": self._inc_row._table.name,
987 "where": self._substitute_uuids(
988 _where_uuid_equals(self._inc_row.uuid)),
989 "mutations": [[self._inc_column, "+=", 1]]})
990 operations.append({"op": "select",
991 "table": self._inc_row._table.name,
992 "where": self._substitute_uuids(
993 _where_uuid_equals(self._inc_row.uuid)),
994 "columns": [self._inc_column]})
998 operations.append({"op": "comment",
999 "comment": "\n".join(self._comments)})
1003 operations.append({"op": "abort"})
1006 self._status = Transaction.UNCHANGED
1008 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1009 self._request_id = msg.id
1010 if not self.idl._session.send(msg):
1011 self.idl._outstanding_txns[self._request_id] = self
1012 self._status = Transaction.INCOMPLETE
1014 self._status = Transaction.TRY_AGAIN
1016 self.__disassemble()
1019 def commit_block(self):
1020 """Attempts to commit this transaction, blocking until the commit
1021 either succeeds or fails. Returns the final commit status, which may
1022 be any Transaction.* value other than Transaction.INCOMPLETE.
1024 This function calls Idl.run() on this transaction'ss IDL, so it may
1025 cause Idl.change_seqno to change."""
1027 status = self.commit()
1028 if status != Transaction.INCOMPLETE:
1033 poller = ovs.poller.Poller()
1034 self.idl.wait(poller)
1038 def get_increment_new_value(self):
1039 """Returns the final (incremented) value of the column in this
1040 transaction that was set to be incremented by Row.increment. This
1041 transaction must have committed successfully."""
1042 assert self._status == Transaction.SUCCESS
1043 return self._inc_new_value
1046 """Aborts this transaction. If Transaction.commit() has already been
1047 called then the transaction might get committed anyhow."""
1048 self.__disassemble()
1049 if self._status in (Transaction.UNCOMMITTED,
1050 Transaction.INCOMPLETE):
1051 self._status = Transaction.ABORTED
1053 def get_error(self):
1054 """Returns a string representing this transaction's current status,
1055 suitable for use in log messages."""
1056 if self._status != Transaction.ERROR:
1057 return Transaction.status_to_string(self._status)
1061 return "no error details available"
1063 def __set_error_json(self, json):
1064 if self._error is None:
1065 self._error = ovs.json.to_string(json)
1067 def get_insert_uuid(self, uuid):
1068 """Finds and returns the permanent UUID that the database assigned to a
1069 newly inserted row, given the UUID that Transaction.insert() assigned
1070 locally to that row.
1072 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1073 or if it was assigned by that function and then deleted by Row.delete()
1074 within the same transaction. (Rows that are inserted and then deleted
1075 within a single transaction are never sent to the database server, so
1076 it never assigns them a permanent UUID.)
1078 This transaction must have completed successfully."""
1079 assert self._status in (Transaction.SUCCESS,
1080 Transaction.UNCHANGED)
1081 inserted_row = self._inserted_rows.get(uuid)
1083 return inserted_row.real
1086 def _increment(self, row, column):
1087 assert not self._inc_row
1089 self._inc_column = column
1091 def _fetch(self, row, column_name):
1092 self._fetch_requests.append({"row": row, "column_name": column_name})
1094 def _write(self, row, column, datum):
1095 assert row._changes is not None
1099 # If this is a write-only column and the datum being written is the
1100 # same as the one already there, just skip the update entirely. This
1101 # is worth optimizing because we have a lot of columns that get
1102 # periodically refreshed into the database but don't actually change
1105 # We don't do this for read/write columns because that would break
1106 # atomicity of transactions--some other client might have written a
1107 # different value in that column since we read it. (But if a whole
1108 # transaction only does writes of existing values, without making any
1109 # real changes, we will drop the whole transaction later in
1110 # ovsdb_idl_txn_commit().)
1111 if not column.alert and row._data and row._data.get(column.name) == datum:
1112 new_value = row._changes.get(column.name)
1113 if new_value is None or new_value == datum:
1116 txn._txn_rows[row.uuid] = row
1117 row._changes[column.name] = datum.copy()
1119 def insert(self, table, new_uuid=None):
1120 """Inserts and returns a new row in 'table', which must be one of the
1121 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1123 The new row is assigned a provisional UUID. If 'uuid' is None then one
1124 is randomly generated; otherwise 'uuid' should specify a randomly
1125 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1126 different UUID when 'txn' is committed, but the IDL will replace any
1127 uses of the provisional UUID in the data to be to be committed by the
1128 UUID assigned by ovsdb-server."""
1129 assert self._status == Transaction.UNCOMMITTED
1130 if new_uuid is None:
1131 new_uuid = uuid.uuid4()
1132 row = Row(self.idl, table, new_uuid, None)
1133 table.rows[row.uuid] = row
1134 self._txn_rows[row.uuid] = row
1137 def _process_reply(self, msg):
1138 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1139 self._status = Transaction.ERROR
1140 elif type(msg.result) not in (list, tuple):
1142 vlog.warn('reply to "transact" is not JSON array')
1151 # This isn't an error in itself but indicates that some
1152 # prior operation failed, so make sure that we know about
1155 elif type(op) == dict:
1156 error = op.get("error")
1157 if error is not None:
1158 if error == "timed out":
1160 elif error == "not owner":
1162 elif error == "aborted":
1166 self.__set_error_json(op)
1169 self.__set_error_json(op)
1171 vlog.warn("operation reply is not JSON null or object")
1173 if not soft_errors and not hard_errors and not lock_errors:
1174 if self._inc_row and not self.__process_inc_reply(ops):
1176 if self._fetch_requests:
1177 if self.__process_fetch_reply(ops):
1178 self.idl.change_seqno += 1
1182 for insert in self._inserted_rows.itervalues():
1183 if not self.__process_insert_reply(insert, ops):
1187 self._status = Transaction.ERROR
1189 self._status = Transaction.NOT_LOCKED
1191 self._status = Transaction.TRY_AGAIN
1193 self._status = Transaction.SUCCESS
1196 def __check_json_type(json, types, name):
1199 vlog.warn("%s is missing" % name)
1201 elif type(json) not in types:
1203 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1208 def __process_fetch_reply(self, ops):
1210 for fetch_request in self._fetch_requests:
1211 row = fetch_request["row"]
1212 column_name = fetch_request["column_name"]
1213 index = fetch_request["index"]
1217 fetched_rows = select.get("rows")
1218 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1219 '"select" reply "rows"'):
1221 if len(fetched_rows) != 1:
1223 vlog.warn('"select" reply "rows" has %d elements '
1224 'instead of 1' % len(fetched_rows))
1226 fetched_row = fetched_rows[0]
1227 if not Transaction.__check_json_type(fetched_row, (dict,),
1228 '"select" reply row'):
1231 column = table.columns.get(column_name)
1232 datum_json = fetched_row.get(column_name)
1233 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1235 row._data[column_name] = datum
1240 def __process_inc_reply(self, ops):
1241 if self._inc_index + 2 > len(ops):
1243 vlog.warn("reply does not contain enough operations for "
1244 "increment (has %d, needs %d)" %
1245 (len(ops), self._inc_index + 2))
1247 # We know that this is a JSON object because the loop in
1248 # __process_reply() already checked.
1249 mutate = ops[self._inc_index]
1250 count = mutate.get("count")
1251 if not Transaction.__check_json_type(count, (int, long),
1252 '"mutate" reply "count"'):
1256 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1259 select = ops[self._inc_index + 1]
1260 rows = select.get("rows")
1261 if not Transaction.__check_json_type(rows, (list, tuple),
1262 '"select" reply "rows"'):
1266 vlog.warn('"select" reply "rows" has %d elements '
1267 'instead of 1' % len(rows))
1270 if not Transaction.__check_json_type(row, (dict,),
1271 '"select" reply row'):
1273 column = row.get(self._inc_column)
1274 if not Transaction.__check_json_type(column, (int, long),
1275 '"select" reply inc column'):
1277 self._inc_new_value = column
1280 def __process_insert_reply(self, insert, ops):
1281 if insert.op_index >= len(ops):
1283 vlog.warn("reply does not contain enough operations "
1284 "for insert (has %d, needs %d)"
1285 % (len(ops), insert.op_index))
1288 # We know that this is a JSON object because the loop in
1289 # __process_reply() already checked.
1290 reply = ops[insert.op_index]
1291 json_uuid = reply.get("uuid")
1292 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1293 '"insert" reply "uuid"'):
1297 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1300 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1307 class SchemaHelper(object):
1308 """IDL Schema helper.
1310 This class encapsulates the logic required to generate schemas suitable
1311 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1312 they are interested in using register_columns(). When finished, the
1313 get_idl_schema() function may be called.
1315 The location on disk of the schema used may be found in the
1316 'schema_location' variable."""
1318 def __init__(self, location=None, schema_json=None):
1319 """Creates a new Schema object.
1321 'location' file path to ovs schema. None means default location
1322 'schema_json' schema in json preresentation in memory
1325 if location and schema_json:
1326 raise ValueError("both location and schema_json can't be "
1327 "specified. it's ambiguous.")
1328 if schema_json is None:
1329 if location is None:
1330 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1331 schema_json = ovs.json.from_file(location)
1333 self.schema_json = schema_json
1338 def register_columns(self, table, columns, readonly=[]):
1339 """Registers interest in the given 'columns' of 'table'. Future calls
1340 to get_idl_schema() will include 'table':column for each column in
1341 'columns'. This function automatically avoids adding duplicate entries
1343 A subset of 'columns' can be specified as 'readonly'. The readonly
1344 columns are not replicated but can be fetched on-demand by the user
1347 'table' must be a string.
1348 'columns' must be a list of strings.
1349 'readonly' must be a list of strings.
1352 assert type(table) is str
1353 assert type(columns) is list
1355 columns = set(columns) | self._tables.get(table, set())
1356 self._tables[table] = columns
1357 self._readonly[table] = readonly
1359 def register_table(self, table):
1360 """Registers interest in the given all columns of 'table'. Future calls
1361 to get_idl_schema() will include all columns of 'table'.
1363 'table' must be a string
1365 assert type(table) is str
1366 self._tables[table] = set() # empty set means all columns in the table
1368 def register_all(self):
1369 """Registers interest in every column of every table."""
1372 def get_idl_schema(self):
1373 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1374 object based on columns registered using the register_columns()
1377 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1378 self.schema_json = None
1382 for table, columns in self._tables.iteritems():
1383 schema_tables[table] = (
1384 self._keep_table_columns(schema, table, columns))
1386 schema.tables = schema_tables
1387 schema.readonly = self._readonly
1390 def _keep_table_columns(self, schema, table_name, columns):
1391 assert table_name in schema.tables
1392 table = schema.tables[table_name]
1395 # empty set means all columns in the table
1399 for column_name in columns:
1400 assert type(column_name) is str
1401 assert column_name in table.columns
1403 new_columns[column_name] = table.columns[column_name]
1405 table.columns = new_columns