ovn-northd: Ability to detach on Windows.
[cascardo/ovs.git] / ovn / northd / ovn-northd.c
index aa58134..554dba3 100644 (file)
@@ -30,6 +30,7 @@
 #include "ovn/lib/ovn-nb-idl.h"
 #include "ovn/lib/ovn-sb-idl.h"
 #include "poll-loop.h"
+#include "smap.h"
 #include "stream.h"
 #include "stream-ssl.h"
 #include "unixctl.h"
@@ -53,6 +54,35 @@ static const char *ovnsb_db;
 
 static const char *default_db(void);
 
+
+/* Ingress pipeline stages.
+ *
+ * These must be listed in the order that the stages will be executed. */
+#define INGRESS_STAGES                         \
+    INGRESS_STAGE(PORT_SEC, port_sec)          \
+    INGRESS_STAGE(L2_LKUP, l2_lkup)
+
+enum ingress_stage {
+#define INGRESS_STAGE(NAME, STR) S_IN_##NAME,
+    INGRESS_STAGES
+#undef INGRESS_STAGE
+    INGRESS_N_STAGES
+};
+
+/* Egress pipeline stages.
+ *
+ * These must be listed in the order that the stages will be executed. */
+#define EGRESS_STAGES                         \
+    EGRESS_STAGE(ACL, acl)                    \
+    EGRESS_STAGE(PORT_SEC, port_sec)
+
+enum egress_stage {
+#define EGRESS_STAGE(NAME, STR) S_OUT_##NAME,
+    EGRESS_STAGES
+#undef EGRESS_STAGE
+    EGRESS_N_STAGES
+};
+
 static void
 usage(void)
 {
@@ -74,50 +104,475 @@ Options:\n\
     stream_usage("database", true, true, false);
 }
 \f
-static int
-compare_strings(const void *a_, const void *b_)
+struct tnlid_node {
+    struct hmap_node hmap_node;
+    uint32_t tnlid;
+};
+
+static void
+destroy_tnlids(struct hmap *tnlids)
 {
-    char *const *a = a_;
-    char *const *b = b_;
-    return strcmp(*a, *b);
+    struct tnlid_node *node, *next;
+    HMAP_FOR_EACH_SAFE (node, next, hmap_node, tnlids) {
+        hmap_remove(tnlids, &node->hmap_node);
+        free(node);
+    }
+    hmap_destroy(tnlids);
+}
+
+static void
+add_tnlid(struct hmap *set, uint32_t tnlid)
+{
+    struct tnlid_node *node = xmalloc(sizeof *node);
+    hmap_insert(set, &node->hmap_node, hash_int(tnlid, 0));
+    node->tnlid = tnlid;
 }
 
-/*
- * Determine whether 2 arrays of MAC addresses are the same.  It's possible that
- * the lists could be *very* long and this check is being done a lot (every
- * time the OVN_Northbound database changes).
- */
 static bool
-macs_equal(char **binding_macs_, size_t b_n_macs,
-           char **lport_macs_, size_t l_n_macs)
+tnlid_in_use(const struct hmap *set, uint32_t tnlid)
 {
-    char **binding_macs, **lport_macs;
-    size_t bytes, i;
+    const struct tnlid_node *node;
+    HMAP_FOR_EACH_IN_BUCKET (node, hmap_node, hash_int(tnlid, 0), set) {
+        if (node->tnlid == tnlid) {
+            return true;
+        }
+    }
+    return false;
+}
 
