netdev-dpdk: fix mbuf leaks
[cascardo/ovs.git] / utilities / ovs-vlan-test.in
1 #! @PYTHON@
2 #
3 # Copyright (c) 2010 Nicira, Inc.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at:
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 import BaseHTTPServer
18 import getopt
19 import httplib
20 import os
21 import threading
22 import time
23 import signal #Causes keyboard interrupts to go to the main thread.
24 import socket
25 import sys
26
27 print_safe_lock = threading.Lock()
28 def print_safe(s):
29     print_safe_lock.acquire()
30     print(s)
31     print_safe_lock.release()
32
33 def start_thread(target, args):
34     t = threading.Thread(target=target, args=args)
35     t.setDaemon(True)
36     t.start()
37     return t
38
39 #Caller is responsible for catching socket.error exceptions.
40 def send_packet(key, length, dest_ip, dest_port):
41
42     length -= 20 + 8  #IP and UDP headers.
43
44     packet = str(key)
45     packet += chr(0) * (length - len(packet))
46
47     sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
48     sock.sendto(packet, (dest_ip, dest_port))
49     sock.close()
50 \f
51 #UDP Receiver
52 class UDPReceiver:
53     def __init__(self, vlan_ip, vlan_port):
54         self.vlan_ip        = vlan_ip
55         self.vlan_port      = vlan_port
56         self.recv_callbacks = {}
57         self.udp_run        = False
58
59     def recv_packet(self, key, success_callback, timeout_callback):
60
61         event = threading.Event()
62
63         def timeout_cb():
64             timeout_callback()
65             event.set()
66
67         timer = threading.Timer(30, timeout_cb)
68         timer.daemon = True
69
70         def success_cb():
71             timer.cancel()
72             success_callback()
73             event.set()
74
75         # Start the timer first to avoid a timer.cancel() race condition.
76         timer.start()
77         self.recv_callbacks[key] = success_cb
78         return event
79
80     def udp_receiver(self):
81
82         sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
83         sock.settimeout(1)
84
85         try:
86             sock.bind((self.vlan_ip, self.vlan_port))
87         except socket.error, e:
88             print_safe('Failed to bind to %s:%d with error: %s'
89                     % (self.vlan_ip, self.vlan_port, e))
90             os._exit(1) #sys.exit only exits the current thread.
91
92         while self.udp_run:
93
94             try:
95                 data, _ = sock.recvfrom(4096)
96             except socket.timeout:
97                 continue
98             except socket.error, e:
99                 print_safe('Failed to receive from %s:%d with error: %s'
100                     % (self.vlan_ip, self.vlan_port, e))
101                 os._exit(1)
102
103             data_str = data.split(chr(0))[0]
104
105             if not data_str.isdigit():
106                 continue
107
108             key = int(data_str)
109
110             if key in self.recv_callbacks:
111                 self.recv_callbacks[key]()
112                 del self.recv_callbacks[key]
113
114     def start(self):
115         self.udp_run = True
116         start_thread(self.udp_receiver, ())
117
118     def stop(self):
119         self.udp_run = False
120 \f
121 #Server
122 vlan_server = None
123 class VlanServer:
124
125     def __init__(self, server_ip, server_port, vlan_ip, vlan_port):
126         global vlan_server
127
128         vlan_server = self
129
130         self.server_ip   = server_ip
131         self.server_port = server_port
132
133         self.recv_response = '%s:%d:' % (vlan_ip, vlan_port)
134
135         self.result      = {}
136         self.result_lock = threading.Lock()
137
138         self._test_id      = 0
139         self._test_id_lock = threading.Lock()
140
141         self.udp_recv = UDPReceiver(vlan_ip, vlan_port)
142
143     def get_test_id(self):
144         self._test_id_lock.acquire()
145
146         self._test_id += 1
147         ret = self._test_id
148
149         self._test_id_lock.release()
150         return ret
151
152     def set_result(self, key, value):
153
154         self.result_lock.acquire()
155
156         if key not in self.result:
157             self.result[key] = value
158
159         self.result_lock.release()
160
161     def recv(self, test_id):
162         self.udp_recv.recv_packet(test_id,
163                 lambda : self.set_result(test_id, 'Success'),
164                 lambda : self.set_result(test_id, 'Timeout'))
165
166         return self.recv_response + str(test_id)
167
168     def send(self, test_id, data):
169         try:
170             ip, port, size = data.split(':')
171             port = int(port)
172             size = int(size)
173         except ValueError:
174             self.set_result(test_id,
175                     'Server failed to parse send request: %s' % data)
176             return
177
178         def send_thread():
179             send_time = 10
180             for _ in range(send_time * 2):
181                 try:
182                     send_packet(test_id, size, ip, port)
183                 except socket.error, e:
184                     self.set_result(test_id, 'Failure: ' + str(e))
185                     return
186                 time.sleep(.5)
187
188             self.set_result(test_id, 'Success')
189
190         start_thread(send_thread, ())
191
192         return str(test_id)
193
194     def run(self):
195         self.udp_recv.start()
196         try:
197             BaseHTTPServer.HTTPServer((self.server_ip, self.server_port),
198                     VlanServerHandler).serve_forever()
199         except socket.error, e:
200             print_safe('Failed to start control server: %s' % e)
201             self.udp_recv.stop()
202
203         return 1
204
205 class VlanServerHandler(BaseHTTPServer.BaseHTTPRequestHandler):
206     def do_GET(self):
207
208         #Guarantee three arguments.
209         path = (self.path.lower().lstrip('/') + '//').split('/')
210
211         resp = 404
212         body = None
213
214         if path[0] == 'start':
215             test_id = vlan_server.get_test_id()
216
217             if path[1] == 'recv':
218                 resp = 200
219                 body = vlan_server.recv(test_id)
220             elif path[1] == 'send':
221                 resp = 200
222                 body = vlan_server.send(test_id, path[2])
223         elif (path[0] == 'result'
224                 and path[1].isdigit()
225                 and int(path[1]) in vlan_server.result):
226             resp = 200
227             body = vlan_server.result[int(path[1])]
228         elif path[0] == 'ping':
229             resp = 200
230             body = 'pong'
231
232         self.send_response(resp)
233         self.end_headers()
234
235         if body:
236             self.wfile.write(body)
237 \f
238 #Client
239 class VlanClient:
240
241     def __init__(self, server_ip, server_port, vlan_ip, vlan_port):
242         self.server_ip_port = '%s:%d' % (server_ip, server_port)
243         self.vlan_ip_port   = "%s:%d" % (vlan_ip, vlan_port)
244         self.udp_recv       = UDPReceiver(vlan_ip, vlan_port)
245
246     def request(self, resource):
247         conn = httplib.HTTPConnection(self.server_ip_port)
248         conn.request('GET', resource)
249         return conn
250
251     def send(self, size):
252
253         def error_msg(e):
254             print_safe('Send size %d unsuccessful: %s' % (size, e))
255
256         try:
257             conn = self.request('/start/recv')
258             data = conn.getresponse().read()
259         except (socket.error, httplib.HTTPException), e:
260             error_msg(e)
261             return False
262
263         try:
264             ip, port, test_id = data.split(':')
265             port    = int(port)
266             test_id = int(test_id)
267         except ValueError:
268             error_msg("Received invalid response from control server (%s)" %
269                     data)
270             return False
271
272         send_time = 5
273
274         for _ in range(send_time * 4):
275
276             try:
277                 send_packet(test_id, size, ip, port)
278                 resp = self.request('/result/%d' % test_id).getresponse()
279                 data = resp.read()
280             except (socket.error, httplib.HTTPException), e:
281                 error_msg(e)
282                 return False
283
284             if resp.status == 200 and data == 'Success':
285                 print_safe('Send size %d successful' % size)
286                 return True
287             elif resp.status == 200:
288                 error_msg(data)
289                 return False
290
291             time.sleep(.25)
292
293         error_msg('Timeout')
294         return False
295
296     def recv(self, size):
297
298         def error_msg(e):
299             print_safe('Receive size %d unsuccessful: %s' % (size, e))
300
301         resource = '/start/send/%s:%d' % (self.vlan_ip_port, size)
302         try:
303             conn    = self.request(resource)
304             test_id = conn.getresponse().read()
305         except (socket.error, httplib.HTTPException), e:
306             error_msg(e)
307             return False
308
309         if not test_id.isdigit():
310             error_msg('Invalid response %s' % test_id)
311             return False
312
313         success = [False] #Primitive datatypes can't be set from closures.
314
315         def success_cb():
316             success[0] = True
317
318         def failure_cb():
319             success[0] = False
320
321         self.udp_recv.recv_packet(int(test_id), success_cb, failure_cb).wait()
322
323         if success[0]:
324             print_safe('Receive size %d successful' % size)
325         else:
326             error_msg('Timeout')
327
328         return success[0]
329
330     def server_up(self):
331
332         def error_msg(e):
333             print_safe('Failed control server connectivity test: %s' % e)
334
335         try:
336             resp = self.request('/ping').getresponse()
337             data = resp.read()
338         except (socket.error, httplib.HTTPException), e:
339             error_msg(e)
340             return False
341
342         if resp.status != 200:
343             error_msg('Invalid status %d' % resp.status)
344         elif data != 'pong':
345             error_msg('Invalid response %s' % data)
346
347         return True
348
349     def run(self):
350
351         if not self.server_up():
352             return 1
353
354         self.udp_recv.start()
355
356         success = True
357         for size in [50, 500, 1000, 1500]:
358             success = self.send(size) and success
359             success = self.recv(size) and success
360
361         self.udp_recv.stop()
362
363         if success:
364             print_safe('OK')
365             return 0
366         else:
367             print_safe('FAILED')
368             return 1
369 \f
370 def usage():
371     print_safe("""\
372 %(argv0)s: Test vlan connectivity
373 usage: %(argv0)s server vlan
374
375 The following options are also available:
376   -s, --server                run in server mode
377   -h, --help                  display this help message
378   -V, --version               display version information\
379 """ % {'argv0': sys.argv[0]})
380
381 def main():
382
383     try:
384         options, args = getopt.gnu_getopt(sys.argv[1:], 'hVs',
385                                           ['help', 'version', 'server'])
386     except getopt.GetoptError, geo:
387         print_safe('%s: %s\n' % (sys.argv[0], geo.msg))
388         return 1
389
390     server = False
391     for key, _ in options:
392         if key in ['-h', '--help']:
393             usage()
394             return 0
395         elif key in ['-V', '--version']:
396             print_safe('ovs-vlan-test (Open vSwitch) @VERSION@')
397             return 0
398         elif key in ['-s', '--server']:
399             server = True
400         else:
401             print_safe('Unexpected option %s. (use --help for help)' % key)
402             return 1
403
404     if len(args) != 2:
405         print_safe('Expecting two arguments. (use --help for help)')
406         return 1
407
408     try:
409         server_ip, server_port = args[0].split(':')
410         server_port = int(server_port)
411     except ValueError:
412         server_ip = args[0]
413         server_port = 80
414
415     try:
416         vlan_ip, vlan_port = args[1].split(':')
417         vlan_port = int(vlan_port)
418     except ValueError:
419         vlan_ip   = args[1]
420         vlan_port = 15213
421
422     if server:
423         return VlanServer(server_ip, server_port, vlan_ip, vlan_port).run()
424     else:
425         return VlanClient(server_ip, server_port, vlan_ip, vlan_port).run()
426
427 if __name__ == '__main__':
428     main_ret = main()
429
430     # Python can throw exceptions if threads are running at exit.
431     for th in threading.enumerate():
432         if th != threading.currentThread():
433             th.join()
434
435     sys.exit(main_ret)