hv_xen: Test errors while writing config file
[ganeti-github.git] / test / py / ganeti.hypervisor.hv_xen_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.hypervisor.hv_lxc"""
23
24 import string # pylint: disable=W0402
25 import unittest
26 import tempfile
27 import shutil
28
29 from ganeti import constants
30 from ganeti import objects
31 from ganeti import hypervisor
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import compat
35
36 from ganeti.hypervisor import hv_xen
37
38 import testutils
39
40
41 class TestConsole(unittest.TestCase):
42 def test(self):
43 for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
44 instance = objects.Instance(name="xen.example.com",
45 primary_node="node24828")
46 cons = cls.GetInstanceConsole(instance, {}, {})
47 self.assertTrue(cons.Validate())
48 self.assertEqual(cons.kind, constants.CONS_SSH)
49 self.assertEqual(cons.host, instance.primary_node)
50 self.assertEqual(cons.command[-1], instance.name)
51
52
53 class TestCreateConfigCpus(unittest.TestCase):
54 def testEmpty(self):
55 for cpu_mask in [None, ""]:
56 self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
57 "cpus = [ ]")
58
59 def testAll(self):
60 self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
61 None)
62
63 def testOne(self):
64 self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
65
66 def testMultiple(self):
67 self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
68 ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
69 constants.CPU_PINNING_ALL_XEN))
70
71
72 class TestParseXmList(testutils.GanetiTestCase):
73 def test(self):
74 data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
75
76 # Exclude node
77 self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
78
79 # Include node
80 result = hv_xen._ParseXmList(data.splitlines(), True)
81 self.assertEqual(len(result), 1)
82 self.assertEqual(len(result[0]), 6)
83
84 # Name
85 self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
86
87 # ID
88 self.assertEqual(result[0][1], 0)
89
90 # Memory
91 self.assertEqual(result[0][2], 1023)
92
93 # VCPUs
94 self.assertEqual(result[0][3], 1)
95
96 # State
97 self.assertEqual(result[0][4], "r-----")
98
99 # Time
100 self.assertAlmostEqual(result[0][5], 121152.6)
101
102 def testWrongLineFormat(self):
103 tests = [
104 ["three fields only"],
105 ["name InvalidID 128 1 r----- 12345"],
106 ]
107
108 for lines in tests:
109 try:
110 hv_xen._ParseXmList(["Header would be here"] + lines, False)
111 except errors.HypervisorError, err:
112 self.assertTrue("Can't parse output of xm list" in str(err))
113 else:
114 self.fail("Exception was not raised")
115
116
117 class TestGetXmList(testutils.GanetiTestCase):
118 def _Fail(self):
119 return utils.RunResult(constants.EXIT_FAILURE, None,
120 "stdout", "stderr", None,
121 NotImplemented, NotImplemented)
122
123 def testTimeout(self):
124 fn = testutils.CallCounter(self._Fail)
125 try:
126 hv_xen._GetXmList(fn, False, _timeout=0.1)
127 except errors.HypervisorError, err:
128 self.assertTrue("timeout exceeded" in str(err))
129 else:
130 self.fail("Exception was not raised")
131
132 self.assertTrue(fn.Count() < 10,
133 msg="'xm list' was called too many times")
134
135 def _Success(self, stdout):
136 return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
137 NotImplemented, NotImplemented)
138
139 def testSuccess(self):
140 data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
141
142 fn = testutils.CallCounter(compat.partial(self._Success, data))
143
144 result = hv_xen._GetXmList(fn, True, _timeout=0.1)
145
146 self.assertEqual(len(result), 4)
147
148 self.assertEqual(map(compat.fst, result), [
149 "Domain-0",
150 "server01.example.com",
151 "web3106215069.example.com",
152 "testinstance.example.com",
153 ])
154
155 self.assertEqual(fn.Count(), 1)
156
157
158 class TestParseNodeInfo(testutils.GanetiTestCase):
159 def testEmpty(self):
160 self.assertEqual(hv_xen._ParseNodeInfo(""), {})
161
162 def testUnknownInput(self):
163 data = "\n".join([
164 "foo bar",
165 "something else goes",
166 "here",
167 ])
168 self.assertEqual(hv_xen._ParseNodeInfo(data), {})
169
170 def testBasicInfo(self):
171 data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
172 result = hv_xen._ParseNodeInfo(data)
173 self.assertEqual(result, {
174 "cpu_nodes": 1,
175 "cpu_sockets": 2,
176 "cpu_total": 4,
177 "hv_version": (4, 0),
178 "memory_free": 8004,
179 "memory_total": 16378,
180 })
181
182
183 class TestMergeInstanceInfo(testutils.GanetiTestCase):
184 def testEmpty(self):
185 self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
186
187 def _FakeXmList(self, include_node):
188 self.assertTrue(include_node)
189 return [
190 (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
191 NotImplemented),
192 ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
193 NotImplemented),
194 ]
195
196 def testMissingNodeInfo(self):
197 result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
198 self.assertEqual(result, {
199 "memory_dom0": 4096,
200 "dom0_cpus": 7,
201 })
202
203 def testWithNodeInfo(self):
204 info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
205 result = hv_xen._GetNodeInfo(info, self._FakeXmList)
206 self.assertEqual(result, {
207 "cpu_nodes": 1,
208 "cpu_sockets": 2,
209 "cpu_total": 4,
210 "dom0_cpus": 7,
211 "hv_version": (4, 0),
212 "memory_dom0": 4096,
213 "memory_free": 8004,
214 "memory_hv": 2230,
215 "memory_total": 16378,
216 })
217
218
219 class TestGetConfigFileDiskData(unittest.TestCase):
220 def testLetterCount(self):
221 self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
222
223 def testNoDisks(self):
224 self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
225
226 def testManyDisks(self):
227 for offset in [0, 1, 10]:
228 disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
229 for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
230
231 if offset == 0:
232 result = hv_xen._GetConfigFileDiskData(disks, "hd")
233 self.assertEqual(result, [
234 "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
235 for idx in range(len(hv_xen._DISK_LETTERS) + offset)
236 ])
237 else:
238 try:
239 hv_xen._GetConfigFileDiskData(disks, "hd")
240 except errors.HypervisorError, err:
241 self.assertEqual(str(err), "Too many disks")
242 else:
243 self.fail("Exception was not raised")
244
245 def testTwoLvDisksWithMode(self):
246 disks = [
247 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
248 "/tmp/diskFirst"),
249 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
250 "/tmp/diskLast"),
251 ]
252
253 result = hv_xen._GetConfigFileDiskData(disks, "hd")
254 self.assertEqual(result, [
255 "'phy:/tmp/diskFirst,hda,w'",
256 "'phy:/tmp/diskLast,hdb,r'",
257 ])
258
259 def testFileDisks(self):
260 disks = [
261 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
262 physical_id=[constants.FD_LOOP]),
263 "/tmp/diskFirst"),
264 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
265 physical_id=[constants.FD_BLKTAP]),
266 "/tmp/diskTwo"),
267 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
268 physical_id=[constants.FD_LOOP]),
269 "/tmp/diskThree"),
270 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
271 physical_id=[constants.FD_BLKTAP]),
272 "/tmp/diskLast"),
273 ]
274
275 result = hv_xen._GetConfigFileDiskData(disks, "sd")
276 self.assertEqual(result, [
277 "'file:/tmp/diskFirst,sda,w'",
278 "'tap:aio:/tmp/diskTwo,sdb,r'",
279 "'file:/tmp/diskThree,sdc,w'",
280 "'tap:aio:/tmp/diskLast,sdd,w'",
281 ])
282
283 def testInvalidFileDisk(self):
284 disks = [
285 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
286 physical_id=["#unknown#"]),
287 "/tmp/diskinvalid"),
288 ]
289
290 self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
291
292
293 class TestXenHypervisorUnknownCommand(unittest.TestCase):
294 def test(self):
295 cmd = "#unknown command#"
296 self.assertFalse(cmd in constants.KNOWN_XEN_COMMANDS)
297 hv = hv_xen.XenHypervisor(_cfgdir=NotImplemented,
298 _run_cmd_fn=NotImplemented,
299 _cmd=cmd)
300 self.assertRaises(errors.ProgrammerError, hv._RunXen, [])
301
302
303 class TestXenHypervisorWriteConfigFile(unittest.TestCase):
304 def setUp(self):
305 self.tmpdir = tempfile.mkdtemp()
306
307 def tearDown(self):
308 shutil.rmtree(self.tmpdir)
309
310 def testWriteError(self):
311 cfgdir = utils.PathJoin(self.tmpdir, "foobar")
312
313 hv = hv_xen.XenHypervisor(_cfgdir=cfgdir,
314 _run_cmd_fn=NotImplemented,
315 _cmd=NotImplemented)
316
317 self.assertFalse(os.path.exists(cfgdir))
318
319 try:
320 hv._WriteConfigFile("name", "data")
321 except errors.HypervisorError, err:
322 self.assertTrue(str(err).startswith("Cannot write Xen instance"))
323 else:
324 self.fail("Exception was not raised")
325
326
327 if __name__ == "__main__":
328 testutils.GanetiTestProgram()