-    if (b_n_macs != l_n_macs) {
-        return false;
+static uint32_t
+allocate_tnlid(struct hmap *set, const char *name, uint32_t max,
+               uint32_t *hint)
+{
+    for (uint32_t tnlid = *hint + 1; tnlid != *hint;
+         tnlid = tnlid + 1 <= max ? tnlid + 1 : 1) {
+        if (!tnlid_in_use(set, tnlid)) {
+            add_tnlid(set, tnlid);
+            *hint = tnlid;
+            return tnlid;
+        }
     }
 
-    bytes = b_n_macs * sizeof binding_macs_[0];
-    binding_macs = xmalloc(bytes);
-    lport_macs = xmalloc(bytes);
+    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+    VLOG_WARN_RL(&rl, "all %s tunnel ids exhausted", name);
+    return 0;
+}
+\f
+/* The 'key' comes from nb->header_.uuid or sb->external_ids:logical-switch. */
+struct ovn_datapath {
+    struct hmap_node key_node;  /* Index on 'key'. */
+    struct uuid key;            /* nb->header_.uuid. */
 
-    memcpy(binding_macs, binding_macs_, bytes);
-    memcpy(lport_macs, lport_macs_, bytes);
+    const struct nbrec_logical_switch *nb;   /* May be NULL. */
+    const struct sbrec_datapath_binding *sb; /* May be NULL. */
 
-    qsort(binding_macs, b_n_macs, sizeof binding_macs[0], compare_strings);
-    qsort(lport_macs, l_n_macs, sizeof lport_macs[0], compare_strings);
+    struct ovs_list list;       /* In list of similar records. */
 
-    for (i = 0; i < b_n_macs; i++) {
-        if (strcmp(binding_macs[i], lport_macs[i])) {
-            break;
+    struct hmap port_tnlids;
+    uint32_t port_key_hint;
+
+    bool has_unknown;
+};
+
+static struct ovn_datapath *
+ovn_datapath_create(struct hmap *datapaths, const struct uuid *key,
+                    const struct nbrec_logical_switch *nb,
+                    const struct sbrec_datapath_binding *sb)
+{
+    struct ovn_datapath *od = xzalloc(sizeof *od);
+    od->key = *key;
+    od->sb = sb;
+    od->nb = nb;
+    hmap_init(&od->port_tnlids);
+    od->port_key_hint = 0;
+    hmap_insert(datapaths, &od->key_node, uuid_hash(&od->key));
+    return od;
+}
+
+static void
+ovn_datapath_destroy(struct hmap *datapaths, struct ovn_datapath *od)
+{
+    if (od) {
+        /* Don't remove od->list.  It is used within build_datapaths() as a
+         * private list and once we've exited that function it is not safe to
+         * use it. */
+        hmap_remove(datapaths, &od->key_node);
+        destroy_tnlids(&od->port_tnlids);
+        free(od);
+    }
+}
+
+static struct ovn_datapath *
+ovn_datapath_find(struct hmap *datapaths, const struct uuid *uuid)
+{
+    struct ovn_datapath *od;
+
+    HMAP_FOR_EACH_WITH_HASH (od, key_node, uuid_hash(uuid), datapaths) {
+        if (uuid_equals(uuid, &od->key)) {
+            return od;
+        }
+    }
+    return NULL;
+}
+
+static struct ovn_datapath *
+ovn_datapath_from_sbrec(struct hmap *datapaths,
+                        const struct sbrec_datapath_binding *sb)
+{
+    struct uuid key;
+
+    if (!smap_get_uuid(&sb->external_ids, "logical-switch", &key)) {
+        return NULL;
+    }
+    return ovn_datapath_find(datapaths, &key);
+}
+
+static void
+join_datapaths(struct northd_context *ctx, struct hmap *datapaths,
+               struct ovs_list *sb_only, struct ovs_list *nb_only,
+               struct ovs_list *both)
+{
+    hmap_init(datapaths);
+    list_init(sb_only);
+    list_init(nb_only);
+    list_init(both);
+
+    const struct sbrec_datapath_binding *sb, *sb_next;
+    SBREC_DATAPATH_BINDING_FOR_EACH_SAFE (sb, sb_next, ctx->ovnsb_idl) {
+        struct uuid key;
+        if (!smap_get_uuid(&sb->external_ids, "logical-switch", &key)) {
+            ovsdb_idl_txn_add_comment(ctx->ovnsb_txn,
+                                      "deleting Datapath_Binding "UUID_FMT" that "
+                                      "lacks external-ids:logical-switch",
+                         UUID_ARGS(&sb->header_.uuid));
+            sbrec_datapath_binding_delete(sb);
+            continue;
+        }
+
+        if (ovn_datapath_find(datapaths, &key)) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 1);
+            VLOG_INFO_RL(&rl, "deleting Datapath_Binding "UUID_FMT" with "
+                         "duplicate external-ids:logical-switch "UUID_FMT,
+                         UUID_ARGS(&sb->header_.uuid), UUID_ARGS(&key));
+            sbrec_datapath_binding_delete(sb);
+            continue;
+        }
+
+        struct ovn_datapath *od = ovn_datapath_create(datapaths, &key,
+                                                      NULL, sb);
+        list_push_back(sb_only, &od->list);
+    }
+
+    const struct nbrec_logical_switch *nb;
+    NBREC_LOGICAL_SWITCH_FOR_EACH (nb, ctx->ovnnb_idl) {
+        struct ovn_datapath *od = ovn_datapath_find(datapaths,
+                                                    &nb->header_.uuid);
+        if (od) {
+            od->nb = nb;
+            list_remove(&od->list);
+            list_push_back(both, &od->list);
+        } else {
+            od = ovn_datapath_create(datapaths, &nb->header_.uuid, nb, NULL);
+            list_push_back(nb_only, &od->list);
+        }
+    }
+}
+
+static uint32_t
+ovn_datapath_allocate_key(struct hmap *dp_tnlids)
+{
+    static uint32_t hint;
+    return allocate_tnlid(dp_tnlids, "datapath", (1u << 24) - 1, &hint);
+}
+
+static void
+build_datapaths(struct northd_context *ctx, struct hmap *datapaths)
+{
+    struct ovs_list sb_only, nb_only, both;
+
+    join_datapaths(ctx, datapaths, &sb_only, &nb_only, &both);
+
+    if (!list_is_empty(&nb_only)) {
+        /* First index the in-use datapath tunnel IDs. */
+        struct hmap dp_tnlids = HMAP_INITIALIZER(&dp_tnlids);
+        struct ovn_datapath *od;
+        LIST_FOR_EACH (od, list, &both) {
+            add_tnlid(&dp_tnlids, od->sb->tunnel_key);
+        }
+
+        /* Add southbound record for each unmatched northbound record. */
+        LIST_FOR_EACH (od, list, &nb_only) {
+            uint16_t tunnel_key = ovn_datapath_allocate_key(&dp_tnlids);
+            if (!tunnel_key) {
+                break;
+            }
+
+            od->sb = sbrec_datapath_binding_insert(ctx->ovnsb_txn);
+
+            struct smap external_ids = SMAP_INITIALIZER(&external_ids);
+            char uuid_s[UUID_LEN + 1];
+            sprintf(uuid_s, UUID_FMT, UUID_ARGS(&od->nb->header_.uuid));
+            smap_add(&external_ids, "logical-switch", uuid_s);
+            sbrec_datapath_binding_set_external_ids(od->sb, &external_ids);
+            smap_destroy(&external_ids);
+
+            sbrec_datapath_binding_set_tunnel_key(od->sb, tunnel_key);
+        }
+        destroy_tnlids(&dp_tnlids);
+    }
+
+    /* Delete southbound records without northbound matches. */
+    struct ovn_datapath *od, *next;
+    LIST_FOR_EACH_SAFE (od, next, list, &sb_only) {
+        list_remove(&od->list);
+        sbrec_datapath_binding_delete(od->sb);
+        ovn_datapath_destroy(datapaths, od);
+    }
+}
+\f
+struct ovn_port {
+    struct hmap_node key_node;  /* Index on 'key'. */
+    const char *key;            /* nb->name and sb->logical_port */
+
+    const struct nbrec_logical_port *nb; /* May be NULL. */
+    const struct sbrec_port_binding *sb; /* May be NULL. */
+
+    struct ovn_datapath *od;
+
+    struct ovs_list list;       /* In list of similar records. */
+};
+
+static struct ovn_port *
+ovn_port_create(struct hmap *ports, const char *key,
+                const struct nbrec_logical_port *nb,
+                const struct sbrec_port_binding *sb)
+{
+    struct ovn_port *op = xzalloc(sizeof *op);
+    op->key = key;
+    op->sb = sb;
+    op->nb = nb;
+    hmap_insert(ports, &op->key_node, hash_string(op->key, 0));
+    return op;
+}
+
+static void
+ovn_port_destroy(struct hmap *ports, struct ovn_port *port)
+{
+    if (port) {
+        /* Don't remove port->list.  It is used within build_ports() as a
+         * private list and once we've exited that function it is not safe to
+         * use it. */
+        hmap_remove(ports, &port->key_node);
+        free(port);
+    }
+}
+
+static struct ovn_port *
+ovn_port_find(struct hmap *ports, const char *name)
+{
+    struct ovn_port *op;
+
+    HMAP_FOR_EACH_WITH_HASH (op, key_node, hash_string(name, 0), ports) {
+        if (!strcmp(op->key, name)) {
+            return op;
+        }
+    }
+    return NULL;
+}
+
+static uint32_t
+ovn_port_allocate_key(struct ovn_datapath *od)
+{
+    return allocate_tnlid(&od->port_tnlids, "port",
+                          (1u << 15) - 1, &od->port_key_hint);
+}
+
+static void
+join_logical_ports(struct northd_context *ctx,
+                   struct hmap *datapaths, struct hmap *ports,
+                   struct ovs_list *sb_only, struct ovs_list *nb_only,
+                   struct ovs_list *both)
+{
+    hmap_init(ports);
+    list_init(sb_only);
+    list_init(nb_only);
+    list_init(both);
+
+    const struct sbrec_port_binding *sb;
+    SBREC_PORT_BINDING_FOR_EACH (sb, ctx->ovnsb_idl) {
+        struct ovn_port *op = ovn_port_create(ports, sb->logical_port,
+                                              NULL, sb);
+        list_push_back(sb_only, &op->list);
+    }
+
+    struct ovn_datapath *od;
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        for (size_t i = 0; i < od->nb->n_ports; i++) {
+            const struct nbrec_logical_port *nb = od->nb->ports[i];
+            struct ovn_port *op = ovn_port_find(ports, nb->name);
+            if (op) {
+                op->nb = nb;
+                list_remove(&op->list);
+                list_push_back(both, &op->list);
+            } else {
+                op = ovn_port_create(ports, nb->name, nb, NULL);
+                list_push_back(nb_only, &op->list);
+            }
+            op->od = od;
+        }
+    }
+}
+
+static void
+ovn_port_update_sbrec(const struct ovn_port *op)
+{
+    sbrec_port_binding_set_type(op->sb, op->nb->type);
+    sbrec_port_binding_set_options(op->sb, &op->nb->options);
+    sbrec_port_binding_set_datapath(op->sb, op->od->sb);
+    sbrec_port_binding_set_parent_port(op->sb, op->nb->parent_name);
+    sbrec_port_binding_set_tag(op->sb, op->nb->tag, op->nb->n_tag);
+    sbrec_port_binding_set_mac(op->sb, (const char **) op->nb->macs,
+                               op->nb->n_macs);
+}
+
+static void
+build_ports(struct northd_context *ctx, struct hmap *datapaths,
+            struct hmap *ports)
+{
+    struct ovs_list sb_only, nb_only, both;
+
+    join_logical_ports(ctx, datapaths, ports, &sb_only, &nb_only, &both);
+
+    /* For logical ports that are in both databases, update the southbound
+     * record based on northbound data.  Also index the in-use tunnel_keys. */
+    struct ovn_port *op, *next;
+    LIST_FOR_EACH_SAFE (op, next, list, &both) {
+        ovn_port_update_sbrec(op);
+
+        add_tnlid(&op->od->port_tnlids, op->sb->tunnel_key);
+        if (op->sb->tunnel_key > op->od->port_key_hint) {
+            op->od->port_key_hint = op->sb->tunnel_key;
+        }
+    }
+
+    /* Add southbound record for each unmatched northbound record. */
+    LIST_FOR_EACH_SAFE (op, next, list, &nb_only) {
+        uint16_t tunnel_key = ovn_port_allocate_key(op->od);
+        if (!tunnel_key) {
+            continue;
+        }
+
+        op->sb = sbrec_port_binding_insert(ctx->ovnsb_txn);
+        ovn_port_update_sbrec(op);
+
+        sbrec_port_binding_set_logical_port(op->sb, op->key);
+        sbrec_port_binding_set_tunnel_key(op->sb, tunnel_key);
+    }
+
+    /* Delete southbound records without northbound matches. */
+    LIST_FOR_EACH_SAFE(op, next, list, &sb_only) {
+        list_remove(&op->list);
+        sbrec_port_binding_delete(op->sb);
+        ovn_port_destroy(ports, op);
+    }
+}
+\f
+#define OVN_MIN_MULTICAST 32768
+#define OVN_MAX_MULTICAST 65535
+
+struct multicast_group {
+    const char *name;
+    uint16_t key;               /* OVN_MIN_MULTICAST...OVN_MAX_MULTICAST. */
+};
+
+#define MC_FLOOD "_MC_flood"
+static const struct multicast_group mc_flood = { MC_FLOOD, 65535 };
+
+#define MC_UNKNOWN "_MC_unknown"
+static const struct multicast_group mc_unknown = { MC_UNKNOWN, 65534 };
+
+static bool
+multicast_group_equal(const struct multicast_group *a,
+                      const struct multicast_group *b)
+{
+    return !strcmp(a->name, b->name) && a->key == b->key;
+}
+
+/* Multicast group entry. */
+struct ovn_multicast {
+    struct hmap_node hmap_node; /* Index on 'datapath' and 'key'. */
+    struct ovn_datapath *datapath;
+    const struct multicast_group *group;
+
+    struct ovn_port **ports;
+    size_t n_ports, allocated_ports;
+};
+
+static uint32_t
+ovn_multicast_hash(const struct ovn_datapath *datapath,
+                   const struct multicast_group *group)
+{
+    return hash_pointer(datapath, group->key);
+}
+
+static struct ovn_multicast *
+ovn_multicast_find(struct hmap *mcgroups, struct ovn_datapath *datapath,
+                   const struct multicast_group *group)
+{
+    struct ovn_multicast *mc;
+
+    HMAP_FOR_EACH_WITH_HASH (mc, hmap_node,
+                             ovn_multicast_hash(datapath, group), mcgroups) {
+        if (mc->datapath == datapath
+            && multicast_group_equal(mc->group, group)) {
+            return mc;
         }
     }
+    return NULL;
+}
 
