e55e5246077d3d0ce501205c0e25930bb29c32bc
[cascardo/ovs.git] / ovn / controller / ofctrl.c
1 /* Copyright (c) 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <config.h>
17 #include "ofctrl.h"
18 #include "dirs.h"
19 #include "dynamic-string.h"
20 #include "hmap.h"
21 #include "match.h"
22 #include "ofp-actions.h"
23 #include "ofp-msgs.h"
24 #include "ofp-print.h"
25 #include "ofp-util.h"
26 #include "ofpbuf.h"
27 #include "openflow/openflow.h"
28 #include "openvswitch/vlog.h"
29 #include "ovn-controller.h"
30 #include "physical.h"
31 #include "rconn.h"
32 #include "socket-util.h"
33 #include "vswitch-idl.h"
34
35 VLOG_DEFINE_THIS_MODULE(ofctrl);
36
37 /* An OpenFlow flow. */
38 struct ovn_flow {
39     /* Key. */
40     struct hmap_node hmap_node;
41     uint8_t table_id;
42     uint16_t priority;
43     struct match match;
44
45     /* Data. */
46     struct ofpact *ofpacts;
47     size_t ofpacts_len;
48 };
49
50 static uint32_t ovn_flow_hash(const struct ovn_flow *);
51 static struct ovn_flow *ovn_flow_lookup(struct hmap *flow_table,
52                                         const struct ovn_flow *target);
53 static char *ovn_flow_to_string(const struct ovn_flow *);
54 static void ovn_flow_log(const struct ovn_flow *, const char *action);
55 static void ovn_flow_destroy(struct ovn_flow *);
56
57 static ovs_be32 queue_msg(struct ofpbuf *);
58 static void queue_flow_mod(struct ofputil_flow_mod *);
59
60 /* OpenFlow connection to the switch. */
61 static struct rconn *swconn;
62
63 /* Last seen sequence number for 'swconn'.  When this differs from
64  * rconn_get_connection_seqno(rconn), 'swconn' has reconnected. */
65 static unsigned int seqno;
66
67 /* Connection state machine. */
68 #define STATES                                  \
69     STATE(S_NEW)                                \
70     STATE(S_GENEVE_TABLE_REQUESTED)             \
71     STATE(S_GENEVE_TABLE_MOD_SENT)              \
72     STATE(S_CLEAR_FLOWS)                        \
73     STATE(S_UPDATE_FLOWS)
74 enum ofctrl_state {
75 #define STATE(NAME) NAME,
76     STATES
77 #undef STATE
78 };
79
80 /* Current state. */
81 static enum ofctrl_state state;
82
83 /* Transaction IDs for messages in flight to the switch. */
84 static ovs_be32 xid, xid2;
85
86 /* Counter for in-flight OpenFlow messages on 'swconn'.  We only send a new
87  * round of flow table modifications to the switch when the counter falls to
88  * zero, to avoid unbounded buffering. */
89 static struct rconn_packet_counter *tx_counter;
90
91 /* Flow table of "struct ovn_flow"s, that holds the flow table currently
92  * installed in the switch. */
93 static struct hmap installed_flows;
94
95 /* MFF_* field ID for our Geneve option.  In S_GENEVE_TABLE_MOD_SENT, this is
96  * the option we requested (we don't know whether we obtained it yet).  In
97  * S_CLEAR_FLOWS or S_UPDATE_FLOWS, this is really the option we have. */
98 static enum mf_field_id mff_ovn_geneve;
99
100 static void ovn_flow_table_clear(struct hmap *flow_table);
101 static void ovn_flow_table_destroy(struct hmap *flow_table);
102
103 static void ofctrl_recv(const struct ofp_header *, enum ofptype);
104
105 void
106 ofctrl_init(void)
107 {
108     swconn = rconn_create(5, 0, DSCP_DEFAULT, 1 << OFP13_VERSION);
109     tx_counter = rconn_packet_counter_create();
110     hmap_init(&installed_flows);
111 }
112 \f
113 /* S_NEW, for a new connection.
114  *
115  * Sends NXT_GENEVE_TABLE_REQUEST and transitions to
116  * S_GENEVE_TABLE_REQUESTED. */
117
118 static void
119 run_S_NEW(void)
120 {
121     struct ofpbuf *buf = ofpraw_alloc(OFPRAW_NXT_GENEVE_TABLE_REQUEST,
122                                       rconn_get_version(swconn), 0);
123     xid = queue_msg(buf);
124     state = S_GENEVE_TABLE_REQUESTED;
125 }
126
127 static void
128 recv_S_NEW(const struct ofp_header *oh OVS_UNUSED,
129            enum ofptype type OVS_UNUSED)
130 {
131     OVS_NOT_REACHED();
132 }
133 \f
134 /* S_GENEVE_TABLE_REQUESTED, when NXT_GENEVE_TABLE_REQUEST has been sent
135  * and we're waiting for a reply.
136  *
137  * If we receive an NXT_GENEVE_TABLE_REPLY:
138  *
139  *     - If it contains our tunnel metadata option, assign its field ID to
140  *       mff_ovn_geneve and transition to S_CLEAR_FLOWS.
141  *
142  *     - Otherwise, if there is an unused tunnel metadata field ID, send
143  *       NXT_GENEVE_TABLE_MOD and OFPT_BARRIER_REQUEST, and transition to
144  *       S_GENEVE_TABLE_MOD_SENT.
145  *
146  *     - Otherwise, log an error, disable Geneve, and transition to
147  *       S_CLEAR_FLOWS.
148  *
149  * If we receive an OFPT_ERROR:
150  *
151  *     - Log an error, disable Geneve, and transition to S_CLEAR_FLOWS. */
152
153 static void
154 run_S_GENEVE_TABLE_REQUESTED(void)
155 {
156 }
157
158 static void
159 recv_S_GENEVE_TABLE_REQUESTED(const struct ofp_header *oh, enum ofptype type)
160 {
161     if (oh->xid != xid) {
162         ofctrl_recv(oh, type);
163     } else if (type == OFPTYPE_NXT_GENEVE_TABLE_REPLY) {
164         struct ofputil_geneve_table_reply reply;
165         enum ofperr error = ofputil_decode_geneve_table_reply(oh, &reply);
166         if (error) {
167             VLOG_ERR("failed to decode Geneve table request (%s)",
168                      ofperr_to_string(error));
169             goto error;
170         }
171
172         const struct ofputil_geneve_map *map;
173         uint64_t md_free = UINT64_MAX;
174         BUILD_ASSERT(TUN_METADATA_NUM_OPTS == 64);
175
176         LIST_FOR_EACH (map, list_node, &reply.mappings) {
177             if (map->option_class == OVN_GENEVE_CLASS
178                 && map->option_type == OVN_GENEVE_TYPE
179                 && map->option_len == OVN_GENEVE_LEN) {
180                 if (map->index >= TUN_METADATA_NUM_OPTS) {
181                     VLOG_ERR("desired Geneve tunnel option 0x%"PRIx16","
182                              "%"PRIu8",%"PRIu8" already in use with "
183                              "unsupported index %"PRIu16,
184                              map->option_class, map->option_type,
185                              map->option_len, map->index);
186                     goto error;
187                 } else {
188                     mff_ovn_geneve = MFF_TUN_METADATA0 + map->index;
189                     state = S_CLEAR_FLOWS;
190                     return;
191                 }
192             }
193
194             if (map->index < TUN_METADATA_NUM_OPTS) {
195                 md_free &= ~(UINT64_C(1) << map->index);
196             }
197         }
198
199         VLOG_DBG("OVN Geneve option not found");
200         if (!md_free) {
201             VLOG_ERR("no Geneve options free for use by OVN");
202             goto error;
203         }
204
205         unsigned int index = rightmost_1bit_idx(md_free);
206         mff_ovn_geneve = MFF_TUN_METADATA0 + index;
207         struct ofputil_geneve_map gm;
208         gm.option_class = OVN_GENEVE_CLASS;
209         gm.option_type = OVN_GENEVE_TYPE;
210         gm.option_len = OVN_GENEVE_LEN;
211         gm.index = index;
212
213         struct ofputil_geneve_table_mod gtm;
214         gtm.command = NXGTMC_ADD;
215         list_init(&gtm.mappings);
216         list_push_back(&gtm.mappings, &gm.list_node);
217
218         xid = queue_msg(ofputil_encode_geneve_table_mod(OFP13_VERSION, &gtm));
219         xid2 = queue_msg(ofputil_encode_barrier_request(OFP13_VERSION));
220         state = S_GENEVE_TABLE_MOD_SENT;
221     } else if (type == OFPTYPE_ERROR) {
222         VLOG_ERR("switch refused to allocate Geneve option (%s)",
223                  ofperr_to_string(ofperr_decode_msg(oh, NULL)));
224         goto error;
225     } else {
226         char *s = ofp_to_string(oh, ntohs(oh->length), 1);
227         VLOG_ERR("unexpected reply to Geneve table request (%s)",
228                  s);
229         free(s);
230         goto error;
231     }
232     return;
233
234 error:
235     mff_ovn_geneve = 0;
236     state = S_CLEAR_FLOWS;
237 }
238 \f
239 /* S_GENEVE_TABLE_MOD_SENT, when NXT_GENEVE_TABLE_MOD and OFPT_BARRIER_REQUEST
240  * have been sent and we're waiting for a reply to one or the other.
241  *
242  * If we receive an OFPT_ERROR:
243  *
244  *     - If the error is NXGTMFC_ALREADY_MAPPED or NXGTMFC_DUP_ENTRY, we
245  *       raced with some other controller.  Transition to S_NEW.
246  *
247  *     - Otherwise, log an error, disable Geneve, and transition to
248  *       S_CLEAR_FLOWS.
249  *
250  * If we receive OFPT_BARRIER_REPLY:
251  *
252  *     - Set the tunnel metadata field ID to the one that we requested.
253  *       Transition to S_CLEAR_FLOWS.
254  */
255
256 static void
257 run_S_GENEVE_TABLE_MOD_SENT(void)
258 {
259 }
260
261 static void
262 recv_S_GENEVE_TABLE_MOD_SENT(const struct ofp_header *oh, enum ofptype type)
263 {
264     if (oh->xid != xid && oh->xid != xid2) {
265         ofctrl_recv(oh, type);
266     } else if (oh->xid == xid2 && type == OFPTYPE_BARRIER_REPLY) {
267         state = S_CLEAR_FLOWS;
268     } else if (oh->xid == xid && type == OFPTYPE_ERROR) {
269         enum ofperr error = ofperr_decode_msg(oh, NULL);
270         if (error == OFPERR_NXGTMFC_ALREADY_MAPPED ||
271             error == OFPERR_NXGTMFC_DUP_ENTRY) {
272             VLOG_INFO("raced with another controller adding "
273                       "Geneve option (%s); trying again",
274                       ofperr_to_string(error));
275             state = S_NEW;
276         } else {
277             VLOG_ERR("error adding Geneve option (%s)",
278                      ofperr_to_string(error));
279             goto error;
280         }
281     } else {
282         char *s = ofp_to_string(oh, ntohs(oh->length), 1);
283         VLOG_ERR("unexpected reply to Geneve option allocation request (%s)",
284                  s);
285         free(s);
286         goto error;
287     }
288     return;
289
290 error:
291     state = S_CLEAR_FLOWS;
292 }
293 \f
294 /* S_CLEAR_FLOWS, after we've established a Geneve metadata field ID and it's
295  * time to set up some flows.
296  *
297  * Sends an OFPT_TABLE_MOD to clear all flows, then transitions to
298  * S_UPDATE_FLOWS. */
299
300 static void
301 run_S_CLEAR_FLOWS(void)
302 {
303     /* Send a flow_mod to delete all flows. */
304     struct ofputil_flow_mod fm = {
305         .match = MATCH_CATCHALL_INITIALIZER,
306         .table_id = OFPTT_ALL,
307         .command = OFPFC_DELETE,
308     };
309     queue_flow_mod(&fm);
310     VLOG_DBG("clearing all flows");
311
312     /* Clear installed_flows, to match the state of the switch. */
313     ovn_flow_table_clear(&installed_flows);
314
315     state = S_UPDATE_FLOWS;
316 }
317
318 static void
319 recv_S_CLEAR_FLOWS(const struct ofp_header *oh, enum ofptype type)
320 {
321     ofctrl_recv(oh, type);
322 }
323 \f
324 /* S_UPDATE_FLOWS, for maintaining the flow table over time.
325  *
326  * Compare the installed flows to the ones we want.  Send OFPT_FLOW_MOD as
327  * necessary.
328  *
329  * This is a terminal state.  We only transition out of it if the connection
330  * drops. */
331
332 static void
333 run_S_UPDATE_FLOWS(void)
334 {
335     /* Nothing to do here.
336      *
337      * Being in this state enables ofctrl_put() to work, however. */
338 }
339
340 static void
341 recv_S_UPDATE_FLOWS(const struct ofp_header *oh, enum ofptype type)
342 {
343     ofctrl_recv(oh, type);
344 }
345 \f
346 /* Runs the OpenFlow state machine against 'br_int', which is local to the
347  * hypervisor on which we are running.  Attempts to negotiate a Geneve option
348  * field for class OVN_GENEVE_CLASS, type OVN_GENEVE_TYPE.  If successful,
349  * returns the MFF_* field ID for the option, otherwise returns 0. */
350 enum mf_field_id
351 ofctrl_run(const struct ovsrec_bridge *br_int)
352 {
353     if (br_int) {
354         char *target;
355         target = xasprintf("unix:%s/%s.mgmt", ovs_rundir(), br_int->name);
356         if (strcmp(target, rconn_get_target(swconn))) {
357             VLOG_INFO("%s: connecting to switch", target);
358             rconn_connect(swconn, target, target);
359         }
360         free(target);
361     } else {
362         rconn_disconnect(swconn);
363     }
364
365     rconn_run(swconn);
366
367     if (!rconn_is_connected(swconn)) {
368         return 0;
369     }
370     if (seqno != rconn_get_connection_seqno(swconn)) {
371         seqno = rconn_get_connection_seqno(swconn);
372         state = S_NEW;
373     }
374
375     enum ofctrl_state old_state;
376     do {
377         old_state = state;
378         switch (state) {
379 #define STATE(NAME) case NAME: run_##NAME(); break;
380             STATES
381 #undef STATE
382         default:
383             OVS_NOT_REACHED();
384         }
385     } while (state != old_state);
386
387     for (int i = 0; state == old_state && i < 50; i++) {
388         struct ofpbuf *msg = rconn_recv(swconn);
389         if (!msg) {
390             break;
391         }
392
393         const struct ofp_header *oh = msg->data;
394         enum ofptype type;
395         enum ofperr error;
396
397         error = ofptype_decode(&type, oh);
398         if (!error) {
399             switch (state) {
400 #define STATE(NAME) case NAME: recv_##NAME(oh, type); break;
401                 STATES
402 #undef STATE
403             default:
404                 OVS_NOT_REACHED();
405             }
406         } else {
407             char *s = ofp_to_string(oh, ntohs(oh->length), 1);
408             VLOG_WARN("could not decode OpenFlow message (%s): %s",
409                       ofperr_to_string(error), s);
410             free(s);
411         }
412
413         ofpbuf_delete(msg);
414     }
415
416     return (state == S_CLEAR_FLOWS || state == S_UPDATE_FLOWS
417             ? mff_ovn_geneve : 0);
418 }
419
420 void
421 ofctrl_wait(void)
422 {
423     rconn_run_wait(swconn);
424     rconn_recv_wait(swconn);
425 }
426
427 void
428 ofctrl_destroy(void)
429 {
430     rconn_destroy(swconn);
431     ovn_flow_table_destroy(&installed_flows);
432     rconn_packet_counter_destroy(tx_counter);
433 }
434 \f
435 static ovs_be32
436 queue_msg(struct ofpbuf *msg)
437 {
438     const struct ofp_header *oh = msg->data;
439     ovs_be32 xid = oh->xid;
440     rconn_send(swconn, msg, tx_counter);
441     return xid;
442 }
443
444 static void
445 ofctrl_recv(const struct ofp_header *oh, enum ofptype type)
446 {
447     if (type == OFPTYPE_ECHO_REQUEST) {
448         queue_msg(make_echo_reply(oh));
449     } else if (type != OFPTYPE_ECHO_REPLY &&
450                type != OFPTYPE_BARRIER_REPLY &&
451                type != OFPTYPE_PACKET_IN &&
452                type != OFPTYPE_PORT_STATUS &&
453                type != OFPTYPE_FLOW_REMOVED) {
454         if (VLOG_IS_DBG_ENABLED()) {
455             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
456
457             char *s = ofp_to_string(oh, ntohs(oh->length), 2);
458             VLOG_DBG_RL(&rl, "OpenFlow packet ignored: %s", s);
459             free(s);
460         }
461     }
462 }
463 \f
464 /* Flow table interface to the rest of ovn-controller. */
465
466 /* Adds a flow to 'desired_flows' with the specified 'match' and 'actions' to
467  * the OpenFlow table numbered 'table_id' with the given 'priority'.  The
468  * caller retains ownership of 'match' and 'actions'.
469  *
470  * This just assembles the desired flow table in memory.  Nothing is actually
471  * sent to the switch until a later call to ofctrl_run().
472  *
473  * The caller should initialize its own hmap to hold the flows. */
474 void
475 ofctrl_add_flow(struct hmap *desired_flows,
476                 uint8_t table_id, uint16_t priority,
477                 const struct match *match, const struct ofpbuf *actions)
478 {
479     struct ovn_flow *f = xmalloc(sizeof *f);
480     f->table_id = table_id;
481     f->priority = priority;
482     f->match = *match;
483     f->ofpacts = xmemdup(actions->data, actions->size);
484     f->ofpacts_len = actions->size;
485     f->hmap_node.hash = ovn_flow_hash(f);
486
487     if (ovn_flow_lookup(desired_flows, f)) {
488         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
489         if (!VLOG_DROP_INFO(&rl)) {
490             char *s = ovn_flow_to_string(f);
491             VLOG_INFO("dropping duplicate flow: %s", s);
492             free(s);
493         }
494
495         ovn_flow_destroy(f);
496         return;
497     }
498
499     hmap_insert(desired_flows, &f->hmap_node, f->hmap_node.hash);
500 }
501 \f
502 /* ovn_flow. */
503
504 /* Returns a hash of the key in 'f'. */
505 static uint32_t
506 ovn_flow_hash(const struct ovn_flow *f)
507 {
508     return hash_2words((f->table_id << 16) | f->priority,
509                        match_hash(&f->match, 0));
510
511 }
512
513 /* Finds and returns an ovn_flow in 'flow_table' whose key is identical to
514  * 'target''s key, or NULL if there is none. */
515 static struct ovn_flow *
516 ovn_flow_lookup(struct hmap *flow_table, const struct ovn_flow *target)
517 {
518     struct ovn_flow *f;
519
520     HMAP_FOR_EACH_WITH_HASH (f, hmap_node, target->hmap_node.hash,
521                              flow_table) {
522         if (f->table_id == target->table_id
523             && f->priority == target->priority
524             && match_equal(&f->match, &target->match)) {
525             return f;
526         }
527     }
528     return NULL;
529 }
530
531 static char *
532 ovn_flow_to_string(const struct ovn_flow *f)
533 {
534     struct ds s = DS_EMPTY_INITIALIZER;
535     ds_put_format(&s, "table_id=%"PRIu8", ", f->table_id);
536     ds_put_format(&s, "priority=%"PRIu16", ", f->priority);
537     match_format(&f->match, &s, OFP_DEFAULT_PRIORITY);
538     ds_put_cstr(&s, ", actions=");
539     ofpacts_format(f->ofpacts, f->ofpacts_len, &s);
540     return ds_steal_cstr(&s);
541 }
542
543 static void
544 ovn_flow_log(const struct ovn_flow *f, const char *action)
545 {
546     if (VLOG_IS_DBG_ENABLED()) {
547         char *s = ovn_flow_to_string(f);
548         VLOG_DBG("%s flow: %s", action, s);
549         free(s);
550     }
551 }
552
553 static void
554 ovn_flow_destroy(struct ovn_flow *f)
555 {
556     if (f) {
557         free(f->ofpacts);
558         free(f);
559     }
560 }
561 \f
562 /* Flow tables of struct ovn_flow. */
563
564 static void
565 ovn_flow_table_clear(struct hmap *flow_table)
566 {
567     struct ovn_flow *f, *next;
568     HMAP_FOR_EACH_SAFE (f, next, hmap_node, flow_table) {
569         hmap_remove(flow_table, &f->hmap_node);
570         ovn_flow_destroy(f);
571     }
572 }
573
574 static void
575 ovn_flow_table_destroy(struct hmap *flow_table)
576 {
577     ovn_flow_table_clear(flow_table);
578     hmap_destroy(flow_table);
579 }
580 \f
581 /* Flow table update. */
582
583 static void
584 queue_flow_mod(struct ofputil_flow_mod *fm)
585 {
586     fm->buffer_id = UINT32_MAX;
587     fm->out_port = OFPP_ANY;
588     fm->out_group = OFPG_ANY;
589     queue_msg(ofputil_encode_flow_mod(fm, OFPUTIL_P_OF13_OXM));
590 }
591
592 /* Replaces the flow table on the switch, if possible, by the flows in
593  * 'flow_table', which should have been added with ofctrl_add_flow().
594  * Regardless of whether the flow table is updated, this deletes all of the
595  * flows from 'flow_table' and frees them.  (The hmap itself isn't
596  * destroyed.)
597  *
598  * This called be called be ofctrl_run() within the main loop. */
599 void
600 ofctrl_put(struct hmap *flow_table)
601 {
602     /* The flow table can be updated if the connection to the switch is up and
603      * in the correct state and not backlogged with existing flow_mods.  (Our
604      * criteria for being backlogged appear very conservative, but the socket
605      * between ovn-controller and OVS provides some buffering.)  Otherwise,
606      * discard the flows.  A solution to either of those problems will cause us
607      * to wake up and retry. */
608     if (state != S_UPDATE_FLOWS
609         || rconn_packet_counter_n_packets(tx_counter)) {
610         ovn_flow_table_clear(flow_table);
611         return;
612     }
613
614     /* Iterate through all of the installed flows.  If any of them are no
615      * longer desired, delete them; if any of them should have different
616      * actions, update them. */
617     struct ovn_flow *i, *next;
618     HMAP_FOR_EACH_SAFE (i, next, hmap_node, &installed_flows) {
619         struct ovn_flow *d = ovn_flow_lookup(flow_table, i);
620         if (!d) {
621             /* Installed flow is no longer desirable.  Delete it from the
622              * switch and from installed_flows. */
623             struct ofputil_flow_mod fm = {
624                 .match = i->match,
625                 .priority = i->priority,
626                 .table_id = i->table_id,
627                 .command = OFPFC_DELETE_STRICT,
628             };
629             queue_flow_mod(&fm);
630             ovn_flow_log(i, "removing");
631
632             hmap_remove(&installed_flows, &i->hmap_node);
633             ovn_flow_destroy(i);
634         } else {
635             if (!ofpacts_equal(i->ofpacts, i->ofpacts_len,
636                                d->ofpacts, d->ofpacts_len)) {
637                 /* Update actions in installed flow. */
638                 struct ofputil_flow_mod fm = {
639                     .match = i->match,
640                     .priority = i->priority,
641                     .table_id = i->table_id,
642                     .ofpacts = d->ofpacts,
643                     .ofpacts_len = d->ofpacts_len,
644                     .command = OFPFC_MODIFY_STRICT,
645                 };
646                 queue_flow_mod(&fm);
647                 ovn_flow_log(i, "updating");
648
649                 /* Replace 'i''s actions by 'd''s. */
650                 free(i->ofpacts);
651                 i->ofpacts = d->ofpacts;
652                 i->ofpacts_len = d->ofpacts_len;
653                 d->ofpacts = NULL;
654                 d->ofpacts_len = 0;
655             }
656
657             hmap_remove(flow_table, &d->hmap_node);
658             ovn_flow_destroy(d);
659         }
660     }
661
662     /* The previous loop removed from 'flow_table' all of the flows that are
663      * already installed.  Thus, any flows remaining in 'flow_table' need to
664      * be added to the flow table. */
665     struct ovn_flow *d;
666     HMAP_FOR_EACH_SAFE (d, next, hmap_node, flow_table) {
667         /* Send flow_mod to add flow. */
668         struct ofputil_flow_mod fm = {
669             .match = d->match,
670             .priority = d->priority,
671             .table_id = d->table_id,
672             .ofpacts = d->ofpacts,
673             .ofpacts_len = d->ofpacts_len,
674             .command = OFPFC_ADD,
675         };
676         queue_flow_mod(&fm);
677         ovn_flow_log(d, "adding");
678
679         /* Move 'd' from 'flow_table' to installed_flows. */
680         hmap_remove(flow_table, &d->hmap_node);
681         hmap_insert(&installed_flows, &d->hmap_node, d->hmap_node.hash);
682     }
683 }