1 # Copyright (c) 2009, 2010, 2011, 2012, 2013 Nicira, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
23 from ovs.db import error
28 vlog = ovs.vlog.Vlog("idl")
30 __pychecker__ = 'no-classattr no-objattrs'
41 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
43 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
44 requests to an OVSDB database server and parses the responses, converting
45 raw JSON into data structures that are easier for clients to digest.
47 The IDL also assists with issuing database transactions. The client
48 creates a transaction, manipulates the IDL data structures, and commits or
49 aborts the transaction. The IDL then composes and issues the necessary
50 JSON-RPC requests and reports to the client whether the transaction
51 completed successfully.
53 The client is allowed to access the following attributes directly, in a
56 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
57 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
58 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
61 The client may directly read and write the Row objects referenced by the
62 'rows' map values. Refer to Row for more details.
64 - 'change_seqno': A number that represents the IDL's state. When the IDL
65 is updated (by Idl.run()), its value changes. The sequence number can
66 occasionally change even if the database does not. This happens if the
67 connection to the database drops and reconnects, which causes the
68 database contents to be reloaded even if they didn't change. (It could
69 also happen if the database server sends out a "change" that reflects
70 what the IDL already thought was in the database. The database server is
71 not supposed to do that, but bugs could in theory cause it to do so.)
73 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
74 if no lock is configured.
76 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
77 lock, and False otherwise.
79 Locking and unlocking happens asynchronously from the database client's
80 point of view, so the information is only useful for optimization
81 (e.g. if the client doesn't have the lock then there's no point in trying
82 to write to the database).
84 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
85 the database server has indicated that some other client already owns the
86 requested lock, and False otherwise.
88 - 'txn': The ovs.db.idl.Transaction object for the database transaction
89 currently being constructed, if there is one, or None otherwise.
93 IDL_S_MONITOR_REQUESTED = 1
94 IDL_S_MONITOR_COND_REQUESTED = 2
96 def __init__(self, remote, schema):
97 """Creates and returns a connection to the database named 'db_name' on
98 'remote', which should be in a form acceptable to
99 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
100 replica of the remote database.
102 'schema' should be the schema for the remote database. The caller may
103 have cut it down by removing tables or columns that are not of
104 interest. The IDL will only replicate the tables and columns that
105 remain. The caller may also add a attribute named 'alert' to selected
106 remaining columns, setting its value to False; if so, then changes to
107 those columns will not be considered changes to the database for the
108 purpose of the return value of Idl.run() and Idl.change_seqno. This is
109 useful for columns that the IDL's client will write but not read.
111 As a convenience to users, 'schema' may also be an instance of the
114 The IDL uses and modifies 'schema' directly."""
116 assert isinstance(schema, SchemaHelper)
117 schema = schema.get_idl_schema()
119 self.tables = schema.tables
120 self.readonly = schema.readonly
122 self._session = ovs.jsonrpc.Session.open(remote)
123 self._monitor_request_id = None
124 self._last_seqno = None
125 self.change_seqno = 0
126 self.uuid = uuid.uuid1()
127 self.state = self.IDL_S_INITIAL
130 self.lock_name = None # Name of lock we need, None if none.
131 self.has_lock = False # Has db server said we have the lock?
132 self.is_lock_contended = False # Has db server said we can't get lock?
133 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
135 # Transaction support.
137 self._outstanding_txns = {}
139 for table in six.itervalues(schema.tables):
140 for column in six.itervalues(table.columns):
141 if not hasattr(column, 'alert'):
143 table.need_table = False
149 """Closes the connection to the database. The IDL will no longer
151 self._session.close()
154 """Processes a batch of messages from the database server. Returns
155 True if the database as seen through the IDL changed, False if it did
156 not change. The initial fetch of the entire contents of the remote
157 database is considered to be one kind of change. If the IDL has been
158 configured to acquire a database lock (with Idl.set_lock()), then
159 successfully acquiring the lock is also considered to be a change.
161 This function can return occasional false positives, that is, report
162 that the database changed even though it didn't. This happens if the
163 connection to the database drops and reconnects, which causes the
164 database contents to be reloaded even if they didn't change. (It could
165 also happen if the database server sends out a "change" that reflects
166 what we already thought was in the database, but the database server is
167 not supposed to do that.)
169 As an alternative to checking the return value, the client may check
170 for changes in self.change_seqno."""
172 initial_change_seqno = self.change_seqno
177 if not self._session.is_connected():
180 seqno = self._session.get_seqno()
181 if seqno != self._last_seqno:
182 self._last_seqno = seqno
183 self.__txn_abort_all()
184 self.__send_monitor_request()
186 self.__send_lock_request()
189 msg = self._session.recv()
192 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
193 and msg.method == "update2"
194 and len(msg.params) == 2):
195 # Database contents changed.
196 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
197 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
198 and msg.method == "update"
199 and len(msg.params) == 2):
200 # Database contents changed.
201 self.__parse_update(msg.params[1], OVSDB_UPDATE)
202 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
203 and self._monitor_request_id is not None
204 and self._monitor_request_id == msg.id):
205 # Reply to our "monitor" request.
207 self.change_seqno += 1
208 self._monitor_request_id = None
210 if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
211 self.__parse_update(msg.result, OVSDB_UPDATE2)
213 assert self.state == self.IDL_S_MONITOR_REQUESTED
214 self.__parse_update(msg.result, OVSDB_UPDATE)
216 except error.Error as e:
217 vlog.err("%s: parse error in received schema: %s"
218 % (self._session.get_name(), e))
220 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
221 and self._lock_request_id is not None
222 and self._lock_request_id == msg.id):
223 # Reply to our "lock" request.
224 self.__parse_lock_reply(msg.result)
225 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
226 and msg.method == "locked"):
228 self.__parse_lock_notify(msg.params, True)
229 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
230 and msg.method == "stolen"):
231 # Someone else stole our lock.
232 self.__parse_lock_notify(msg.params, False)
233 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
234 # Reply to our echo request. Ignore it.
236 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
237 self.state == self.IDL_S_MONITOR_COND_REQUESTED and
238 self._monitor_request_id == msg.id):
239 if msg.error == "unknown method":
240 self.__send_monitor_request()
241 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
242 ovs.jsonrpc.Message.T_REPLY)
243 and self.__txn_process_reply(msg)):
244 # __txn_process_reply() did everything needed.
247 # This can happen if a transaction is destroyed before we
248 # receive the reply, so keep the log level low.
249 vlog.dbg("%s: received unexpected %s message"
250 % (self._session.get_name(),
251 ovs.jsonrpc.Message.type_to_string(msg.type)))
253 return initial_change_seqno != self.change_seqno
255 def cond_change(self, table_name, cond):
256 """Change conditions for this IDL session. If session is not already
257 connected, add condtion to table and submit it on send_monitor_request.
258 Otherwise send monitor_cond_change method with the requested
260 table = self.tables.get(table_name)
262 raise error.Error('Unknown table "%s"' % table_name)
263 if self._session.is_connected():
264 self.__send_cond_change(table, cond)
266 table.condition = cond
268 def wait(self, poller):
269 """Arranges for poller.block() to wake up when self.run() has something
270 to do or when activity occurs on a transaction on 'self'."""
271 self._session.wait(poller)
272 self._session.recv_wait(poller)
274 def has_ever_connected(self):
275 """Returns True, if the IDL successfully connected to the remote
276 database and retrieved its contents (even if the connection
277 subsequently dropped and is in the process of reconnecting). If so,
278 then the IDL contains an atomic snapshot of the database's contents
279 (but it might be arbitrarily old if the connection dropped).
281 Returns False if the IDL has never connected or retrieved the
282 database's contents. If so, the IDL is empty."""
283 return self.change_seqno != 0
285 def force_reconnect(self):
286 """Forces the IDL to drop its connection to the database and reconnect.
287 In the meantime, the contents of the IDL will not change."""
288 self._session.force_reconnect()
290 def set_lock(self, lock_name):
291 """If 'lock_name' is not None, configures the IDL to obtain the named
292 lock from the database server and to avoid modifying the database when
293 the lock cannot be acquired (that is, when another client has the same
296 If 'lock_name' is None, drops the locking requirement and releases the
299 assert not self._outstanding_txns
301 if self.lock_name and (not lock_name or lock_name != self.lock_name):
302 # Release previous lock.
303 self.__send_unlock_request()
304 self.lock_name = None
305 self.is_lock_contended = False
307 if lock_name and not self.lock_name:
309 self.lock_name = lock_name
310 self.__send_lock_request()
312 def notify(self, event, row, updates=None):
313 """Hook for implementing create/update/delete notifications
315 :param event: The event that was triggered
316 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
317 :param row: The row as it is after the operation has occured
319 :param updates: For updates, row with only updated columns
323 def __send_cond_change(self, table, cond):
324 monitor_cond_change = {table.name: [{"where": cond}]}
325 old_uuid = str(self.uuid)
326 self.uuid = uuid.uuid1()
327 params = [old_uuid, str(self.uuid), monitor_cond_change]
328 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
329 self._session.send(msg)
334 for table in six.itervalues(self.tables):
340 self.change_seqno += 1
342 def __update_has_lock(self, new_has_lock):
343 if new_has_lock and not self.has_lock:
344 if self._monitor_request_id is None:
345 self.change_seqno += 1
347 # We're waiting for a monitor reply, so don't signal that the
348 # database changed. The monitor reply will increment
349 # change_seqno anyhow.
351 self.is_lock_contended = False
352 self.has_lock = new_has_lock
354 def __do_send_lock_request(self, method):
355 self.__update_has_lock(False)
356 self._lock_request_id = None
357 if self._session.is_connected():
358 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
360 self._session.send(msg)
365 def __send_lock_request(self):
366 self._lock_request_id = self.__do_send_lock_request("lock")
368 def __send_unlock_request(self):
369 self.__do_send_lock_request("unlock")
371 def __parse_lock_reply(self, result):
372 self._lock_request_id = None
373 got_lock = isinstance(result, dict) and result.get("locked") is True
374 self.__update_has_lock(got_lock)
376 self.is_lock_contended = True
378 def __parse_lock_notify(self, params, new_has_lock):
379 if (self.lock_name is not None
380 and isinstance(params, (list, tuple))
382 and params[0] == self.lock_name):
383 self.__update_has_lock(new_has_lock)
385 self.is_lock_contended = True
387 def __send_monitor_request(self):
388 if self.state == self.IDL_S_INITIAL:
389 self.state = self.IDL_S_MONITOR_COND_REQUESTED
390 method = "monitor_cond"
392 self.state = self.IDL_S_MONITOR_REQUESTED
395 monitor_requests = {}
396 for table in six.itervalues(self.tables):
398 for column in six.iterkeys(table.columns):
399 if ((table.name not in self.readonly) or
400 (table.name in self.readonly) and
401 (column not in self.readonly[table.name])):
402 columns.append(column)
403 monitor_requests[table.name] = {"columns": columns}
404 if method == "monitor_cond" and table.condition:
405 monitor_requests[table.name]["where"] = table.condition
406 table.condition = None
408 msg = ovs.jsonrpc.Message.create_request(
409 method, [self._db.name, str(self.uuid), monitor_requests])
410 self._monitor_request_id = msg.id
411 self._session.send(msg)
413 def __parse_update(self, update, version):
415 self.__do_parse_update(update, version)
416 except error.Error as e:
417 vlog.err("%s: error parsing update: %s"
418 % (self._session.get_name(), e))
420 def __do_parse_update(self, table_updates, version):
421 if not isinstance(table_updates, dict):
422 raise error.Error("<table-updates> is not an object",
425 for table_name, table_update in six.iteritems(table_updates):
426 table = self.tables.get(table_name)
428 raise error.Error('<table-updates> includes unknown '
429 'table "%s"' % table_name)
431 if not isinstance(table_update, dict):
432 raise error.Error('<table-update> for table "%s" is not '
433 'an object' % table_name, table_update)
435 for uuid_string, row_update in six.iteritems(table_update):
436 if not ovs.ovsuuid.is_valid_string(uuid_string):
437 raise error.Error('<table-update> for table "%s" '
438 'contains bad UUID "%s" as member '
439 'name' % (table_name, uuid_string),
441 uuid = ovs.ovsuuid.from_string(uuid_string)
443 if not isinstance(row_update, dict):
444 raise error.Error('<table-update> for table "%s" '
445 'contains <row-update> for %s that '
447 % (table_name, uuid_string))
449 if version == OVSDB_UPDATE2:
450 if self.__process_update2(table, uuid, row_update):
451 self.change_seqno += 1
454 parser = ovs.db.parser.Parser(row_update, "row-update")
455 old = parser.get_optional("old", [dict])
456 new = parser.get_optional("new", [dict])
459 if not old and not new:
460 raise error.Error('<row-update> missing "old" and '
461 '"new" members', row_update)
463 if self.__process_update(table, uuid, old, new):
464 self.change_seqno += 1
466 def __process_update2(self, table, uuid, row_update):
467 row = table.rows.get(uuid)
469 if "delete" in row_update:
472 self.notify(ROW_DELETE, row)
476 vlog.warn("cannot delete missing row %s from table"
477 "%s" % (uuid, table.name))
478 elif "insert" in row_update or "initial" in row_update:
480 vlog.warn("cannot add existing row %s from table"
481 " %s" % (uuid, table.name))
483 row = self.__create_row(table, uuid)
484 if "insert" in row_update:
485 row_update = row_update['insert']
487 row_update = row_update['initial']
488 self.__add_default(table, row_update)
489 if self.__row_update(table, row, row_update):
491 self.notify(ROW_CREATE, row)
492 elif "modify" in row_update:
494 raise error.Error('Modify non-existing row')
496 self.__apply_diff(table, row, row_update['modify'])
497 self.notify(ROW_UPDATE, row,
498 Row.from_json(self, table, uuid, row_update['modify']))
501 raise error.Error('<row-update> unknown operation',
505 def __process_update(self, table, uuid, old, new):
506 """Returns True if a column changed, False otherwise."""
507 row = table.rows.get(uuid)
514 self.notify(ROW_DELETE, row)
517 vlog.warn("cannot delete missing row %s from table %s"
518 % (uuid, table.name))
522 row = self.__create_row(table, uuid)
526 vlog.warn("cannot add existing row %s to table %s"
527 % (uuid, table.name))
528 if self.__row_update(table, row, new):
530 self.notify(ROW_CREATE, row)
534 row = self.__create_row(table, uuid)
538 vlog.warn("cannot modify missing row %s in table %s"
539 % (uuid, table.name))
540 if self.__row_update(table, row, new):
542 self.notify(op, row, Row.from_json(self, table, uuid, old))
545 def __column_name(self, column):
546 if column.type.key.type == ovs.db.types.UuidType:
547 return ovs.ovsuuid.to_json(column.type.key.type.default)
549 return column.type.key.type.default
551 def __add_default(self, table, row_update):
552 for column in six.itervalues(table.columns):
553 if column.name not in row_update:
554 if ((table.name not in self.readonly) or
555 (table.name in self.readonly) and
556 (column.name not in self.readonly[table.name])):
557 if column.type.n_min != 0 and not column.type.is_map():
558 row_update[column.name] = self.__column_name(column)
560 def __apply_diff(self, table, row, row_diff):
561 for column_name, datum_json in six.iteritems(row_diff):
562 column = table.columns.get(column_name)
565 vlog.warn("unknown column %s updating table %s"
566 % (column_name, table.name))
570 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
571 except error.Error as e:
573 vlog.warn("error parsing column %s in table %s: %s"
574 % (column_name, table.name, e))
577 datum = row._data[column_name].diff(datum)
578 if datum != row._data[column_name]:
579 row._data[column_name] = datum
581 def __row_update(self, table, row, row_json):
583 for column_name, datum_json in six.iteritems(row_json):
584 column = table.columns.get(column_name)
587 vlog.warn("unknown column %s updating table %s"
588 % (column_name, table.name))
592 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
593 except error.Error as e:
595 vlog.warn("error parsing column %s in table %s: %s"
596 % (column_name, table.name, e))
599 if datum != row._data[column_name]:
600 row._data[column_name] = datum
604 # Didn't really change but the OVSDB monitor protocol always
605 # includes every value in a row.
609 def __create_row(self, table, uuid):
611 for column in six.itervalues(table.columns):
612 data[column.name] = ovs.db.data.Datum.default(column.type)
613 row = table.rows[uuid] = Row(self, table, uuid, data)
617 self._session.force_reconnect()
619 def __txn_abort_all(self):
620 while self._outstanding_txns:
621 txn = self._outstanding_txns.popitem()[1]
622 txn._status = Transaction.TRY_AGAIN
624 def __txn_process_reply(self, msg):
625 txn = self._outstanding_txns.pop(msg.id, None)
627 txn._process_reply(msg)
630 def _uuid_to_row(atom, base):
632 return base.ref_table.rows.get(atom)
637 def _row_to_uuid(value):
638 if isinstance(value, Row):
644 @functools.total_ordering
646 """A row within an IDL.
648 The client may access the following attributes directly:
650 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
652 - An attribute for each column in the Row's table, named for the column,
653 whose values are as returned by Datum.to_python() for the column's type.
655 If some error occurs (e.g. the database server's idea of the column is
656 different from the IDL's idea), then the attribute values is the
657 "default" value return by Datum.default() for the column's type. (It is
658 important to know this because the default value may violate constraints
659 for the column's type, e.g. the default integer value is 0 even if column
660 contraints require the column's value to be positive.)
662 When a transaction is active, column attributes may also be assigned new
663 values. Committing the transaction will then cause the new value to be
664 stored into the database.
666 *NOTE*: In the current implementation, the value of a column is a *copy*
667 of the value in the database. This means that modifying its value
668 directly will have no useful effect. For example, the following:
669 row.mycolumn["a"] = "b" # don't do this
670 will not change anything in the database, even after commit. To modify
671 the column, instead assign the modified column value back to the column:
676 def __init__(self, idl, table, uuid, data):
677 # All of the explicit references to self.__dict__ below are required
678 # to set real attributes with invoking self.__getattr__().
679 self.__dict__["uuid"] = uuid
681 self.__dict__["_idl"] = idl
682 self.__dict__["_table"] = table
684 # _data is the committed data. It takes the following values:
686 # - A dictionary that maps every column name to a Datum, if the row
687 # exists in the committed form of the database.
689 # - None, if this row is newly inserted within the active transaction
690 # and thus has no committed form.
691 self.__dict__["_data"] = data
693 # _changes describes changes to this row within the active transaction.
694 # It takes the following values:
696 # - {}, the empty dictionary, if no transaction is active or if the
697 # row has yet not been changed within this transaction.
699 # - A dictionary that maps a column name to its new Datum, if an
700 # active transaction changes those columns' values.
702 # - A dictionary that maps every column name to a Datum, if the row
703 # is newly inserted within the active transaction.
705 # - None, if this transaction deletes this row.
706 self.__dict__["_changes"] = {}
708 # A dictionary whose keys are the names of columns that must be
709 # verified as prerequisites when the transaction commits. The values
710 # in the dictionary are all None.
711 self.__dict__["_prereqs"] = {}
713 def __lt__(self, other):
714 if not isinstance(other, Row):
715 return NotImplemented
716 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
718 def __eq__(self, other):
719 if not isinstance(other, Row):
720 return NotImplemented
721 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
724 return int(self.__dict__['uuid'])
726 def __getattr__(self, column_name):
727 assert self._changes is not None
729 datum = self._changes.get(column_name)
731 if self._data is None:
732 raise AttributeError("%s instance has no attribute '%s'" %
733 (self.__class__.__name__, column_name))
734 if column_name in self._data:
735 datum = self._data[column_name]
737 raise AttributeError("%s instance has no attribute '%s'" %
738 (self.__class__.__name__, column_name))
740 return datum.to_python(_uuid_to_row)
742 def __setattr__(self, column_name, value):
743 assert self._changes is not None
746 if ((self._table.name in self._idl.readonly) and
747 (column_name in self._idl.readonly[self._table.name])):
748 vlog.warn("attempting to write to readonly column %s"
752 column = self._table.columns[column_name]
754 datum = ovs.db.data.Datum.from_python(column.type, value,
756 except error.Error as e:
758 vlog.err("attempting to write bad value to column %s (%s)"
761 self._idl.txn._write(self, column, datum)
764 def from_json(cls, idl, table, uuid, row_json):
766 for column_name, datum_json in six.iteritems(row_json):
767 column = table.columns.get(column_name)
770 vlog.warn("unknown column %s in table %s"
771 % (column_name, table.name))
774 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
775 except error.Error as e:
777 vlog.warn("error parsing column %s in table %s: %s"
778 % (column_name, table.name, e))
780 data[column_name] = datum
781 return cls(idl, table, uuid, data)
783 def verify(self, column_name):
784 """Causes the original contents of column 'column_name' in this row to
785 be verified as a prerequisite to completing the transaction. That is,
786 if 'column_name' changed in this row (or if this row was deleted)
787 between the time that the IDL originally read its contents and the time
788 that the transaction commits, then the transaction aborts and
789 Transaction.commit() returns Transaction.TRY_AGAIN.
791 The intention is that, to ensure that no transaction commits based on
792 dirty reads, an application should call Row.verify() on each data item
793 read as part of a read-modify-write operation.
795 In some cases Row.verify() reduces to a no-op, because the current
796 value of the column is already known:
798 - If this row is a row created by the current transaction (returned
799 by Transaction.insert()).
801 - If the column has already been modified within the current
804 Because of the latter property, always call Row.verify() *before*
805 modifying the column, for a given read-modify-write.
807 A transaction must be in progress."""
809 assert self._changes is not None
810 if not self._data or column_name in self._changes:
813 self._prereqs[column_name] = None
816 """Deletes this row from its table.
818 A transaction must be in progress."""
820 assert self._changes is not None
821 if self._data is None:
822 del self._idl.txn._txn_rows[self.uuid]
824 self._idl.txn._txn_rows[self.uuid] = self
825 self.__dict__["_changes"] = None
826 del self._table.rows[self.uuid]
828 def fetch(self, column_name):
829 self._idl.txn._fetch(self, column_name)
831 def increment(self, column_name):
832 """Causes the transaction, when committed, to increment the value of
833 'column_name' within this row by 1. 'column_name' must have an integer
834 type. After the transaction commits successfully, the client may
835 retrieve the final (incremented) value of 'column_name' with
836 Transaction.get_increment_new_value().
838 The client could accomplish something similar by reading and writing
839 and verify()ing columns. However, increment() will never (by itself)
840 cause a transaction to fail because of a verify error.
842 The intended use is for incrementing the "next_cfg" column in
843 the Open_vSwitch table."""
844 self._idl.txn._increment(self, column_name)
847 def _uuid_name_from_uuid(uuid):
848 return "row%s" % str(uuid).replace("-", "_")
851 def _where_uuid_equals(uuid):
852 return [["_uuid", "==", ["uuid", str(uuid)]]]
855 class _InsertedRow(object):
856 def __init__(self, op_index):
857 self.op_index = op_index
861 class Transaction(object):
862 """A transaction may modify the contents of a database by modifying the
863 values of columns, deleting rows, inserting rows, or adding checks that
864 columns in the database have not changed ("verify" operations), through
867 Reading and writing columns and inserting and deleting rows are all
868 straightforward. The reasons to verify columns are less obvious.
869 Verification is the key to maintaining transactional integrity. Because
870 OVSDB handles multiple clients, it can happen that between the time that
871 OVSDB client A reads a column and writes a new value, OVSDB client B has
872 written that column. Client A's write should not ordinarily overwrite
873 client B's, especially if the column in question is a "map" column that
874 contains several more or less independent data items. If client A adds a
875 "verify" operation before it writes the column, then the transaction fails
876 in case client B modifies it first. Client A will then see the new value
877 of the column and compose a new transaction based on the new contents
880 When a transaction is complete, which must be before the next call to
881 Idl.run(), call Transaction.commit() or Transaction.abort().
883 The life-cycle of a transaction looks like this:
885 1. Create the transaction and record the initial sequence number:
887 seqno = idl.change_seqno(idl)
888 txn = Transaction(idl)
890 2. Modify the database with Row and Transaction methods.
892 3. Commit the transaction by calling Transaction.commit(). The first call
893 to this function probably returns Transaction.INCOMPLETE. The client
894 must keep calling again along as this remains true, calling Idl.run() in
895 between to let the IDL do protocol processing. (If the client doesn't
896 have anything else to do in the meantime, it can use
897 Transaction.commit_block() to avoid having to loop itself.)
899 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
900 to change from the saved 'seqno' (it's possible that it's already
901 changed, in which case the client should not wait at all), then start
902 over from step 1. Only a call to Idl.run() will change the return value
903 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
905 # Status values that Transaction.commit() can return.
907 # Not yet committed or aborted.
908 UNCOMMITTED = "uncommitted"
909 # Transaction didn't include any changes.
910 UNCHANGED = "unchanged"
911 # Commit in progress, please wait.
912 INCOMPLETE = "incomplete"
913 # ovsdb_idl_txn_abort() called.
917 # Commit failed because a "verify" operation
918 # reported an inconsistency, due to a network
919 # problem, or other transient failure. Wait
920 # for a change, then try again.
921 TRY_AGAIN = "try again"
922 # Server hasn't given us the lock yet.
923 NOT_LOCKED = "not locked"
924 # Commit failed due to a hard error.
928 def status_to_string(status):
929 """Converts one of the status values that Transaction.commit() can
930 return into a human-readable string.
932 (The status values are in fact such strings already, so
933 there's nothing to do.)"""
936 def __init__(self, idl):
937 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
938 A given Idl may only have a single active transaction at a time.
940 A Transaction may modify the contents of a database by assigning new
941 values to columns (attributes of Row), deleting rows (with
942 Row.delete()), or inserting rows (with Transaction.insert()). It may
943 also check that columns in the database have not changed with
946 When a transaction is complete (which must be before the next call to
947 Idl.run()), call Transaction.commit() or Transaction.abort()."""
948 assert idl.txn is None
951 self._request_id = None
955 self._status = Transaction.UNCOMMITTED
960 self._inc_column = None
962 self._fetch_requests = []
964 self._inserted_rows = {} # Map from UUID to _InsertedRow
966 def add_comment(self, comment):
967 """Appends 'comment' to the comments that will be passed to the OVSDB
968 server when this transaction is committed. (The comment will be
969 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
970 relatively human-readable form.)"""
971 self._comments.append(comment)
973 def wait(self, poller):
974 """Causes poll_block() to wake up if this transaction has completed
976 if self._status not in (Transaction.UNCOMMITTED,
977 Transaction.INCOMPLETE):
978 poller.immediate_wake()
980 def _substitute_uuids(self, json):
981 if isinstance(json, (list, tuple)):
983 and json[0] == 'uuid'
984 and ovs.ovsuuid.is_valid_string(json[1])):
985 uuid = ovs.ovsuuid.from_string(json[1])
986 row = self._txn_rows.get(uuid, None)
987 if row and row._data is None:
988 return ["named-uuid", _uuid_name_from_uuid(uuid)]
990 return [self._substitute_uuids(elem) for elem in json]
993 def __disassemble(self):
996 for row in six.itervalues(self._txn_rows):
997 if row._changes is None:
998 row._table.rows[row.uuid] = row
999 elif row._data is None:
1000 del row._table.rows[row.uuid]
1001 row.__dict__["_changes"] = {}
1002 row.__dict__["_prereqs"] = {}
1006 """Attempts to commit 'txn'. Returns the status of the commit
1007 operation, one of the following constants:
1009 Transaction.INCOMPLETE:
1011 The transaction is in progress, but not yet complete. The caller
1012 should call again later, after calling Idl.run() to let the
1013 IDL do OVSDB protocol processing.
1015 Transaction.UNCHANGED:
1017 The transaction is complete. (It didn't actually change the
1018 database, so the IDL didn't send any request to the database
1021 Transaction.ABORTED:
1023 The caller previously called Transaction.abort().
1025 Transaction.SUCCESS:
1027 The transaction was successful. The update made by the
1028 transaction (and possibly other changes made by other database
1029 clients) should already be visible in the IDL.
1031 Transaction.TRY_AGAIN:
1033 The transaction failed for some transient reason, e.g. because a
1034 "verify" operation reported an inconsistency or due to a network
1035 problem. The caller should wait for a change to the database,
1036 then compose a new transaction, and commit the new transaction.
1038 Use Idl.change_seqno to wait for a change in the database. It is
1039 important to use its value *before* the initial call to
1040 Transaction.commit() as the baseline for this purpose, because
1041 the change that one should wait for can happen after the initial
1042 call but before the call that returns Transaction.TRY_AGAIN, and
1043 using some other baseline value in that situation could cause an
1044 indefinite wait if the database rarely changes.
1046 Transaction.NOT_LOCKED:
1048 The transaction failed because the IDL has been configured to
1049 require a database lock (with Idl.set_lock()) but didn't
1050 get it yet or has already lost it.
1052 Committing a transaction rolls back all of the changes that it made to
1053 the IDL's copy of the database. If the transaction commits
1054 successfully, then the database server will send an update and, thus,
1055 the IDL will be updated with the committed changes."""
1056 # The status can only change if we're the active transaction.
1057 # (Otherwise, our status will change only in Idl.run().)
1058 if self != self.idl.txn:
1061 # If we need a lock but don't have it, give up quickly.
1062 if self.idl.lock_name and not self.idl.has_lock:
1063 self._status = Transaction.NOT_LOCKED
1064 self.__disassemble()
1067 operations = [self.idl._db.name]
1069 # Assert that we have the required lock (avoiding a race).
1070 if self.idl.lock_name:
1071 operations.append({"op": "assert",
1072 "lock": self.idl.lock_name})
1074 # Add prerequisites and declarations of new rows.
1075 for row in six.itervalues(self._txn_rows):
1079 for column_name in row._prereqs:
1080 columns.append(column_name)
1081 rows[column_name] = row._data[column_name].to_json()
1082 operations.append({"op": "wait",
1083 "table": row._table.name,
1085 "where": _where_uuid_equals(row.uuid),
1092 for row in six.itervalues(self._txn_rows):
1093 if row._changes is None:
1094 if row._table.is_root:
1095 operations.append({"op": "delete",
1096 "table": row._table.name,
1097 "where": _where_uuid_equals(row.uuid)})
1100 # Let ovsdb-server decide whether to really delete it.
1103 op = {"table": row._table.name}
1104 if row._data is None:
1106 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1109 op_index = len(operations) - 1
1110 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1113 op["where"] = _where_uuid_equals(row.uuid)
1116 op["row"] = row_json
1118 for column_name, datum in six.iteritems(row._changes):
1119 if row._data is not None or not datum.is_default():
1120 row_json[column_name] = (
1121 self._substitute_uuids(datum.to_json()))
1123 # If anything really changed, consider it an update.
1124 # We can't suppress not-really-changed values earlier
1125 # or transactions would become nonatomic (see the big
1126 # comment inside Transaction._write()).
1127 if (not any_updates and row._data is not None and
1128 row._data[column_name] != datum):
1131 if row._data is None or row_json:
1132 operations.append(op)
1134 if self._fetch_requests:
1135 for fetch in self._fetch_requests:
1136 fetch["index"] = len(operations) - 1
1137 operations.append({"op": "select",
1138 "table": fetch["row"]._table.name,
1139 "where": self._substitute_uuids(
1140 _where_uuid_equals(fetch["row"].uuid)),
1141 "columns": [fetch["column_name"]]})
1145 if self._inc_row and any_updates:
1146 self._inc_index = len(operations) - 1
1148 operations.append({"op": "mutate",
1149 "table": self._inc_row._table.name,
1150 "where": self._substitute_uuids(
1151 _where_uuid_equals(self._inc_row.uuid)),
1152 "mutations": [[self._inc_column, "+=", 1]]})
1153 operations.append({"op": "select",
1154 "table": self._inc_row._table.name,
1155 "where": self._substitute_uuids(
1156 _where_uuid_equals(self._inc_row.uuid)),
1157 "columns": [self._inc_column]})
1161 operations.append({"op": "comment",
1162 "comment": "\n".join(self._comments)})
1166 operations.append({"op": "abort"})
1169 self._status = Transaction.UNCHANGED
1171 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1172 self._request_id = msg.id
1173 if not self.idl._session.send(msg):
1174 self.idl._outstanding_txns[self._request_id] = self
1175 self._status = Transaction.INCOMPLETE
1177 self._status = Transaction.TRY_AGAIN
1179 self.__disassemble()
1182 def commit_block(self):
1183 """Attempts to commit this transaction, blocking until the commit
1184 either succeeds or fails. Returns the final commit status, which may
1185 be any Transaction.* value other than Transaction.INCOMPLETE.
1187 This function calls Idl.run() on this transaction'ss IDL, so it may
1188 cause Idl.change_seqno to change."""
1190 status = self.commit()
1191 if status != Transaction.INCOMPLETE:
1196 poller = ovs.poller.Poller()
1197 self.idl.wait(poller)
1201 def get_increment_new_value(self):
1202 """Returns the final (incremented) value of the column in this
1203 transaction that was set to be incremented by Row.increment. This
1204 transaction must have committed successfully."""
1205 assert self._status == Transaction.SUCCESS
1206 return self._inc_new_value
1209 """Aborts this transaction. If Transaction.commit() has already been
1210 called then the transaction might get committed anyhow."""
1211 self.__disassemble()
1212 if self._status in (Transaction.UNCOMMITTED,
1213 Transaction.INCOMPLETE):
1214 self._status = Transaction.ABORTED
1216 def get_error(self):
1217 """Returns a string representing this transaction's current status,
1218 suitable for use in log messages."""
1219 if self._status != Transaction.ERROR:
1220 return Transaction.status_to_string(self._status)
1224 return "no error details available"
1226 def __set_error_json(self, json):
1227 if self._error is None:
1228 self._error = ovs.json.to_string(json)
1230 def get_insert_uuid(self, uuid):
1231 """Finds and returns the permanent UUID that the database assigned to a
1232 newly inserted row, given the UUID that Transaction.insert() assigned
1233 locally to that row.
1235 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1236 or if it was assigned by that function and then deleted by Row.delete()
1237 within the same transaction. (Rows that are inserted and then deleted
1238 within a single transaction are never sent to the database server, so
1239 it never assigns them a permanent UUID.)
1241 This transaction must have completed successfully."""
1242 assert self._status in (Transaction.SUCCESS,
1243 Transaction.UNCHANGED)
1244 inserted_row = self._inserted_rows.get(uuid)
1246 return inserted_row.real
1249 def _increment(self, row, column):
1250 assert not self._inc_row
1252 self._inc_column = column
1254 def _fetch(self, row, column_name):
1255 self._fetch_requests.append({"row": row, "column_name": column_name})
1257 def _write(self, row, column, datum):
1258 assert row._changes is not None
1262 # If this is a write-only column and the datum being written is the
1263 # same as the one already there, just skip the update entirely. This
1264 # is worth optimizing because we have a lot of columns that get
1265 # periodically refreshed into the database but don't actually change
1268 # We don't do this for read/write columns because that would break
1269 # atomicity of transactions--some other client might have written a
1270 # different value in that column since we read it. (But if a whole
1271 # transaction only does writes of existing values, without making any
1272 # real changes, we will drop the whole transaction later in
1273 # ovsdb_idl_txn_commit().)
1274 if (not column.alert and row._data and
1275 row._data.get(column.name) == datum):
1276 new_value = row._changes.get(column.name)
1277 if new_value is None or new_value == datum:
1280 txn._txn_rows[row.uuid] = row
1281 row._changes[column.name] = datum.copy()
1283 def insert(self, table, new_uuid=None):
1284 """Inserts and returns a new row in 'table', which must be one of the
1285 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1287 The new row is assigned a provisional UUID. If 'uuid' is None then one
1288 is randomly generated; otherwise 'uuid' should specify a randomly
1289 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1290 different UUID when 'txn' is committed, but the IDL will replace any
1291 uses of the provisional UUID in the data to be to be committed by the
1292 UUID assigned by ovsdb-server."""
1293 assert self._status == Transaction.UNCOMMITTED
1294 if new_uuid is None:
1295 new_uuid = uuid.uuid4()
1296 row = Row(self.idl, table, new_uuid, None)
1297 table.rows[row.uuid] = row
1298 self._txn_rows[row.uuid] = row
1301 def _process_reply(self, msg):
1302 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1303 self._status = Transaction.ERROR
1304 elif not isinstance(msg.result, (list, tuple)):
1306 vlog.warn('reply to "transact" is not JSON array')
1315 # This isn't an error in itself but indicates that some
1316 # prior operation failed, so make sure that we know about
1319 elif isinstance(op, dict):
1320 error = op.get("error")
1321 if error is not None:
1322 if error == "timed out":
1324 elif error == "not owner":
1326 elif error == "aborted":
1330 self.__set_error_json(op)
1333 self.__set_error_json(op)
1335 vlog.warn("operation reply is not JSON null or object")
1337 if not soft_errors and not hard_errors and not lock_errors:
1338 if self._inc_row and not self.__process_inc_reply(ops):
1340 if self._fetch_requests:
1341 if self.__process_fetch_reply(ops):
1342 self.idl.change_seqno += 1
1346 for insert in six.itervalues(self._inserted_rows):
1347 if not self.__process_insert_reply(insert, ops):
1351 self._status = Transaction.ERROR
1353 self._status = Transaction.NOT_LOCKED
1355 self._status = Transaction.TRY_AGAIN
1357 self._status = Transaction.SUCCESS
1360 def __check_json_type(json, types, name):
1363 vlog.warn("%s is missing" % name)
1365 elif not isinstance(json, tuple(types)):
1367 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1372 def __process_fetch_reply(self, ops):
1374 for fetch_request in self._fetch_requests:
1375 row = fetch_request["row"]
1376 column_name = fetch_request["column_name"]
1377 index = fetch_request["index"]
1381 fetched_rows = select.get("rows")
1382 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1383 '"select" reply "rows"'):
1385 if len(fetched_rows) != 1:
1387 vlog.warn('"select" reply "rows" has %d elements '
1388 'instead of 1' % len(fetched_rows))
1390 fetched_row = fetched_rows[0]
1391 if not Transaction.__check_json_type(fetched_row, (dict,),
1392 '"select" reply row'):
1395 column = table.columns.get(column_name)
1396 datum_json = fetched_row.get(column_name)
1397 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1399 row._data[column_name] = datum
1404 def __process_inc_reply(self, ops):
1405 if self._inc_index + 2 > len(ops):
1407 vlog.warn("reply does not contain enough operations for "
1408 "increment (has %d, needs %d)" %
1409 (len(ops), self._inc_index + 2))
1411 # We know that this is a JSON object because the loop in
1412 # __process_reply() already checked.
1413 mutate = ops[self._inc_index]
1414 count = mutate.get("count")
1415 if not Transaction.__check_json_type(count, six.integer_types,
1416 '"mutate" reply "count"'):
1420 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1423 select = ops[self._inc_index + 1]
1424 rows = select.get("rows")
1425 if not Transaction.__check_json_type(rows, (list, tuple),
1426 '"select" reply "rows"'):
1430 vlog.warn('"select" reply "rows" has %d elements '
1431 'instead of 1' % len(rows))
1434 if not Transaction.__check_json_type(row, (dict,),
1435 '"select" reply row'):
1437 column = row.get(self._inc_column)
1438 if not Transaction.__check_json_type(column, six.integer_types,
1439 '"select" reply inc column'):
1441 self._inc_new_value = column
1444 def __process_insert_reply(self, insert, ops):
1445 if insert.op_index >= len(ops):
1447 vlog.warn("reply does not contain enough operations "
1448 "for insert (has %d, needs %d)"
1449 % (len(ops), insert.op_index))
1452 # We know that this is a JSON object because the loop in
1453 # __process_reply() already checked.
1454 reply = ops[insert.op_index]
1455 json_uuid = reply.get("uuid")
1456 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1457 '"insert" reply "uuid"'):
1461 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1464 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1471 class SchemaHelper(object):
1472 """IDL Schema helper.
1474 This class encapsulates the logic required to generate schemas suitable
1475 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1476 they are interested in using register_columns(). When finished, the
1477 get_idl_schema() function may be called.
1479 The location on disk of the schema used may be found in the
1480 'schema_location' variable."""
1482 def __init__(self, location=None, schema_json=None):
1483 """Creates a new Schema object.
1485 'location' file path to ovs schema. None means default location
1486 'schema_json' schema in json preresentation in memory
1489 if location and schema_json:
1490 raise ValueError("both location and schema_json can't be "
1491 "specified. it's ambiguous.")
1492 if schema_json is None:
1493 if location is None:
1494 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1495 schema_json = ovs.json.from_file(location)
1497 self.schema_json = schema_json
1502 def register_columns(self, table, columns, readonly=[]):
1503 """Registers interest in the given 'columns' of 'table'. Future calls
1504 to get_idl_schema() will include 'table':column for each column in
1505 'columns'. This function automatically avoids adding duplicate entries
1507 A subset of 'columns' can be specified as 'readonly'. The readonly
1508 columns are not replicated but can be fetched on-demand by the user
1511 'table' must be a string.
1512 'columns' must be a list of strings.
1513 'readonly' must be a list of strings.
1516 assert isinstance(table, six.string_types)
1517 assert isinstance(columns, list)
1519 columns = set(columns) | self._tables.get(table, set())
1520 self._tables[table] = columns
1521 self._readonly[table] = readonly
1523 def register_table(self, table):
1524 """Registers interest in the given all columns of 'table'. Future calls
1525 to get_idl_schema() will include all columns of 'table'.
1527 'table' must be a string
1529 assert isinstance(table, six.string_types)
1530 self._tables[table] = set() # empty set means all columns in the table
1532 def register_all(self):
1533 """Registers interest in every column of every table."""
1536 def get_idl_schema(self):
1537 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1538 object based on columns registered using the register_columns()
1541 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1542 self.schema_json = None
1546 for table, columns in six.iteritems(self._tables):
1547 schema_tables[table] = (
1548 self._keep_table_columns(schema, table, columns))
1550 schema.tables = schema_tables
1551 schema.readonly = self._readonly
1554 def _keep_table_columns(self, schema, table_name, columns):
1555 assert table_name in schema.tables
1556 table = schema.tables[table_name]
1559 # empty set means all columns in the table
1563 for column_name in columns:
1564 assert isinstance(column_name, six.string_types)
1565 assert column_name in table.columns
1567 new_columns[column_name] = table.columns[column_name]
1569 table.columns = new_columns