-    free(binding_macs);
-    free(lport_macs);
+static void
+ovn_multicast_add(struct hmap *mcgroups, const struct multicast_group *group,
+                  struct ovn_port *port)
+{
+    struct ovn_datapath *od = port->od;
+    struct ovn_multicast *mc = ovn_multicast_find(mcgroups, od, group);
+    if (!mc) {
+        mc = xmalloc(sizeof *mc);
+        hmap_insert(mcgroups, &mc->hmap_node, ovn_multicast_hash(od, group));
+        mc->datapath = od;
+        mc->group = group;
+        mc->n_ports = 0;
+        mc->allocated_ports = 4;
+        mc->ports = xmalloc(mc->allocated_ports * sizeof *mc->ports);
+    }
+    if (mc->n_ports >= mc->allocated_ports) {
+        mc->ports = x2nrealloc(mc->ports, &mc->allocated_ports,
+                               sizeof *mc->ports);
+    }
+    mc->ports[mc->n_ports++] = port;
+}
 
-    return (i == b_n_macs) ? true : false;
+static void
+ovn_multicast_destroy(struct hmap *mcgroups, struct ovn_multicast *mc)
+{
+    if (mc) {
+        hmap_remove(mcgroups, &mc->hmap_node);
+        free(mc->ports);
+        free(mc);
+    }
+}
+
+static void
+ovn_multicast_update_sbrec(const struct ovn_multicast *mc,
+                           const struct sbrec_multicast_group *sb)
+{
+    struct sbrec_port_binding **ports = xmalloc(mc->n_ports * sizeof *ports);
+    for (size_t i = 0; i < mc->n_ports; i++) {
+        ports[i] = CONST_CAST(struct sbrec_port_binding *, mc->ports[i]->sb);
+    }
+    sbrec_multicast_group_set_ports(sb, ports, mc->n_ports);
+    free(ports);
 }
 \f
 /* Logical flow generation.
@@ -126,83 +581,110 @@ macs_equal(char **binding_macs_, size_t b_n_macs,
  * function of most of the northbound database.
  */
 
