Use reserved documentation IPs and domains
[ganeti-github.git] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import distutils.version
25 import errno
26 import fcntl
27 import glob
28 import os
29 import os.path
30 import re
31 import shutil
32 import signal
33 import socket
34 import stat
35 import string
36 import tempfile
37 import time
38 import unittest
39 import warnings
40 import OpenSSL
41
42 import testutils
43 from ganeti import constants
44 from ganeti import compat
45 from ganeti import utils
46 from ganeti import errors
47 from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
48 ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
49 TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
50 ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
51
52
53 class TestIsProcessAlive(unittest.TestCase):
54 """Testing case for IsProcessAlive"""
55
56 def testExists(self):
57 mypid = os.getpid()
58 self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
59
60 def testNotExisting(self):
61 pid_non_existing = os.fork()
62 if pid_non_existing == 0:
63 os._exit(0)
64 elif pid_non_existing < 0:
65 raise SystemError("can't fork")
66 os.waitpid(pid_non_existing, 0)
67 self.assertFalse(utils.IsProcessAlive(pid_non_existing),
68 "nonexisting process detected")
69
70
71 class TestGetProcStatusPath(unittest.TestCase):
72 def test(self):
73 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
74 self.assertNotEqual(utils._GetProcStatusPath(1),
75 utils._GetProcStatusPath(2))
76
77
78 class TestIsProcessHandlingSignal(unittest.TestCase):
79 def setUp(self):
80 self.tmpdir = tempfile.mkdtemp()
81
82 def tearDown(self):
83 shutil.rmtree(self.tmpdir)
84
85 def testParseSigsetT(self):
86 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
87 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
88 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
89 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
90 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
91 set([2, 10, 32, 33]))
92 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
93 set([2, 32, 33]))
94 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
95 set([2, 28, 32, 33]))
96 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
97 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
98 24, 25, 26, 28, 31]))
99 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
100
101 def testGetProcStatusField(self):
102 for field in ["SigCgt", "Name", "FDSize"]:
103 for value in ["", "0", "cat", " 1234 KB"]:
104 pstatus = "\n".join([
105 "VmPeak: 999 kB",
106 "%s: %s" % (field, value),
107 "TracerPid: 0",
108 ])
109 result = utils._GetProcStatusField(pstatus, field)
110 self.assertEqual(result, value.strip())
111
112 def test(self):
113 sp = PathJoin(self.tmpdir, "status")
114
115 utils.WriteFile(sp, data="\n".join([
116 "Name: bash",
117 "State: S (sleeping)",
118 "SleepAVG: 98%",
119 "Pid: 22250",
120 "PPid: 10858",
121 "TracerPid: 0",
122 "SigBlk: 0000000000010000",
123 "SigIgn: 0000000000384004",
124 "SigCgt: 000000004b813efb",
125 "CapEff: 0000000000000000",
126 ]))
127
128 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
129
130 def testNoSigCgt(self):
131 sp = PathJoin(self.tmpdir, "status")
132
133 utils.WriteFile(sp, data="\n".join([
134 "Name: bash",
135 ]))
136
137 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
138 1234, 10, status_path=sp)
139
140 def testNoSuchFile(self):
141 sp = PathJoin(self.tmpdir, "notexist")
142
143 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
144
145 @staticmethod
146 def _TestRealProcess():
147 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
148 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
149 raise Exception("SIGUSR1 is handled when it should not be")
150
151 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
152 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
153 raise Exception("SIGUSR1 is not handled when it should be")
154
155 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
156 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
157 raise Exception("SIGUSR1 is not handled when it should be")
158
159 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
160 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
161 raise Exception("SIGUSR1 is handled when it should not be")
162
163 return True
164
165 def testRealProcess(self):
166 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
167
168
169 class TestPidFileFunctions(unittest.TestCase):
170 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
171
172 def setUp(self):
173 self.dir = tempfile.mkdtemp()
174 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
175 utils.DaemonPidFileName = self.f_dpn
176
177 def testPidFileFunctions(self):
178 pid_file = self.f_dpn('test')
179 utils.WritePidFile('test')
180 self.failUnless(os.path.exists(pid_file),
181 "PID file should have been created")
182 read_pid = utils.ReadPidFile(pid_file)
183 self.failUnlessEqual(read_pid, os.getpid())
184 self.failUnless(utils.IsProcessAlive(read_pid))
185 self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
186 utils.RemovePidFile('test')
187 self.failIf(os.path.exists(pid_file),
188 "PID file should not exist anymore")
189 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
190 "ReadPidFile should return 0 for missing pid file")
191 fh = open(pid_file, "w")
192 fh.write("blah\n")
193 fh.close()
194 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
195 "ReadPidFile should return 0 for invalid pid file")
196 utils.RemovePidFile('test')
197 self.failIf(os.path.exists(pid_file),
198 "PID file should not exist anymore")
199
200 def testKill(self):
201 pid_file = self.f_dpn('child')
202 r_fd, w_fd = os.pipe()
203 new_pid = os.fork()
204 if new_pid == 0: #child
205 utils.WritePidFile('child')
206 os.write(w_fd, 'a')
207 signal.pause()
208 os._exit(0)
209 return
210 # else we are in the parent
211 # wait until the child has written the pid file
212 os.read(r_fd, 1)
213 read_pid = utils.ReadPidFile(pid_file)
214 self.failUnlessEqual(read_pid, new_pid)
215 self.failUnless(utils.IsProcessAlive(new_pid))
216 utils.KillProcess(new_pid, waitpid=True)
217 self.failIf(utils.IsProcessAlive(new_pid))
218 utils.RemovePidFile('child')
219 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
220
221 def tearDown(self):
222 for name in os.listdir(self.dir):
223 os.unlink(os.path.join(self.dir, name))
224 os.rmdir(self.dir)
225
226
227 class TestRunCmd(testutils.GanetiTestCase):
228 """Testing case for the RunCmd function"""
229
230 def setUp(self):
231 testutils.GanetiTestCase.setUp(self)
232 self.magic = time.ctime() + " ganeti test"
233 self.fname = self._CreateTempFile()
234
235 def testOk(self):
236 """Test successful exit code"""
237 result = RunCmd("/bin/sh -c 'exit 0'")
238 self.assertEqual(result.exit_code, 0)
239 self.assertEqual(result.output, "")
240
241 def testFail(self):
242 """Test fail exit code"""
243 result = RunCmd("/bin/sh -c 'exit 1'")
244 self.assertEqual(result.exit_code, 1)
245 self.assertEqual(result.output, "")
246
247 def testStdout(self):
248 """Test standard output"""
249 cmd = 'echo -n "%s"' % self.magic
250 result = RunCmd("/bin/sh -c '%s'" % cmd)
251 self.assertEqual(result.stdout, self.magic)
252 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
253 self.assertEqual(result.output, "")
254 self.assertFileContent(self.fname, self.magic)
255
256 def testStderr(self):
257 """Test standard error"""
258 cmd = 'echo -n "%s"' % self.magic
259 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
260 self.assertEqual(result.stderr, self.magic)
261 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
262 self.assertEqual(result.output, "")
263 self.assertFileContent(self.fname, self.magic)
264
265 def testCombined(self):
266 """Test combined output"""
267 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
268 expected = "A" + self.magic + "B" + self.magic
269 result = RunCmd("/bin/sh -c '%s'" % cmd)
270 self.assertEqual(result.output, expected)
271 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
272 self.assertEqual(result.output, "")
273 self.assertFileContent(self.fname, expected)
274
275 def testSignal(self):
276 """Test signal"""
277 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
278 self.assertEqual(result.signal, 15)
279 self.assertEqual(result.output, "")
280
281 def testListRun(self):
282 """Test list runs"""
283 result = RunCmd(["true"])
284 self.assertEqual(result.signal, None)
285 self.assertEqual(result.exit_code, 0)
286 result = RunCmd(["/bin/sh", "-c", "exit 1"])
287 self.assertEqual(result.signal, None)
288 self.assertEqual(result.exit_code, 1)
289 result = RunCmd(["echo", "-n", self.magic])
290 self.assertEqual(result.signal, None)
291 self.assertEqual(result.exit_code, 0)
292 self.assertEqual(result.stdout, self.magic)
293
294 def testFileEmptyOutput(self):
295 """Test file output"""
296 result = RunCmd(["true"], output=self.fname)
297 self.assertEqual(result.signal, None)
298 self.assertEqual(result.exit_code, 0)
299 self.assertFileContent(self.fname, "")
300
301 def testLang(self):
302 """Test locale environment"""
303 old_env = os.environ.copy()
304 try:
305 os.environ["LANG"] = "en_US.UTF-8"
306 os.environ["LC_ALL"] = "en_US.UTF-8"
307 result = RunCmd(["locale"])
308 for line in result.output.splitlines():
309 key, value = line.split("=", 1)
310 # Ignore these variables, they're overridden by LC_ALL
311 if key == "LANG" or key == "LANGUAGE":
312 continue
313 self.failIf(value and value != "C" and value != '"C"',
314 "Variable %s is set to the invalid value '%s'" % (key, value))
315 finally:
316 os.environ = old_env
317
318 def testDefaultCwd(self):
319 """Test default working directory"""
320 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
321
322 def testCwd(self):
323 """Test default working directory"""
324 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
325 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
326 cwd = os.getcwd()
327 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
328
329 def testResetEnv(self):
330 """Test environment reset functionality"""
331 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
332 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
333 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
334
335
336 class TestRunParts(unittest.TestCase):
337 """Testing case for the RunParts function"""
338
339 def setUp(self):
340 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
341
342 def tearDown(self):
343 shutil.rmtree(self.rundir)
344
345 def testEmpty(self):
346 """Test on an empty dir"""
347 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
348
349 def testSkipWrongName(self):
350 """Test that wrong files are skipped"""
351 fname = os.path.join(self.rundir, "00test.dot")
352 utils.WriteFile(fname, data="")
353 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
354 relname = os.path.basename(fname)
355 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
356 [(relname, constants.RUNPARTS_SKIP, None)])
357
358 def testSkipNonExec(self):
359 """Test that non executable files are skipped"""
360 fname = os.path.join(self.rundir, "00test")
361 utils.WriteFile(fname, data="")
362 relname = os.path.basename(fname)
363 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
364 [(relname, constants.RUNPARTS_SKIP, None)])
365
366 def testError(self):
367 """Test error on a broken executable"""
368 fname = os.path.join(self.rundir, "00test")
369 utils.WriteFile(fname, data="")
370 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
371 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
372 self.failUnlessEqual(relname, os.path.basename(fname))
373 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
374 self.failUnless(error)
375
376 def testSorted(self):
377 """Test executions are sorted"""
378 files = []
379 files.append(os.path.join(self.rundir, "64test"))
380 files.append(os.path.join(self.rundir, "00test"))
381 files.append(os.path.join(self.rundir, "42test"))
382
383 for fname in files:
384 utils.WriteFile(fname, data="")
385
386 results = RunParts(self.rundir, reset_env=True)
387
388 for fname in sorted(files):
389 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
390
391 def testOk(self):
392 """Test correct execution"""
393 fname = os.path.join(self.rundir, "00test")
394 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
395 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
396 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
397 self.failUnlessEqual(relname, os.path.basename(fname))
398 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
399 self.failUnlessEqual(runresult.stdout, "ciao")
400
401 def testRunFail(self):
402 """Test correct execution, with run failure"""
403 fname = os.path.join(self.rundir, "00test")
404 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
405 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
406 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
407 self.failUnlessEqual(relname, os.path.basename(fname))
408 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
409 self.failUnlessEqual(runresult.exit_code, 1)
410 self.failUnless(runresult.failed)
411
412 def testRunMix(self):
413 files = []
414 files.append(os.path.join(self.rundir, "00test"))
415 files.append(os.path.join(self.rundir, "42test"))
416 files.append(os.path.join(self.rundir, "64test"))
417 files.append(os.path.join(self.rundir, "99test"))
418
419 files.sort()
420
421 # 1st has errors in execution
422 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
423 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
424
425 # 2nd is skipped
426 utils.WriteFile(files[1], data="")
427
428 # 3rd cannot execute properly
429 utils.WriteFile(files[2], data="")
430 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
431
432 # 4th execs
433 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
434 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
435
436 results = RunParts(self.rundir, reset_env=True)
437
438 (relname, status, runresult) = results[0]
439 self.failUnlessEqual(relname, os.path.basename(files[0]))
440 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
441 self.failUnlessEqual(runresult.exit_code, 1)
442 self.failUnless(runresult.failed)
443
444 (relname, status, runresult) = results[1]
445 self.failUnlessEqual(relname, os.path.basename(files[1]))
446 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
447 self.failUnlessEqual(runresult, None)
448
449 (relname, status, runresult) = results[2]
450 self.failUnlessEqual(relname, os.path.basename(files[2]))
451 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
452 self.failUnless(runresult)
453
454 (relname, status, runresult) = results[3]
455 self.failUnlessEqual(relname, os.path.basename(files[3]))
456 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
457 self.failUnlessEqual(runresult.output, "ciao")
458 self.failUnlessEqual(runresult.exit_code, 0)
459 self.failUnless(not runresult.failed)
460
461
462 class TestStartDaemon(testutils.GanetiTestCase):
463 def setUp(self):
464 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
465 self.tmpfile = os.path.join(self.tmpdir, "test")
466
467 def tearDown(self):
468 shutil.rmtree(self.tmpdir)
469
470 def testShell(self):
471 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
472 self._wait(self.tmpfile, 60.0, "Hello World")
473
474 def testShellOutput(self):
475 utils.StartDaemon("echo Hello World", output=self.tmpfile)
476 self._wait(self.tmpfile, 60.0, "Hello World")
477
478 def testNoShellNoOutput(self):
479 utils.StartDaemon(["pwd"])
480
481 def testNoShellNoOutputTouch(self):
482 testfile = os.path.join(self.tmpdir, "check")
483 self.failIf(os.path.exists(testfile))
484 utils.StartDaemon(["touch", testfile])
485 self._wait(testfile, 60.0, "")
486
487 def testNoShellOutput(self):
488 utils.StartDaemon(["pwd"], output=self.tmpfile)
489 self._wait(self.tmpfile, 60.0, "/")
490
491 def testNoShellOutputCwd(self):
492 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
493 self._wait(self.tmpfile, 60.0, os.getcwd())
494
495 def testShellEnv(self):
496 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
497 env={ "GNT_TEST_VAR": "Hello World", })
498 self._wait(self.tmpfile, 60.0, "Hello World")
499
500 def testNoShellEnv(self):
501 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
502 env={ "GNT_TEST_VAR": "Hello World", })
503 self._wait(self.tmpfile, 60.0, "Hello World")
504
505 def testOutputFd(self):
506 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
507 try:
508 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
509 finally:
510 os.close(fd)
511 self._wait(self.tmpfile, 60.0, os.getcwd())
512
513 def testPid(self):
514 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
515 self._wait(self.tmpfile, 60.0, str(pid))
516
517 def testPidFile(self):
518 pidfile = os.path.join(self.tmpdir, "pid")
519 checkfile = os.path.join(self.tmpdir, "abort")
520
521 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
522 output=self.tmpfile)
523 try:
524 fd = os.open(pidfile, os.O_RDONLY)
525 try:
526 # Check file is locked
527 self.assertRaises(errors.LockError, utils.LockFile, fd)
528
529 pidtext = os.read(fd, 100)
530 finally:
531 os.close(fd)
532
533 self.assertEqual(int(pidtext.strip()), pid)
534
535 self.assert_(utils.IsProcessAlive(pid))
536 finally:
537 # No matter what happens, kill daemon
538 utils.KillProcess(pid, timeout=5.0, waitpid=False)
539 self.failIf(utils.IsProcessAlive(pid))
540
541 self.assertEqual(utils.ReadFile(self.tmpfile), "")
542
543 def _wait(self, path, timeout, expected):
544 # Due to the asynchronous nature of daemon processes, polling is necessary.
545 # A timeout makes sure the test doesn't hang forever.
546 def _CheckFile():
547 if not (os.path.isfile(path) and
548 utils.ReadFile(path).strip() == expected):
549 raise utils.RetryAgain()
550
551 try:
552 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
553 except utils.RetryTimeout:
554 self.fail("Apparently the daemon didn't run in %s seconds and/or"
555 " didn't write the correct output" % timeout)
556
557 def testError(self):
558 self.assertRaises(errors.OpExecError, utils.StartDaemon,
559 ["./does-NOT-EXIST/here/0123456789"])
560 self.assertRaises(errors.OpExecError, utils.StartDaemon,
561 ["./does-NOT-EXIST/here/0123456789"],
562 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
563 self.assertRaises(errors.OpExecError, utils.StartDaemon,
564 ["./does-NOT-EXIST/here/0123456789"],
565 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
566 self.assertRaises(errors.OpExecError, utils.StartDaemon,
567 ["./does-NOT-EXIST/here/0123456789"],
568 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
569
570 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
571 try:
572 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
573 ["./does-NOT-EXIST/here/0123456789"],
574 output=self.tmpfile, output_fd=fd)
575 finally:
576 os.close(fd)
577
578
579 class TestSetCloseOnExecFlag(unittest.TestCase):
580 """Tests for SetCloseOnExecFlag"""
581
582 def setUp(self):
583 self.tmpfile = tempfile.TemporaryFile()
584
585 def testEnable(self):
586 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
587 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
588 fcntl.FD_CLOEXEC)
589
590 def testDisable(self):
591 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
592 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
593 fcntl.FD_CLOEXEC)
594
595
596 class TestSetNonblockFlag(unittest.TestCase):
597 def setUp(self):
598 self.tmpfile = tempfile.TemporaryFile()
599
600 def testEnable(self):
601 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
602 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
603 os.O_NONBLOCK)
604
605 def testDisable(self):
606 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
607 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
608 os.O_NONBLOCK)
609
610
611 class TestRemoveFile(unittest.TestCase):
612 """Test case for the RemoveFile function"""
613
614 def setUp(self):
615 """Create a temp dir and file for each case"""
616 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
617 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
618 os.close(fd)
619
620 def tearDown(self):
621 if os.path.exists(self.tmpfile):
622 os.unlink(self.tmpfile)
623 os.rmdir(self.tmpdir)
624
625 def testIgnoreDirs(self):
626 """Test that RemoveFile() ignores directories"""
627 self.assertEqual(None, RemoveFile(self.tmpdir))
628
629 def testIgnoreNotExisting(self):
630 """Test that RemoveFile() ignores non-existing files"""
631 RemoveFile(self.tmpfile)
632 RemoveFile(self.tmpfile)
633
634 def testRemoveFile(self):
635 """Test that RemoveFile does remove a file"""
636 RemoveFile(self.tmpfile)
637 if os.path.exists(self.tmpfile):
638 self.fail("File '%s' not removed" % self.tmpfile)
639
640 def testRemoveSymlink(self):
641 """Test that RemoveFile does remove symlinks"""
642 symlink = self.tmpdir + "/symlink"
643 os.symlink("no-such-file", symlink)
644 RemoveFile(symlink)
645 if os.path.exists(symlink):
646 self.fail("File '%s' not removed" % symlink)
647 os.symlink(self.tmpfile, symlink)
648 RemoveFile(symlink)
649 if os.path.exists(symlink):
650 self.fail("File '%s' not removed" % symlink)
651
652
653 class TestRename(unittest.TestCase):
654 """Test case for RenameFile"""
655
656 def setUp(self):
657 """Create a temporary directory"""
658 self.tmpdir = tempfile.mkdtemp()
659 self.tmpfile = os.path.join(self.tmpdir, "test1")
660
661 # Touch the file
662 open(self.tmpfile, "w").close()
663
664 def tearDown(self):
665 """Remove temporary directory"""
666 shutil.rmtree(self.tmpdir)
667
668 def testSimpleRename1(self):
669 """Simple rename 1"""
670 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
671 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
672
673 def testSimpleRename2(self):
674 """Simple rename 2"""
675 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
676 mkdir=True)
677 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
678
679 def testRenameMkdir(self):
680 """Rename with mkdir"""
681 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
682 mkdir=True)
683 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
684 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
685
686 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
687 os.path.join(self.tmpdir, "test/foo/bar/baz"),
688 mkdir=True)
689 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
690 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
691 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
692
693
694 class TestMatchNameComponent(unittest.TestCase):
695 """Test case for the MatchNameComponent function"""
696
697 def testEmptyList(self):
698 """Test that there is no match against an empty list"""
699
700 self.failUnlessEqual(MatchNameComponent("", []), None)
701 self.failUnlessEqual(MatchNameComponent("test", []), None)
702
703 def testSingleMatch(self):
704 """Test that a single match is performed correctly"""
705 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
706 for key in "test2", "test2.example", "test2.example.com":
707 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
708
709 def testMultipleMatches(self):
710 """Test that a multiple match is returned as None"""
711 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
712 for key in "test1", "test1.example":
713 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
714
715 def testFullMatch(self):
716 """Test that a full match is returned correctly"""
717 key1 = "test1"
718 key2 = "test1.example"
719 mlist = [key2, key2 + ".com"]
720 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
721 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
722
723 def testCaseInsensitivePartialMatch(self):
724 """Test for the case_insensitive keyword"""
725 mlist = ["test1.example.com", "test2.example.net"]
726 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
727 "test2.example.net")
728 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
729 "test2.example.net")
730 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
731 "test2.example.net")
732 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
733 "test2.example.net")
734
735
736 def testCaseInsensitiveFullMatch(self):
737 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
738 # Between the two ts1 a full string match non-case insensitive should work
739 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
740 None)
741 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
742 "ts1.ex")
743 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
744 "ts1.ex")
745 # Between the two ts2 only case differs, so only case-match works
746 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
747 "ts2.ex")
748 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
749 "Ts2.ex")
750 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
751 None)
752
753
754 class TestReadFile(testutils.GanetiTestCase):
755
756 def testReadAll(self):
757 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
758 self.assertEqual(len(data), 814)
759
760 h = compat.md5_hash()
761 h.update(data)
762 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
763
764 def testReadSize(self):
765 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
766 size=100)
767 self.assertEqual(len(data), 100)
768
769 h = compat.md5_hash()
770 h.update(data)
771 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
772
773 def testError(self):
774 self.assertRaises(EnvironmentError, utils.ReadFile,
775 "/dev/null/does-not-exist")
776
777
778 class TestReadOneLineFile(testutils.GanetiTestCase):
779
780 def setUp(self):
781 testutils.GanetiTestCase.setUp(self)
782
783 def testDefault(self):
784 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
785 self.assertEqual(len(data), 27)
786 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
787
788 def testNotStrict(self):
789 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
790 self.assertEqual(len(data), 27)
791 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
792
793 def testStrictFailure(self):
794 self.assertRaises(errors.GenericError, ReadOneLineFile,
795 self._TestDataFilename("cert1.pem"), strict=True)
796
797 def testLongLine(self):
798 dummydata = (1024 * "Hello World! ")
799 myfile = self._CreateTempFile()
800 utils.WriteFile(myfile, data=dummydata)
801 datastrict = ReadOneLineFile(myfile, strict=True)
802 datalax = ReadOneLineFile(myfile, strict=False)
803 self.assertEqual(dummydata, datastrict)
804 self.assertEqual(dummydata, datalax)
805
806 def testNewline(self):
807 myfile = self._CreateTempFile()
808 myline = "myline"
809 for nl in ["", "\n", "\r\n"]:
810 dummydata = "%s%s" % (myline, nl)
811 utils.WriteFile(myfile, data=dummydata)
812 datalax = ReadOneLineFile(myfile, strict=False)
813 self.assertEqual(myline, datalax)
814 datastrict = ReadOneLineFile(myfile, strict=True)
815 self.assertEqual(myline, datastrict)
816
817 def testWhitespaceAndMultipleLines(self):
818 myfile = self._CreateTempFile()
819 for nl in ["", "\n", "\r\n"]:
820 for ws in [" ", "\t", "\t\t \t", "\t "]:
821 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
822 utils.WriteFile(myfile, data=dummydata)
823 datalax = ReadOneLineFile(myfile, strict=False)
824 if nl:
825 self.assert_(set("\r\n") & set(dummydata))
826 self.assertRaises(errors.GenericError, ReadOneLineFile,
827 myfile, strict=True)
828 explen = len("Foo bar baz ") + len(ws)
829 self.assertEqual(len(datalax), explen)
830 self.assertEqual(datalax, dummydata[:explen])
831 self.assertFalse(set("\r\n") & set(datalax))
832 else:
833 datastrict = ReadOneLineFile(myfile, strict=True)
834 self.assertEqual(dummydata, datastrict)
835 self.assertEqual(dummydata, datalax)
836
837 def testEmptylines(self):
838 myfile = self._CreateTempFile()
839 myline = "myline"
840 for nl in ["\n", "\r\n"]:
841 for ol in ["", "otherline"]:
842 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
843 utils.WriteFile(myfile, data=dummydata)
844 self.assert_(set("\r\n") & set(dummydata))
845 datalax = ReadOneLineFile(myfile, strict=False)
846 self.assertEqual(myline, datalax)
847 if ol:
848 self.assertRaises(errors.GenericError, ReadOneLineFile,
849 myfile, strict=True)
850 else:
851 datastrict = ReadOneLineFile(myfile, strict=True)
852 self.assertEqual(myline, datastrict)
853
854
855 class TestTimestampForFilename(unittest.TestCase):
856 def test(self):
857 self.assert_("." not in utils.TimestampForFilename())
858 self.assert_(":" not in utils.TimestampForFilename())
859
860
861 class TestCreateBackup(testutils.GanetiTestCase):
862 def setUp(self):
863 testutils.GanetiTestCase.setUp(self)
864
865 self.tmpdir = tempfile.mkdtemp()
866
867 def tearDown(self):
868 testutils.GanetiTestCase.tearDown(self)
869
870 shutil.rmtree(self.tmpdir)
871
872 def testEmpty(self):
873 filename = PathJoin(self.tmpdir, "config.data")
874 utils.WriteFile(filename, data="")
875 bname = utils.CreateBackup(filename)
876 self.assertFileContent(bname, "")
877 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
878 utils.CreateBackup(filename)
879 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
880 utils.CreateBackup(filename)
881 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
882
883 fifoname = PathJoin(self.tmpdir, "fifo")
884 os.mkfifo(fifoname)
885 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
886
887 def testContent(self):
888 bkpcount = 0
889 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
890 for rep in [1, 2, 10, 127]:
891 testdata = data * rep
892
893 filename = PathJoin(self.tmpdir, "test.data_")
894 utils.WriteFile(filename, data=testdata)
895 self.assertFileContent(filename, testdata)
896
897 for _ in range(3):
898 bname = utils.CreateBackup(filename)
899 bkpcount += 1
900 self.assertFileContent(bname, testdata)
901 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
902
903
904 class TestFormatUnit(unittest.TestCase):
905 """Test case for the FormatUnit function"""
906
907 def testMiB(self):
908 self.assertEqual(FormatUnit(1, 'h'), '1M')
909 self.assertEqual(FormatUnit(100, 'h'), '100M')
910 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
911
912 self.assertEqual(FormatUnit(1, 'm'), '1')
913 self.assertEqual(FormatUnit(100, 'm'), '100')
914 self.assertEqual(FormatUnit(1023, 'm'), '1023')
915
916 self.assertEqual(FormatUnit(1024, 'm'), '1024')
917 self.assertEqual(FormatUnit(1536, 'm'), '1536')
918 self.assertEqual(FormatUnit(17133, 'm'), '17133')
919 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
920
921 def testGiB(self):
922 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
923 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
924 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
925 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
926
927 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
928 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
929 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
930 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
931
932 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
933 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
934 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
935
936 def testTiB(self):
937 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
938 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
939 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
940
941 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
942 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
943 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
944
945
946 class TestParseUnit(unittest.TestCase):
947 """Test case for the ParseUnit function"""
948
949 SCALES = (('', 1),
950 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
951 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
952 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
953
954 def testRounding(self):
955 self.assertEqual(ParseUnit('0'), 0)
956 self.assertEqual(ParseUnit('1'), 4)
957 self.assertEqual(ParseUnit('2'), 4)
958 self.assertEqual(ParseUnit('3'), 4)
959
960 self.assertEqual(ParseUnit('124'), 124)
961 self.assertEqual(ParseUnit('125'), 128)
962 self.assertEqual(ParseUnit('126'), 128)
963 self.assertEqual(ParseUnit('127'), 128)
964 self.assertEqual(ParseUnit('128'), 128)
965 self.assertEqual(ParseUnit('129'), 132)
966 self.assertEqual(ParseUnit('130'), 132)
967
968 def testFloating(self):
969 self.assertEqual(ParseUnit('0'), 0)
970 self.assertEqual(ParseUnit('0.5'), 4)
971 self.assertEqual(ParseUnit('1.75'), 4)
972 self.assertEqual(ParseUnit('1.99'), 4)
973 self.assertEqual(ParseUnit('2.00'), 4)
974 self.assertEqual(ParseUnit('2.01'), 4)
975 self.assertEqual(ParseUnit('3.99'), 4)
976 self.assertEqual(ParseUnit('4.00'), 4)
977 self.assertEqual(ParseUnit('4.01'), 8)
978 self.assertEqual(ParseUnit('1.5G'), 1536)
979 self.assertEqual(ParseUnit('1.8G'), 1844)
980 self.assertEqual(ParseUnit('8.28T'), 8682212)
981
982 def testSuffixes(self):
983 for sep in ('', ' ', ' ', "\t", "\t "):
984 for suffix, scale in TestParseUnit.SCALES:
985 for func in (lambda x: x, str.lower, str.upper):
986 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
987 1024 * scale)
988
989 def testInvalidInput(self):
990 for sep in ('-', '_', ',', 'a'):
991 for suffix, _ in TestParseUnit.SCALES:
992 self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
993
994 for suffix, _ in TestParseUnit.SCALES:
995 self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
996
997
998 class TestSshKeys(testutils.GanetiTestCase):
999 """Test case for the AddAuthorizedKey function"""
1000
1001 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1002 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="198.51.100.4" '
1003 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1004
1005 def setUp(self):
1006 testutils.GanetiTestCase.setUp(self)
1007 self.tmpname = self._CreateTempFile()
1008 handle = open(self.tmpname, 'w')
1009 try:
1010 handle.write("%s\n" % TestSshKeys.KEY_A)
1011 handle.write("%s\n" % TestSshKeys.KEY_B)
1012 finally:
1013 handle.close()
1014
1015 def testAddingNewKey(self):
1016 utils.AddAuthorizedKey(self.tmpname,
1017 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1018
1019 self.assertFileContent(self.tmpname,
1020 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1021 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1022 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1023 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1024
1025 def testAddingAlmostButNotCompletelyTheSameKey(self):
1026 utils.AddAuthorizedKey(self.tmpname,
1027 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1028
1029 self.assertFileContent(self.tmpname,
1030 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1031 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1032 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1033 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1034
1035 def testAddingExistingKeyWithSomeMoreSpaces(self):
1036 utils.AddAuthorizedKey(self.tmpname,
1037 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1038
1039 self.assertFileContent(self.tmpname,
1040 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1041 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1042 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1043
1044 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1045 utils.RemoveAuthorizedKey(self.tmpname,
1046 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1047
1048 self.assertFileContent(self.tmpname,
1049 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1050 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1051
1052 def testRemovingNonExistingKey(self):
1053 utils.RemoveAuthorizedKey(self.tmpname,
1054 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1055
1056 self.assertFileContent(self.tmpname,
1057 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1058 'command="/usr/bin/fooserver -t --verbose",from="198.51.100.4"'
1059 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1060
1061
1062 class TestEtcHosts(testutils.GanetiTestCase):
1063 """Test functions modifying /etc/hosts"""
1064
1065 def setUp(self):
1066 testutils.GanetiTestCase.setUp(self)
1067 self.tmpname = self._CreateTempFile()
1068 handle = open(self.tmpname, 'w')
1069 try:
1070 handle.write('# This is a test file for /etc/hosts\n')
1071 handle.write('127.0.0.1\tlocalhost\n')
1072 handle.write('192.0.2.1 router gw\n')
1073 finally:
1074 handle.close()
1075
1076 def testSettingNewIp(self):
1077 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost.example.com',
1078 ['myhost'])
1079
1080 self.assertFileContent(self.tmpname,
1081 "# This is a test file for /etc/hosts\n"
1082 "127.0.0.1\tlocalhost\n"
1083 "192.0.2.1 router gw\n"
1084 "198.51.100.4\tmyhost.example.com myhost\n")
1085 self.assertFileMode(self.tmpname, 0644)
1086
1087 def testSettingExistingIp(self):
1088 SetEtcHostsEntry(self.tmpname, '192.0.2.1', 'myhost.example.com',
1089 ['myhost'])
1090
1091 self.assertFileContent(self.tmpname,
1092 "# This is a test file for /etc/hosts\n"
1093 "127.0.0.1\tlocalhost\n"
1094 "192.0.2.1\tmyhost.example.com myhost\n")
1095 self.assertFileMode(self.tmpname, 0644)
1096
1097 def testSettingDuplicateName(self):
1098 SetEtcHostsEntry(self.tmpname, '198.51.100.4', 'myhost', ['myhost'])
1099
1100 self.assertFileContent(self.tmpname,
1101 "# This is a test file for /etc/hosts\n"
1102 "127.0.0.1\tlocalhost\n"
1103 "192.0.2.1 router gw\n"
1104 "198.51.100.4\tmyhost\n")
1105 self.assertFileMode(self.tmpname, 0644)
1106
1107 def testRemovingExistingHost(self):
1108 RemoveEtcHostsEntry(self.tmpname, 'router')
1109
1110 self.assertFileContent(self.tmpname,
1111 "# This is a test file for /etc/hosts\n"
1112 "127.0.0.1\tlocalhost\n"
1113 "192.0.2.1 gw\n")
1114 self.assertFileMode(self.tmpname, 0644)
1115
1116 def testRemovingSingleExistingHost(self):
1117 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1118
1119 self.assertFileContent(self.tmpname,
1120 "# This is a test file for /etc/hosts\n"
1121 "192.0.2.1 router gw\n")
1122 self.assertFileMode(self.tmpname, 0644)
1123
1124 def testRemovingNonExistingHost(self):
1125 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1126
1127 self.assertFileContent(self.tmpname,
1128 "# This is a test file for /etc/hosts\n"
1129 "127.0.0.1\tlocalhost\n"
1130 "192.0.2.1 router gw\n")
1131 self.assertFileMode(self.tmpname, 0644)
1132
1133 def testRemovingAlias(self):
1134 RemoveEtcHostsEntry(self.tmpname, 'gw')
1135
1136 self.assertFileContent(self.tmpname,
1137 "# This is a test file for /etc/hosts\n"
1138 "127.0.0.1\tlocalhost\n"
1139 "192.0.2.1 router\n")
1140 self.assertFileMode(self.tmpname, 0644)
1141
1142
1143 class TestGetMounts(unittest.TestCase):
1144 """Test case for GetMounts()."""
1145
1146 TESTDATA = (
1147 "rootfs / rootfs rw 0 0\n"
1148 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
1149 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
1150
1151 def setUp(self):
1152 self.tmpfile = tempfile.NamedTemporaryFile()
1153 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1154
1155 def testGetMounts(self):
1156 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1157 [
1158 ("rootfs", "/", "rootfs", "rw"),
1159 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1160 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1161 ])
1162
1163
1164 class TestShellQuoting(unittest.TestCase):
1165 """Test case for shell quoting functions"""
1166
1167 def testShellQuote(self):
1168 self.assertEqual(ShellQuote('abc'), "abc")
1169 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1170 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1171 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1172 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1173
1174 def testShellQuoteArgs(self):
1175 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1176 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1177 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1178
1179
1180 class TestListVisibleFiles(unittest.TestCase):
1181 """Test case for ListVisibleFiles"""
1182
1183 def setUp(self):
1184 self.path = tempfile.mkdtemp()
1185
1186 def tearDown(self):
1187 shutil.rmtree(self.path)
1188
1189 def _CreateFiles(self, files):
1190 for name in files:
1191 utils.WriteFile(os.path.join(self.path, name), data="test")
1192
1193 def _test(self, files, expected):
1194 self._CreateFiles(files)
1195 found = ListVisibleFiles(self.path)
1196 self.assertEqual(set(found), set(expected))
1197
1198 def testAllVisible(self):
1199 files = ["a", "b", "c"]
1200 expected = files
1201 self._test(files, expected)
1202
1203 def testNoneVisible(self):
1204 files = [".a", ".b", ".c"]
1205 expected = []
1206 self._test(files, expected)
1207
1208 def testSomeVisible(self):
1209 files = ["a", "b", ".c"]
1210 expected = ["a", "b"]
1211 self._test(files, expected)
1212
1213 def testNonAbsolutePath(self):
1214 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1215
1216 def testNonNormalizedPath(self):
1217 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1218 "/bin/../tmp")
1219
1220
1221 class TestNewUUID(unittest.TestCase):
1222 """Test case for NewUUID"""
1223
1224 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1225 '[a-f0-9]{4}-[a-f0-9]{12}$')
1226
1227 def runTest(self):
1228 self.failUnless(self._re_uuid.match(utils.NewUUID()))
1229
1230
1231 class TestUniqueSequence(unittest.TestCase):
1232 """Test case for UniqueSequence"""
1233
1234 def _test(self, input, expected):
1235 self.assertEqual(utils.UniqueSequence(input), expected)
1236
1237 def runTest(self):
1238 # Ordered input
1239 self._test([1, 2, 3], [1, 2, 3])
1240 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1241 self._test([1, 2, 2, 3], [1, 2, 3])
1242 self._test([1, 2, 3, 3], [1, 2, 3])
1243
1244 # Unordered input
1245 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1246 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1247
1248 # Strings
1249 self._test(["a", "a"], ["a"])
1250 self._test(["a", "b"], ["a", "b"])
1251 self._test(["a", "b", "a"], ["a", "b"])
1252
1253
1254 class TestFirstFree(unittest.TestCase):
1255 """Test case for the FirstFree function"""
1256
1257 def test(self):
1258 """Test FirstFree"""
1259 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1260 self.failUnlessEqual(FirstFree([]), None)
1261 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1262 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1263 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1264
1265
1266 class TestTailFile(testutils.GanetiTestCase):
1267 """Test case for the TailFile function"""
1268
1269 def testEmpty(self):
1270 fname = self._CreateTempFile()
1271 self.failUnlessEqual(TailFile(fname), [])
1272 self.failUnlessEqual(TailFile(fname, lines=25), [])
1273
1274 def testAllLines(self):
1275 data = ["test %d" % i for i in range(30)]
1276 for i in range(30):
1277 fname = self._CreateTempFile()
1278 fd = open(fname, "w")
1279 fd.write("\n".join(data[:i]))
1280 if i > 0:
1281 fd.write("\n")
1282 fd.close()
1283 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1284
1285 def testPartialLines(self):
1286 data = ["test %d" % i for i in range(30)]
1287 fname = self._CreateTempFile()
1288 fd = open(fname, "w")
1289 fd.write("\n".join(data))
1290 fd.write("\n")
1291 fd.close()
1292 for i in range(1, 30):
1293 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1294
1295 def testBigFile(self):
1296 data = ["test %d" % i for i in range(30)]
1297 fname = self._CreateTempFile()
1298 fd = open(fname, "w")
1299 fd.write("X" * 1048576)
1300 fd.write("\n")
1301 fd.write("\n".join(data))
1302 fd.write("\n")
1303 fd.close()
1304 for i in range(1, 30):
1305 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1306
1307
1308 class _BaseFileLockTest:
1309 """Test case for the FileLock class"""
1310
1311 def testSharedNonblocking(self):
1312 self.lock.Shared(blocking=False)
1313 self.lock.Close()
1314
1315 def testExclusiveNonblocking(self):
1316 self.lock.Exclusive(blocking=False)
1317 self.lock.Close()
1318
1319 def testUnlockNonblocking(self):
1320 self.lock.Unlock(blocking=False)
1321 self.lock.Close()
1322
1323 def testSharedBlocking(self):
1324 self.lock.Shared(blocking=True)
1325 self.lock.Close()
1326
1327 def testExclusiveBlocking(self):
1328 self.lock.Exclusive(blocking=True)
1329 self.lock.Close()
1330
1331 def testUnlockBlocking(self):
1332 self.lock.Unlock(blocking=True)
1333 self.lock.Close()
1334
1335 def testSharedExclusiveUnlock(self):
1336 self.lock.Shared(blocking=False)
1337 self.lock.Exclusive(blocking=False)
1338 self.lock.Unlock(blocking=False)
1339 self.lock.Close()
1340
1341 def testExclusiveSharedUnlock(self):
1342 self.lock.Exclusive(blocking=False)
1343 self.lock.Shared(blocking=False)
1344 self.lock.Unlock(blocking=False)
1345 self.lock.Close()
1346
1347 def testSimpleTimeout(self):
1348 # These will succeed on the first attempt, hence a short timeout
1349 self.lock.Shared(blocking=True, timeout=10.0)
1350 self.lock.Exclusive(blocking=False, timeout=10.0)
1351 self.lock.Unlock(blocking=True, timeout=10.0)
1352 self.lock.Close()
1353
1354 @staticmethod
1355 def _TryLockInner(filename, shared, blocking):
1356 lock = utils.FileLock.Open(filename)
1357
1358 if shared:
1359 fn = lock.Shared
1360 else:
1361 fn = lock.Exclusive
1362
1363 try:
1364 # The timeout doesn't really matter as the parent process waits for us to
1365 # finish anyway.
1366 fn(blocking=blocking, timeout=0.01)
1367 except errors.LockError, err:
1368 return False
1369
1370 return True
1371
1372 def _TryLock(self, *args):
1373 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1374 *args)
1375
1376 def testTimeout(self):
1377 for blocking in [True, False]:
1378 self.lock.Exclusive(blocking=True)
1379 self.failIf(self._TryLock(False, blocking))
1380 self.failIf(self._TryLock(True, blocking))
1381
1382 self.lock.Shared(blocking=True)
1383 self.assert_(self._TryLock(True, blocking))
1384 self.failIf(self._TryLock(False, blocking))
1385
1386 def testCloseShared(self):
1387 self.lock.Close()
1388 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1389
1390 def testCloseExclusive(self):
1391 self.lock.Close()
1392 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1393
1394 def testCloseUnlock(self):
1395 self.lock.Close()
1396 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1397
1398
1399 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1400 TESTDATA = "Hello World\n" * 10
1401
1402 def setUp(self):
1403 testutils.GanetiTestCase.setUp(self)
1404
1405 self.tmpfile = tempfile.NamedTemporaryFile()
1406 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1407 self.lock = utils.FileLock.Open(self.tmpfile.name)
1408
1409 # Ensure "Open" didn't truncate file
1410 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1411
1412 def tearDown(self):
1413 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1414
1415 testutils.GanetiTestCase.tearDown(self)
1416
1417
1418 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1419 def setUp(self):
1420 self.tmpfile = tempfile.NamedTemporaryFile()
1421 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1422
1423
1424 class TestTimeFunctions(unittest.TestCase):
1425 """Test case for time functions"""
1426
1427 def runTest(self):
1428 self.assertEqual(utils.SplitTime(1), (1, 0))
1429 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1430 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1431 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1432 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1433 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1434 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1435 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1436
1437 self.assertRaises(AssertionError, utils.SplitTime, -1)
1438
1439 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1440 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1441 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1442
1443 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1444 1218448917.481)
1445 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1446
1447 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1448 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1449 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1450 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1451 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1452
1453
1454 class FieldSetTestCase(unittest.TestCase):
1455 """Test case for FieldSets"""
1456
1457 def testSimpleMatch(self):
1458 f = utils.FieldSet("a", "b", "c", "def")
1459 self.failUnless(f.Matches("a"))
1460 self.failIf(f.Matches("d"), "Substring matched")
1461 self.failIf(f.Matches("defghi"), "Prefix string matched")
1462 self.failIf(f.NonMatching(["b", "c"]))
1463 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1464 self.failUnless(f.NonMatching(["a", "d"]))
1465
1466 def testRegexMatch(self):
1467 f = utils.FieldSet("a", "b([0-9]+)", "c")
1468 self.failUnless(f.Matches("b1"))
1469 self.failUnless(f.Matches("b99"))
1470 self.failIf(f.Matches("b/1"))
1471 self.failIf(f.NonMatching(["b12", "c"]))
1472 self.failUnless(f.NonMatching(["a", "1"]))
1473
1474 class TestForceDictType(unittest.TestCase):
1475 """Test case for ForceDictType"""
1476
1477 def setUp(self):
1478 self.key_types = {
1479 'a': constants.VTYPE_INT,
1480 'b': constants.VTYPE_BOOL,
1481 'c': constants.VTYPE_STRING,
1482 'd': constants.VTYPE_SIZE,
1483 }
1484
1485 def _fdt(self, dict, allowed_values=None):
1486 if allowed_values is None:
1487 utils.ForceDictType(dict, self.key_types)
1488 else:
1489 utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1490
1491 return dict
1492
1493 def testSimpleDict(self):
1494 self.assertEqual(self._fdt({}), {})
1495 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1496 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1497 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1498 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1499 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1500 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1501 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1502 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1503 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1504 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1505 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1506
1507 def testErrors(self):
1508 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1509 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1510 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1511 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1512
1513
1514 class TestIsNormAbsPath(unittest.TestCase):
1515 """Testing case for IsNormAbsPath"""
1516
1517 def _pathTestHelper(self, path, result):
1518 if result:
1519 self.assert_(utils.IsNormAbsPath(path),
1520 "Path %s should result absolute and normalized" % path)
1521 else:
1522 self.assertFalse(utils.IsNormAbsPath(path),
1523 "Path %s should not result absolute and normalized" % path)
1524
1525 def testBase(self):
1526 self._pathTestHelper('/etc', True)
1527 self._pathTestHelper('/srv', True)
1528 self._pathTestHelper('etc', False)
1529 self._pathTestHelper('/etc/../root', False)
1530 self._pathTestHelper('/etc/', False)
1531
1532
1533 class TestSafeEncode(unittest.TestCase):
1534 """Test case for SafeEncode"""
1535
1536 def testAscii(self):
1537 for txt in [string.digits, string.letters, string.punctuation]:
1538 self.failUnlessEqual(txt, SafeEncode(txt))
1539
1540 def testDoubleEncode(self):
1541 for i in range(255):
1542 txt = SafeEncode(chr(i))
1543 self.failUnlessEqual(txt, SafeEncode(txt))
1544
1545 def testUnicode(self):
1546 # 1024 is high enough to catch non-direct ASCII mappings
1547 for i in range(1024):
1548 txt = SafeEncode(unichr(i))
1549 self.failUnlessEqual(txt, SafeEncode(txt))
1550
1551
1552 class TestFormatTime(unittest.TestCase):
1553 """Testing case for FormatTime"""
1554
1555 def testNone(self):
1556 self.failUnlessEqual(FormatTime(None), "N/A")
1557
1558 def testInvalid(self):
1559 self.failUnlessEqual(FormatTime(()), "N/A")
1560
1561 def testNow(self):
1562 # tests that we accept time.time input
1563 FormatTime(time.time())
1564 # tests that we accept int input
1565 FormatTime(int(time.time()))
1566
1567
1568 class RunInSeparateProcess(unittest.TestCase):
1569 def test(self):
1570 for exp in [True, False]:
1571 def _child():
1572 return exp
1573
1574 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1575
1576 def testArgs(self):
1577 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1578 def _child(carg1, carg2):
1579 return carg1 == "Foo" and carg2 == arg
1580
1581 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1582
1583 def testPid(self):
1584 parent_pid = os.getpid()
1585
1586 def _check():
1587 return os.getpid() == parent_pid
1588
1589 self.failIf(utils.RunInSeparateProcess(_check))
1590
1591 def testSignal(self):
1592 def _kill():
1593 os.kill(os.getpid(), signal.SIGTERM)
1594
1595 self.assertRaises(errors.GenericError,
1596 utils.RunInSeparateProcess, _kill)
1597
1598 def testException(self):
1599 def _exc():
1600 raise errors.GenericError("This is a test")
1601
1602 self.assertRaises(errors.GenericError,
1603 utils.RunInSeparateProcess, _exc)
1604
1605
1606 class TestFingerprintFile(unittest.TestCase):
1607 def setUp(self):
1608 self.tmpfile = tempfile.NamedTemporaryFile()
1609
1610 def test(self):
1611 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1612 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1613
1614 utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1615 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1616 "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1617
1618
1619 class TestUnescapeAndSplit(unittest.TestCase):
1620 """Testing case for UnescapeAndSplit"""
1621
1622 def setUp(self):
1623 # testing more that one separator for regexp safety
1624 self._seps = [",", "+", "."]
1625
1626 def testSimple(self):
1627 a = ["a", "b", "c", "d"]
1628 for sep in self._seps:
1629 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1630
1631 def testEscape(self):
1632 for sep in self._seps:
1633 a = ["a", "b\\" + sep + "c", "d"]
1634 b = ["a", "b" + sep + "c", "d"]
1635 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1636
1637 def testDoubleEscape(self):
1638 for sep in self._seps:
1639 a = ["a", "b\\\\", "c", "d"]
1640 b = ["a", "b\\", "c", "d"]
1641 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1642
1643 def testThreeEscape(self):
1644 for sep in self._seps:
1645 a = ["a", "b\\\\\\" + sep + "c", "d"]
1646 b = ["a", "b\\" + sep + "c", "d"]
1647 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1648
1649
1650 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1651 def setUp(self):
1652 self.tmpdir = tempfile.mkdtemp()
1653
1654 def tearDown(self):
1655 shutil.rmtree(self.tmpdir)
1656
1657 def _checkRsaPrivateKey(self, key):
1658 lines = key.splitlines()
1659 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1660 "-----END RSA PRIVATE KEY-----" in lines)
1661
1662 def _checkCertificate(self, cert):
1663 lines = cert.splitlines()
1664 return ("-----BEGIN CERTIFICATE-----" in lines and
1665 "-----END CERTIFICATE-----" in lines)
1666
1667 def test(self):
1668 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1669 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1670 self._checkRsaPrivateKey(key_pem)
1671 self._checkCertificate(cert_pem)
1672
1673 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1674 key_pem)
1675 self.assert_(key.bits() >= 1024)
1676 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1677 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1678
1679 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1680 cert_pem)
1681 self.failIf(x509.has_expired())
1682 self.assertEqual(x509.get_issuer().CN, common_name)
1683 self.assertEqual(x509.get_subject().CN, common_name)
1684 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1685
1686 def testLegacy(self):
1687 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1688
1689 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1690
1691 cert1 = utils.ReadFile(cert1_filename)
1692
1693 self.assert_(self._checkRsaPrivateKey(cert1))
1694 self.assert_(self._checkCertificate(cert1))
1695
1696
1697 class TestPathJoin(unittest.TestCase):
1698 """Testing case for PathJoin"""
1699
1700 def testBasicItems(self):
1701 mlist = ["/a", "b", "c"]
1702 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1703
1704 def testNonAbsPrefix(self):
1705 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1706
1707 def testBackTrack(self):
1708 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1709
1710 def testMultiAbs(self):
1711 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1712
1713
1714 class TestValidateServiceName(unittest.TestCase):
1715 def testValid(self):
1716 testnames = [
1717 0, 1, 2, 3, 1024, 65000, 65534, 65535,
1718 "ganeti",
1719 "gnt-masterd",
1720 "HELLO_WORLD_SVC",
1721 "hello.world.1",
1722 "0", "80", "1111", "65535",
1723 ]
1724
1725 for name in testnames:
1726 self.assertEqual(utils.ValidateServiceName(name), name)
1727
1728 def testInvalid(self):
1729 testnames = [
1730 -15756, -1, 65536, 133428083,
1731 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1732 "-8546", "-1", "65536",
1733 (129 * "A"),
1734 ]
1735
1736 for name in testnames:
1737 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1738
1739
1740 class TestParseAsn1Generalizedtime(unittest.TestCase):
1741 def test(self):
1742 # UTC
1743 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1744 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1745 1266860512)
1746 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1747 (2**31) - 1)
1748
1749 # With offset
1750 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1751 1266860512)
1752 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1753 1266931012)
1754 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1755 1266931088)
1756 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1757 1266931295)
1758 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1759 3600)
1760
1761 # Leap seconds are not supported by datetime.datetime
1762 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1763 "19841231235960+0000")
1764 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1765 "19920630235960+0000")
1766
1767 # Errors
1768 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1769 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1770 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1771 "20100222174152")
1772 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1773 "Mon Feb 22 17:47:02 UTC 2010")
1774 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1775 "2010-02-22 17:42:02")
1776
1777
1778 class TestGetX509CertValidity(testutils.GanetiTestCase):
1779 def setUp(self):
1780 testutils.GanetiTestCase.setUp(self)
1781
1782 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1783
1784 # Test whether we have pyOpenSSL 0.7 or above
1785 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1786
1787 if not self.pyopenssl0_7:
1788 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1789 " function correctly")
1790
1791 def _LoadCert(self, name):
1792 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1793 self._ReadTestData(name))
1794
1795 def test(self):
1796 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1797 if self.pyopenssl0_7:
1798 self.assertEqual(validity, (1266919967, 1267524767))
1799 else:
1800 self.assertEqual(validity, (None, None))
1801
1802
1803 class TestSignX509Certificate(unittest.TestCase):
1804 KEY = "My private key!"
1805 KEY_OTHER = "Another key"
1806
1807 def test(self):
1808 # Generate certificate valid for 5 minutes
1809 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1810
1811 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1812 cert_pem)
1813
1814 # No signature at all
1815 self.assertRaises(errors.GenericError,
1816 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1817
1818 # Invalid input
1819 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1820 "", self.KEY)
1821 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1822 "X-Ganeti-Signature: \n", self.KEY)
1823 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1824 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1825 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1826 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1827 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1828 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1829
1830 # Invalid salt
1831 for salt in list("-_@$,:;/\\ \t\n"):
1832 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1833 cert_pem, self.KEY, "foo%sbar" % salt)
1834
1835 for salt in ["HelloWorld", "salt", string.letters, string.digits,
1836 utils.GenerateSecret(numbytes=4),
1837 utils.GenerateSecret(numbytes=16),
1838 "{123:456}".encode("hex")]:
1839 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1840
1841 self._Check(cert, salt, signed_pem)
1842
1843 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1844 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1845 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1846 "lines----\n------ at\nthe end!"))
1847
1848 def _Check(self, cert, salt, pem):
1849 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1850 self.assertEqual(salt, salt2)
1851 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1852
1853 # Other key
1854 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1855 pem, self.KEY_OTHER)
1856
1857
1858 class TestMakedirs(unittest.TestCase):
1859 def setUp(self):
1860 self.tmpdir = tempfile.mkdtemp()
1861
1862 def tearDown(self):
1863 shutil.rmtree(self.tmpdir)
1864
1865 def testNonExisting(self):
1866 path = PathJoin(self.tmpdir, "foo")
1867 utils.Makedirs(path)
1868 self.assert_(os.path.isdir(path))
1869
1870 def testExisting(self):
1871 path = PathJoin(self.tmpdir, "foo")
1872 os.mkdir(path)
1873 utils.Makedirs(path)
1874 self.assert_(os.path.isdir(path))
1875
1876 def testRecursiveNonExisting(self):
1877 path = PathJoin(self.tmpdir, "foo/bar/baz")
1878 utils.Makedirs(path)
1879 self.assert_(os.path.isdir(path))
1880
1881 def testRecursiveExisting(self):
1882 path = PathJoin(self.tmpdir, "B/moo/xyz")
1883 self.assertFalse(os.path.exists(path))
1884 os.mkdir(PathJoin(self.tmpdir, "B"))
1885 utils.Makedirs(path)
1886 self.assert_(os.path.isdir(path))
1887
1888
1889 class TestRetry(testutils.GanetiTestCase):
1890 def setUp(self):
1891 testutils.GanetiTestCase.setUp(self)
1892 self.retries = 0
1893
1894 @staticmethod
1895 def _RaiseRetryAgain():
1896 raise utils.RetryAgain()
1897
1898 @staticmethod
1899 def _RaiseRetryAgainWithArg(args):
1900 raise utils.RetryAgain(*args)
1901
1902 def _WrongNestedLoop(self):
1903 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1904
1905 def _RetryAndSucceed(self, retries):
1906 if self.retries < retries:
1907 self.retries += 1
1908 raise utils.RetryAgain()
1909 else:
1910 return True
1911
1912 def testRaiseTimeout(self):
1913 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1914 self._RaiseRetryAgain, 0.01, 0.02)
1915 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1916 self._RetryAndSucceed, 0.01, 0, args=[1])
1917 self.failUnlessEqual(self.retries, 1)
1918
1919 def testComplete(self):
1920 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1921 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1922 True)
1923 self.failUnlessEqual(self.retries, 2)
1924
1925 def testNestedLoop(self):
1926 try:
1927 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1928 self._WrongNestedLoop, 0, 1)
1929 except utils.RetryTimeout:
1930 self.fail("Didn't detect inner loop's exception")
1931
1932 def testTimeoutArgument(self):
1933 retry_arg="my_important_debugging_message"
1934 try:
1935 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1936 except utils.RetryTimeout, err:
1937 self.failUnlessEqual(err.args, (retry_arg, ))
1938 else:
1939 self.fail("Expected timeout didn't happen")
1940
1941 def testRaiseInnerWithExc(self):
1942 retry_arg="my_important_debugging_message"
1943 try:
1944 try:
1945 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1946 args=[[errors.GenericError(retry_arg, retry_arg)]])
1947 except utils.RetryTimeout, err:
1948 err.RaiseInner()
1949 else:
1950 self.fail("Expected timeout didn't happen")
1951 except errors.GenericError, err:
1952 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1953 else:
1954 self.fail("Expected GenericError didn't happen")
1955
1956 def testRaiseInnerWithMsg(self):
1957 retry_arg="my_important_debugging_message"
1958 try:
1959 try:
1960 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1961 args=[[retry_arg, retry_arg]])
1962 except utils.RetryTimeout, err:
1963 err.RaiseInner()
1964 else:
1965 self.fail("Expected timeout didn't happen")
1966 except utils.RetryTimeout, err:
1967 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1968 else:
1969 self.fail("Expected RetryTimeout didn't happen")
1970
1971
1972 class TestLineSplitter(unittest.TestCase):
1973 def test(self):
1974 lines = []
1975 ls = utils.LineSplitter(lines.append)
1976 ls.write("Hello World\n")
1977 self.assertEqual(lines, [])
1978 ls.write("Foo\n Bar\r\n ")
1979 ls.write("Baz")
1980 ls.write("Moo")
1981 self.assertEqual(lines, [])
1982 ls.flush()
1983 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
1984 ls.close()
1985 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
1986
1987 def _testExtra(self, line, all_lines, p1, p2):
1988 self.assertEqual(p1, 999)
1989 self.assertEqual(p2, "extra")
1990 all_lines.append(line)
1991
1992 def testExtraArgsNoFlush(self):
1993 lines = []
1994 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
1995 ls.write("\n\nHello World\n")
1996 ls.write("Foo\n Bar\r\n ")
1997 ls.write("")
1998 ls.write("Baz")
1999 ls.write("Moo\n\nx\n")
2000 self.assertEqual(lines, [])
2001 ls.close()
2002 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2003 "", "x"])
2004
2005
2006 class TestReadLockedPidFile(unittest.TestCase):
2007 def setUp(self):
2008 self.tmpdir = tempfile.mkdtemp()
2009
2010 def tearDown(self):
2011 shutil.rmtree(self.tmpdir)
2012
2013 def testNonExistent(self):
2014 path = PathJoin(self.tmpdir, "nonexist")
2015 self.assert_(utils.ReadLockedPidFile(path) is None)
2016
2017 def testUnlocked(self):
2018 path = PathJoin(self.tmpdir, "pid")
2019 utils.WriteFile(path, data="123")
2020 self.assert_(utils.ReadLockedPidFile(path) is None)
2021
2022 def testLocked(self):
2023 path = PathJoin(self.tmpdir, "pid")
2024 utils.WriteFile(path, data="123")
2025
2026 fl = utils.FileLock.Open(path)
2027 try:
2028 fl.Exclusive(blocking=True)
2029
2030 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2031 finally:
2032 fl.Close()
2033
2034 self.assert_(utils.ReadLockedPidFile(path) is None)
2035
2036 def testError(self):
2037 path = PathJoin(self.tmpdir, "foobar", "pid")
2038 utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2039 # open(2) should return ENOTDIR
2040 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2041
2042
2043 class TestCertVerification(testutils.GanetiTestCase):
2044 def setUp(self):
2045 testutils.GanetiTestCase.setUp(self)
2046
2047 self.tmpdir = tempfile.mkdtemp()
2048
2049 def tearDown(self):
2050 shutil.rmtree(self.tmpdir)
2051
2052 def testVerifyCertificate(self):
2053 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2054 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2055 cert_pem)
2056
2057 # Not checking return value as this certificate is expired
2058 utils.VerifyX509Certificate(cert, 30, 7)
2059
2060
2061 class TestVerifyCertificateInner(unittest.TestCase):
2062 def test(self):
2063 vci = utils._VerifyCertificateInner
2064
2065 # Valid
2066 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2067 (None, None))
2068
2069 # Not yet valid
2070 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2071 self.assertEqual(errcode, utils.CERT_WARNING)
2072
2073 # Expiring soon
2074 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2075 self.assertEqual(errcode, utils.CERT_ERROR)
2076
2077 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2078 self.assertEqual(errcode, utils.CERT_WARNING)
2079
2080 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2081 self.assertEqual(errcode, None)
2082
2083 # Expired
2084 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2085 self.assertEqual(errcode, utils.CERT_ERROR)
2086
2087 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2088 self.assertEqual(errcode, utils.CERT_ERROR)
2089
2090 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2091 self.assertEqual(errcode, utils.CERT_ERROR)
2092
2093 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2094 self.assertEqual(errcode, utils.CERT_ERROR)
2095
2096
2097 class TestHmacFunctions(unittest.TestCase):
2098 # Digests can be checked with "openssl sha1 -hmac $key"
2099 def testSha1Hmac(self):
2100 self.assertEqual(utils.Sha1Hmac("", ""),
2101 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2102 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2103 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2104 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2105 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2106
2107 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2108 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2109 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2110
2111 def testSha1HmacSalt(self):
2112 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2113 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2114 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2115 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2116 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2117 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2118
2119 def testVerifySha1Hmac(self):
2120 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2121 "7d64b71fb76370690e1d")))
2122 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2123 ("f904c2476527c6d3e660"
2124 "9ab683c66fa0652cb1dc")))
2125
2126 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2127 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2128 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2129 digest.lower()))
2130 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2131 digest.upper()))
2132 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2133 digest.title()))
2134
2135 def testVerifySha1HmacSalt(self):
2136 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2137 ("17a4adc34d69c0d367d4"
2138 "ffbef96fd41d4df7a6e8"),
2139 salt="abc9"))
2140 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2141 ("7f264f8114c9066afc9b"
2142 "b7636e1786d996d3cc0d"),
2143 salt="xyz0"))
2144
2145
2146 class TestIgnoreSignals(unittest.TestCase):
2147 """Test the IgnoreSignals decorator"""
2148
2149 @staticmethod
2150 def _Raise(exception):
2151 raise exception
2152
2153 @staticmethod
2154 def _Return(rval):
2155 return rval
2156
2157 def testIgnoreSignals(self):
2158 sock_err_intr = socket.error(errno.EINTR, "Message")
2159 sock_err_inval = socket.error(errno.EINVAL, "Message")
2160
2161 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2162 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2163
2164 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2165 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2166 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2167 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2168
2169 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2170 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2171 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2172 sock_err_inval)
2173 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2174 env_err_inval)
2175
2176 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2177 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2178
2179
2180 class TestEnsureDirs(unittest.TestCase):
2181 """Tests for EnsureDirs"""
2182
2183 def setUp(self):
2184 self.dir = tempfile.mkdtemp()
2185 self.old_umask = os.umask(0777)
2186
2187 def testEnsureDirs(self):
2188 utils.EnsureDirs([
2189 (PathJoin(self.dir, "foo"), 0777),
2190 (PathJoin(self.dir, "bar"), 0000),
2191 ])
2192 self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2193 self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2194
2195 def tearDown(self):
2196 os.rmdir(PathJoin(self.dir, "foo"))
2197 os.rmdir(PathJoin(self.dir, "bar"))
2198 os.rmdir(self.dir)
2199 os.umask(self.old_umask)
2200
2201
2202 class TestFormatSeconds(unittest.TestCase):
2203 def test(self):
2204 self.assertEqual(utils.FormatSeconds(1), "1s")
2205 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2206 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2207 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2208 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2209 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2210 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2211 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2212 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2213 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2214 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2215
2216 def testFloat(self):
2217 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2218 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2219 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2220 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2221
2222
2223 class RunIgnoreProcessNotFound(unittest.TestCase):
2224 @staticmethod
2225 def _WritePid(fd):
2226 os.write(fd, str(os.getpid()))
2227 os.close(fd)
2228 return True
2229
2230 def test(self):
2231 (pid_read_fd, pid_write_fd) = os.pipe()
2232
2233 # Start short-lived process which writes its PID to pipe
2234 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2235 os.close(pid_write_fd)
2236
2237 # Read PID from pipe
2238 pid = int(os.read(pid_read_fd, 1024))
2239 os.close(pid_read_fd)
2240
2241 # Try to send signal to process which exited recently
2242 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2243
2244
2245 if __name__ == '__main__':
2246 testutils.GanetiTestProgram()