Establish base for testing Xen hypervisor abstraction
[ganeti-github.git] / test / py / ganeti.hypervisor.hv_xen_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.hypervisor.hv_lxc"""
23
24 import string # pylint: disable=W0402
25 import unittest
26 import tempfile
27 import shutil
28 import random
29
30 from ganeti import constants
31 from ganeti import objects
32 from ganeti import hypervisor
33 from ganeti import utils
34 from ganeti import errors
35 from ganeti import compat
36
37 from ganeti.hypervisor import hv_xen
38
39 import testutils
40
41
42 # Map from hypervisor class to hypervisor name
43 HVCLASS_TO_HVNAME = utils.InvertDict(hypervisor._HYPERVISOR_MAP)
44
45
46 class TestConsole(unittest.TestCase):
47 def test(self):
48 for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
49 instance = objects.Instance(name="xen.example.com",
50 primary_node="node24828")
51 cons = cls.GetInstanceConsole(instance, {}, {})
52 self.assertTrue(cons.Validate())
53 self.assertEqual(cons.kind, constants.CONS_SSH)
54 self.assertEqual(cons.host, instance.primary_node)
55 self.assertEqual(cons.command[-1], instance.name)
56
57
58 class TestCreateConfigCpus(unittest.TestCase):
59 def testEmpty(self):
60 for cpu_mask in [None, ""]:
61 self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
62 "cpus = [ ]")
63
64 def testAll(self):
65 self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
66 None)
67
68 def testOne(self):
69 self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
70
71 def testMultiple(self):
72 self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
73 ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
74 constants.CPU_PINNING_ALL_XEN))
75
76
77 class TestParseXmList(testutils.GanetiTestCase):
78 def test(self):
79 data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
80
81 # Exclude node
82 self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
83
84 # Include node
85 result = hv_xen._ParseXmList(data.splitlines(), True)
86 self.assertEqual(len(result), 1)
87 self.assertEqual(len(result[0]), 6)
88
89 # Name
90 self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
91
92 # ID
93 self.assertEqual(result[0][1], 0)
94
95 # Memory
96 self.assertEqual(result[0][2], 1023)
97
98 # VCPUs
99 self.assertEqual(result[0][3], 1)
100
101 # State
102 self.assertEqual(result[0][4], "r-----")
103
104 # Time
105 self.assertAlmostEqual(result[0][5], 121152.6)
106
107 def testWrongLineFormat(self):
108 tests = [
109 ["three fields only"],
110 ["name InvalidID 128 1 r----- 12345"],
111 ]
112
113 for lines in tests:
114 try:
115 hv_xen._ParseXmList(["Header would be here"] + lines, False)
116 except errors.HypervisorError, err:
117 self.assertTrue("Can't parse output of xm list" in str(err))
118 else:
119 self.fail("Exception was not raised")
120
121
122 class TestGetXmList(testutils.GanetiTestCase):
123 def _Fail(self):
124 return utils.RunResult(constants.EXIT_FAILURE, None,
125 "stdout", "stderr", None,
126 NotImplemented, NotImplemented)
127
128 def testTimeout(self):
129 fn = testutils.CallCounter(self._Fail)
130 try:
131 hv_xen._GetXmList(fn, False, _timeout=0.1)
132 except errors.HypervisorError, err:
133 self.assertTrue("timeout exceeded" in str(err))
134 else:
135 self.fail("Exception was not raised")
136
137 self.assertTrue(fn.Count() < 10,
138 msg="'xm list' was called too many times")
139
140 def _Success(self, stdout):
141 return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
142 NotImplemented, NotImplemented)
143
144 def testSuccess(self):
145 data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
146
147 fn = testutils.CallCounter(compat.partial(self._Success, data))
148
149 result = hv_xen._GetXmList(fn, True, _timeout=0.1)
150
151 self.assertEqual(len(result), 4)
152
153 self.assertEqual(map(compat.fst, result), [
154 "Domain-0",
155 "server01.example.com",
156 "web3106215069.example.com",
157 "testinstance.example.com",
158 ])
159
160 self.assertEqual(fn.Count(), 1)
161
162
163 class TestParseNodeInfo(testutils.GanetiTestCase):
164 def testEmpty(self):
165 self.assertEqual(hv_xen._ParseNodeInfo(""), {})
166
167 def testUnknownInput(self):
168 data = "\n".join([
169 "foo bar",
170 "something else goes",
171 "here",
172 ])
173 self.assertEqual(hv_xen._ParseNodeInfo(data), {})
174
175 def testBasicInfo(self):
176 data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
177 result = hv_xen._ParseNodeInfo(data)
178 self.assertEqual(result, {
179 "cpu_nodes": 1,
180 "cpu_sockets": 2,
181 "cpu_total": 4,
182 "hv_version": (4, 0),
183 "memory_free": 8004,
184 "memory_total": 16378,
185 })
186
187
188 class TestMergeInstanceInfo(testutils.GanetiTestCase):
189 def testEmpty(self):
190 self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
191
192 def _FakeXmList(self, include_node):
193 self.assertTrue(include_node)
194 return [
195 (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
196 NotImplemented),
197 ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
198 NotImplemented),
199 ]
200
201 def testMissingNodeInfo(self):
202 result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
203 self.assertEqual(result, {
204 "memory_dom0": 4096,
205 "dom0_cpus": 7,
206 })
207
208 def testWithNodeInfo(self):
209 info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
210 result = hv_xen._GetNodeInfo(info, self._FakeXmList)
211 self.assertEqual(result, {
212 "cpu_nodes": 1,
213 "cpu_sockets": 2,
214 "cpu_total": 4,
215 "dom0_cpus": 7,
216 "hv_version": (4, 0),
217 "memory_dom0": 4096,
218 "memory_free": 8004,
219 "memory_hv": 2230,
220 "memory_total": 16378,
221 })
222
223
224 class TestGetConfigFileDiskData(unittest.TestCase):
225 def testLetterCount(self):
226 self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
227
228 def testNoDisks(self):
229 self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
230
231 def testManyDisks(self):
232 for offset in [0, 1, 10]:
233 disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
234 for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
235
236 if offset == 0:
237 result = hv_xen._GetConfigFileDiskData(disks, "hd")
238 self.assertEqual(result, [
239 "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
240 for idx in range(len(hv_xen._DISK_LETTERS) + offset)
241 ])
242 else:
243 try:
244 hv_xen._GetConfigFileDiskData(disks, "hd")
245 except errors.HypervisorError, err:
246 self.assertEqual(str(err), "Too many disks")
247 else:
248 self.fail("Exception was not raised")
249
250 def testTwoLvDisksWithMode(self):
251 disks = [
252 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
253 "/tmp/diskFirst"),
254 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
255 "/tmp/diskLast"),
256 ]
257
258 result = hv_xen._GetConfigFileDiskData(disks, "hd")
259 self.assertEqual(result, [
260 "'phy:/tmp/diskFirst,hda,w'",
261 "'phy:/tmp/diskLast,hdb,r'",
262 ])
263
264 def testFileDisks(self):
265 disks = [
266 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
267 physical_id=[constants.FD_LOOP]),
268 "/tmp/diskFirst"),
269 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
270 physical_id=[constants.FD_BLKTAP]),
271 "/tmp/diskTwo"),
272 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
273 physical_id=[constants.FD_LOOP]),
274 "/tmp/diskThree"),
275 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
276 physical_id=[constants.FD_BLKTAP]),
277 "/tmp/diskLast"),
278 ]
279
280 result = hv_xen._GetConfigFileDiskData(disks, "sd")
281 self.assertEqual(result, [
282 "'file:/tmp/diskFirst,sda,w'",
283 "'tap:aio:/tmp/diskTwo,sdb,r'",
284 "'file:/tmp/diskThree,sdc,w'",
285 "'tap:aio:/tmp/diskLast,sdd,w'",
286 ])
287
288 def testInvalidFileDisk(self):
289 disks = [
290 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
291 physical_id=["#unknown#"]),
292 "/tmp/diskinvalid"),
293 ]
294
295 self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
296
297
298 class TestXenHypervisorUnknownCommand(unittest.TestCase):
299 def test(self):
300 cmd = "#unknown command#"
301 self.assertFalse(cmd in constants.KNOWN_XEN_COMMANDS)
302 hv = hv_xen.XenHypervisor(_cfgdir=NotImplemented,
303 _run_cmd_fn=NotImplemented,
304 _cmd=cmd)
305 self.assertRaises(errors.ProgrammerError, hv._RunXen, [])
306
307
308 class TestXenHypervisorWriteConfigFile(unittest.TestCase):
309 def setUp(self):
310 self.tmpdir = tempfile.mkdtemp()
311
312 def tearDown(self):
313 shutil.rmtree(self.tmpdir)
314
315 def testWriteError(self):
316 cfgdir = utils.PathJoin(self.tmpdir, "foobar")
317
318 hv = hv_xen.XenHypervisor(_cfgdir=cfgdir,
319 _run_cmd_fn=NotImplemented,
320 _cmd=NotImplemented)
321
322 self.assertFalse(os.path.exists(cfgdir))
323
324 try:
325 hv._WriteConfigFile("name", "data")
326 except errors.HypervisorError, err:
327 self.assertTrue(str(err).startswith("Cannot write Xen instance"))
328 else:
329 self.fail("Exception was not raised")
330
331
332 class _TestXenHypervisor(object):
333 TARGET = NotImplemented
334 CMD = NotImplemented
335 HVNAME = NotImplemented
336
337 def setUp(self):
338 super(_TestXenHypervisor, self).setUp()
339
340 self.tmpdir = tempfile.mkdtemp()
341
342 self.vncpw = "".join(random.sample(string.ascii_letters, 10))
343
344 self.vncpw_path = utils.PathJoin(self.tmpdir, "vncpw")
345 utils.WriteFile(self.vncpw_path, data=self.vncpw)
346
347 def tearDown(self):
348 super(_TestXenHypervisor, self).tearDown()
349
350 shutil.rmtree(self.tmpdir)
351
352 def _GetHv(self, run_cmd=NotImplemented):
353 return self.TARGET(_cfgdir=self.tmpdir, _run_cmd_fn=run_cmd, _cmd=self.CMD)
354
355 def _SuccessCommand(self, stdout, cmd):
356 self.assertEqual(cmd[0], self.CMD)
357
358 return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
359 NotImplemented, NotImplemented)
360
361 def _FailingCommand(self, cmd):
362 self.assertEqual(cmd[0], self.CMD)
363
364 return utils.RunResult(constants.EXIT_FAILURE, None,
365 "", "This command failed", None,
366 NotImplemented, NotImplemented)
367
368
369 def _MakeTestClass(cls, cmd):
370 """Makes a class for testing.
371
372 The returned class has structure as shown in the following pseudo code:
373
374 class Test{cls.__name__}{cmd}(_TestXenHypervisor, unittest.TestCase):
375 TARGET = {cls}
376 CMD = {cmd}
377 HVNAME = {Hypervisor name retrieved using class}
378
379 @type cls: class
380 @param cls: Hypervisor class to be tested
381 @type cmd: string
382 @param cmd: Hypervisor command
383 @rtype: tuple
384 @return: Class name and class object (not instance)
385
386 """
387 name = "Test%sCmd%s" % (cls.__name__, cmd.title())
388 bases = (_TestXenHypervisor, unittest.TestCase)
389 hvname = HVCLASS_TO_HVNAME[cls]
390
391 return (name, type(name, bases, dict(TARGET=cls, CMD=cmd, HVNAME=hvname)))
392
393
394 # Create test classes programmatically instead of manually to reduce the risk
395 # of forgetting some combinations
396 for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
397 for cmd in constants.KNOWN_XEN_COMMANDS:
398 (name, testcls) = _MakeTestClass(cls, cmd)
399
400 assert name not in locals()
401
402 locals()[name] = testcls
403
404
405 if __name__ == "__main__":
406 testutils.GanetiTestProgram()