-/* Enough context to add a Logical_Flow row, using lflow_add(). */
-struct lflow_ctx {
-    /* From northd_context. */
-    struct ovsdb_idl *ovnsb_idl;
-    struct ovsdb_idl_txn *ovnsb_txn;
-
-    /* Contains "struct lflow_hash_node"s.  Used to figure out what existing
-     * Logical_Flow rows should be deleted: we index all of the Logical_Flow
-     * rows into this data structure, then as existing rows are generated we
-     * remove them.  After generating all the rows, any remaining in
-     * 'lflow_hmap' must be deleted from the database. */
-    struct hmap lflow_hmap;
-};
+struct ovn_lflow {
+    struct hmap_node hmap_node;
 
-/* A row in the Logical_Flow table, indexed by its full contents, */
-struct lflow_hash_node {
-    struct hmap_node node;
-    const struct sbrec_logical_flow *lflow;
+    struct ovn_datapath *od;
+    enum ovn_pipeline { P_IN, P_OUT } pipeline;
+    uint8_t table_id;
+    uint16_t priority;
+    char *match;
+    char *actions;
 };
 
 static size_t
-lflow_hash(const struct uuid *logical_datapath, uint8_t table_id,
-          uint16_t priority, const char *match, const char *actions)
+ovn_lflow_hash(const struct ovn_lflow *lflow)
 {
-    size_t hash = uuid_hash(logical_datapath);
-    hash = hash_2words((table_id << 16) | priority, hash);
-    hash = hash_string(match, hash);
-    return hash_string(actions, hash);
+    size_t hash = uuid_hash(&lflow->od->key);
+    hash = hash_2words((lflow->table_id << 16) | lflow->priority, hash);
+    hash = hash_string(lflow->match, hash);
+    return hash_string(lflow->actions, hash);
 }
 
-static size_t
-lflow_hash_rec(const struct sbrec_logical_flow *lflow)
+static bool
+ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_lflow *b)
+{
+    return (a->od == b->od
+            && a->pipeline == b->pipeline
+            && a->table_id == b->table_id
+            && a->priority == b->priority
+            && !strcmp(a->match, b->match)
+            && !strcmp(a->actions, b->actions));
+}
+
+static void
+ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
+              enum ovn_pipeline pipeline, uint8_t table_id, uint16_t priority,
+              char *match, char *actions)
 {
-    return lflow_hash(&lflow->logical_datapath, lflow->table_id,
-                      lflow->priority, lflow->match,
-                      lflow->actions);
+    lflow->od = od;
+    lflow->pipeline = pipeline;
+    lflow->table_id = table_id;
+    lflow->priority = priority;
+    lflow->match = match;
+    lflow->actions = actions;
+}
+
+static const char *
+ingress_stage_to_str(int stage) {
+    switch (stage) {
+#define INGRESS_STAGE(NAME, STR) case S_IN_##NAME: return #STR;
+    INGRESS_STAGES
+#undef INGRESS_STAGE
+        default: return "<unknown>";
+    }
+}
+
+static const char *
+egress_stage_to_str(int stage) {
+    switch (stage) {
+#define EGRESS_STAGE(NAME, STR) case S_OUT_##NAME: return #STR;
+    EGRESS_STAGES
+#undef EGRESS_STAGE
+        default: return "<unknown>";
+    }
 }
 
 /* Adds a row with the specified contents to the Logical_Flow table. */
 static void
-lflow_add(struct lflow_ctx *ctx,
-          const struct nbrec_logical_switch *logical_datapath,
-          uint8_t table_id,
-          uint16_t priority,
-          const char *match,
-          const char *actions)
-{
-    struct lflow_hash_node *hash_node;
-
-    /* Check whether such a row already exists in the Logical_Flow table.  If
-     * so, remove it from 'ctx->lflow_hmap' and we're done. */
-    HMAP_FOR_EACH_WITH_HASH (hash_node, node,
-                             lflow_hash(&logical_datapath->header_.uuid,
-                                        table_id, priority, match, actions),
-                             &ctx->lflow_hmap) {
-        const struct sbrec_logical_flow *lflow = hash_node->lflow;
-        if (uuid_equals(&lflow->logical_datapath,
-                        &logical_datapath->header_.uuid)
-            && lflow->table_id == table_id
-            && lflow->priority == priority
-            && !strcmp(lflow->match, match)
-            && !strcmp(lflow->actions, actions)) {
-            hmap_remove(&ctx->lflow_hmap, &hash_node->node);
-            free(hash_node);
-            return;
-        }
-    }
-
-    /* No such Logical_Flow row.  Add one. */
-    const struct sbrec_logical_flow *lflow;
-    lflow = sbrec_logical_flow_insert(ctx->ovnsb_txn);
-    sbrec_logical_flow_set_logical_datapath(lflow,
-                                            logical_datapath->header_.uuid);
-    sbrec_logical_flow_set_table_id(lflow, table_id);
-    sbrec_logical_flow_set_priority(lflow, priority);
-    sbrec_logical_flow_set_match(lflow, match);
-    sbrec_logical_flow_set_actions(lflow, actions);
+ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
+              enum ovn_pipeline pipeline, uint8_t table_id, uint16_t priority,
+              const char *match, const char *actions)
+{
+    struct ovn_lflow *lflow = xmalloc(sizeof *lflow);
+    ovn_lflow_init(lflow, od, pipeline, table_id, priority,
+                   xstrdup(match), xstrdup(actions));
+    hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
+}
+
+static struct ovn_lflow *
+ovn_lflow_find(struct hmap *lflows, struct ovn_datapath *od,
+               enum ovn_pipeline pipeline, uint8_t table_id, uint16_t priority,
+               const char *match, const char *actions)
+{
+    struct ovn_lflow target;
+    ovn_lflow_init(&target, od, pipeline, table_id, priority,
+                   CONST_CAST(char *, match), CONST_CAST(char *, actions));
+
+    struct ovn_lflow *lflow;
+    HMAP_FOR_EACH_WITH_HASH (lflow, hmap_node, ovn_lflow_hash(&target),
+                             lflows) {
+        if (ovn_lflow_equal(lflow, &target)) {
+            return lflow;
+        }
+    }
+    return NULL;
+}
+
+static void
+ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
+{
+    if (lflow) {
+        hmap_remove(lflows, &lflow->hmap_node);
+        free(lflow->match);
+        free(lflow->actions);
+        free(lflow);
+    }
 }
 
 /* Appends port security constraints on L2 address field 'eth_addr_field'
@@ -241,394 +723,236 @@ lport_is_enabled(const struct nbrec_logical_port *lport)
     return !lport->enabled || *lport->enabled;
 }
 
-/* Updates the Logical_Flow table in the OVN_SB database, constructing its
- * contents based on the OVN_NB database. */
+/* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
+ * constructing their contents based on the OVN_NB database. */
 static void
