ovn-northd: Remove info log in extract_lport_addresses().
[cascardo/ovs.git] / ovn / northd / ovn-northd.c
index 253ee59..63f3fcd 100644 (file)
@@ -53,37 +53,105 @@ static const char *ovnnb_db;
 static const char *ovnsb_db;
 
 static const char *default_db(void);
+\f
+/* Pipeline stages. */
 
+/* The two pipelines in an OVN logical flow table. */
+enum ovn_pipeline {
+    P_IN,                       /* Ingress pipeline. */
+    P_OUT                       /* Egress pipeline. */
+};
 
-/* Ingress pipeline stages.
- *
- * These must be listed in the order that the stages will be executed. */
-#define INGRESS_STAGES                         \
-    INGRESS_STAGE(PORT_SEC, port_sec)          \
-    INGRESS_STAGE(ACL, acl)                    \
-    INGRESS_STAGE(L2_LKUP, l2_lkup)
-
-enum ingress_stage {
-#define INGRESS_STAGE(NAME, STR) S_IN_##NAME,
-    INGRESS_STAGES
-#undef INGRESS_STAGE
-    INGRESS_N_STAGES
+/* The two purposes for which ovn-northd uses OVN logical datapaths. */
+enum ovn_datapath_type {
+    DP_SWITCH,                  /* OVN logical switch. */
+    DP_ROUTER                   /* OVN logical router. */
 };
 
-/* Egress pipeline stages.
+/* Returns an "enum ovn_stage" built from the arguments.
+ *
+ * (It's better to use ovn_stage_build() for type-safety reasons, but inline
+ * functions can't be used in enums or switch cases.) */
+#define OVN_STAGE_BUILD(DP_TYPE, PIPELINE, TABLE) \
+    (((DP_TYPE) << 9) | ((PIPELINE) << 8) | (TABLE))
+
+/* A stage within an OVN logical switch or router.
  *
- * These must be listed in the order that the stages will be executed. */
-#define EGRESS_STAGES                         \
-    EGRESS_STAGE(ACL, acl)                    \
-    EGRESS_STAGE(PORT_SEC, port_sec)
-
-enum egress_stage {
-#define EGRESS_STAGE(NAME, STR) S_OUT_##NAME,
-    EGRESS_STAGES
-#undef EGRESS_STAGE
-    EGRESS_N_STAGES
+ * An "enum ovn_stage" indicates whether the stage is part of a logical switch
+ * or router, whether the stage is part of the ingress or egress pipeline, and
+ * the table within that pipeline.  The first three components are combined to
+ * form the stage's full name, e.g. S_SWITCH_IN_PORT_SEC,
+ * S_ROUTER_OUT_DELIVERY. */
+enum ovn_stage {
+#define PIPELINE_STAGES                                               \
+    /* Logical switch ingress stages. */                              \
+    PIPELINE_STAGE(SWITCH, IN,  PORT_SEC,    0, "ls_in_port_sec")     \
+    PIPELINE_STAGE(SWITCH, IN,  PRE_ACL,     1, "ls_in_pre_acl")      \
+    PIPELINE_STAGE(SWITCH, IN,  ACL,         2, "ls_in_acl")          \
+    PIPELINE_STAGE(SWITCH, IN,  L2_LKUP,     3, "ls_in_l2_lkup")      \
+                                                                      \
+    /* Logical switch egress stages. */                               \
+    PIPELINE_STAGE(SWITCH, OUT, PRE_ACL,     0, "ls_out_pre_acl")     \
+    PIPELINE_STAGE(SWITCH, OUT, ACL,         1, "ls_out_acl")         \
+    PIPELINE_STAGE(SWITCH, OUT, PORT_SEC,    2, "ls_out_port_sec")    \
+                                                                      \
+    /* Logical router ingress stages. */                              \
+    PIPELINE_STAGE(ROUTER, IN,  ADMISSION,   0, "lr_in_admission")    \
+    PIPELINE_STAGE(ROUTER, IN,  IP_INPUT,    1, "lr_in_ip_input")     \
+    PIPELINE_STAGE(ROUTER, IN,  IP_ROUTING,  2, "lr_in_ip_routing")   \
+    PIPELINE_STAGE(ROUTER, IN,  ARP,         3, "lr_in_arp")          \
+                                                                      \
+    /* Logical router egress stages. */                               \
+    PIPELINE_STAGE(ROUTER, OUT, DELIVERY,    0, "lr_out_delivery")
+
+#define PIPELINE_STAGE(DP_TYPE, PIPELINE, STAGE, TABLE, NAME)   \
+    S_##DP_TYPE##_##PIPELINE##_##STAGE                          \
+        = OVN_STAGE_BUILD(DP_##DP_TYPE, P_##PIPELINE, TABLE),
+    PIPELINE_STAGES
+#undef PIPELINE_STAGE
 };
 
+/* Due to various hard-coded priorities need to implement ACLs, the
+ * northbound database supports a smaller range of ACL priorities than
+ * are available to logical flows.  This value is added to an ACL
+ * priority to determine the ACL's logical flow priority. */
+#define OVN_ACL_PRI_OFFSET 1000
+
+/* Returns an "enum ovn_stage" built from the arguments. */
+static enum ovn_stage
+ovn_stage_build(enum ovn_datapath_type dp_type, enum ovn_pipeline pipeline,
+                uint8_t table)
+{
+    return OVN_STAGE_BUILD(dp_type, pipeline, table);
+}
+
+/* Returns the pipeline to which 'stage' belongs. */
+static enum ovn_pipeline
+ovn_stage_get_pipeline(enum ovn_stage stage)
+{
+    return (stage >> 8) & 1;
+}
+
+/* Returns the table to which 'stage' belongs. */
+static uint8_t
+ovn_stage_get_table(enum ovn_stage stage)
+{
+    return stage & 0xff;
+}
+
+/* Returns a string name for 'stage'. */
+static const char *
+ovn_stage_to_str(enum ovn_stage stage)
+{
+    switch (stage) {
+#define PIPELINE_STAGE(DP_TYPE, PIPELINE, STAGE, TABLE, NAME)       \
+        case S_##DP_TYPE##_##PIPELINE##_##STAGE: return NAME;
+    PIPELINE_STAGES
+#undef PIPELINE_STAGE
+        default: return "<unknown>";
+    }
+}
+\f
 static void
 usage(void)
 {
@@ -159,16 +227,25 @@ allocate_tnlid(struct hmap *set, const char *name, uint32_t max,
     return 0;
 }
 \f
-/* The 'key' comes from nb->header_.uuid or sb->external_ids:logical-switch. */
+/* The 'key' comes from nbs->header_.uuid or nbr->header_.uuid or
+ * sb->external_ids:logical-switch. */
 struct ovn_datapath {
     struct hmap_node key_node;  /* Index on 'key'. */
-    struct uuid key;            /* nb->header_.uuid. */
+    struct uuid key;            /* (nbs/nbr)->header_.uuid. */
 
-    const struct nbrec_logical_switch *nb;   /* May be NULL. */
+    const struct nbrec_logical_switch *nbs;  /* May be NULL. */
+    const struct nbrec_logical_router *nbr;  /* May be NULL. */
     const struct sbrec_datapath_binding *sb; /* May be NULL. */
 
     struct ovs_list list;       /* In list of similar records. */
 
+    /* Logical router data (digested from nbr). */
+    ovs_be32 gateway;
+
+    /* Logical switch data. */
+    struct ovn_port **router_ports;
+    size_t n_router_ports;
+
     struct hmap port_tnlids;
     uint32_t port_key_hint;
 
@@ -177,13 +254,15 @@ struct ovn_datapath {
 
 static struct ovn_datapath *
 ovn_datapath_create(struct hmap *datapaths, const struct uuid *key,
-                    const struct nbrec_logical_switch *nb,
+                    const struct nbrec_logical_switch *nbs,
+                    const struct nbrec_logical_router *nbr,
                     const struct sbrec_datapath_binding *sb)
 {
     struct ovn_datapath *od = xzalloc(sizeof *od);
     od->key = *key;
     od->sb = sb;
-    od->nb = nb;
+    od->nbs = nbs;
+    od->nbr = nbr;
     hmap_init(&od->port_tnlids);
     od->port_key_hint = 0;
     hmap_insert(datapaths, &od->key_node, uuid_hash(&od->key));
@@ -199,6 +278,7 @@ ovn_datapath_destroy(struct hmap *datapaths, struct ovn_datapath *od)
          * use it. */
         hmap_remove(datapaths, &od->key_node);
         destroy_tnlids(&od->port_tnlids);
+        free(od->router_ports);
         free(od);
     }
 }
@@ -222,7 +302,8 @@ ovn_datapath_from_sbrec(struct hmap *datapaths,
 {
     struct uuid key;
 
-    if (!smap_get_uuid(&sb->external_ids, "logical-switch", &key)) {
+    if (!smap_get_uuid(&sb->external_ids, "logical-switch", &key) &&
+        !smap_get_uuid(&sb->external_ids, "logical-router", &key)) {
         return NULL;
     }
     return ovn_datapath_find(datapaths, &key);
@@ -241,42 +322,85 @@ join_datapaths(struct northd_context *ctx, struct hmap *datapaths,
     const struct sbrec_datapath_binding *sb, *sb_next;
     SBREC_DATAPATH_BINDING_FOR_EACH_SAFE (sb, sb_next, ctx->ovnsb_idl) {
         struct uuid key;
-        if (!smap_get_uuid(&sb->external_ids, "logical-switch", &key)) {
-            ovsdb_idl_txn_add_comment(ctx->ovnsb_txn,
-                                      "deleting Datapath_Binding "UUID_FMT" that "
-                                      "lacks external-ids:logical-switch",
-                         UUID_ARGS(&sb->header_.uuid));
+        if (!smap_get_uuid(&sb->external_ids, "logical-switch", &key) &&
+            !smap_get_uuid(&sb->external_ids, "logical-router", &key)) {
+            ovsdb_idl_txn_add_comment(
+                ctx->ovnsb_txn,
+                "deleting Datapath_Binding "UUID_FMT" that lacks "
+                "external-ids:logical-switch and "
+                "external-ids:logical-router",
+                UUID_ARGS(&sb->header_.uuid));
             sbrec_datapath_binding_delete(sb);
             continue;
         }
 
         if (ovn_datapath_find(datapaths, &key)) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 1);
-            VLOG_INFO_RL(&rl, "deleting Datapath_Binding "UUID_FMT" with "
-                         "duplicate external-ids:logical-switch "UUID_FMT,
-                         UUID_ARGS(&sb->header_.uuid), UUID_ARGS(&key));
+            VLOG_INFO_RL(
+                &rl, "deleting Datapath_Binding "UUID_FMT" with "
+                "duplicate external-ids:logical-switch/router "UUID_FMT,
+                UUID_ARGS(&sb->header_.uuid), UUID_ARGS(&key));
             sbrec_datapath_binding_delete(sb);
             continue;
         }
 
         struct ovn_datapath *od = ovn_datapath_create(datapaths, &key,
-                                                      NULL, sb);
+                                                      NULL, NULL, sb);
         list_push_back(sb_only, &od->list);
     }
 
-    const struct nbrec_logical_switch *nb;
-    NBREC_LOGICAL_SWITCH_FOR_EACH (nb, ctx->ovnnb_idl) {
+    const struct nbrec_logical_switch *nbs;
+    NBREC_LOGICAL_SWITCH_FOR_EACH (nbs, ctx->ovnnb_idl) {
         struct ovn_datapath *od = ovn_datapath_find(datapaths,
-                                                    &nb->header_.uuid);
+                                                    &nbs->header_.uuid);
         if (od) {
-            od->nb = nb;
+            od->nbs = nbs;
             list_remove(&od->list);
             list_push_back(both, &od->list);
         } else {
-            od = ovn_datapath_create(datapaths, &nb->header_.uuid, nb, NULL);
+            od = ovn_datapath_create(datapaths, &nbs->header_.uuid,
+                                     nbs, NULL, NULL);
             list_push_back(nb_only, &od->list);
         }
     }
+
+    const struct nbrec_logical_router *nbr;
+    NBREC_LOGICAL_ROUTER_FOR_EACH (nbr, ctx->ovnnb_idl) {
+        struct ovn_datapath *od = ovn_datapath_find(datapaths,
+                                                    &nbr->header_.uuid);
+        if (od) {
+            if (!od->nbs) {
+                od->nbr = nbr;
+                list_remove(&od->list);
+                list_push_back(both, &od->list);
+            } else {
+                /* Can't happen! */
+                static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 1);
+                VLOG_WARN_RL(&rl,
+                             "duplicate UUID "UUID_FMT" in OVN_Northbound",
+                             UUID_ARGS(&nbr->header_.uuid));
+                continue;
+            }
+        } else {
+            od = ovn_datapath_create(datapaths, &nbr->header_.uuid,
+                                     NULL, nbr, NULL);
+            list_push_back(nb_only, &od->list);
+        }
+
+        od->gateway = 0;
+        if (nbr->default_gw) {
+            ovs_be32 ip, mask;
+            char *error = ip_parse_masked(nbr->default_gw, &ip, &mask);
+            if (error || !ip || mask != OVS_BE32_MAX) {
+                static struct vlog_rate_limit rl
+                    = VLOG_RATE_LIMIT_INIT(5, 1);
+                VLOG_WARN_RL(&rl, "bad 'gateway' %s", nbr->default_gw);
+                free(error);
+            } else {
+                od->gateway = ip;
+            }
+        }
+    }
 }
 
 static uint32_t
@@ -311,8 +435,9 @@ build_datapaths(struct northd_context *ctx, struct hmap *datapaths)
             od->sb = sbrec_datapath_binding_insert(ctx->ovnsb_txn);
 
             char uuid_s[UUID_LEN + 1];
-            sprintf(uuid_s, UUID_FMT, UUID_ARGS(&od->nb->header_.uuid));
-            const struct smap id = SMAP_CONST1(&id, "logical-switch", uuid_s);
+            sprintf(uuid_s, UUID_FMT, UUID_ARGS(&od->key));
+            const char *key = od->nbs ? "logical-switch" : "logical-router";
+            const struct smap id = SMAP_CONST1(&id, key, uuid_s);
             sbrec_datapath_binding_set_external_ids(od->sb, &id);
 
             sbrec_datapath_binding_set_tunnel_key(od->sb, tunnel_key);
@@ -331,10 +456,19 @@ build_datapaths(struct northd_context *ctx, struct hmap *datapaths)
 \f
 struct ovn_port {
     struct hmap_node key_node;  /* Index on 'key'. */
-    const char *key;            /* nb->name and sb->logical_port */
+    char *key;                  /* nbs->name, nbr->name, sb->logical_port. */
+    char *json_key;             /* 'key', quoted for use in JSON. */
 
-    const struct nbrec_logical_port *nb; /* May be NULL. */
-    const struct sbrec_port_binding *sb; /* May be NULL. */
+    const struct nbrec_logical_port *nbs;        /* May be NULL. */
+    const struct nbrec_logical_router_port *nbr; /* May be NULL. */
+    const struct sbrec_port_binding *sb;         /* May be NULL. */
+
+    /* Logical router port data. */
+    ovs_be32 ip, mask;          /* 192.168.10.123/24. */
+    ovs_be32 network;           /* 192.168.10.0. */
+    ovs_be32 bcast;             /* 192.168.10.255. */
+    struct eth_addr mac;
+    struct ovn_port *peer;
 
     struct ovn_datapath *od;
 
@@ -343,13 +477,20 @@ struct ovn_port {
 
 static struct ovn_port *
 ovn_port_create(struct hmap *ports, const char *key,
-                const struct nbrec_logical_port *nb,
+                const struct nbrec_logical_port *nbs,
+                const struct nbrec_logical_router_port *nbr,
                 const struct sbrec_port_binding *sb)
 {
     struct ovn_port *op = xzalloc(sizeof *op);
-    op->key = key;
+
+    struct ds json_key = DS_EMPTY_INITIALIZER;
+    json_string_escape(key, &json_key);
+    op->json_key = ds_steal_cstr(&json_key);
+
+    op->key = xstrdup(key);
     op->sb = sb;
-    op->nb = nb;
+    op->nbs = nbs;
+    op->nbr = nbr;
     hmap_insert(ports, &op->key_node, hash_string(op->key, 0));
     return op;
 }
@@ -362,6 +503,8 @@ ovn_port_destroy(struct hmap *ports, struct ovn_port *port)
          * private list and once we've exited that function it is not safe to
          * use it. */
         hmap_remove(ports, &port->key_node);
+        free(port->json_key);
+        free(port->key);
         free(port);
     }
 }
@@ -400,24 +543,108 @@ join_logical_ports(struct northd_context *ctx,
     const struct sbrec_port_binding *sb;
     SBREC_PORT_BINDING_FOR_EACH (sb, ctx->ovnsb_idl) {
         struct ovn_port *op = ovn_port_create(ports, sb->logical_port,
-                                              NULL, sb);
+                                              NULL, NULL, sb);
         list_push_back(sb_only, &op->list);
     }
 
     struct ovn_datapath *od;
     HMAP_FOR_EACH (od, key_node, datapaths) {
-        for (size_t i = 0; i < od->nb->n_ports; i++) {
-            const struct nbrec_logical_port *nb = od->nb->ports[i];
-            struct ovn_port *op = ovn_port_find(ports, nb->name);
-            if (op) {
-                op->nb = nb;
-                list_remove(&op->list);
-                list_push_back(both, &op->list);
-            } else {
-                op = ovn_port_create(ports, nb->name, nb, NULL);
-                list_push_back(nb_only, &op->list);
+        if (od->nbs) {
+            for (size_t i = 0; i < od->nbs->n_ports; i++) {
+                const struct nbrec_logical_port *nbs = od->nbs->ports[i];
+                struct ovn_port *op = ovn_port_find(ports, nbs->name);
+                if (op) {
+                    if (op->nbs || op->nbr) {
+                        static struct vlog_rate_limit rl
+                            = VLOG_RATE_LIMIT_INIT(5, 1);
+                        VLOG_WARN_RL(&rl, "duplicate logical port %s",
+                                     nbs->name);
+                        continue;
+                    }
+                    op->nbs = nbs;
+                    list_remove(&op->list);
+                    list_push_back(both, &op->list);
+                } else {
+                    op = ovn_port_create(ports, nbs->name, nbs, NULL, NULL);
+                    list_push_back(nb_only, &op->list);
+                }
+
+                op->od = od;
+            }
+        } else {
+            for (size_t i = 0; i < od->nbr->n_ports; i++) {
+                const struct nbrec_logical_router_port *nbr
+                    = od->nbr->ports[i];
+
+                struct eth_addr mac;
+                if (!eth_addr_from_string(nbr->mac, &mac)) {
+                    static struct vlog_rate_limit rl
+                        = VLOG_RATE_LIMIT_INIT(5, 1);
+                    VLOG_WARN_RL(&rl, "bad 'mac' %s", nbr->mac);
+                    continue;
+                }
+
+                ovs_be32 ip, mask;
+                char *error = ip_parse_masked(nbr->network, &ip, &mask);
+                if (error || mask == OVS_BE32_MAX || !ip_is_cidr(mask)) {
+                    static struct vlog_rate_limit rl
+                        = VLOG_RATE_LIMIT_INIT(5, 1);
+                    VLOG_WARN_RL(&rl, "bad 'network' %s", nbr->network);
+                    free(error);
+                    continue;
+                }
+
+                struct ovn_port *op = ovn_port_find(ports, nbr->name);
+                if (op) {
+                    if (op->nbs || op->nbr) {
+                        static struct vlog_rate_limit rl
+                            = VLOG_RATE_LIMIT_INIT(5, 1);
+                        VLOG_WARN_RL(&rl, "duplicate logical router port %s",
+                                     nbr->name);
+                        continue;
+                    }
+                    op->nbr = nbr;
+                    list_remove(&op->list);
+                    list_push_back(both, &op->list);
+                } else {
+                    op = ovn_port_create(ports, nbr->name, NULL, nbr, NULL);
+                    list_push_back(nb_only, &op->list);
+                }
+
+                op->ip = ip;
+                op->mask = mask;
+                op->network = ip & mask;
+                op->bcast = ip | ~mask;
+                op->mac = mac;
+
+                op->od = od;
             }
-            op->od = od;
+        }
+    }
+
+    /* Connect logical router ports, and logical switch ports of type "router",
+     * to their peers. */
+    struct ovn_port *op;
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (op->nbs && !strcmp(op->nbs->type, "router")) {
+            const char *peer_name = smap_get(&op->nbs->options, "router-port");
+            if (!peer_name) {
+                continue;
+            }
+
+            struct ovn_port *peer = ovn_port_find(ports, peer_name);
+            if (!peer || !peer->nbr) {
+                continue;
+            }
+
+            peer->peer = op;
+            op->peer = peer;
+            op->od->router_ports = xrealloc(
+                op->od->router_ports,
+                sizeof *op->od->router_ports * (op->od->n_router_ports + 1));
+            op->od->router_ports[op->od->n_router_ports++] = op;
+        } else if (op->nbr && op->nbr->peer) {
+            op->peer = ovn_port_find(ports, op->nbr->name);
         }
     }
 }
@@ -425,13 +652,37 @@ join_logical_ports(struct northd_context *ctx,
 static void
 ovn_port_update_sbrec(const struct ovn_port *op)
 {
-    sbrec_port_binding_set_type(op->sb, op->nb->type);
-    sbrec_port_binding_set_options(op->sb, &op->nb->options);
     sbrec_port_binding_set_datapath(op->sb, op->od->sb);
-    sbrec_port_binding_set_parent_port(op->sb, op->nb->parent_name);
-    sbrec_port_binding_set_tag(op->sb, op->nb->tag, op->nb->n_tag);
-    sbrec_port_binding_set_mac(op->sb, (const char **) op->nb->macs,
-                               op->nb->n_macs);
+    if (op->nbr) {
+        sbrec_port_binding_set_type(op->sb, "patch");
+
+        const char *peer = op->peer ? op->peer->key : "<error>";
+        const struct smap ids = SMAP_CONST1(&ids, "peer", peer);
+        sbrec_port_binding_set_options(op->sb, &ids);
+
+        sbrec_port_binding_set_parent_port(op->sb, NULL);
+        sbrec_port_binding_set_tag(op->sb, NULL, 0);
+        sbrec_port_binding_set_mac(op->sb, NULL, 0);
+    } else {
+        if (strcmp(op->nbs->type, "router")) {
+            sbrec_port_binding_set_type(op->sb, op->nbs->type);
+            sbrec_port_binding_set_options(op->sb, &op->nbs->options);
+        } else {
+            sbrec_port_binding_set_type(op->sb, "patch");
+
+            const char *router_port = smap_get(&op->nbs->options,
+                                               "router-port");
+            if (!router_port) {
+                router_port = "<error>";
+            }
+            const struct smap ids = SMAP_CONST1(&ids, "peer", router_port);
+            sbrec_port_binding_set_options(op->sb, &ids);
+        }
+        sbrec_port_binding_set_parent_port(op->sb, op->nbs->parent_name);
+        sbrec_port_binding_set_tag(op->sb, op->nbs->tag, op->nbs->n_tag);
+        sbrec_port_binding_set_mac(op->sb, (const char **) op->nbs->addresses,
+                                   op->nbs->n_addresses);
+    }
 }
 
 static void
@@ -584,8 +835,7 @@ struct ovn_lflow {
     struct hmap_node hmap_node;
 
     struct ovn_datapath *od;
-    enum ovn_pipeline { P_IN, P_OUT } pipeline;
-    uint8_t table_id;
+    enum ovn_stage stage;
     uint16_t priority;
     char *match;
     char *actions;
@@ -595,7 +845,7 @@ static size_t
 ovn_lflow_hash(const struct ovn_lflow *lflow)
 {
     size_t hash = uuid_hash(&lflow->od->key);
-    hash = hash_2words((lflow->table_id << 16) | lflow->priority, hash);
+    hash = hash_2words((lflow->stage << 16) | lflow->priority, hash);
     hash = hash_string(lflow->match, hash);
     return hash_string(lflow->actions, hash);
 }
@@ -604,8 +854,7 @@ static bool
 ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_lflow *b)
 {
     return (a->od == b->od
-            && a->pipeline == b->pipeline
-            && a->table_id == b->table_id
+            && a->stage == b->stage
             && a->priority == b->priority
             && !strcmp(a->match, b->match)
             && !strcmp(a->actions, b->actions));
@@ -613,56 +862,35 @@ ovn_lflow_equal(const struct ovn_lflow *a, const struct ovn_lflow *b)
 
 static void
 ovn_lflow_init(struct ovn_lflow *lflow, struct ovn_datapath *od,
-              enum ovn_pipeline pipeline, uint8_t table_id, uint16_t priority,
+              enum ovn_stage stage, uint16_t priority,
               char *match, char *actions)
 {
     lflow->od = od;
-    lflow->pipeline = pipeline;
-    lflow->table_id = table_id;
+    lflow->stage = stage;
     lflow->priority = priority;
     lflow->match = match;
     lflow->actions = actions;
 }
 
-static const char *
-ingress_stage_to_str(int stage) {
-    switch (stage) {
-#define INGRESS_STAGE(NAME, STR) case S_IN_##NAME: return #STR;
-    INGRESS_STAGES
-#undef INGRESS_STAGE
-        default: return "<unknown>";
-    }
-}
-
-static const char *
-egress_stage_to_str(int stage) {
-    switch (stage) {
-#define EGRESS_STAGE(NAME, STR) case S_OUT_##NAME: return #STR;
-    EGRESS_STAGES
-#undef EGRESS_STAGE
-        default: return "<unknown>";
-    }
-}
-
 /* Adds a row with the specified contents to the Logical_Flow table. */
 static void
 ovn_lflow_add(struct hmap *lflow_map, struct ovn_datapath *od,
-              enum ovn_pipeline pipeline, uint8_t table_id, uint16_t priority,
+              enum ovn_stage stage, uint16_t priority,
               const char *match, const char *actions)
 {
     struct ovn_lflow *lflow = xmalloc(sizeof *lflow);
-    ovn_lflow_init(lflow, od, pipeline, table_id, priority,
+    ovn_lflow_init(lflow, od, stage, priority,
                    xstrdup(match), xstrdup(actions));
     hmap_insert(lflow_map, &lflow->hmap_node, ovn_lflow_hash(lflow));
 }
 
 static struct ovn_lflow *
 ovn_lflow_find(struct hmap *lflows, struct ovn_datapath *od,
-               enum ovn_pipeline pipeline, uint8_t table_id, uint16_t priority,
+               enum ovn_stage stage, uint16_t priority,
                const char *match, const char *actions)
 {
     struct ovn_lflow target;
-    ovn_lflow_init(&target, od, pipeline, table_id, priority,
+    ovn_lflow_init(&target, od, stage, priority,
                    CONST_CAST(char *, match), CONST_CAST(char *, actions));
 
     struct ovn_lflow *lflow;
@@ -686,6 +914,109 @@ ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow)
     }
 }
 
+struct ipv4_netaddr {
+    ovs_be32 addr;
+    unsigned int plen;
+};
+
+struct ipv6_netaddr {
+    struct in6_addr addr;
+    unsigned int plen;
+};
+
+struct lport_addresses {
+    struct eth_addr ea;
+    size_t n_ipv4_addrs;
+    struct ipv4_netaddr *ipv4_addrs;
+    size_t n_ipv6_addrs;
+    struct ipv6_netaddr *ipv6_addrs;
+};
+
+/*
+ * Extracts the mac, ipv4 and ipv6 addresses from the input param 'address'
+ * which should be of the format 'MAC [IP1 IP2 ..]" where IPn should be
+ * a valid IPv4 or IPv6 address and stores them in the 'ipv4_addrs' and
+ * 'ipv6_addrs' fields of input param 'laddrs'.
+ * The caller has to free the 'ipv4_addrs' and 'ipv6_addrs' fields.
+ * If input param 'store_ipv6' is true only then extracted ipv6 addresses
+ * are stored in 'ipv6_addrs' fields.
+ * Return true if at least 'MAC' is found in 'address', false otherwise.
+ * Eg 1.
+ * If 'address' = '00:00:00:00:00:01 10.0.0.4 fe80::ea2a:eaff:fe28:3390/64
+ *                 30.0.0.3/23' and 'store_ipv6' = true
+ * then returns true with laddrs->n_ipv4_addrs = 2, naddrs->n_ipv6_addrs = 1.
+ *
+ * Eg. 2
+ * If 'address' = '00:00:00:00:00:01 10.0.0.4 fe80::ea2a:eaff:fe28:3390/64
+ *                 30.0.0.3/23' and 'store_ipv6' = false
+ * then returns true with laddrs->n_ipv4_addrs = 2, naddrs->n_ipv6_addrs = 0.
+ *
+ * Eg 3. If 'address' = '00:00:00:00:00:01 10.0.0.4 addr 30.0.0.4', then
+ * returns true with laddrs->n_ipv4_addrs = 1 and laddrs->n_ipv6_addrs = 0.
+ */
+static bool
+extract_lport_addresses(char *address, struct lport_addresses *laddrs,
+                        bool store_ipv6)
+{
+    char *buf = address;
+    int buf_index = 0;
+    char *buf_end = buf + strlen(address);
+    if (!ovs_scan_len(buf, &buf_index, ETH_ADDR_SCAN_FMT,
+                      ETH_ADDR_SCAN_ARGS(laddrs->ea))) {
+        return false;
+    }
+
+    ovs_be32 ip4;
+    struct in6_addr ip6;
+    unsigned int plen;
+    char *error;
+
+    laddrs->n_ipv4_addrs = 0;
+    laddrs->n_ipv6_addrs = 0;
+    laddrs->ipv4_addrs = NULL;
+    laddrs->ipv6_addrs = NULL;
+
+    /* Loop through the buffer and extract the IPv4/IPv6 addresses
+     * and store in the 'laddrs'. Break the loop if invalid data is found.
+     */
+    buf += buf_index;
+    while (buf < buf_end) {
+        buf_index = 0;
+        error = ip_parse_cidr_len(buf, &buf_index, &ip4, &plen);
+        if (!error) {
+            laddrs->n_ipv4_addrs++;
+            laddrs->ipv4_addrs = xrealloc(
+                laddrs->ipv4_addrs,
+                sizeof (struct ipv4_netaddr) * laddrs->n_ipv4_addrs);
+            laddrs->ipv4_addrs[laddrs->n_ipv4_addrs - 1].addr = ip4;
+            laddrs->ipv4_addrs[laddrs->n_ipv4_addrs - 1].plen = plen;
+            buf += buf_index;
+            continue;
+        }
+        free(error);
+        error = ipv6_parse_cidr_len(buf, &buf_index, &ip6, &plen);
+        if (!error && store_ipv6) {
+            laddrs->n_ipv6_addrs++;
+            laddrs->ipv6_addrs = xrealloc(
+                laddrs->ipv6_addrs,
+                sizeof(struct ipv6_netaddr) * laddrs->n_ipv6_addrs);
+            memcpy(&laddrs->ipv6_addrs[laddrs->n_ipv6_addrs - 1].addr, &ip6,
+                   sizeof(struct in6_addr));
+            laddrs->ipv6_addrs[laddrs->n_ipv6_addrs - 1].plen = plen;
+        }
+
+        if (error) {
+            static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
+            VLOG_INFO_RL(&rl, "invalid syntax '%s' in address", address);
+            free(error);
+            break;
+        }
+        buf += buf_index;
+    }
+
+    return true;
+}
+
 /* Appends port security constraints on L2 address field 'eth_addr_field'
  * (e.g. "eth.src" or "eth.dst") to 'match'.  'port_security', with
  * 'n_port_security' elements, is the collection of port_security constraints
@@ -722,165 +1053,683 @@ lport_is_enabled(const struct nbrec_logical_port *lport)
     return !lport->enabled || *lport->enabled;
 }
 
-/* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
- * constructing their contents based on the OVN_NB database. */
+static bool
+lport_is_up(const struct nbrec_logical_port *lport)
+{
+    return !lport->up || *lport->up;
+}
+
+static bool
+has_stateful_acl(struct ovn_datapath *od)
+{
+    for (size_t i = 0; i < od->nbs->n_acls; i++) {
+        struct nbrec_acl *acl = od->nbs->acls[i];
+        if (!strcmp(acl->action, "allow-related")) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
 static void
-build_lflows(struct northd_context *ctx, struct hmap *datapaths,
-             struct hmap *ports)
+build_acls(struct ovn_datapath *od, struct hmap *lflows, struct hmap *ports)
 {
-    struct hmap lflows = HMAP_INITIALIZER(&lflows);
-    struct hmap mcgroups = HMAP_INITIALIZER(&mcgroups);
+    bool has_stateful = has_stateful_acl(od);
+    struct ovn_port *op;
+    struct ds match_in, match_out;
+
+    /* Ingress and Egress Pre-ACL Table (Priority 0): Packets are
+     * allowed by default. */
+    ovn_lflow_add(lflows, od, S_SWITCH_IN_PRE_ACL, 0, "1", "next;");
+    ovn_lflow_add(lflows, od, S_SWITCH_OUT_PRE_ACL, 0, "1", "next;");
+
+    /* Ingress and Egress ACL Table (Priority 0): Packets are allowed by
+     * default.  A related rule at priority 1 is added below if there
+     * are any stateful ACLs in this datapath. */
+    ovn_lflow_add(lflows, od, S_SWITCH_IN_ACL, 0, "1", "next;");
+    ovn_lflow_add(lflows, od, S_SWITCH_OUT_ACL, 0, "1", "next;");
+
+    /* If there are any stateful ACL rules in this dapapath, we must
+     * send all IP packets through the conntrack action, which handles
+     * defragmentation, in order to match L4 headers. */
+    if (has_stateful) {
+        HMAP_FOR_EACH (op, key_node, ports) {
+            if (op->od == od && !strcmp(op->nbs->type, "router")) {
+                /* Can't use ct() for router ports. Consider the following configuration:
+                lp1(10.0.0.2) on hostA--ls1--lr0--ls2--lp2(10.0.1.2) on hostB,
+                For a ping from lp1 to lp2, First, the response will go through ct()
+                with a zone for lp2 in the ls2 ingress pipeline on hostB.
+                That ct zone knows about this connection. Next, it goes through ct()
+                with the zone for the router port in the egress pipeline of ls2 on hostB.
+                This zone does not know about the connection, as the icmp request
+                went through the logical router on hostA, not hostB. This would only work
+                with distributed conntrack state across all chassis. */
+
+                ds_init(&match_in);
+                ds_init(&match_out);
+                ds_put_format(&match_in, "ip && inport == %s", op->json_key);
+                ds_put_format(&match_out, "ip && outport == %s", op->json_key);
+                ovn_lflow_add(lflows, od, S_SWITCH_IN_PRE_ACL, 110, ds_cstr(&match_in), "next;");
+                ovn_lflow_add(lflows, od, S_SWITCH_OUT_PRE_ACL, 110, ds_cstr(&match_out), "next;");
+
+                ds_destroy(&match_in);
+                ds_destroy(&match_out);
+            }
+        }
+
+        /* Ingress and Egress Pre-ACL Table (Priority 100).
+         *
+         * Regardless of whether the ACL is "from-lport" or "to-lport",
+         * we need rules in both the ingress and egress table, because
+         * the return traffic needs to be followed. */
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_PRE_ACL, 100, "ip", "ct_next;");
+        ovn_lflow_add(lflows, od, S_SWITCH_OUT_PRE_ACL, 100, "ip", "ct_next;");
+
+        /* Ingress and Egress ACL Table (Priority 1).
+         *
+         * By default, traffic is allowed.  This is partially handled by
+         * the Priority 0 ACL flows added earlier, but we also need to
+         * commit IP flows.  This is because, while the initiater's
+         * direction may not have any stateful rules, the server's may
+         * and then its return traffic would not have an associated
+         * conntrack entry and would return "+invalid". */
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_ACL, 1, "ip",
+                      "ct_commit; next;");
+        ovn_lflow_add(lflows, od, S_SWITCH_OUT_ACL, 1, "ip",
+                      "ct_commit; next;");
+
+        /* Ingress and Egress ACL Table (Priority 65535).
+         *
+         * Always drop traffic that's in an invalid state.  This is
+         * enforced at a higher priority than ACLs can be defined. */
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_ACL, UINT16_MAX,
+                      "ct.inv", "drop;");
+        ovn_lflow_add(lflows, od, S_SWITCH_OUT_ACL, UINT16_MAX,
+                      "ct.inv", "drop;");
+
+        /* Ingress and Egress ACL Table (Priority 65535).
+         *
+         * Always allow traffic that is established to a committed
+         * conntrack entry.  This is enforced at a higher priority than
+         * ACLs can be defined. */
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_ACL, UINT16_MAX,
+                      "ct.est && !ct.rel && !ct.new && !ct.inv",
+                      "next;");
+        ovn_lflow_add(lflows, od, S_SWITCH_OUT_ACL, UINT16_MAX,
+                      "ct.est && !ct.rel && !ct.new && !ct.inv",
+                      "next;");
+
+        /* Ingress and Egress ACL Table (Priority 65535).
+         *
+         * Always allow traffic that is related to an existing conntrack
+         * entry.  This is enforced at a higher priority than ACLs can
+         * be defined.
+         *
+         * NOTE: This does not support related data sessions (eg,
+         * a dynamically negotiated FTP data channel), but will allow
+         * related traffic such as an ICMP Port Unreachable through
+         * that's generated from a non-listening UDP port.  */
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_ACL, UINT16_MAX,
+                      "!ct.est && ct.rel && !ct.new && !ct.inv",
+                      "next;");
+        ovn_lflow_add(lflows, od, S_SWITCH_OUT_ACL, UINT16_MAX,
+                      "!ct.est && ct.rel && !ct.new && !ct.inv",
+                      "next;");
+    }
 
-    /* Ingress table 0: Admission control framework (priorities 0 and 100). */
+    /* Ingress or Egress ACL Table (Various priorities). */
+    for (size_t i = 0; i < od->nbs->n_acls; i++) {
+        struct nbrec_acl *acl = od->nbs->acls[i];
+        bool ingress = !strcmp(acl->direction, "from-lport") ? true :false;
+        enum ovn_stage stage = ingress ? S_SWITCH_IN_ACL : S_SWITCH_OUT_ACL;
+
+        if (!strcmp(acl->action, "allow")) {
+            /* If there are any stateful flows, we must even commit "allow"
+             * actions.  This is because, while the initiater's
+             * direction may not have any stateful rules, the server's
+             * may and then its return traffic would not have an
+             * associated conntrack entry and would return "+invalid". */
+            const char *actions = has_stateful ? "ct_commit; next;" : "next;";
+            ovn_lflow_add(lflows, od, stage,
+                          acl->priority + OVN_ACL_PRI_OFFSET,
+                          acl->match, actions);
+        } else if (!strcmp(acl->action, "allow-related")) {
+            struct ds match = DS_EMPTY_INITIALIZER;
+
+            /* Commit the connection tracking entry, which allows all
+             * other traffic related to this entry to flow due to the
+             * 65535 priority flow defined earlier. */
+            ds_put_format(&match, "ct.new && (%s)", acl->match);
+            ovn_lflow_add(lflows, od, stage,
+                          acl->priority + OVN_ACL_PRI_OFFSET,
+                          ds_cstr(&match), "ct_commit; next;");
+
+            ds_destroy(&match);
+        } else if (!strcmp(acl->action, "drop")) {
+            ovn_lflow_add(lflows, od, stage,
+                          acl->priority + OVN_ACL_PRI_OFFSET,
+                          acl->match, "drop;");
+        } else if (!strcmp(acl->action, "reject")) {
+            /* xxx Need to support "reject". */
+            VLOG_INFO("reject is not a supported action");
+            ovn_lflow_add(lflows, od, stage,
+                          acl->priority + OVN_ACL_PRI_OFFSET,
+                          acl->match, "drop;");
+        }
+    }
+}
+
+static void
+build_lswitch_flows(struct hmap *datapaths, struct hmap *ports,
+                    struct hmap *lflows, struct hmap *mcgroups)
+{
+    /* This flow table structure is documented in ovn-northd(8), so please
+     * update ovn-northd.8.xml if you change anything. */
+
+    /* Build pre-ACL and ACL tables for both ingress and egress.
+     * Ingress tables 1 and 2.  Egress tables 0 and 1. */
     struct ovn_datapath *od;
     HMAP_FOR_EACH (od, key_node, datapaths) {
+        if (!od->nbs) {
+            continue;
+        }
+
+        build_acls(od, lflows, ports);
+    }
+
+    /* Logical switch ingress table 0: Admission control framework (priority
+     * 100). */
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        if (!od->nbs) {
+            continue;
+        }
+
         /* Logical VLANs not supported. */
-        ovn_lflow_add(&lflows, od, P_IN, S_IN_PORT_SEC, 100, "vlan.present",
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_PORT_SEC, 100, "vlan.present",
                       "drop;");
 
         /* Broadcast/multicast source address is invalid. */
-        ovn_lflow_add(&lflows, od, P_IN, S_IN_PORT_SEC, 100, "eth.src[40]",
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_PORT_SEC, 100, "eth.src[40]",
                       "drop;");
 
         /* Port security flows have priority 50 (see below) and will continue
          * to the next table if packet source is acceptable. */
-
-        /* Otherwise drop the packet. */
-        ovn_lflow_add(&lflows, od, P_IN, S_IN_PORT_SEC, 0, "1", "drop;");
     }
 
-    /* Ingress table 0: Ingress port security (priority 50). */
+    /* Logical switch ingress table 0: Ingress port security (priority 50). */
     struct ovn_port *op;
     HMAP_FOR_EACH (op, key_node, ports) {
+        if (!op->nbs) {
+            continue;
+        }
+
+        if (!lport_is_enabled(op->nbs)) {
+            /* Drop packets from disabled logical ports (since logical flow
+             * tables are default-drop). */
+            continue;
+        }
+
         struct ds match = DS_EMPTY_INITIALIZER;
-        ds_put_cstr(&match, "inport == ");
-        json_string_escape(op->key, &match);
+        ds_put_format(&match, "inport == %s", op->json_key);
         build_port_security("eth.src",
-                            op->nb->port_security, op->nb->n_port_security,
+                            op->nbs->port_security, op->nbs->n_port_security,
                             &match);
-        ovn_lflow_add(&lflows, op->od, P_IN, S_IN_PORT_SEC, 50, ds_cstr(&match),
-                      lport_is_enabled(op->nb) ? "next;" : "drop;");
+        ovn_lflow_add(lflows, op->od, S_SWITCH_IN_PORT_SEC, 50,
+                      ds_cstr(&match), "next;");
         ds_destroy(&match);
     }
 
-    /* Ingress table 1: ACLs (any priority). */
-    HMAP_FOR_EACH (od, key_node, datapaths) {
-        for (size_t i = 0; i < od->nb->n_acls; i++) {
-            const struct nbrec_acl *acl = od->nb->acls[i];
-            const char *action;
+    /* Ingress table 3: Destination lookup, ARP reply for known IPs.
+     * (priority 150). */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (!op->nbs) {
+            continue;
+        }
+
+        /*
+         * Add ARP reply flows if either the
+         *  - port is up or
+         *  - port type is router
+         */
+        if (!lport_is_up(op->nbs) && strcmp(op->nbs->type, "router")) {
+            continue;
+        }
 
-            if (strcmp(acl->direction, "from-lport")) {
+        for (size_t i = 0; i < op->nbs->n_addresses; i++) {
+            struct lport_addresses laddrs;
+            if (!extract_lport_addresses(op->nbs->addresses[i], &laddrs,
+                                         false)) {
                 continue;
             }
+            for (size_t j = 0; j < laddrs.n_ipv4_addrs; j++) {
+                char *match = xasprintf(
+                    "arp.tpa == "IP_FMT" && arp.op == 1",
+                    IP_ARGS(laddrs.ipv4_addrs[j].addr));
+                char *actions = xasprintf(
+                    "eth.dst = eth.src; "
+                    "eth.src = "ETH_ADDR_FMT"; "
+                    "arp.op = 2; /* ARP reply */ "
+                    "arp.tha = arp.sha; "
+                    "arp.sha = "ETH_ADDR_FMT"; "
+                    "arp.tpa = arp.spa; "
+                    "arp.spa = "IP_FMT"; "
+                    "outport = inport; "
+                    "inport = \"\"; /* Allow sending out inport. */ "
+                    "output;",
+                    ETH_ADDR_ARGS(laddrs.ea),
+                    ETH_ADDR_ARGS(laddrs.ea),
+                    IP_ARGS(laddrs.ipv4_addrs[j].addr));
+                ovn_lflow_add(lflows, op->od, S_SWITCH_IN_L2_LKUP, 150,
+                              match, actions);
+                free(match);
+                free(actions);
+            }
 
-            action = (!strcmp(acl->action, "allow") ||
-                      !strcmp(acl->action, "allow-related"))
-                ? "next;" : "drop;";
-            ovn_lflow_add(&lflows, od, P_IN, S_IN_ACL, acl->priority,
-                          acl->match, action);
+            free(laddrs.ipv4_addrs);
         }
     }
-    HMAP_FOR_EACH (od, key_node, datapaths) {
-        ovn_lflow_add(&lflows, od, P_IN, S_IN_ACL, 0, "1", "next;");
-    }
 
-    /* Ingress table 2: Destination lookup, broadcast and multicast handling
+    /* Ingress table 3: Destination lookup, broadcast and multicast handling
      * (priority 100). */
     HMAP_FOR_EACH (op, key_node, ports) {
-        if (lport_is_enabled(op->nb)) {
-            ovn_multicast_add(&mcgroups, &mc_flood, op);
+        if (!op->nbs) {
+            continue;
+        }
+
+        if (lport_is_enabled(op->nbs)) {
+            ovn_multicast_add(mcgroups, &mc_flood, op);
         }
     }
     HMAP_FOR_EACH (od, key_node, datapaths) {
-        ovn_lflow_add(&lflows, od, P_IN, S_IN_L2_LKUP, 100, "eth.dst[40]",
+        if (!od->nbs) {
+            continue;
+        }
+
+        ovn_lflow_add(lflows, od, S_SWITCH_IN_L2_LKUP, 100, "eth.mcast",
                       "outport = \""MC_FLOOD"\"; output;");
     }
 
-    /* Ingress table 2: Destination lookup, unicast handling (priority 50), */
+    /* Ingress table 3: Destination lookup, unicast handling (priority 50), */
     HMAP_FOR_EACH (op, key_node, ports) {
-        for (size_t i = 0; i < op->nb->n_macs; i++) {
+        if (!op->nbs) {
+            continue;
+        }
+
+        for (size_t i = 0; i < op->nbs->n_addresses; i++) {
             struct eth_addr mac;
 
-            if (eth_addr_from_string(op->nb->macs[i], &mac)) {
+            if (eth_addr_from_string(op->nbs->addresses[i], &mac)) {
                 struct ds match, actions;
 
                 ds_init(&match);
-                ds_put_format(&match, "eth.dst == %s", op->nb->macs[i]);
+                ds_put_format(&match, "eth.dst == "ETH_ADDR_FMT,
+                              ETH_ADDR_ARGS(mac));
 
                 ds_init(&actions);
-                ds_put_cstr(&actions, "outport = ");
-                json_string_escape(op->nb->name, &actions);
-                ds_put_cstr(&actions, "; output;");
-                ovn_lflow_add(&lflows, op->od, P_IN, S_IN_L2_LKUP, 50,
+                ds_put_format(&actions, "outport = %s; output;", op->json_key);
+                ovn_lflow_add(lflows, op->od, S_SWITCH_IN_L2_LKUP, 50,
                               ds_cstr(&match), ds_cstr(&actions));
                 ds_destroy(&actions);
                 ds_destroy(&match);
-            } else if (!strcmp(op->nb->macs[i], "unknown")) {
-                ovn_multicast_add(&mcgroups, &mc_unknown, op);
-                op->od->has_unknown = true;
+            } else if (!strcmp(op->nbs->addresses[i], "unknown")) {
+                if (lport_is_enabled(op->nbs)) {
+                    ovn_multicast_add(mcgroups, &mc_unknown, op);
+                    op->od->has_unknown = true;
+                }
             } else {
                 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
 
-                VLOG_INFO_RL(&rl, "%s: invalid syntax '%s' in macs column",
-                             op->nb->name, op->nb->macs[i]);
+                VLOG_INFO_RL(&rl,
+                             "%s: invalid syntax '%s' in addresses column",
+                             op->nbs->name, op->nbs->addresses[i]);
             }
         }
     }
 
-    /* Ingress table 2: Destination lookup for unknown MACs (priority 0). */
+    /* Ingress table 3: Destination lookup for unknown MACs (priority 0). */
     HMAP_FOR_EACH (od, key_node, datapaths) {
+        if (!od->nbs) {
+            continue;
+        }
+
         if (od->has_unknown) {
-            ovn_lflow_add(&lflows, od, P_IN, S_IN_L2_LKUP, 0, "1",
+            ovn_lflow_add(lflows, od, S_SWITCH_IN_L2_LKUP, 0, "1",
                           "outport = \""MC_UNKNOWN"\"; output;");
         }
     }
 
-    /* Egress table 0: ACLs (any priority). */
+    /* Egress table 2: Egress port security multicast/broadcast (priority
+     * 100). */
     HMAP_FOR_EACH (od, key_node, datapaths) {
-        for (size_t i = 0; i < od->nb->n_acls; i++) {
-            const struct nbrec_acl *acl = od->nb->acls[i];
-            const char *action;
+        if (!od->nbs) {
+            continue;
+        }
 
-            if (strcmp(acl->direction, "to-lport")) {
-                continue;
-            }
+        ovn_lflow_add(lflows, od, S_SWITCH_OUT_PORT_SEC, 100, "eth.mcast",
+                      "output;");
+    }
 
-            action = (!strcmp(acl->action, "allow") ||
-                      !strcmp(acl->action, "allow-related"))
-                ? "next;" : "drop;";
-            ovn_lflow_add(&lflows, od, P_OUT, S_OUT_ACL, acl->priority,
-                          acl->match, action);
+    /* Egress table 2: Egress port security (priorities 50 and 150).
+     *
+     * Priority 50 rules implement port security for enabled logical port.
+     *
+     * Priority 150 rules drop packets to disabled logical ports, so that they
+     * don't even receive multicast or broadcast packets. */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (!op->nbs) {
+            continue;
         }
+
+        struct ds match = DS_EMPTY_INITIALIZER;
+        ds_put_format(&match, "outport == %s", op->json_key);
+        if (lport_is_enabled(op->nbs)) {
+            build_port_security("eth.dst", op->nbs->port_security,
+                                op->nbs->n_port_security, &match);
+            ovn_lflow_add(lflows, op->od, S_SWITCH_OUT_PORT_SEC, 50,
+                          ds_cstr(&match), "output;");
+        } else {
+            ovn_lflow_add(lflows, op->od, S_SWITCH_OUT_PORT_SEC, 150,
+                          ds_cstr(&match), "drop;");
+        }
+
+        ds_destroy(&match);
     }
+}
+
+static bool
+lrport_is_enabled(const struct nbrec_logical_router_port *lrport)
+{
+    return !lrport->enabled || *lrport->enabled;
+}
+
+static void
+add_route(struct hmap *lflows, struct ovn_datapath *od,
+          ovs_be32 network, ovs_be32 mask, ovs_be32 gateway)
+{
+    char *match = xasprintf("ip4.dst == "IP_FMT"/"IP_FMT,
+                            IP_ARGS(network), IP_ARGS(mask));
+
+    struct ds actions = DS_EMPTY_INITIALIZER;
+    ds_put_cstr(&actions, "ip.ttl--; reg0 = ");
+    if (gateway) {
+        ds_put_format(&actions, IP_FMT, IP_ARGS(gateway));
+    } else {
+        ds_put_cstr(&actions, "ip4.dst");
+    }
+    ds_put_cstr(&actions, "; next;");
+
+    /* The priority here is calculated to implement longest-prefix-match
+     * routing. */
+    ovn_lflow_add(lflows, od, S_ROUTER_IN_IP_ROUTING,
+                  count_1bits(ntohl(mask)), match, ds_cstr(&actions));
+    ds_destroy(&actions);
+    free(match);
+}
+
+static void
+build_lrouter_flows(struct hmap *datapaths, struct hmap *ports,
+                    struct hmap *lflows)
+{
+    /* This flow table structure is documented in ovn-northd(8), so please
+     * update ovn-northd.8.xml if you change anything. */
+
+    /* Logical router ingress table 0: Admission control framework. */
+    struct ovn_datapath *od;
     HMAP_FOR_EACH (od, key_node, datapaths) {
-        ovn_lflow_add(&lflows, od, P_OUT, S_OUT_ACL, 0, "1", "next;");
+        if (!od->nbr) {
+            continue;
+        }
+
+        /* Logical VLANs not supported.
+         * Broadcast/multicast source address is invalid. */
+        ovn_lflow_add(lflows, od, S_ROUTER_IN_ADMISSION, 100,
+                      "vlan.present || eth.src[40]", "drop;");
     }
 
-    /* Egress table 1: Egress port security multicast/broadcast (priority
-     * 100). */
+    /* Logical router ingress table 0: match (priority 50). */
+    struct ovn_port *op;
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (!op->nbr) {
+            continue;
+        }
+
+        if (!lrport_is_enabled(op->nbr)) {
+            /* Drop packets from disabled logical ports (since logical flow
+             * tables are default-drop). */
+            continue;
+        }
+
+        char *match = xasprintf(
+            "(eth.mcast || eth.dst == "ETH_ADDR_FMT") && inport == %s",
+            ETH_ADDR_ARGS(op->mac), op->json_key);
+        ovn_lflow_add(lflows, op->od, S_ROUTER_IN_ADMISSION, 50,
+                      match, "next;");
+        free(match);
+    }
+
+    /* Logical router ingress table 1: IP Input. */
     HMAP_FOR_EACH (od, key_node, datapaths) {
-        ovn_lflow_add(&lflows, od, P_OUT, S_OUT_PORT_SEC, 100, "eth.dst[40]",
-                      "output;");
+        if (!od->nbr) {
+            continue;
+        }
+
+        /* L3 admission control: drop multicast and broadcast source, localhost
+         * source or destination, and zero network source or destination
+         * (priority 100). */
+        ovn_lflow_add(lflows, od, S_ROUTER_IN_IP_INPUT, 100,
+                      "ip4.mcast || "
+                      "ip4.src == 255.255.255.255 || "
+                      "ip4.src == 127.0.0.0/8 || "
+                      "ip4.dst == 127.0.0.0/8 || "
+                      "ip4.src == 0.0.0.0/8 || "
+                      "ip4.dst == 0.0.0.0/8",
+                      "drop;");
+
+        /* Drop Ethernet local broadcast.  By definition this traffic should
+         * not be forwarded.*/
+        ovn_lflow_add(lflows, od, S_ROUTER_IN_IP_INPUT, 50,
+                      "eth.bcast", "drop;");
+
+        /* Drop IP multicast. */
+        ovn_lflow_add(lflows, od, S_ROUTER_IN_IP_INPUT, 50,
+                      "ip4.mcast", "drop;");
+
+        /* TTL discard.
+         *
+         * XXX Need to send ICMP time exceeded if !ip.later_frag. */
+        char *match = xasprintf("ip4 && ip.ttl == {0, 1}");
+        ovn_lflow_add(lflows, od, S_ROUTER_IN_IP_INPUT, 30, match, "drop;");
+        free(match);
+
+        /* Pass other traffic not already handled to the next table for
+         * routing. */
+        ovn_lflow_add(lflows, od, S_ROUTER_IN_IP_INPUT, 0, "1", "next;");
     }
 
-    /* Egress table 1: Egress port security (priority 50). */
     HMAP_FOR_EACH (op, key_node, ports) {
-        struct ds match;
+        if (!op->nbr) {
+            continue;
+        }
 
-        ds_init(&match);
-        ds_put_cstr(&match, "outport == ");
-        json_string_escape(op->key, &match);
-        build_port_security("eth.dst",
-                            op->nb->port_security, op->nb->n_port_security,
-                            &match);
+        /* L3 admission control: drop packets that originate from an IP address
+         * owned by the router or a broadcast address known to the router
+         * (priority 100). */
+        char *match = xasprintf("ip4.src == {"IP_FMT", "IP_FMT"}",
+                                IP_ARGS(op->ip), IP_ARGS(op->bcast));
+        ovn_lflow_add(lflows, op->od, S_ROUTER_IN_IP_INPUT, 100,
+                      match, "drop;");
+        free(match);
+
+        /* ICMP echo reply.  These flows reply to ICMP echo requests
+         * received for the router's IP address. */
+        match = xasprintf(
+            "inport == %s && (ip4.dst == "IP_FMT" || ip4.dst == "IP_FMT") && "
+            "icmp4.type == 8 && icmp4.code == 0",
+            op->json_key, IP_ARGS(op->ip), IP_ARGS(op->bcast));
+        char *actions = xasprintf(
+            "ip4.dst = ip4.src; "
+            "ip4.src = "IP_FMT"; "
+            "ip.ttl = 255; "
+            "icmp4.type = 0; "
+            "inport = \"\"; /* Allow sending out inport. */ "
+            "next; ",
+            IP_ARGS(op->ip));
+        ovn_lflow_add(lflows, op->od, S_ROUTER_IN_IP_INPUT, 90,
+                      match, actions);
+        free(match);
+        free(actions);
+
+        /* ARP reply.  These flows reply to ARP requests for the router's own
+         * IP address. */
+        match = xasprintf(
+            "inport == %s && arp.tpa == "IP_FMT" && arp.op == 1",
+            op->json_key, IP_ARGS(op->ip));
+        actions = xasprintf(
+            "eth.dst = eth.src; "
+            "eth.src = "ETH_ADDR_FMT"; "
+            "arp.op = 2; /* ARP reply */ "
+            "arp.tha = arp.sha; "
+            "arp.sha = "ETH_ADDR_FMT"; "
+            "arp.tpa = arp.spa; "
+            "arp.spa = "IP_FMT"; "
+            "outport = %s; "
+            "inport = \"\"; /* Allow sending out inport. */ "
+            "output;",
+            ETH_ADDR_ARGS(op->mac),
+            ETH_ADDR_ARGS(op->mac),
+            IP_ARGS(op->ip),
+            op->json_key);
+        ovn_lflow_add(lflows, op->od, S_ROUTER_IN_IP_INPUT, 90,
+                      match, actions);
+        free(match);
+        free(actions);
+
+        /* Drop IP traffic to this router. */
+        match = xasprintf("ip4.dst == "IP_FMT, IP_ARGS(op->ip));
+        ovn_lflow_add(lflows, op->od, S_ROUTER_IN_IP_INPUT, 60,
+                      match, "drop;");
+        free(match);
+    }
 
-        ovn_lflow_add(&lflows, op->od, P_OUT, S_OUT_PORT_SEC, 50,
-                      ds_cstr(&match),
-                      lport_is_enabled(op->nb) ? "output;" : "drop;");
+    /* Logical router ingress table 2: IP Routing.
+     *
+     * A packet that arrives at this table is an IP packet that should be
+     * routed to the address in ip4.dst. This table sets reg0 to the next-hop
+     * IP address (leaving ip4.dst, the packet’s final destination, unchanged)
+     * and advances to the next table for ARP resolution. */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (!op->nbr) {
+            continue;
+        }
 
-        ds_destroy(&match);
+        add_route(lflows, op->od, op->network, op->mask, 0);
+    }
+    HMAP_FOR_EACH (od, key_node, datapaths) {
+        if (!od->nbr) {
+            continue;
+        }
+
+        if (od->gateway) {
+            add_route(lflows, od, 0, 0, od->gateway);
+        }
+    }
+    /* XXX destination unreachable */
+
+    /* Local router ingress table 3: ARP Resolution.
+     *
+     * Any packet that reaches this table is an IP packet whose next-hop IP
+     * address is in reg0. (ip4.dst is the final destination.) This table
+     * resolves the IP address in reg0 into an output port in outport and an
+     * Ethernet address in eth.dst. */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (op->nbr) {
+            /* XXX ARP for neighboring router */
+        } else if (op->od->n_router_ports) {
+            for (size_t i = 0; i < op->nbs->n_addresses; i++) {
+                struct lport_addresses laddrs;
+                if (!extract_lport_addresses(op->nbs->addresses[i], &laddrs,
+                                             false)) {
+                    continue;
+                }
+
+                for (size_t k = 0; k < laddrs.n_ipv4_addrs; k++) {
+                    ovs_be32 ip = laddrs.ipv4_addrs[k].addr;
+                    for (size_t j = 0; j < op->od->n_router_ports; j++) {
+                        /* Get the Logical_Router_Port that the Logical_Port is
+                         * connected to, as 'peer'. */
+                        const char *peer_name = smap_get(
+                            &op->od->router_ports[j]->nbs->options,
+                            "router-port");
+                        if (!peer_name) {
+                            continue;
+                        }
+
+                        struct ovn_port *peer
+                            = ovn_port_find(ports, peer_name);
+                        if (!peer || !peer->nbr) {
+                            continue;
+                        }
+
+                        /* Make sure that 'ip' is in 'peer''s network. */
+                        if ((ip ^ peer->network) & peer->mask) {
+                            continue;
+                        }
+
+                        char *match = xasprintf("reg0 == "IP_FMT, IP_ARGS(ip));
+                        char *actions = xasprintf("eth.src = "ETH_ADDR_FMT"; "
+                                                  "eth.dst = "ETH_ADDR_FMT"; "
+                                                  "outport = %s; "
+                                                  "output;",
+                                                  ETH_ADDR_ARGS(peer->mac),
+                                                  ETH_ADDR_ARGS(laddrs.ea),
+                                                  peer->json_key);
+                        ovn_lflow_add(lflows, peer->od,
+                                      S_ROUTER_IN_ARP, 200, match, actions);
+                        free(actions);
+                        free(match);
+                        break;
+                    }
+                }
+
+                free(laddrs.ipv4_addrs);
+            }
+        }
     }
 
+    /* Logical router egress table 0: Delivery (priority 100).
+     *
+     * Priority 100 rules deliver packets to enabled logical ports. */
+    HMAP_FOR_EACH (op, key_node, ports) {
+        if (!op->nbr) {
+            continue;
+        }
+
+        if (!lrport_is_enabled(op->nbr)) {
+            /* Drop packets to disabled logical ports (since logical flow
+             * tables are default-drop). */
+            continue;
+        }
+
+        char *match = xasprintf("outport == %s", op->json_key);
+        ovn_lflow_add(lflows, op->od, S_ROUTER_OUT_DELIVERY, 100,
+                      match, "output;");
+        free(match);
+    }
+}
+
+/* Updates the Logical_Flow and Multicast_Group tables in the OVN_SB database,
+ * constructing their contents based on the OVN_NB database. */
+static void
+build_lflows(struct northd_context *ctx, struct hmap *datapaths,
+             struct hmap *ports)
+{
+    struct hmap lflows = HMAP_INITIALIZER(&lflows);
+    struct hmap mcgroups = HMAP_INITIALIZER(&mcgroups);
+
+    build_lswitch_flows(datapaths, ports, &lflows, &mcgroups);
+    build_lrouter_flows(datapaths, ports, &lflows);
+
     /* Push changes to the Logical_Flow table to database. */
     const struct sbrec_logical_flow *sbflow, *next_sbflow;
     SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) {
@@ -891,10 +1740,12 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
             continue;
         }
 
+        enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER;
+        enum ovn_pipeline pipeline
+            = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT;
         struct ovn_lflow *lflow = ovn_lflow_find(
-            &lflows, od, (!strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT),
-            sbflow->table_id, sbflow->priority,
-            sbflow->match, sbflow->actions);
+            &lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id),
+            sbflow->priority, sbflow->match, sbflow->actions);
         if (lflow) {
             ovn_lflow_destroy(&lflows, lflow);
         } else {
@@ -903,20 +1754,20 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
     }
     struct ovn_lflow *lflow, *next_lflow;
     HMAP_FOR_EACH_SAFE (lflow, next_lflow, hmap_node, &lflows) {
+        enum ovn_pipeline pipeline = ovn_stage_get_pipeline(lflow->stage);
+        uint8_t table = ovn_stage_get_table(lflow->stage);
+
         sbflow = sbrec_logical_flow_insert(ctx->ovnsb_txn);
         sbrec_logical_flow_set_logical_datapath(sbflow, lflow->od->sb);
         sbrec_logical_flow_set_pipeline(
-            sbflow, lflow->pipeline == P_IN ? "ingress" : "egress");
-        sbrec_logical_flow_set_table_id(sbflow, lflow->table_id);
+            sbflow, pipeline == P_IN ? "ingress" : "egress");
+        sbrec_logical_flow_set_table_id(sbflow, table);
         sbrec_logical_flow_set_priority(sbflow, lflow->priority);
         sbrec_logical_flow_set_match(sbflow, lflow->match);
         sbrec_logical_flow_set_actions(sbflow, lflow->actions);
 
-        const struct smap ids = SMAP_CONST1(
-            &ids, "stage-name",
-            (lflow->pipeline == P_IN
-             ? ingress_stage_to_str(lflow->table_id)
-             : egress_stage_to_str(lflow->table_id)));
+        const struct smap ids = SMAP_CONST1(&ids, "stage-name",
+                                            ovn_stage_to_str(lflow->stage));
         sbrec_logical_flow_set_external_ids(sbflow, &ids);
 
         ovn_lflow_destroy(&lflows, lflow);
@@ -956,10 +1807,12 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths,
 }
 \f
 static void
-ovnnb_db_changed(struct northd_context *ctx)
+ovnnb_db_run(struct northd_context *ctx)
 {
-    VLOG_DBG("ovn-nb db contents have changed.");
-
+    if (!ctx->ovnsb_txn) {
+        return;
+    }
+    VLOG_DBG("ovn-nb db contents may have changed.");
     struct hmap datapaths, ports;
     build_datapaths(ctx, &datapaths);
     build_ports(ctx, &datapaths, &ports);
@@ -984,8 +1837,11 @@ ovnnb_db_changed(struct northd_context *ctx)
  * need to set the corresponding logical port as 'up' in the northbound DB.
  */
 static void
-ovnsb_db_changed(struct northd_context *ctx)
+ovnsb_db_run(struct northd_context *ctx)
 {
+    if (!ctx->ovnnb_txn) {
+        return;
+    }
     struct hmap lports_hmap;
     const struct sbrec_port_binding *sb;
     const struct nbrec_logical_port *nb;
@@ -1131,15 +1987,7 @@ add_column_noalert(struct ovsdb_idl *idl,
 int
 main(int argc, char *argv[])
 {
-    extern struct vlog_module VLM_reconnect;
-    struct ovsdb_idl *ovnnb_idl, *ovnsb_idl;
-    unsigned int ovnnb_seqno, ovn_seqno;
     int res = EXIT_SUCCESS;
-    struct northd_context ctx = {
-        .ovnsb_txn = NULL,
-    };
-    bool ovnnb_changes_pending = false;
-    bool ovn_changes_pending = false;
     struct unixctl_server *unixctl;
     int retval;
     bool exiting;
@@ -1147,11 +1995,9 @@ main(int argc, char *argv[])
     fatal_ignore_sigpipe();
     set_program_name(argv[0]);
     service_start(&argc, &argv);
-    vlog_set_levels(NULL, VLF_CONSOLE, VLL_WARN);
-    vlog_set_levels(&VLM_reconnect, VLF_ANY_DESTINATION, VLL_WARN);
     parse_options(argc, argv);
 
-    daemonize_start();
+    daemonize_start(false);
 
     retval = unixctl_server_create(NULL, &unixctl);
     if (retval) {
@@ -1165,190 +2011,82 @@ main(int argc, char *argv[])
     sbrec_init();
 
     /* We want to detect all changes to the ovn-nb db. */
-    ctx.ovnnb_idl = ovnnb_idl = ovsdb_idl_create(ovnnb_db,
-            &nbrec_idl_class, true, true);
-
-    ctx.ovnsb_idl = ovnsb_idl = ovsdb_idl_create(ovnsb_db,
-            &sbrec_idl_class, false, true);
-
-    ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_logical_flow);
-    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_logical_datapath);
-    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_pipeline);
-    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_table_id);
-    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_priority);
-    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_match);
-    add_column_noalert(ovnsb_idl, &sbrec_logical_flow_col_actions);
-
-    ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_multicast_group);
-    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_datapath);
-    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_tunnel_key);
-    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_name);
-    add_column_noalert(ovnsb_idl, &sbrec_multicast_group_col_ports);
-
-    ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_datapath_binding);
-    add_column_noalert(ovnsb_idl, &sbrec_datapath_binding_col_tunnel_key);
-    add_column_noalert(ovnsb_idl, &sbrec_datapath_binding_col_external_ids);
-
-    ovsdb_idl_add_table(ovnsb_idl, &sbrec_table_port_binding);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_datapath);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_logical_port);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_tunnel_key);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_parent_port);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_tag);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_type);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_options);
-    add_column_noalert(ovnsb_idl, &sbrec_port_binding_col_mac);
-    ovsdb_idl_add_column(ovnsb_idl, &sbrec_port_binding_col_chassis);
-
-    /*
-     * The loop here just runs the IDL in a loop waiting for the seqno to
-     * change, which indicates that the contents of the db have changed.
-     *
-     * If the contents of the ovn-nb db change, the mappings to the ovn-sb
-     * db must be recalculated.
-     *
-     * If the contents of the ovn-sb db change, it means the 'up' state of
-     * a port may have changed, as that's the only type of change ovn-northd is
-     * watching for.
-     */
-
-    ovnnb_seqno = ovsdb_idl_get_seqno(ovnnb_idl);
-    ovn_seqno = ovsdb_idl_get_seqno(ovnsb_idl);
+    struct ovsdb_idl_loop ovnnb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
+        ovsdb_idl_create(ovnnb_db, &nbrec_idl_class, true, true));
+
+    struct ovsdb_idl_loop ovnsb_idl_loop = OVSDB_IDL_LOOP_INITIALIZER(
+        ovsdb_idl_create(ovnsb_db, &sbrec_idl_class, false, true));
+
+    ovsdb_idl_add_table(ovnsb_idl_loop.idl, &sbrec_table_logical_flow);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_logical_flow_col_logical_datapath);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_logical_flow_col_pipeline);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_logical_flow_col_table_id);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_logical_flow_col_priority);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_logical_flow_col_match);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_logical_flow_col_actions);
+
+    ovsdb_idl_add_table(ovnsb_idl_loop.idl, &sbrec_table_multicast_group);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_multicast_group_col_datapath);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_multicast_group_col_tunnel_key);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_multicast_group_col_name);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_multicast_group_col_ports);
+
+    ovsdb_idl_add_table(ovnsb_idl_loop.idl, &sbrec_table_datapath_binding);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_datapath_binding_col_tunnel_key);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_datapath_binding_col_external_ids);
+
+    ovsdb_idl_add_table(ovnsb_idl_loop.idl, &sbrec_table_port_binding);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_port_binding_col_datapath);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_port_binding_col_logical_port);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_port_binding_col_tunnel_key);
+    add_column_noalert(ovnsb_idl_loop.idl,
+                       &sbrec_port_binding_col_parent_port);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_port_binding_col_tag);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_port_binding_col_type);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_port_binding_col_options);
+    add_column_noalert(ovnsb_idl_loop.idl, &sbrec_port_binding_col_mac);
+    ovsdb_idl_add_column(ovnsb_idl_loop.idl, &sbrec_port_binding_col_chassis);
+
+    /* Main loop. */
     exiting = false;
     while (!exiting) {
-        ovsdb_idl_run(ovnnb_idl);
-        ovsdb_idl_run(ovnsb_idl);
-        unixctl_server_run(unixctl);
+        struct northd_context ctx = {
+            .ovnnb_idl = ovnnb_idl_loop.idl,
+            .ovnnb_txn = ovsdb_idl_loop_run(&ovnnb_idl_loop),
+            .ovnsb_idl = ovnsb_idl_loop.idl,
+            .ovnsb_txn = ovsdb_idl_loop_run(&ovnsb_idl_loop),
+        };
 
-        if (!ovsdb_idl_is_alive(ovnnb_idl)) {
-            int retval = ovsdb_idl_get_last_error(ovnnb_idl);
-            VLOG_ERR("%s: database connection failed (%s)",
-                    ovnnb_db, ovs_retval_to_string(retval));
-            res = EXIT_FAILURE;
-            break;
-        }
+        ovnnb_db_run(&ctx);
+        ovnsb_db_run(&ctx);
 
-        if (!ovsdb_idl_is_alive(ovnsb_idl)) {
-            int retval = ovsdb_idl_get_last_error(ovnsb_idl);
-            VLOG_ERR("%s: database connection failed (%s)",
-                    ovnsb_db, ovs_retval_to_string(retval));
-            res = EXIT_FAILURE;
-            break;
-        }
-
-        if (ovnnb_seqno != ovsdb_idl_get_seqno(ovnnb_idl)) {
-            ovnnb_seqno = ovsdb_idl_get_seqno(ovnnb_idl);
-            ovnnb_changes_pending = true;
-        }
-
-        if (ovn_seqno != ovsdb_idl_get_seqno(ovnsb_idl)) {
-            ovn_seqno = ovsdb_idl_get_seqno(ovnsb_idl);
-            ovn_changes_pending = true;
-        }
-
-        /*
-         * If there are any pending changes, we delay recalculating the
-         * necessary updates until after an existing transaction finishes.
-         * This avoids the possibility of rapid updates causing ovn-northd to
-         * never be able to successfully make the corresponding updates to the
-         * other db.  Instead, pending changes are batched up until the next
-         * time we get a chance to calculate the new state and apply it.
-         */
-
-        if (ovnnb_changes_pending && !ctx.ovnsb_txn) {
-            /*
-             * The OVN-nb db contents have changed, so create a transaction for
-             * updating the OVN-sb DB.
-             */
-            ctx.ovnsb_txn = ovsdb_idl_txn_create(ctx.ovnsb_idl);
-            ovsdb_idl_txn_add_comment(ctx.ovnsb_txn,
-                                      "ovn-northd: northbound db changed");
-            ovnnb_db_changed(&ctx);
-            ovnnb_changes_pending = false;
-        }
-
-        if (ovn_changes_pending && !ctx.ovnnb_txn) {
-            /*
-             * The OVN-sb db contents have changed, so create a transaction for
-             * updating the northbound DB.
-             */
-            ctx.ovnnb_txn = ovsdb_idl_txn_create(ctx.ovnnb_idl);
-            ovsdb_idl_txn_add_comment(ctx.ovnnb_txn,
-                                      "ovn-northd: southbound db changed");
-            ovnsb_db_changed(&ctx);
-            ovn_changes_pending = false;
-        }
-
-        if (ctx.ovnnb_txn) {
-            enum ovsdb_idl_txn_status txn_status;
-            txn_status = ovsdb_idl_txn_commit(ctx.ovnnb_txn);
-            switch (txn_status) {
-            case TXN_UNCOMMITTED:
-            case TXN_INCOMPLETE:
-                /* Come back around and try to commit this transaction again */
-                break;
-            case TXN_ABORTED:
-            case TXN_TRY_AGAIN:
-            case TXN_NOT_LOCKED:
-            case TXN_ERROR:
-                /* Something went wrong, so try creating a new transaction. */
-                ovn_changes_pending = true;
-            case TXN_UNCHANGED:
-            case TXN_SUCCESS:
-                ovsdb_idl_txn_destroy(ctx.ovnnb_txn);
-                ctx.ovnnb_txn = NULL;
-            }
-        }
-
-        if (ctx.ovnsb_txn) {
-            enum ovsdb_idl_txn_status txn_status;
-            txn_status = ovsdb_idl_txn_commit(ctx.ovnsb_txn);
-            switch (txn_status) {
-            case TXN_UNCOMMITTED:
-            case TXN_INCOMPLETE:
-                /* Come back around and try to commit this transaction again */
-                break;
-            case TXN_ABORTED:
-            case TXN_TRY_AGAIN:
-            case TXN_NOT_LOCKED:
-            case TXN_ERROR:
-                /* Something went wrong, so try creating a new transaction. */
-                ovnnb_changes_pending = true;
-            case TXN_UNCHANGED:
-            case TXN_SUCCESS:
-                ovsdb_idl_txn_destroy(ctx.ovnsb_txn);
-                ctx.ovnsb_txn = NULL;
-            }
+        unixctl_server_run(unixctl);
+        unixctl_server_wait(unixctl);
+        if (exiting) {
+            poll_immediate_wake();
         }
+        ovsdb_idl_loop_commit_and_wait(&ovnnb_idl_loop);
+        ovsdb_idl_loop_commit_and_wait(&ovnsb_idl_loop);
 
-        if (ovnnb_seqno == ovsdb_idl_get_seqno(ovnnb_idl) &&
-                ovn_seqno == ovsdb_idl_get_seqno(ovnsb_idl)) {
-            ovsdb_idl_wait(ovnnb_idl);
-            ovsdb_idl_wait(ovnsb_idl);
-            if (ctx.ovnnb_txn) {
-                ovsdb_idl_txn_wait(ctx.ovnnb_txn);
-            }
-            if (ctx.ovnsb_txn) {
-                ovsdb_idl_txn_wait(ctx.ovnsb_txn);
-            }
-            unixctl_server_wait(unixctl);
-            if (exiting) {
-                poll_immediate_wake();
-            }
-            poll_block();
-        }
+        poll_block();
         if (should_service_stop()) {
             exiting = true;
         }
     }
 
     unixctl_server_destroy(unixctl);
-    ovsdb_idl_destroy(ovnsb_idl);
-    ovsdb_idl_destroy(ovnnb_idl);
+    ovsdb_idl_loop_destroy(&ovnnb_idl_loop);
+    ovsdb_idl_loop_destroy(&ovnsb_idl_loop);
     service_stop();
 
     free(default_db_);
-
     exit(res);
 }