hv_xen: Factorize and test disk configuration
[ganeti-github.git] / test / py / ganeti.hypervisor.hv_xen_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.hypervisor.hv_lxc"""
23
24 import string # pylint: disable=W0402
25 import unittest
26
27 from ganeti import constants
28 from ganeti import objects
29 from ganeti import hypervisor
30 from ganeti import utils
31 from ganeti import errors
32 from ganeti import compat
33
34 from ganeti.hypervisor import hv_xen
35
36 import testutils
37
38
39 class TestConsole(unittest.TestCase):
40 def test(self):
41 for cls in [hv_xen.XenPvmHypervisor, hv_xen.XenHvmHypervisor]:
42 instance = objects.Instance(name="xen.example.com",
43 primary_node="node24828")
44 cons = cls.GetInstanceConsole(instance, {}, {})
45 self.assertTrue(cons.Validate())
46 self.assertEqual(cons.kind, constants.CONS_SSH)
47 self.assertEqual(cons.host, instance.primary_node)
48 self.assertEqual(cons.command[-1], instance.name)
49
50
51 class TestCreateConfigCpus(unittest.TestCase):
52 def testEmpty(self):
53 for cpu_mask in [None, ""]:
54 self.assertEqual(hv_xen._CreateConfigCpus(cpu_mask),
55 "cpus = [ ]")
56
57 def testAll(self):
58 self.assertEqual(hv_xen._CreateConfigCpus(constants.CPU_PINNING_ALL),
59 None)
60
61 def testOne(self):
62 self.assertEqual(hv_xen._CreateConfigCpus("9"), "cpu = \"9\"")
63
64 def testMultiple(self):
65 self.assertEqual(hv_xen._CreateConfigCpus("0-2,4,5-5:3:all"),
66 ("cpus = [ \"0,1,2,4,5\", \"3\", \"%s\" ]" %
67 constants.CPU_PINNING_ALL_XEN))
68
69
70 class TestParseXmList(testutils.GanetiTestCase):
71 def test(self):
72 data = testutils.ReadTestData("xen-xm-list-4.0.1-dom0-only.txt")
73
74 # Exclude node
75 self.assertEqual(hv_xen._ParseXmList(data.splitlines(), False), [])
76
77 # Include node
78 result = hv_xen._ParseXmList(data.splitlines(), True)
79 self.assertEqual(len(result), 1)
80 self.assertEqual(len(result[0]), 6)
81
82 # Name
83 self.assertEqual(result[0][0], hv_xen._DOM0_NAME)
84
85 # ID
86 self.assertEqual(result[0][1], 0)
87
88 # Memory
89 self.assertEqual(result[0][2], 1023)
90
91 # VCPUs
92 self.assertEqual(result[0][3], 1)
93
94 # State
95 self.assertEqual(result[0][4], "r-----")
96
97 # Time
98 self.assertAlmostEqual(result[0][5], 121152.6)
99
100 def testWrongLineFormat(self):
101 tests = [
102 ["three fields only"],
103 ["name InvalidID 128 1 r----- 12345"],
104 ]
105
106 for lines in tests:
107 try:
108 hv_xen._ParseXmList(["Header would be here"] + lines, False)
109 except errors.HypervisorError, err:
110 self.assertTrue("Can't parse output of xm list" in str(err))
111 else:
112 self.fail("Exception was not raised")
113
114
115 class TestGetXmList(testutils.GanetiTestCase):
116 def _Fail(self):
117 return utils.RunResult(constants.EXIT_FAILURE, None,
118 "stdout", "stderr", None,
119 NotImplemented, NotImplemented)
120
121 def testTimeout(self):
122 fn = testutils.CallCounter(self._Fail)
123 try:
124 hv_xen._GetXmList(fn, False, _timeout=0.1)
125 except errors.HypervisorError, err:
126 self.assertTrue("timeout exceeded" in str(err))
127 else:
128 self.fail("Exception was not raised")
129
130 self.assertTrue(fn.Count() < 10,
131 msg="'xm list' was called too many times")
132
133 def _Success(self, stdout):
134 return utils.RunResult(constants.EXIT_SUCCESS, None, stdout, "", None,
135 NotImplemented, NotImplemented)
136
137 def testSuccess(self):
138 data = testutils.ReadTestData("xen-xm-list-4.0.1-four-instances.txt")
139
140 fn = testutils.CallCounter(compat.partial(self._Success, data))
141
142 result = hv_xen._GetXmList(fn, True, _timeout=0.1)
143
144 self.assertEqual(len(result), 4)
145
146 self.assertEqual(map(compat.fst, result), [
147 "Domain-0",
148 "server01.example.com",
149 "web3106215069.example.com",
150 "testinstance.example.com",
151 ])
152
153 self.assertEqual(fn.Count(), 1)
154
155
156 class TestParseNodeInfo(testutils.GanetiTestCase):
157 def testEmpty(self):
158 self.assertEqual(hv_xen._ParseNodeInfo(""), {})
159
160 def testUnknownInput(self):
161 data = "\n".join([
162 "foo bar",
163 "something else goes",
164 "here",
165 ])
166 self.assertEqual(hv_xen._ParseNodeInfo(data), {})
167
168 def testBasicInfo(self):
169 data = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
170 result = hv_xen._ParseNodeInfo(data)
171 self.assertEqual(result, {
172 "cpu_nodes": 1,
173 "cpu_sockets": 2,
174 "cpu_total": 4,
175 "hv_version": (4, 0),
176 "memory_free": 8004,
177 "memory_total": 16378,
178 })
179
180
181 class TestMergeInstanceInfo(testutils.GanetiTestCase):
182 def testEmpty(self):
183 self.assertEqual(hv_xen._MergeInstanceInfo({}, lambda _: []), {})
184
185 def _FakeXmList(self, include_node):
186 self.assertTrue(include_node)
187 return [
188 (hv_xen._DOM0_NAME, NotImplemented, 4096, 7, NotImplemented,
189 NotImplemented),
190 ("inst1.example.com", NotImplemented, 2048, 4, NotImplemented,
191 NotImplemented),
192 ]
193
194 def testMissingNodeInfo(self):
195 result = hv_xen._MergeInstanceInfo({}, self._FakeXmList)
196 self.assertEqual(result, {
197 "memory_dom0": 4096,
198 "dom0_cpus": 7,
199 })
200
201 def testWithNodeInfo(self):
202 info = testutils.ReadTestData("xen-xm-info-4.0.1.txt")
203 result = hv_xen._GetNodeInfo(info, self._FakeXmList)
204 self.assertEqual(result, {
205 "cpu_nodes": 1,
206 "cpu_sockets": 2,
207 "cpu_total": 4,
208 "dom0_cpus": 7,
209 "hv_version": (4, 0),
210 "memory_dom0": 4096,
211 "memory_free": 8004,
212 "memory_hv": 2230,
213 "memory_total": 16378,
214 })
215
216
217 class TestGetConfigFileDiskData(unittest.TestCase):
218 def testLetterCount(self):
219 self.assertEqual(len(hv_xen._DISK_LETTERS), 26)
220
221 def testNoDisks(self):
222 self.assertEqual(hv_xen._GetConfigFileDiskData([], "hd"), [])
223
224 def testManyDisks(self):
225 for offset in [0, 1, 10]:
226 disks = [(objects.Disk(dev_type=constants.LD_LV), "/tmp/disk/%s" % idx)
227 for idx in range(len(hv_xen._DISK_LETTERS) + offset)]
228
229 if offset == 0:
230 result = hv_xen._GetConfigFileDiskData(disks, "hd")
231 self.assertEqual(result, [
232 "'phy:/tmp/disk/%s,hd%s,r'" % (idx, string.ascii_lowercase[idx])
233 for idx in range(len(hv_xen._DISK_LETTERS) + offset)
234 ])
235 else:
236 try:
237 hv_xen._GetConfigFileDiskData(disks, "hd")
238 except errors.HypervisorError, err:
239 self.assertEqual(str(err), "Too many disks")
240 else:
241 self.fail("Exception was not raised")
242
243 def testTwoLvDisksWithMode(self):
244 disks = [
245 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDWR),
246 "/tmp/diskFirst"),
247 (objects.Disk(dev_type=constants.LD_LV, mode=constants.DISK_RDONLY),
248 "/tmp/diskLast"),
249 ]
250
251 result = hv_xen._GetConfigFileDiskData(disks, "hd")
252 self.assertEqual(result, [
253 "'phy:/tmp/diskFirst,hda,w'",
254 "'phy:/tmp/diskLast,hdb,r'",
255 ])
256
257 def testFileDisks(self):
258 disks = [
259 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
260 physical_id=[constants.FD_LOOP]),
261 "/tmp/diskFirst"),
262 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDONLY,
263 physical_id=[constants.FD_BLKTAP]),
264 "/tmp/diskTwo"),
265 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
266 physical_id=[constants.FD_LOOP]),
267 "/tmp/diskThree"),
268 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
269 physical_id=[constants.FD_BLKTAP]),
270 "/tmp/diskLast"),
271 ]
272
273 result = hv_xen._GetConfigFileDiskData(disks, "sd")
274 self.assertEqual(result, [
275 "'file:/tmp/diskFirst,sda,w'",
276 "'tap:aio:/tmp/diskTwo,sdb,r'",
277 "'file:/tmp/diskThree,sdc,w'",
278 "'tap:aio:/tmp/diskLast,sdd,w'",
279 ])
280
281 def testInvalidFileDisk(self):
282 disks = [
283 (objects.Disk(dev_type=constants.LD_FILE, mode=constants.DISK_RDWR,
284 physical_id=["#unknown#"]),
285 "/tmp/diskinvalid"),
286 ]
287
288 self.assertRaises(KeyError, hv_xen._GetConfigFileDiskData, disks, "sd")
289
290
291 if __name__ == "__main__":
292 testutils.GanetiTestProgram()