-build_lflow(struct northd_context *ctx)
+build_lflows(struct northd_context *ctx, struct hmap *datapaths,
+             struct hmap *ports)
 {
-    struct lflow_ctx pc = {
-        .ovnsb_idl = ctx->ovnsb_idl,
-        .ovnsb_txn = ctx->ovnsb_txn,
-        .lflow_hmap = HMAP_INITIALIZER(&pc.lflow_hmap)
-    };
+    struct hmap lflows = HMAP_INITIALIZER(&lflows);
+    struct hmap mcgroups = HMAP_INITIALIZER(&mcgroups);
 
-    /* Add all the Logical_Flow entries currently in the southbound database to
-     * 'pc.lflow_hmap'.  We remove entries that we generate from the hmap,
-     * thus by the time we're done only entries that need to be removed
-     * remain. */
-    const struct sbrec_logical_flow *lflow;
-    SBREC_LOGICAL_FLOW_FOR_EACH (lflow, ctx->ovnsb_idl) {
-        struct lflow_hash_node *hash_node = xzalloc(sizeof *hash_node);
-        hash_node->lflow = lflow;
-        hmap_insert(&pc.lflow_hmap, &hash_node->node,
-                    lflow_hash_rec(lflow));
-    }
-
-    /* Table 0: Admission control framework. */
-    const struct nbrec_logical_switch *lswitch;
-    NBREC_LOGICAL_SWITCH_FOR_EACH (lswitch, ctx->ovnnb_idl) {
+    /* Ingress table 0: Admission control framework (priorities 0 and 100). */
+    struct ovn_datapath *od;
+    HMAP_FOR_EACH (od, key_node, datapaths) {
         /* Logical VLANs not supported. */
-        lflow_add(&pc, lswitch, 0, 100, "vlan.present", "drop;");
+        ovn_lflow_add(&lflows, od, P_IN, S_IN_PORT_SEC, 100, "vlan.present",
+                      "drop;");
 
         /* Broadcast/multicast source address is invalid. */
-        lflow_add(&pc, lswitch, 0, 100, "eth.src[40]", "drop;");
+        ovn_lflow_add(&lflows, od, P_IN, S_IN_PORT_SEC, 100, "eth.src[40]",
+                      "drop;");
 
         /* Port security flows have priority 50 (see below) and will continue
          * to the next table if packet source is acceptable. */
 
         /* Otherwise drop the packet. */
-        lflow_add(&pc, lswitch, 0, 0, "1", "drop;");
+        ovn_lflow_add(&lflows, od, P_IN, S_IN_PORT_SEC, 0, "1", "drop;");
     }
 
-    /* Table 0: Ingress port security. */
-    NBREC_LOGICAL_SWITCH_FOR_EACH (lswitch, ctx->ovnnb_idl) {
-        for (size_t i = 0; i < lswitch->n_ports; i++) {
-            const struct nbrec_logical_port *lport = lswitch->ports[i];
-            struct ds match = DS_EMPTY_INITIALIZER;
-            ds_put_cstr(&match, "inport == ");
-            json_string_escape(lport->name, &match);
-            build_port_security("eth.src",
-                                lport->port_security, lport->n_port_security,
-                                &match);
-            lflow_add(&pc, lswitch, 0, 50, ds_cstr(&match),
-                      lport_is_enabled(lport) ? "next;" : "drop;");
-            ds_destroy(&match);
-        }
+    /* Ingress table 0: Ingress port security (priority 50). */
+    struct ovn_port *op;
+    HMAP_FOR_EACH (op, key_node, ports) {
+        struct ds match = DS_EMPTY_INITIALIZER;
+        ds_put_cstr(&match, "inport == ");
+        json_string_escape(op->key, &match);
+        build_port_security("eth.src",
+                            op->nb->port_security, op->nb->n_port_security,
+                            &match);
+        ovn_lflow_add(&lflows, op->od, P_IN, S_IN_PORT_SEC, 50, ds_cstr(&match),
+                      lport_is_enabled(op->nb) ? "next;" : "drop;");
+        ds_destroy(&match);
     }
 
-    /* Table 1: Destination lookup:
-     *
-     *   - Broadcast and multicast handling (priority 100).
-     *   - Unicast handling (priority 50).
-     *   - Unknown unicast address handling (priority 0).
-     *   */
-    NBREC_LOGICAL_SWITCH_FOR_EACH (lswitch, ctx->ovnnb_idl) {
-        struct ds bcast;        /* Actions for broadcast on 'lswitch'. */
-        struct ds unknown;      /* Actions for unknown MACs on 'lswitch'. */
-
-        ds_init(&bcast);
-        ds_init(&unknown);
-        for (size_t i = 0; i < lswitch->n_ports; i++) {
-            const struct nbrec_logical_port *lport = lswitch->ports[i];
-
-            ds_put_cstr(&bcast, "outport = ");
-            json_string_escape(lport->name, &bcast);
-            ds_put_cstr(&bcast, "; next; ");
-
-            for (size_t j = 0; j < lport->n_macs; j++) {
-                const char *s = lport->macs[j];
-                uint8_t mac[ETH_ADDR_LEN];
-
-                if (eth_addr_from_string(s, mac)) {
-                    struct ds match, unicast;
-
-                    ds_init(&match);
-                    ds_put_format(&match, "eth.dst == %s", s);
-
-                    ds_init(&unicast);
-                    ds_put_cstr(&unicast, "outport = ");
-                    json_string_escape(lport->name, &unicast);
-                    ds_put_cstr(&unicast, "; next;");
-                    lflow_add(&pc, lswitch, 1, 50,
-                             ds_cstr(&match), ds_cstr(&unicast));
-                    ds_destroy(&unicast);
-                    ds_destroy(&match);
-                } else if (!strcmp(s, "unknown")) {
-                    ds_put_cstr(&unknown, "outport = ");
-                    json_string_escape(lport->name, &unknown);
-                    ds_put_cstr(&unknown, "; next; ");
-                } else {
-                    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
-
-                    VLOG_INFO_RL(&rl, "%s: invalid syntax '%s' in macs column",
-                                 lport->name, s);
-                }
-            }
-        }
-
-        ds_chomp(&bcast, ' ');
-        lflow_add(&pc, lswitch, 1, 100, "eth.dst[40]", ds_cstr(&bcast));
-        ds_destroy(&bcast);
-
-        if (unknown.length) {
-            ds_chomp(&unknown, ' ');
-            lflow_add(&pc, lswitch, 1, 0, "1", ds_cstr(&unknown));
+    /* Ingress table 1: Destination lookup, broadcast and multicast handling
+     * (priority 100). */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (lport_is_enabled(op->nb)) {
+            ovn_multicast_add(&mcgroups, &mc_flood, op);
         }
-        ds_destroy(&unknown);
+    }
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        ovn_lflow_add(&lflows, od, P_IN, S_IN_L2_LKUP, 100, "eth.dst[40]",
+                      "outport = \""MC_FLOOD"\"; output;");
     }
 
-    /* Table 2: ACLs. */
-    NBREC_LOGICAL_SWITCH_FOR_EACH (lswitch, ctx->ovnnb_idl) {
-        for (size_t i = 0; i < lswitch->n_acls; i++) {
-            const struct nbrec_acl *acl = lswitch->acls[i];
+    /* Ingress table 1: Destination lookup, unicast handling (priority 50), */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        for (size_t i = 0; i < op->nb->n_macs; i++) {
+            uint8_t mac[ETH_ADDR_LEN];
+
+            if (eth_addr_from_string(op->nb->macs[i], mac)) {
+                struct ds match, actions;
+
+                ds_init(&match);
+                ds_put_format(&match, "eth.dst == %s", op->nb->macs[i]);
+
+                ds_init(&actions);
+                ds_put_cstr(&actions, "outport = ");
+                json_string_escape(op->nb->name, &actions);
+                ds_put_cstr(&actions, "; output;");
+                ovn_lflow_add(&lflows, op->od, P_IN, S_IN_L2_LKUP, 50,
+                              ds_cstr(&match), ds_cstr(&actions));
+                ds_destroy(&actions);
+                ds_destroy(&match);
+            } else if (!strcmp(op->nb->macs[i], "unknown")) {
+                ovn_multicast_add(&mcgroups, &mc_unknown, op);
+                op->od->has_unknown = true;
+            } else {
+                static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
 
-            NBREC_ACL_FOR_EACH (acl, ctx->ovnnb_idl) {
-                lflow_add(&pc, lswitch, 2, acl->priority, acl->match,
-                          (!strcmp(acl->action, "allow") ||
-                           !strcmp(acl->action, "allow-related")
-                           ? "next;" : "drop;"));
+                VLOG_INFO_RL(&rl, "%s: invalid syntax '%s' in macs column",
+                             op->nb->name, op->nb->macs[i]);
             }
         }
-
-        lflow_add(&pc, lswitch, 2, 0, "1", "next;");
     }
 
-    /* Table 3: Egress port security. */
-    NBREC_LOGICAL_SWITCH_FOR_EACH (lswitch, ctx->ovnnb_idl) {
-        lflow_add(&pc, lswitch, 3, 100, "eth.dst[40]", "output;");
-
-        for (size_t i = 0; i < lswitch->n_ports; i++) {
-            const struct nbrec_logical_port *lport = lswitch->ports[i];
-            struct ds match;
-
-            ds_init(&match);
-            ds_put_cstr(&match, "outport == ");
-            json_string_escape(lport->name, &match);
-            build_port_security("eth.dst",
-                                lport->port_security, lport->n_port_security,
-                                &match);
-
-            lflow_add(&pc, lswitch, 3, 50, ds_cstr(&match),
-                      lport_is_enabled(lport) ? "output;" : "drop;");
-
-            ds_destroy(&match);
+    /* Ingress table 1: Destination lookup for unknown MACs (priority 0). */
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        if (od->has_unknown) {
+            ovn_lflow_add(&lflows, od, P_IN, S_IN_L2_LKUP, 0, "1",
+                          "outport = \""MC_UNKNOWN"\"; output;");
         }
     }
 
-    /* Delete any existing Logical_Flow rows that were not re-generated.  */
-    struct lflow_hash_node *hash_node, *next_hash_node;
-    HMAP_FOR_EACH_SAFE (hash_node, next_hash_node, node, &pc.lflow_hmap) {
-        hmap_remove(&pc.lflow_hmap, &hash_node->node);
-        sbrec_logical_flow_delete(hash_node->lflow);
-        free(hash_node);
+    /* Egress table 0: ACLs (any priority). */
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        for (size_t i = 0; i < od->nb->n_acls; i++) {
+            const struct nbrec_acl *acl = od->nb->acls[i];
+            const char *action;
+
+            action = (!strcmp(acl->action, "allow") ||
+                      !strcmp(acl->action, "allow-related"))
+                ? "next;" : "drop;";
+            ovn_lflow_add(&lflows, od, P_OUT, S_OUT_ACL, acl->priority,
+                          acl->match, action);
+        }
     }
-    hmap_destroy(&pc.lflow_hmap);
-}
-\f
-/*
- * Take two string columns and return true if:
- *  - neither are set
- *  - both are set and the strings are equal
- */
-static bool
-strings_equal(const char *s1, const char *s2)
-{
-    if (!!s1 != !!s2) {
-        /* One is set and the other is not. */
-        return false;
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        ovn_lflow_add(&lflows, od, P_OUT, S_OUT_ACL, 0, "1", "next;");
     }
 
-    if (s1) {
-        /* Both are set. */
-        return strcmp(s1, s2) ? false : true;
+    /* Egress table 1: Egress port security multicast/broadcast (priority
+     * 100). */
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        ovn_lflow_add(&lflows, od, P_OUT, S_OUT_PORT_SEC, 100, "eth.dst[40]",
+                      "output;");
     }
 
-    /* Both are NULL. */
-    return true;
-}
+    /* Egress table 1: Egress port security (priority 50). */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        struct ds match;
 
-/*
- * Take two int64_t columns and return true if either:
- *  - neither are set
- *  - both are set and the first integers in each are equal
- */
-static bool
-int64_first_equal(int64_t *i1, size_t n_i1, int64_t *i2, size_t n_i2)
-{
-    if (n_i1 != n_i2) {
-        return false;
-    }
+        ds_init(&match);
+        ds_put_cstr(&match, "outport == ");
+        json_string_escape(op->key, &match);
+        build_port_security("eth.dst",
+                            op->nb->port_security, op->nb->n_port_security,
+                            &match);
 
-    return i1 ? (i1[0] == i2[0]) : true;
-}
+        ovn_lflow_add(&lflows, op->od, P_OUT, S_OUT_PORT_SEC, 50,
+                      ds_cstr(&match),
+                      lport_is_enabled(op->nb) ? "output;" : "drop;");
 
-struct port_binding_hash_node {
-    struct hmap_node lp_node; /* In 'lp_map', by binding->logical_port. */
-    struct hmap_node tk_node; /* In 'tk_map', by binding->tunnel_key. */
-    const struct sbrec_port_binding *binding;
-};
-
-static bool
-tunnel_key_in_use(const struct hmap *tk_hmap, uint16_t tunnel_key)
-{
-    const struct port_binding_hash_node *hash_node;
-
-    HMAP_FOR_EACH_IN_BUCKET (hash_node, tk_node, hash_int(tunnel_key, 0),
-                             tk_hmap) {
-        if (hash_node->binding->tunnel_key == tunnel_key) {
-            return true;
-        }
+        ds_destroy(&match);
     }
-    return false;
-}
 
-/* Chooses and returns a positive tunnel key that is not already in use in
- * 'tk_hmap'.  Returns 0 if all tunnel keys are in use. */
-static uint16_t
-choose_tunnel_key(const struct hmap *tk_hmap)
-{
-    static uint16_t prev;
-
-    for (uint16_t key = prev + 1; key != prev; key++) {
-        if (!tunnel_key_in_use(tk_hmap, key)) {
-            prev = key;
-            return key;
+    /* Push changes to the Logical_Flow table to database. */
+    const struct sbrec_logical_flow *sbflow, *next_sbflow;
+    SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
+        struct ovn_datapath *od
+            = ovn_datapath_from_sbrec(datapaths, sbflow->logical_datapath);
+        if (!od) {
+            sbrec_logical_flow_delete(sbflow);
+            continue;
         }
-    }
-
-    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
-    VLOG_WARN_RL(&rl, "all tunnel keys exhausted");
-    return 0;
-}
 
-/*
- * When a change has occurred in the OVN_Northbound database, we go through and
- * make sure that the contents of the Port_Binding table in the OVN_Southbound
- * database are up to date with the logical ports defined in the
- * OVN_Northbound database.
- */
-static void
-set_port_bindings(struct northd_context *ctx)
-{
-    const struct sbrec_port_binding *binding;
-
-    /*
-     * We will need to look up a port binding for every logical port.  We don't
-     * want to have to do an O(n) search for every binding, so start out by
-     * hashing them on the logical port.
-     *
-     * As we go through every logical port, we will update the binding if it
-     * exists or create one otherwise.  When the update is done, we'll remove
-     * it from the hashmap.  At the end, any bindings left in the hashmap are
-     * for logical ports that have been deleted.
-     *
-     * We index the logical_port column because that's the shared key between
-     * the OVN_NB and OVN_SB databases.  We index the tunnel_key column to
-     * allow us to choose a unique tunnel key for any Port_Binding rows we have
-     * to add.
-     */
-    struct hmap lp_hmap = HMAP_INITIALIZER(&lp_hmap);
-    struct hmap tk_hmap = HMAP_INITIALIZER(&tk_hmap);
-
-    SBREC_PORT_BINDING_FOR_EACH(binding, ctx->ovnsb_idl) {
-        struct port_binding_hash_node *hash_node = xzalloc(sizeof *hash_node);
-        hash_node->binding = binding;
-        hmap_insert(&lp_hmap, &hash_node->lp_node,
-                    hash_string(binding->logical_port, 0));
-        hmap_insert(&tk_hmap, &hash_node->tk_node,
-                    hash_int(binding->tunnel_key, 0));
-    }
-
-    const struct nbrec_logical_switch *lswitch;
-    NBREC_LOGICAL_SWITCH_FOR_EACH (lswitch, ctx->ovnnb_idl) {
-        const struct uuid *logical_datapath = &lswitch->header_.uuid;
-
-        for (size_t i = 0; i < lswitch->n_ports; i++) {
-            const struct nbrec_logical_port *lport = lswitch->ports[i];
-            struct port_binding_hash_node *hash_node;
-            binding = NULL;
-            HMAP_FOR_EACH_WITH_HASH(hash_node, lp_node,
-                                    hash_string(lport->name, 0), &lp_hmap) {
-                if (!strcmp(lport->name, hash_node->binding->logical_port)) {
-                    binding = hash_node->binding;
-                    break;
-                }
-            }
-
-            if (binding) {
-                /* We found an existing binding for this logical port.  Update
-                 * its contents. */
-
-                hmap_remove(&lp_hmap, &hash_node->lp_node);
-
-                if (!macs_equal(binding->mac, binding->n_mac,
-                                lport->macs, lport->n_macs)) {
-                    sbrec_port_binding_set_mac(binding,
-                                               (const char **) lport->macs,
-                                               lport->n_macs);
-                }
-                if (!strings_equal(binding->parent_port, lport->parent_name)) {
-                    sbrec_port_binding_set_parent_port(binding,
-                                                       lport->parent_name);
-                }
-                if (!int64_first_equal(binding->tag, binding->n_tag,
-                                       lport->tag, lport->n_tag)) {
-                    sbrec_port_binding_set_tag(binding,
-                                               lport->tag, lport->n_tag);
-                }
-                if (!uuid_equals(&binding->logical_datapath,
-                                 logical_datapath)) {
-                    sbrec_port_binding_set_logical_datapath(binding,
-                                                            *logical_datapath);
-                }
-                if (!strings_equal(binding->type, lport->type)) {
-                    sbrec_port_binding_set_type(binding, lport->type);
-                }
-                if (!smap_equal(&binding->options, &lport->options)) {
-                    sbrec_port_binding_set_options(binding, &lport->options);
-                }
-            } else {
-                /* There is no binding for this logical port, so create one. */
-
-                uint16_t tunnel_key = choose_tunnel_key(&tk_hmap);
-                if (!tunnel_key) {
-                    continue;
-                }
-
-                binding = sbrec_port_binding_insert(ctx->ovnsb_txn);
-                sbrec_port_binding_set_logical_port(binding, lport->name);
-                sbrec_port_binding_set_mac(binding,
-                                           (const char **) lport->macs,
-                                           lport->n_macs);
-                if (lport->parent_name && lport->n_tag > 0) {
-                    sbrec_port_binding_set_parent_port(binding,
-                                                       lport->parent_name);
-                    sbrec_port_binding_set_tag(binding,
-                                               lport->tag, lport->n_tag);
-                }
-
-                sbrec_port_binding_set_tunnel_key(binding, tunnel_key);
-                sbrec_port_binding_set_logical_datapath(binding,
-                                                        *logical_datapath);
-
-                sbrec_port_binding_set_type(binding, lport->type);
-                sbrec_port_binding_set_options(binding, &lport->options);
-
-                /* Add the tunnel key to the tk_hmap so that we don't try to
-                 * use it for another port.  (We don't want it in the lp_hmap
-                 * because that would just get the Binding record deleted
-                 * later.) */
-                struct port_binding_hash_node *hash_node
-                    = xzalloc(sizeof *hash_node);
-                hash_node->binding = binding;
-                hmap_insert(&tk_hmap, &hash_node->tk_node,
-                            hash_int(binding->tunnel_key, 0));
-            }
+        struct ovn_lflow *lflow = ovn_lflow_find(
+            &lflows, od, (!strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT),
+            sbflow->table_id, sbflow->priority,
+            sbflow->match, sbflow->actions);
+        if (lflow) {
+            ovn_lflow_destroy(&lflows, lflow);
+        } else {
+            sbrec_logical_flow_delete(sbflow);
         }
     }
-
-    struct port_binding_hash_node *hash_node;
-    HMAP_FOR_EACH (hash_node, lp_node, &lp_hmap) {
-        hmap_remove(&lp_hmap, &hash_node->lp_node);
-        sbrec_port_binding_delete(hash_node->binding);
+    struct ovn_lflow *lflow, *next_lflow;
+    HMAP_FOR_EACH_SAFE (lflow, next_lflow, hmap_node, &lflows) {
+        sbflow = sbrec_logical_flow_insert(ctx->ovnsb_txn);
+        sbrec_logical_flow_set_logical_datapath(sbflow, lflow->od->sb);
+        sbrec_logical_flow_set_pipeline(
+            sbflow, lflow->pipeline == P_IN ? "ingress" : "egress");
+        sbrec_logical_flow_set_table_id(sbflow, lflow->table_id);
+        sbrec_logical_flow_set_priority(sbflow, lflow->priority);
+        sbrec_logical_flow_set_match(sbflow, lflow->match);
+        sbrec_logical_flow_set_actions(sbflow, lflow->actions);
+
+        struct smap external_ids = SMAP_INITIALIZER(&external_ids);
+        smap_add(&external_ids, "stage-name",
+                 lflow->pipeline == P_IN ?
+                  ingress_stage_to_str(lflow->table_id) :
+                  egress_stage_to_str(lflow->table_id));
+        sbrec_logical_flow_set_external_ids(sbflow, &external_ids);
+        smap_destroy(&external_ids);
+
+        ovn_lflow_destroy(&lflows, lflow);
     }
-    hmap_destroy(&lp_hmap);
+    hmap_destroy(&lflows);
+
+    /* Push changes to the Multicast_Group table to database. */
+    const struct sbrec_multicast_group *sbmc, *next_sbmc;
+    SBREC_MULTICAST_GROUP_FOR_EACH_SAFE (sbmc, next_sbmc, ctx->ovnsb_idl) {
+        struct ovn_datapath *od = ovn_datapath_from_sbrec(datapaths,
+                                                          sbmc->datapath);
+        if (!od) {
+            sbrec_multicast_group_delete(sbmc);
+            continue;
+        }
 
-    struct port_binding_hash_node *hash_node_next;
-    HMAP_FOR_EACH_SAFE (hash_node, hash_node_next, tk_node, &tk_hmap) {
-        hmap_remove(&tk_hmap, &hash_node->tk_node);
-        free(hash_node);
+        struct multicast_group group = { .name = sbmc->name,
+                                         .key = sbmc->tunnel_key };
+        struct ovn_multicast *mc = ovn_multicast_find(&mcgroups, od, &group);
+        if (mc) {
+            ovn_multicast_update_sbrec(mc, sbmc);
+            ovn_multicast_destroy(&mcgroups, mc);
+        } else {
+            sbrec_multicast_group_delete(sbmc);
+        }
+    }
+    struct ovn_multicast *mc, *next_mc;
+    HMAP_FOR_EACH_SAFE (mc, next_mc, hmap_node, &mcgroups) {
+        sbmc = sbrec_multicast_group_insert(ctx->ovnsb_txn);
+        sbrec_multicast_group_set_datapath(sbmc, mc->datapath->sb);
+        sbrec_multicast_group_set_name(sbmc, mc->group->name);
+        sbrec_multicast_group_set_tunnel_key(sbmc, mc->group->key);
+        ovn_multicast_update_sbrec(mc, sbmc);
+        ovn_multicast_destroy(&mcgroups, mc);
     }
-    hmap_destroy(&tk_hmap);
+    hmap_destroy(&mcgroups);
 }
-
+\f
 static void
 ovnnb_db_changed(struct northd_context *ctx)
 {
     VLOG_DBG("ovn-nb db contents have changed.");
 
-    set_port_bindings(ctx);
-    build_lflow(ctx);
+    struct hmap datapaths, ports;
+    build_datapaths(ctx, &datapaths);
+    build_ports(ctx, &datapaths, &ports);
+    build_lflows(ctx, &datapaths, &ports);
+
+    struct ovn_datapath *dp, *next_dp;
+    HMAP_FOR_EACH_SAFE (dp, next_dp, key_node, &datapaths) {
+        ovn_datapath_destroy(&datapaths, dp);
+    }
+    hmap_destroy(&datapaths);
+
+    struct ovn_port *port, *next_port;
+    HMAP_FOR_EACH_SAFE (port, next_port, key_node, &ports) {
+        ovn_port_destroy(&ports, port);
+    }
+    hmap_destroy(&ports);
 }
 
 /*
@@ -640,48 +964,48 @@ static void
 ovnsb_db_changed(struct northd_context *ctx)
 {
     struct hmap lports_hmap;
-    const struct sbrec_port_binding *binding;
-    const struct nbrec_logical_port *lport;
+    const struct sbrec_port_binding *sb;
+    const struct nbrec_logical_port *nb;
 
     struct lport_hash_node {
         struct hmap_node node;
-        const struct nbrec_logical_port *lport;
+        const struct nbrec_logical_port *nb;
     } *hash_node, *hash_node_next;
 
     VLOG_DBG("Recalculating port up states for ovn-nb db.");
 
     hmap_init(&lports_hmap);
 
-    NBREC_LOGICAL_PORT_FOR_EACH(lport, ctx->ovnnb_idl) {
+    NBREC_LOGICAL_PORT_FOR_EACH(nb, ctx->ovnnb_idl) {
         hash_node = xzalloc(sizeof *hash_node);
-        hash_node->lport = lport;
-        hmap_insert(&lports_hmap, &hash_node->node,
-                hash_string(lport->name, 0));
+        hash_node->nb = nb;
+        hmap_insert(&lports_hmap, &hash_node->node, hash_string(nb->name, 0));
     }
 
-    SBREC_PORT_BINDING_FOR_EACH(binding, ctx->ovnsb_idl) {
-        lport = NULL;
+    SBREC_PORT_BINDING_FOR_EACH(sb, ctx->ovnsb_idl) {
+        nb = NULL;
         HMAP_FOR_EACH_WITH_HASH(hash_node, node,
-                hash_string(binding->logical_port, 0), &lports_hmap) {
-            if (!strcmp(binding->logical_port, hash_node->lport->name)) {
-                lport = hash_node->lport;
+                                hash_string(sb->logical_port, 0),
+                                &lports_hmap) {
+            if (!strcmp(sb->logical_port, hash_node->nb->name)) {
+                nb = hash_node->nb;
                 break;
             }
         }
 
-        if (!lport) {
+        if (!nb) {
             /* The logical port doesn't exist for this port binding.  This can
              * happen under normal circumstances when ovn-northd hasn't gotten
              * around to pruning the Port_Binding yet. */
             continue;
         }
 
-        if (binding->chassis && (!lport->up || !*lport->up)) {
+        if (sb->chassis && (!nb->up || !*nb->up)) {
             bool up = true;
-            nbrec_logical_port_set_up(lport, &up, 1);
-        } else if (!binding->chassis && (!lport->up || *lport->up)) {
+            nbrec_logical_port_set_up(nb, &up, 1);
+        } else if (!sb->chassis && (!nb->up || *nb->up)) {
             bool up = false;
-            nbrec_logical_port_set_up(lport, &up, 1);
+            nbrec_logical_port_set_up(nb, &up, 1);
         }
     }
 
@@ -771,6 +1095,14 @@ parse_options(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
     free(short_options);
 }
 
+static void
+add_column_noalert(struct ovsdb_idl *idl,
+                   const struct ovsdb_idl_column *column)
+{
+    ovsdb_idl_add_column(idl, column);
+    ovsdb_idl_omit_alert(idl, column);
+}
+
 int
 main(int argc, char *argv[])
 {
@@ -789,6 +1121,7 @@ main(int argc, char *argv[])
 
     fatal_ignore_sigpipe();
     set_program_name(argv[0]);
+    service_start(&argc, &argv);
     vlog_set_levels(NULL, VLF_CONSOLE, VLL_WARN);
     vlog_set_levels(&VLM_reconnect, VLF_ANY_DESTINATION, VLL_WARN);
     parse_options(argc, argv);
@@ -810,30 +1143,37 @@ main(int argc, char *argv[])
     ctx.ovnnb_idl = ovnnb_idl = ovsdb_idl_create(ovnnb_db,
             &nbrec_idl_class, true, true);
 
-    /* There is only a small subset of changes to the ovn-sb db that ovn-northd
-     * has to care about, so we'll enable monitoring those directly. */
     ctx.ovnsb_idl = ovnsb_idl = ovsdb_idl_create(ovnsb_db,
             &sbrec_idl_class, false, true);
+
+    ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_logical_flow);
+    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_logical_datapath);
+    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_pipeline);
+    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_table_id);
+    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_priority);
+    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_match);
+    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_actions);
+
+    ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_multicast_group);
+    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_datapath);
+    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_tunnel_key);
+    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_name);
+    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_ports);
+
+    ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_datapath_binding);
+    add_column_noalert(ovnsb_idl, &sbrec_datapath_binding_col_tunnel_key);
+    add_column_noalert(ovnsb_idl, &sbrec_datapath_binding_col_external_ids);
+
     ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_port_binding);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_logical_port);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_datapath);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_logical_port);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_tunnel_key);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_parent_port);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_tag);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_type);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_options);
+    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_mac);
     ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_chassis);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_mac);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_tag);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_parent_port);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_logical_datapath);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_tunnel_key);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_type);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_options);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_logical_flow_col_logical_datapath);
-    ovsdb_idl_omit_alert(ovnsb_idl, &sbrec_logical_flow_col_logical_datapath);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_logical_flow_col_table_id);
-    ovsdb_idl_omit_alert(ovnsb_idl, &sbrec_logical_flow_col_table_id);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_logical_flow_col_priority);
-    ovsdb_idl_omit_alert(ovnsb_idl, &sbrec_logical_flow_col_priority);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_logical_flow_col_match);
-    ovsdb_idl_omit_alert(ovnsb_idl, &sbrec_logical_flow_col_match);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_logical_flow_col_actions);
-    ovsdb_idl_omit_alert(ovnsb_idl, &sbrec_logical_flow_col_actions);
 
     /*
      * The loop here just runs the IDL in a loop waiting for the seqno to
@@ -972,11 +1312,15 @@ main(int argc, char *argv[])
             }
             poll_block();
         }
+        if (should_service_stop()) {
+            exiting = true;
+        }
     }
 
     unixctl_server_destroy(unixctl);
     ovsdb_idl_destroy(ovnsb_idl);
     ovsdb_idl_destroy(ovnnb_idl);
+    service_stop();
 
     exit(res);
 }