39a48befeed0f2f0649ca4c80da8df1ab13ab974
[ganeti-github.git] / test / ganeti.utils_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for unittesting the utils module"""
23
24 import distutils.version
25 import errno
26 import fcntl
27 import glob
28 import os
29 import os.path
30 import re
31 import shutil
32 import signal
33 import socket
34 import stat
35 import string
36 import tempfile
37 import time
38 import unittest
39 import warnings
40 import OpenSSL
41
42 import testutils
43 from ganeti import constants
44 from ganeti import compat
45 from ganeti import utils
46 from ganeti import errors
47 from ganeti.utils import RunCmd, RemoveFile, MatchNameComponent, FormatUnit, \
48 ParseUnit, ShellQuote, ShellQuoteArgs, ListVisibleFiles, FirstFree, \
49 TailFile, SafeEncode, FormatTime, UnescapeAndSplit, RunParts, PathJoin, \
50 ReadOneLineFile, SetEtcHostsEntry, RemoveEtcHostsEntry
51
52
53 class TestIsProcessAlive(unittest.TestCase):
54 """Testing case for IsProcessAlive"""
55
56 def testExists(self):
57 mypid = os.getpid()
58 self.assert_(utils.IsProcessAlive(mypid), "can't find myself running")
59
60 def testNotExisting(self):
61 pid_non_existing = os.fork()
62 if pid_non_existing == 0:
63 os._exit(0)
64 elif pid_non_existing < 0:
65 raise SystemError("can't fork")
66 os.waitpid(pid_non_existing, 0)
67 self.assertFalse(utils.IsProcessAlive(pid_non_existing),
68 "nonexisting process detected")
69
70
71 class TestGetProcStatusPath(unittest.TestCase):
72 def test(self):
73 self.assert_("/1234/" in utils._GetProcStatusPath(1234))
74 self.assertNotEqual(utils._GetProcStatusPath(1),
75 utils._GetProcStatusPath(2))
76
77
78 class TestIsProcessHandlingSignal(unittest.TestCase):
79 def setUp(self):
80 self.tmpdir = tempfile.mkdtemp()
81
82 def tearDown(self):
83 shutil.rmtree(self.tmpdir)
84
85 def testParseSigsetT(self):
86 self.assertEqual(len(utils._ParseSigsetT("0")), 0)
87 self.assertEqual(utils._ParseSigsetT("1"), set([1]))
88 self.assertEqual(utils._ParseSigsetT("1000a"), set([2, 4, 17]))
89 self.assertEqual(utils._ParseSigsetT("810002"), set([2, 17, 24, ]))
90 self.assertEqual(utils._ParseSigsetT("0000000180000202"),
91 set([2, 10, 32, 33]))
92 self.assertEqual(utils._ParseSigsetT("0000000180000002"),
93 set([2, 32, 33]))
94 self.assertEqual(utils._ParseSigsetT("0000000188000002"),
95 set([2, 28, 32, 33]))
96 self.assertEqual(utils._ParseSigsetT("000000004b813efb"),
97 set([1, 2, 4, 5, 6, 7, 8, 10, 11, 12, 13, 14, 17,
98 24, 25, 26, 28, 31]))
99 self.assertEqual(utils._ParseSigsetT("ffffff"), set(range(1, 25)))
100
101 def testGetProcStatusField(self):
102 for field in ["SigCgt", "Name", "FDSize"]:
103 for value in ["", "0", "cat", " 1234 KB"]:
104 pstatus = "\n".join([
105 "VmPeak: 999 kB",
106 "%s: %s" % (field, value),
107 "TracerPid: 0",
108 ])
109 result = utils._GetProcStatusField(pstatus, field)
110 self.assertEqual(result, value.strip())
111
112 def test(self):
113 sp = PathJoin(self.tmpdir, "status")
114
115 utils.WriteFile(sp, data="\n".join([
116 "Name: bash",
117 "State: S (sleeping)",
118 "SleepAVG: 98%",
119 "Pid: 22250",
120 "PPid: 10858",
121 "TracerPid: 0",
122 "SigBlk: 0000000000010000",
123 "SigIgn: 0000000000384004",
124 "SigCgt: 000000004b813efb",
125 "CapEff: 0000000000000000",
126 ]))
127
128 self.assert_(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
129
130 def testNoSigCgt(self):
131 sp = PathJoin(self.tmpdir, "status")
132
133 utils.WriteFile(sp, data="\n".join([
134 "Name: bash",
135 ]))
136
137 self.assertRaises(RuntimeError, utils.IsProcessHandlingSignal,
138 1234, 10, status_path=sp)
139
140 def testNoSuchFile(self):
141 sp = PathJoin(self.tmpdir, "notexist")
142
143 self.assertFalse(utils.IsProcessHandlingSignal(1234, 10, status_path=sp))
144
145 @staticmethod
146 def _TestRealProcess():
147 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
148 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
149 raise Exception("SIGUSR1 is handled when it should not be")
150
151 signal.signal(signal.SIGUSR1, lambda signum, frame: None)
152 if not utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
153 raise Exception("SIGUSR1 is not handled when it should be")
154
155 signal.signal(signal.SIGUSR1, signal.SIG_IGN)
156 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
157 raise Exception("SIGUSR1 is not handled when it should be")
158
159 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
160 if utils.IsProcessHandlingSignal(os.getpid(), signal.SIGUSR1):
161 raise Exception("SIGUSR1 is handled when it should not be")
162
163 return True
164
165 def testRealProcess(self):
166 self.assert_(utils.RunInSeparateProcess(self._TestRealProcess))
167
168
169 class TestPidFileFunctions(unittest.TestCase):
170 """Tests for WritePidFile, RemovePidFile and ReadPidFile"""
171
172 def setUp(self):
173 self.dir = tempfile.mkdtemp()
174 self.f_dpn = lambda name: os.path.join(self.dir, "%s.pid" % name)
175 utils.DaemonPidFileName = self.f_dpn
176
177 def testPidFileFunctions(self):
178 pid_file = self.f_dpn('test')
179 utils.WritePidFile('test')
180 self.failUnless(os.path.exists(pid_file),
181 "PID file should have been created")
182 read_pid = utils.ReadPidFile(pid_file)
183 self.failUnlessEqual(read_pid, os.getpid())
184 self.failUnless(utils.IsProcessAlive(read_pid))
185 self.failUnlessRaises(errors.GenericError, utils.WritePidFile, 'test')
186 utils.RemovePidFile('test')
187 self.failIf(os.path.exists(pid_file),
188 "PID file should not exist anymore")
189 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
190 "ReadPidFile should return 0 for missing pid file")
191 fh = open(pid_file, "w")
192 fh.write("blah\n")
193 fh.close()
194 self.failUnlessEqual(utils.ReadPidFile(pid_file), 0,
195 "ReadPidFile should return 0 for invalid pid file")
196 utils.RemovePidFile('test')
197 self.failIf(os.path.exists(pid_file),
198 "PID file should not exist anymore")
199
200 def testKill(self):
201 pid_file = self.f_dpn('child')
202 r_fd, w_fd = os.pipe()
203 new_pid = os.fork()
204 if new_pid == 0: #child
205 utils.WritePidFile('child')
206 os.write(w_fd, 'a')
207 signal.pause()
208 os._exit(0)
209 return
210 # else we are in the parent
211 # wait until the child has written the pid file
212 os.read(r_fd, 1)
213 read_pid = utils.ReadPidFile(pid_file)
214 self.failUnlessEqual(read_pid, new_pid)
215 self.failUnless(utils.IsProcessAlive(new_pid))
216 utils.KillProcess(new_pid, waitpid=True)
217 self.failIf(utils.IsProcessAlive(new_pid))
218 utils.RemovePidFile('child')
219 self.failUnlessRaises(errors.ProgrammerError, utils.KillProcess, 0)
220
221 def tearDown(self):
222 for name in os.listdir(self.dir):
223 os.unlink(os.path.join(self.dir, name))
224 os.rmdir(self.dir)
225
226
227 class TestRunCmd(testutils.GanetiTestCase):
228 """Testing case for the RunCmd function"""
229
230 def setUp(self):
231 testutils.GanetiTestCase.setUp(self)
232 self.magic = time.ctime() + " ganeti test"
233 self.fname = self._CreateTempFile()
234
235 def testOk(self):
236 """Test successful exit code"""
237 result = RunCmd("/bin/sh -c 'exit 0'")
238 self.assertEqual(result.exit_code, 0)
239 self.assertEqual(result.output, "")
240
241 def testFail(self):
242 """Test fail exit code"""
243 result = RunCmd("/bin/sh -c 'exit 1'")
244 self.assertEqual(result.exit_code, 1)
245 self.assertEqual(result.output, "")
246
247 def testStdout(self):
248 """Test standard output"""
249 cmd = 'echo -n "%s"' % self.magic
250 result = RunCmd("/bin/sh -c '%s'" % cmd)
251 self.assertEqual(result.stdout, self.magic)
252 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
253 self.assertEqual(result.output, "")
254 self.assertFileContent(self.fname, self.magic)
255
256 def testStderr(self):
257 """Test standard error"""
258 cmd = 'echo -n "%s"' % self.magic
259 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd)
260 self.assertEqual(result.stderr, self.magic)
261 result = RunCmd("/bin/sh -c '%s' 1>&2" % cmd, output=self.fname)
262 self.assertEqual(result.output, "")
263 self.assertFileContent(self.fname, self.magic)
264
265 def testCombined(self):
266 """Test combined output"""
267 cmd = 'echo -n "A%s"; echo -n "B%s" 1>&2' % (self.magic, self.magic)
268 expected = "A" + self.magic + "B" + self.magic
269 result = RunCmd("/bin/sh -c '%s'" % cmd)
270 self.assertEqual(result.output, expected)
271 result = RunCmd("/bin/sh -c '%s'" % cmd, output=self.fname)
272 self.assertEqual(result.output, "")
273 self.assertFileContent(self.fname, expected)
274
275 def testSignal(self):
276 """Test signal"""
277 result = RunCmd(["python", "-c", "import os; os.kill(os.getpid(), 15)"])
278 self.assertEqual(result.signal, 15)
279 self.assertEqual(result.output, "")
280
281 def testListRun(self):
282 """Test list runs"""
283 result = RunCmd(["true"])
284 self.assertEqual(result.signal, None)
285 self.assertEqual(result.exit_code, 0)
286 result = RunCmd(["/bin/sh", "-c", "exit 1"])
287 self.assertEqual(result.signal, None)
288 self.assertEqual(result.exit_code, 1)
289 result = RunCmd(["echo", "-n", self.magic])
290 self.assertEqual(result.signal, None)
291 self.assertEqual(result.exit_code, 0)
292 self.assertEqual(result.stdout, self.magic)
293
294 def testFileEmptyOutput(self):
295 """Test file output"""
296 result = RunCmd(["true"], output=self.fname)
297 self.assertEqual(result.signal, None)
298 self.assertEqual(result.exit_code, 0)
299 self.assertFileContent(self.fname, "")
300
301 def testLang(self):
302 """Test locale environment"""
303 old_env = os.environ.copy()
304 try:
305 os.environ["LANG"] = "en_US.UTF-8"
306 os.environ["LC_ALL"] = "en_US.UTF-8"
307 result = RunCmd(["locale"])
308 for line in result.output.splitlines():
309 key, value = line.split("=", 1)
310 # Ignore these variables, they're overridden by LC_ALL
311 if key == "LANG" or key == "LANGUAGE":
312 continue
313 self.failIf(value and value != "C" and value != '"C"',
314 "Variable %s is set to the invalid value '%s'" % (key, value))
315 finally:
316 os.environ = old_env
317
318 def testDefaultCwd(self):
319 """Test default working directory"""
320 self.failUnlessEqual(RunCmd(["pwd"]).stdout.strip(), "/")
321
322 def testCwd(self):
323 """Test default working directory"""
324 self.failUnlessEqual(RunCmd(["pwd"], cwd="/").stdout.strip(), "/")
325 self.failUnlessEqual(RunCmd(["pwd"], cwd="/tmp").stdout.strip(), "/tmp")
326 cwd = os.getcwd()
327 self.failUnlessEqual(RunCmd(["pwd"], cwd=cwd).stdout.strip(), cwd)
328
329 def testResetEnv(self):
330 """Test environment reset functionality"""
331 self.failUnlessEqual(RunCmd(["env"], reset_env=True).stdout.strip(), "")
332 self.failUnlessEqual(RunCmd(["env"], reset_env=True,
333 env={"FOO": "bar",}).stdout.strip(), "FOO=bar")
334
335
336 class TestRunParts(unittest.TestCase):
337 """Testing case for the RunParts function"""
338
339 def setUp(self):
340 self.rundir = tempfile.mkdtemp(prefix="ganeti-test", suffix=".tmp")
341
342 def tearDown(self):
343 shutil.rmtree(self.rundir)
344
345 def testEmpty(self):
346 """Test on an empty dir"""
347 self.failUnlessEqual(RunParts(self.rundir, reset_env=True), [])
348
349 def testSkipWrongName(self):
350 """Test that wrong files are skipped"""
351 fname = os.path.join(self.rundir, "00test.dot")
352 utils.WriteFile(fname, data="")
353 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
354 relname = os.path.basename(fname)
355 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
356 [(relname, constants.RUNPARTS_SKIP, None)])
357
358 def testSkipNonExec(self):
359 """Test that non executable files are skipped"""
360 fname = os.path.join(self.rundir, "00test")
361 utils.WriteFile(fname, data="")
362 relname = os.path.basename(fname)
363 self.failUnlessEqual(RunParts(self.rundir, reset_env=True),
364 [(relname, constants.RUNPARTS_SKIP, None)])
365
366 def testError(self):
367 """Test error on a broken executable"""
368 fname = os.path.join(self.rundir, "00test")
369 utils.WriteFile(fname, data="")
370 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
371 (relname, status, error) = RunParts(self.rundir, reset_env=True)[0]
372 self.failUnlessEqual(relname, os.path.basename(fname))
373 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
374 self.failUnless(error)
375
376 def testSorted(self):
377 """Test executions are sorted"""
378 files = []
379 files.append(os.path.join(self.rundir, "64test"))
380 files.append(os.path.join(self.rundir, "00test"))
381 files.append(os.path.join(self.rundir, "42test"))
382
383 for fname in files:
384 utils.WriteFile(fname, data="")
385
386 results = RunParts(self.rundir, reset_env=True)
387
388 for fname in sorted(files):
389 self.failUnlessEqual(os.path.basename(fname), results.pop(0)[0])
390
391 def testOk(self):
392 """Test correct execution"""
393 fname = os.path.join(self.rundir, "00test")
394 utils.WriteFile(fname, data="#!/bin/sh\n\necho -n ciao")
395 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
396 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
397 self.failUnlessEqual(relname, os.path.basename(fname))
398 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
399 self.failUnlessEqual(runresult.stdout, "ciao")
400
401 def testRunFail(self):
402 """Test correct execution, with run failure"""
403 fname = os.path.join(self.rundir, "00test")
404 utils.WriteFile(fname, data="#!/bin/sh\n\nexit 1")
405 os.chmod(fname, stat.S_IREAD | stat.S_IEXEC)
406 (relname, status, runresult) = RunParts(self.rundir, reset_env=True)[0]
407 self.failUnlessEqual(relname, os.path.basename(fname))
408 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
409 self.failUnlessEqual(runresult.exit_code, 1)
410 self.failUnless(runresult.failed)
411
412 def testRunMix(self):
413 files = []
414 files.append(os.path.join(self.rundir, "00test"))
415 files.append(os.path.join(self.rundir, "42test"))
416 files.append(os.path.join(self.rundir, "64test"))
417 files.append(os.path.join(self.rundir, "99test"))
418
419 files.sort()
420
421 # 1st has errors in execution
422 utils.WriteFile(files[0], data="#!/bin/sh\n\nexit 1")
423 os.chmod(files[0], stat.S_IREAD | stat.S_IEXEC)
424
425 # 2nd is skipped
426 utils.WriteFile(files[1], data="")
427
428 # 3rd cannot execute properly
429 utils.WriteFile(files[2], data="")
430 os.chmod(files[2], stat.S_IREAD | stat.S_IEXEC)
431
432 # 4th execs
433 utils.WriteFile(files[3], data="#!/bin/sh\n\necho -n ciao")
434 os.chmod(files[3], stat.S_IREAD | stat.S_IEXEC)
435
436 results = RunParts(self.rundir, reset_env=True)
437
438 (relname, status, runresult) = results[0]
439 self.failUnlessEqual(relname, os.path.basename(files[0]))
440 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
441 self.failUnlessEqual(runresult.exit_code, 1)
442 self.failUnless(runresult.failed)
443
444 (relname, status, runresult) = results[1]
445 self.failUnlessEqual(relname, os.path.basename(files[1]))
446 self.failUnlessEqual(status, constants.RUNPARTS_SKIP)
447 self.failUnlessEqual(runresult, None)
448
449 (relname, status, runresult) = results[2]
450 self.failUnlessEqual(relname, os.path.basename(files[2]))
451 self.failUnlessEqual(status, constants.RUNPARTS_ERR)
452 self.failUnless(runresult)
453
454 (relname, status, runresult) = results[3]
455 self.failUnlessEqual(relname, os.path.basename(files[3]))
456 self.failUnlessEqual(status, constants.RUNPARTS_RUN)
457 self.failUnlessEqual(runresult.output, "ciao")
458 self.failUnlessEqual(runresult.exit_code, 0)
459 self.failUnless(not runresult.failed)
460
461
462 class TestStartDaemon(testutils.GanetiTestCase):
463 def setUp(self):
464 self.tmpdir = tempfile.mkdtemp(prefix="ganeti-test")
465 self.tmpfile = os.path.join(self.tmpdir, "test")
466
467 def tearDown(self):
468 shutil.rmtree(self.tmpdir)
469
470 def testShell(self):
471 utils.StartDaemon("echo Hello World > %s" % self.tmpfile)
472 self._wait(self.tmpfile, 60.0, "Hello World")
473
474 def testShellOutput(self):
475 utils.StartDaemon("echo Hello World", output=self.tmpfile)
476 self._wait(self.tmpfile, 60.0, "Hello World")
477
478 def testNoShellNoOutput(self):
479 utils.StartDaemon(["pwd"])
480
481 def testNoShellNoOutputTouch(self):
482 testfile = os.path.join(self.tmpdir, "check")
483 self.failIf(os.path.exists(testfile))
484 utils.StartDaemon(["touch", testfile])
485 self._wait(testfile, 60.0, "")
486
487 def testNoShellOutput(self):
488 utils.StartDaemon(["pwd"], output=self.tmpfile)
489 self._wait(self.tmpfile, 60.0, "/")
490
491 def testNoShellOutputCwd(self):
492 utils.StartDaemon(["pwd"], output=self.tmpfile, cwd=os.getcwd())
493 self._wait(self.tmpfile, 60.0, os.getcwd())
494
495 def testShellEnv(self):
496 utils.StartDaemon("echo \"$GNT_TEST_VAR\"", output=self.tmpfile,
497 env={ "GNT_TEST_VAR": "Hello World", })
498 self._wait(self.tmpfile, 60.0, "Hello World")
499
500 def testNoShellEnv(self):
501 utils.StartDaemon(["printenv", "GNT_TEST_VAR"], output=self.tmpfile,
502 env={ "GNT_TEST_VAR": "Hello World", })
503 self._wait(self.tmpfile, 60.0, "Hello World")
504
505 def testOutputFd(self):
506 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
507 try:
508 utils.StartDaemon(["pwd"], output_fd=fd, cwd=os.getcwd())
509 finally:
510 os.close(fd)
511 self._wait(self.tmpfile, 60.0, os.getcwd())
512
513 def testPid(self):
514 pid = utils.StartDaemon("echo $$ > %s" % self.tmpfile)
515 self._wait(self.tmpfile, 60.0, str(pid))
516
517 def testPidFile(self):
518 pidfile = os.path.join(self.tmpdir, "pid")
519 checkfile = os.path.join(self.tmpdir, "abort")
520
521 pid = utils.StartDaemon("while sleep 5; do :; done", pidfile=pidfile,
522 output=self.tmpfile)
523 try:
524 fd = os.open(pidfile, os.O_RDONLY)
525 try:
526 # Check file is locked
527 self.assertRaises(errors.LockError, utils.LockFile, fd)
528
529 pidtext = os.read(fd, 100)
530 finally:
531 os.close(fd)
532
533 self.assertEqual(int(pidtext.strip()), pid)
534
535 self.assert_(utils.IsProcessAlive(pid))
536 finally:
537 # No matter what happens, kill daemon
538 utils.KillProcess(pid, timeout=5.0, waitpid=False)
539 self.failIf(utils.IsProcessAlive(pid))
540
541 self.assertEqual(utils.ReadFile(self.tmpfile), "")
542
543 def _wait(self, path, timeout, expected):
544 # Due to the asynchronous nature of daemon processes, polling is necessary.
545 # A timeout makes sure the test doesn't hang forever.
546 def _CheckFile():
547 if not (os.path.isfile(path) and
548 utils.ReadFile(path).strip() == expected):
549 raise utils.RetryAgain()
550
551 try:
552 utils.Retry(_CheckFile, (0.01, 1.5, 1.0), timeout)
553 except utils.RetryTimeout:
554 self.fail("Apparently the daemon didn't run in %s seconds and/or"
555 " didn't write the correct output" % timeout)
556
557 def testError(self):
558 self.assertRaises(errors.OpExecError, utils.StartDaemon,
559 ["./does-NOT-EXIST/here/0123456789"])
560 self.assertRaises(errors.OpExecError, utils.StartDaemon,
561 ["./does-NOT-EXIST/here/0123456789"],
562 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
563 self.assertRaises(errors.OpExecError, utils.StartDaemon,
564 ["./does-NOT-EXIST/here/0123456789"],
565 cwd=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
566 self.assertRaises(errors.OpExecError, utils.StartDaemon,
567 ["./does-NOT-EXIST/here/0123456789"],
568 output=os.path.join(self.tmpdir, "DIR/NOT/EXIST"))
569
570 fd = os.open(self.tmpfile, os.O_WRONLY | os.O_CREAT)
571 try:
572 self.assertRaises(errors.ProgrammerError, utils.StartDaemon,
573 ["./does-NOT-EXIST/here/0123456789"],
574 output=self.tmpfile, output_fd=fd)
575 finally:
576 os.close(fd)
577
578
579 class TestSetCloseOnExecFlag(unittest.TestCase):
580 """Tests for SetCloseOnExecFlag"""
581
582 def setUp(self):
583 self.tmpfile = tempfile.TemporaryFile()
584
585 def testEnable(self):
586 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), True)
587 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
588 fcntl.FD_CLOEXEC)
589
590 def testDisable(self):
591 utils.SetCloseOnExecFlag(self.tmpfile.fileno(), False)
592 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFD) &
593 fcntl.FD_CLOEXEC)
594
595
596 class TestSetNonblockFlag(unittest.TestCase):
597 def setUp(self):
598 self.tmpfile = tempfile.TemporaryFile()
599
600 def testEnable(self):
601 utils.SetNonblockFlag(self.tmpfile.fileno(), True)
602 self.failUnless(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
603 os.O_NONBLOCK)
604
605 def testDisable(self):
606 utils.SetNonblockFlag(self.tmpfile.fileno(), False)
607 self.failIf(fcntl.fcntl(self.tmpfile.fileno(), fcntl.F_GETFL) &
608 os.O_NONBLOCK)
609
610
611 class TestRemoveFile(unittest.TestCase):
612 """Test case for the RemoveFile function"""
613
614 def setUp(self):
615 """Create a temp dir and file for each case"""
616 self.tmpdir = tempfile.mkdtemp('', 'ganeti-unittest-')
617 fd, self.tmpfile = tempfile.mkstemp('', '', self.tmpdir)
618 os.close(fd)
619
620 def tearDown(self):
621 if os.path.exists(self.tmpfile):
622 os.unlink(self.tmpfile)
623 os.rmdir(self.tmpdir)
624
625 def testIgnoreDirs(self):
626 """Test that RemoveFile() ignores directories"""
627 self.assertEqual(None, RemoveFile(self.tmpdir))
628
629 def testIgnoreNotExisting(self):
630 """Test that RemoveFile() ignores non-existing files"""
631 RemoveFile(self.tmpfile)
632 RemoveFile(self.tmpfile)
633
634 def testRemoveFile(self):
635 """Test that RemoveFile does remove a file"""
636 RemoveFile(self.tmpfile)
637 if os.path.exists(self.tmpfile):
638 self.fail("File '%s' not removed" % self.tmpfile)
639
640 def testRemoveSymlink(self):
641 """Test that RemoveFile does remove symlinks"""
642 symlink = self.tmpdir + "/symlink"
643 os.symlink("no-such-file", symlink)
644 RemoveFile(symlink)
645 if os.path.exists(symlink):
646 self.fail("File '%s' not removed" % symlink)
647 os.symlink(self.tmpfile, symlink)
648 RemoveFile(symlink)
649 if os.path.exists(symlink):
650 self.fail("File '%s' not removed" % symlink)
651
652
653 class TestRename(unittest.TestCase):
654 """Test case for RenameFile"""
655
656 def setUp(self):
657 """Create a temporary directory"""
658 self.tmpdir = tempfile.mkdtemp()
659 self.tmpfile = os.path.join(self.tmpdir, "test1")
660
661 # Touch the file
662 open(self.tmpfile, "w").close()
663
664 def tearDown(self):
665 """Remove temporary directory"""
666 shutil.rmtree(self.tmpdir)
667
668 def testSimpleRename1(self):
669 """Simple rename 1"""
670 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"))
671 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
672
673 def testSimpleRename2(self):
674 """Simple rename 2"""
675 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "xyz"),
676 mkdir=True)
677 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "xyz")))
678
679 def testRenameMkdir(self):
680 """Rename with mkdir"""
681 utils.RenameFile(self.tmpfile, os.path.join(self.tmpdir, "test/xyz"),
682 mkdir=True)
683 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
684 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/xyz")))
685
686 utils.RenameFile(os.path.join(self.tmpdir, "test/xyz"),
687 os.path.join(self.tmpdir, "test/foo/bar/baz"),
688 mkdir=True)
689 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test")))
690 self.assert_(os.path.isdir(os.path.join(self.tmpdir, "test/foo/bar")))
691 self.assert_(os.path.isfile(os.path.join(self.tmpdir, "test/foo/bar/baz")))
692
693
694 class TestMatchNameComponent(unittest.TestCase):
695 """Test case for the MatchNameComponent function"""
696
697 def testEmptyList(self):
698 """Test that there is no match against an empty list"""
699
700 self.failUnlessEqual(MatchNameComponent("", []), None)
701 self.failUnlessEqual(MatchNameComponent("test", []), None)
702
703 def testSingleMatch(self):
704 """Test that a single match is performed correctly"""
705 mlist = ["test1.example.com", "test2.example.com", "test3.example.com"]
706 for key in "test2", "test2.example", "test2.example.com":
707 self.failUnlessEqual(MatchNameComponent(key, mlist), mlist[1])
708
709 def testMultipleMatches(self):
710 """Test that a multiple match is returned as None"""
711 mlist = ["test1.example.com", "test1.example.org", "test1.example.net"]
712 for key in "test1", "test1.example":
713 self.failUnlessEqual(MatchNameComponent(key, mlist), None)
714
715 def testFullMatch(self):
716 """Test that a full match is returned correctly"""
717 key1 = "test1"
718 key2 = "test1.example"
719 mlist = [key2, key2 + ".com"]
720 self.failUnlessEqual(MatchNameComponent(key1, mlist), None)
721 self.failUnlessEqual(MatchNameComponent(key2, mlist), key2)
722
723 def testCaseInsensitivePartialMatch(self):
724 """Test for the case_insensitive keyword"""
725 mlist = ["test1.example.com", "test2.example.net"]
726 self.assertEqual(MatchNameComponent("test2", mlist, case_sensitive=False),
727 "test2.example.net")
728 self.assertEqual(MatchNameComponent("Test2", mlist, case_sensitive=False),
729 "test2.example.net")
730 self.assertEqual(MatchNameComponent("teSt2", mlist, case_sensitive=False),
731 "test2.example.net")
732 self.assertEqual(MatchNameComponent("TeSt2", mlist, case_sensitive=False),
733 "test2.example.net")
734
735
736 def testCaseInsensitiveFullMatch(self):
737 mlist = ["ts1.ex", "ts1.ex.org", "ts2.ex", "Ts2.ex"]
738 # Between the two ts1 a full string match non-case insensitive should work
739 self.assertEqual(MatchNameComponent("Ts1", mlist, case_sensitive=False),
740 None)
741 self.assertEqual(MatchNameComponent("Ts1.ex", mlist, case_sensitive=False),
742 "ts1.ex")
743 self.assertEqual(MatchNameComponent("ts1.ex", mlist, case_sensitive=False),
744 "ts1.ex")
745 # Between the two ts2 only case differs, so only case-match works
746 self.assertEqual(MatchNameComponent("ts2.ex", mlist, case_sensitive=False),
747 "ts2.ex")
748 self.assertEqual(MatchNameComponent("Ts2.ex", mlist, case_sensitive=False),
749 "Ts2.ex")
750 self.assertEqual(MatchNameComponent("TS2.ex", mlist, case_sensitive=False),
751 None)
752
753
754 class TestReadFile(testutils.GanetiTestCase):
755
756 def testReadAll(self):
757 data = utils.ReadFile(self._TestDataFilename("cert1.pem"))
758 self.assertEqual(len(data), 814)
759
760 h = compat.md5_hash()
761 h.update(data)
762 self.assertEqual(h.hexdigest(), "a491efb3efe56a0535f924d5f8680fd4")
763
764 def testReadSize(self):
765 data = utils.ReadFile(self._TestDataFilename("cert1.pem"),
766 size=100)
767 self.assertEqual(len(data), 100)
768
769 h = compat.md5_hash()
770 h.update(data)
771 self.assertEqual(h.hexdigest(), "893772354e4e690b9efd073eed433ce7")
772
773 def testError(self):
774 self.assertRaises(EnvironmentError, utils.ReadFile,
775 "/dev/null/does-not-exist")
776
777
778 class TestReadOneLineFile(testutils.GanetiTestCase):
779
780 def setUp(self):
781 testutils.GanetiTestCase.setUp(self)
782
783 def testDefault(self):
784 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"))
785 self.assertEqual(len(data), 27)
786 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
787
788 def testNotStrict(self):
789 data = ReadOneLineFile(self._TestDataFilename("cert1.pem"), strict=False)
790 self.assertEqual(len(data), 27)
791 self.assertEqual(data, "-----BEGIN CERTIFICATE-----")
792
793 def testStrictFailure(self):
794 self.assertRaises(errors.GenericError, ReadOneLineFile,
795 self._TestDataFilename("cert1.pem"), strict=True)
796
797 def testLongLine(self):
798 dummydata = (1024 * "Hello World! ")
799 myfile = self._CreateTempFile()
800 utils.WriteFile(myfile, data=dummydata)
801 datastrict = ReadOneLineFile(myfile, strict=True)
802 datalax = ReadOneLineFile(myfile, strict=False)
803 self.assertEqual(dummydata, datastrict)
804 self.assertEqual(dummydata, datalax)
805
806 def testNewline(self):
807 myfile = self._CreateTempFile()
808 myline = "myline"
809 for nl in ["", "\n", "\r\n"]:
810 dummydata = "%s%s" % (myline, nl)
811 utils.WriteFile(myfile, data=dummydata)
812 datalax = ReadOneLineFile(myfile, strict=False)
813 self.assertEqual(myline, datalax)
814 datastrict = ReadOneLineFile(myfile, strict=True)
815 self.assertEqual(myline, datastrict)
816
817 def testWhitespaceAndMultipleLines(self):
818 myfile = self._CreateTempFile()
819 for nl in ["", "\n", "\r\n"]:
820 for ws in [" ", "\t", "\t\t \t", "\t "]:
821 dummydata = (1024 * ("Foo bar baz %s%s" % (ws, nl)))
822 utils.WriteFile(myfile, data=dummydata)
823 datalax = ReadOneLineFile(myfile, strict=False)
824 if nl:
825 self.assert_(set("\r\n") & set(dummydata))
826 self.assertRaises(errors.GenericError, ReadOneLineFile,
827 myfile, strict=True)
828 explen = len("Foo bar baz ") + len(ws)
829 self.assertEqual(len(datalax), explen)
830 self.assertEqual(datalax, dummydata[:explen])
831 self.assertFalse(set("\r\n") & set(datalax))
832 else:
833 datastrict = ReadOneLineFile(myfile, strict=True)
834 self.assertEqual(dummydata, datastrict)
835 self.assertEqual(dummydata, datalax)
836
837 def testEmptylines(self):
838 myfile = self._CreateTempFile()
839 myline = "myline"
840 for nl in ["\n", "\r\n"]:
841 for ol in ["", "otherline"]:
842 dummydata = "%s%s%s%s%s%s" % (nl, nl, myline, nl, ol, nl)
843 utils.WriteFile(myfile, data=dummydata)
844 self.assert_(set("\r\n") & set(dummydata))
845 datalax = ReadOneLineFile(myfile, strict=False)
846 self.assertEqual(myline, datalax)
847 if ol:
848 self.assertRaises(errors.GenericError, ReadOneLineFile,
849 myfile, strict=True)
850 else:
851 datastrict = ReadOneLineFile(myfile, strict=True)
852 self.assertEqual(myline, datastrict)
853
854
855 class TestTimestampForFilename(unittest.TestCase):
856 def test(self):
857 self.assert_("." not in utils.TimestampForFilename())
858 self.assert_(":" not in utils.TimestampForFilename())
859
860
861 class TestCreateBackup(testutils.GanetiTestCase):
862 def setUp(self):
863 testutils.GanetiTestCase.setUp(self)
864
865 self.tmpdir = tempfile.mkdtemp()
866
867 def tearDown(self):
868 testutils.GanetiTestCase.tearDown(self)
869
870 shutil.rmtree(self.tmpdir)
871
872 def testEmpty(self):
873 filename = PathJoin(self.tmpdir, "config.data")
874 utils.WriteFile(filename, data="")
875 bname = utils.CreateBackup(filename)
876 self.assertFileContent(bname, "")
877 self.assertEqual(len(glob.glob("%s*" % filename)), 2)
878 utils.CreateBackup(filename)
879 self.assertEqual(len(glob.glob("%s*" % filename)), 3)
880 utils.CreateBackup(filename)
881 self.assertEqual(len(glob.glob("%s*" % filename)), 4)
882
883 fifoname = PathJoin(self.tmpdir, "fifo")
884 os.mkfifo(fifoname)
885 self.assertRaises(errors.ProgrammerError, utils.CreateBackup, fifoname)
886
887 def testContent(self):
888 bkpcount = 0
889 for data in ["", "X", "Hello World!\n" * 100, "Binary data\0\x01\x02\n"]:
890 for rep in [1, 2, 10, 127]:
891 testdata = data * rep
892
893 filename = PathJoin(self.tmpdir, "test.data_")
894 utils.WriteFile(filename, data=testdata)
895 self.assertFileContent(filename, testdata)
896
897 for _ in range(3):
898 bname = utils.CreateBackup(filename)
899 bkpcount += 1
900 self.assertFileContent(bname, testdata)
901 self.assertEqual(len(glob.glob("%s*" % filename)), 1 + bkpcount)
902
903
904 class TestFormatUnit(unittest.TestCase):
905 """Test case for the FormatUnit function"""
906
907 def testMiB(self):
908 self.assertEqual(FormatUnit(1, 'h'), '1M')
909 self.assertEqual(FormatUnit(100, 'h'), '100M')
910 self.assertEqual(FormatUnit(1023, 'h'), '1023M')
911
912 self.assertEqual(FormatUnit(1, 'm'), '1')
913 self.assertEqual(FormatUnit(100, 'm'), '100')
914 self.assertEqual(FormatUnit(1023, 'm'), '1023')
915
916 self.assertEqual(FormatUnit(1024, 'm'), '1024')
917 self.assertEqual(FormatUnit(1536, 'm'), '1536')
918 self.assertEqual(FormatUnit(17133, 'm'), '17133')
919 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'm'), '1048575')
920
921 def testGiB(self):
922 self.assertEqual(FormatUnit(1024, 'h'), '1.0G')
923 self.assertEqual(FormatUnit(1536, 'h'), '1.5G')
924 self.assertEqual(FormatUnit(17133, 'h'), '16.7G')
925 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'h'), '1024.0G')
926
927 self.assertEqual(FormatUnit(1024, 'g'), '1.0')
928 self.assertEqual(FormatUnit(1536, 'g'), '1.5')
929 self.assertEqual(FormatUnit(17133, 'g'), '16.7')
930 self.assertEqual(FormatUnit(1024 * 1024 - 1, 'g'), '1024.0')
931
932 self.assertEqual(FormatUnit(1024 * 1024, 'g'), '1024.0')
933 self.assertEqual(FormatUnit(5120 * 1024, 'g'), '5120.0')
934 self.assertEqual(FormatUnit(29829 * 1024, 'g'), '29829.0')
935
936 def testTiB(self):
937 self.assertEqual(FormatUnit(1024 * 1024, 'h'), '1.0T')
938 self.assertEqual(FormatUnit(5120 * 1024, 'h'), '5.0T')
939 self.assertEqual(FormatUnit(29829 * 1024, 'h'), '29.1T')
940
941 self.assertEqual(FormatUnit(1024 * 1024, 't'), '1.0')
942 self.assertEqual(FormatUnit(5120 * 1024, 't'), '5.0')
943 self.assertEqual(FormatUnit(29829 * 1024, 't'), '29.1')
944
945
946 class TestParseUnit(unittest.TestCase):
947 """Test case for the ParseUnit function"""
948
949 SCALES = (('', 1),
950 ('M', 1), ('G', 1024), ('T', 1024 * 1024),
951 ('MB', 1), ('GB', 1024), ('TB', 1024 * 1024),
952 ('MiB', 1), ('GiB', 1024), ('TiB', 1024 * 1024))
953
954 def testRounding(self):
955 self.assertEqual(ParseUnit('0'), 0)
956 self.assertEqual(ParseUnit('1'), 4)
957 self.assertEqual(ParseUnit('2'), 4)
958 self.assertEqual(ParseUnit('3'), 4)
959
960 self.assertEqual(ParseUnit('124'), 124)
961 self.assertEqual(ParseUnit('125'), 128)
962 self.assertEqual(ParseUnit('126'), 128)
963 self.assertEqual(ParseUnit('127'), 128)
964 self.assertEqual(ParseUnit('128'), 128)
965 self.assertEqual(ParseUnit('129'), 132)
966 self.assertEqual(ParseUnit('130'), 132)
967
968 def testFloating(self):
969 self.assertEqual(ParseUnit('0'), 0)
970 self.assertEqual(ParseUnit('0.5'), 4)
971 self.assertEqual(ParseUnit('1.75'), 4)
972 self.assertEqual(ParseUnit('1.99'), 4)
973 self.assertEqual(ParseUnit('2.00'), 4)
974 self.assertEqual(ParseUnit('2.01'), 4)
975 self.assertEqual(ParseUnit('3.99'), 4)
976 self.assertEqual(ParseUnit('4.00'), 4)
977 self.assertEqual(ParseUnit('4.01'), 8)
978 self.assertEqual(ParseUnit('1.5G'), 1536)
979 self.assertEqual(ParseUnit('1.8G'), 1844)
980 self.assertEqual(ParseUnit('8.28T'), 8682212)
981
982 def testSuffixes(self):
983 for sep in ('', ' ', ' ', "\t", "\t "):
984 for suffix, scale in TestParseUnit.SCALES:
985 for func in (lambda x: x, str.lower, str.upper):
986 self.assertEqual(ParseUnit('1024' + sep + func(suffix)),
987 1024 * scale)
988
989 def testInvalidInput(self):
990 for sep in ('-', '_', ',', 'a'):
991 for suffix, _ in TestParseUnit.SCALES:
992 self.assertRaises(errors.UnitParseError, ParseUnit, '1' + sep + suffix)
993
994 for suffix, _ in TestParseUnit.SCALES:
995 self.assertRaises(errors.UnitParseError, ParseUnit, '1,3' + suffix)
996
997
998 class TestSshKeys(testutils.GanetiTestCase):
999 """Test case for the AddAuthorizedKey function"""
1000
1001 KEY_A = 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a'
1002 KEY_B = ('command="/usr/bin/fooserver -t --verbose",from="1.2.3.4" '
1003 'ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b')
1004
1005 def setUp(self):
1006 testutils.GanetiTestCase.setUp(self)
1007 self.tmpname = self._CreateTempFile()
1008 handle = open(self.tmpname, 'w')
1009 try:
1010 handle.write("%s\n" % TestSshKeys.KEY_A)
1011 handle.write("%s\n" % TestSshKeys.KEY_B)
1012 finally:
1013 handle.close()
1014
1015 def testAddingNewKey(self):
1016 utils.AddAuthorizedKey(self.tmpname,
1017 'ssh-dss AAAAB3NzaC1kc3MAAACB root@test')
1018
1019 self.assertFileContent(self.tmpname,
1020 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1021 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1022 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1023 "ssh-dss AAAAB3NzaC1kc3MAAACB root@test\n")
1024
1025 def testAddingAlmostButNotCompletelyTheSameKey(self):
1026 utils.AddAuthorizedKey(self.tmpname,
1027 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test')
1028
1029 self.assertFileContent(self.tmpname,
1030 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1031 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1032 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n"
1033 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@test\n")
1034
1035 def testAddingExistingKeyWithSomeMoreSpaces(self):
1036 utils.AddAuthorizedKey(self.tmpname,
1037 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1038
1039 self.assertFileContent(self.tmpname,
1040 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1041 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1042 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1043
1044 def testRemovingExistingKeyWithSomeMoreSpaces(self):
1045 utils.RemoveAuthorizedKey(self.tmpname,
1046 'ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a')
1047
1048 self.assertFileContent(self.tmpname,
1049 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1050 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1051
1052 def testRemovingNonExistingKey(self):
1053 utils.RemoveAuthorizedKey(self.tmpname,
1054 'ssh-dss AAAAB3Nsdfj230xxjxJjsjwjsjdjU root@test')
1055
1056 self.assertFileContent(self.tmpname,
1057 "ssh-dss AAAAB3NzaC1w5256closdj32mZaQU root@key-a\n"
1058 'command="/usr/bin/fooserver -t --verbose",from="1.2.3.4"'
1059 " ssh-dss AAAAB3NzaC1w520smc01ms0jfJs22 root@key-b\n")
1060
1061
1062 class TestEtcHosts(testutils.GanetiTestCase):
1063 """Test functions modifying /etc/hosts"""
1064
1065 def setUp(self):
1066 testutils.GanetiTestCase.setUp(self)
1067 self.tmpname = self._CreateTempFile()
1068 handle = open(self.tmpname, 'w')
1069 try:
1070 handle.write('# This is a test file for /etc/hosts\n')
1071 handle.write('127.0.0.1\tlocalhost\n')
1072 handle.write('192.168.1.1 router gw\n')
1073 finally:
1074 handle.close()
1075
1076 def testSettingNewIp(self):
1077 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost.domain.tld', ['myhost'])
1078
1079 self.assertFileContent(self.tmpname,
1080 "# This is a test file for /etc/hosts\n"
1081 "127.0.0.1\tlocalhost\n"
1082 "192.168.1.1 router gw\n"
1083 "1.2.3.4\tmyhost.domain.tld myhost\n")
1084 self.assertFileMode(self.tmpname, 0644)
1085
1086 def testSettingExistingIp(self):
1087 SetEtcHostsEntry(self.tmpname, '192.168.1.1', 'myhost.domain.tld',
1088 ['myhost'])
1089
1090 self.assertFileContent(self.tmpname,
1091 "# This is a test file for /etc/hosts\n"
1092 "127.0.0.1\tlocalhost\n"
1093 "192.168.1.1\tmyhost.domain.tld myhost\n")
1094 self.assertFileMode(self.tmpname, 0644)
1095
1096 def testSettingDuplicateName(self):
1097 SetEtcHostsEntry(self.tmpname, '1.2.3.4', 'myhost', ['myhost'])
1098
1099 self.assertFileContent(self.tmpname,
1100 "# This is a test file for /etc/hosts\n"
1101 "127.0.0.1\tlocalhost\n"
1102 "192.168.1.1 router gw\n"
1103 "1.2.3.4\tmyhost\n")
1104 self.assertFileMode(self.tmpname, 0644)
1105
1106 def testRemovingExistingHost(self):
1107 RemoveEtcHostsEntry(self.tmpname, 'router')
1108
1109 self.assertFileContent(self.tmpname,
1110 "# This is a test file for /etc/hosts\n"
1111 "127.0.0.1\tlocalhost\n"
1112 "192.168.1.1 gw\n")
1113 self.assertFileMode(self.tmpname, 0644)
1114
1115 def testRemovingSingleExistingHost(self):
1116 RemoveEtcHostsEntry(self.tmpname, 'localhost')
1117
1118 self.assertFileContent(self.tmpname,
1119 "# This is a test file for /etc/hosts\n"
1120 "192.168.1.1 router gw\n")
1121 self.assertFileMode(self.tmpname, 0644)
1122
1123 def testRemovingNonExistingHost(self):
1124 RemoveEtcHostsEntry(self.tmpname, 'myhost')
1125
1126 self.assertFileContent(self.tmpname,
1127 "# This is a test file for /etc/hosts\n"
1128 "127.0.0.1\tlocalhost\n"
1129 "192.168.1.1 router gw\n")
1130 self.assertFileMode(self.tmpname, 0644)
1131
1132 def testRemovingAlias(self):
1133 RemoveEtcHostsEntry(self.tmpname, 'gw')
1134
1135 self.assertFileContent(self.tmpname,
1136 "# This is a test file for /etc/hosts\n"
1137 "127.0.0.1\tlocalhost\n"
1138 "192.168.1.1 router\n")
1139 self.assertFileMode(self.tmpname, 0644)
1140
1141
1142 class TestGetMounts(unittest.TestCase):
1143 """Test case for GetMounts()."""
1144
1145 TESTDATA = (
1146 "rootfs / rootfs rw 0 0\n"
1147 "none /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n"
1148 "none /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n")
1149
1150 def setUp(self):
1151 self.tmpfile = tempfile.NamedTemporaryFile()
1152 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1153
1154 def testGetMounts(self):
1155 self.assertEqual(utils.GetMounts(filename=self.tmpfile.name),
1156 [
1157 ("rootfs", "/", "rootfs", "rw"),
1158 ("none", "/sys", "sysfs", "rw,nosuid,nodev,noexec,relatime"),
1159 ("none", "/proc", "proc", "rw,nosuid,nodev,noexec,relatime"),
1160 ])
1161
1162
1163 class TestShellQuoting(unittest.TestCase):
1164 """Test case for shell quoting functions"""
1165
1166 def testShellQuote(self):
1167 self.assertEqual(ShellQuote('abc'), "abc")
1168 self.assertEqual(ShellQuote('ab"c'), "'ab\"c'")
1169 self.assertEqual(ShellQuote("a'bc"), "'a'\\''bc'")
1170 self.assertEqual(ShellQuote("a b c"), "'a b c'")
1171 self.assertEqual(ShellQuote("a b\\ c"), "'a b\\ c'")
1172
1173 def testShellQuoteArgs(self):
1174 self.assertEqual(ShellQuoteArgs(['a', 'b', 'c']), "a b c")
1175 self.assertEqual(ShellQuoteArgs(['a', 'b"', 'c']), "a 'b\"' c")
1176 self.assertEqual(ShellQuoteArgs(['a', 'b\'', 'c']), "a 'b'\\\''' c")
1177
1178
1179 class TestListVisibleFiles(unittest.TestCase):
1180 """Test case for ListVisibleFiles"""
1181
1182 def setUp(self):
1183 self.path = tempfile.mkdtemp()
1184
1185 def tearDown(self):
1186 shutil.rmtree(self.path)
1187
1188 def _CreateFiles(self, files):
1189 for name in files:
1190 utils.WriteFile(os.path.join(self.path, name), data="test")
1191
1192 def _test(self, files, expected):
1193 self._CreateFiles(files)
1194 found = ListVisibleFiles(self.path)
1195 self.assertEqual(set(found), set(expected))
1196
1197 def testAllVisible(self):
1198 files = ["a", "b", "c"]
1199 expected = files
1200 self._test(files, expected)
1201
1202 def testNoneVisible(self):
1203 files = [".a", ".b", ".c"]
1204 expected = []
1205 self._test(files, expected)
1206
1207 def testSomeVisible(self):
1208 files = ["a", "b", ".c"]
1209 expected = ["a", "b"]
1210 self._test(files, expected)
1211
1212 def testNonAbsolutePath(self):
1213 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles, "abc")
1214
1215 def testNonNormalizedPath(self):
1216 self.failUnlessRaises(errors.ProgrammerError, ListVisibleFiles,
1217 "/bin/../tmp")
1218
1219
1220 class TestNewUUID(unittest.TestCase):
1221 """Test case for NewUUID"""
1222
1223 _re_uuid = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
1224 '[a-f0-9]{4}-[a-f0-9]{12}$')
1225
1226 def runTest(self):
1227 self.failUnless(self._re_uuid.match(utils.NewUUID()))
1228
1229
1230 class TestUniqueSequence(unittest.TestCase):
1231 """Test case for UniqueSequence"""
1232
1233 def _test(self, input, expected):
1234 self.assertEqual(utils.UniqueSequence(input), expected)
1235
1236 def runTest(self):
1237 # Ordered input
1238 self._test([1, 2, 3], [1, 2, 3])
1239 self._test([1, 1, 2, 2, 3, 3], [1, 2, 3])
1240 self._test([1, 2, 2, 3], [1, 2, 3])
1241 self._test([1, 2, 3, 3], [1, 2, 3])
1242
1243 # Unordered input
1244 self._test([1, 2, 3, 1, 2, 3], [1, 2, 3])
1245 self._test([1, 1, 2, 3, 3, 1, 2], [1, 2, 3])
1246
1247 # Strings
1248 self._test(["a", "a"], ["a"])
1249 self._test(["a", "b"], ["a", "b"])
1250 self._test(["a", "b", "a"], ["a", "b"])
1251
1252
1253 class TestFirstFree(unittest.TestCase):
1254 """Test case for the FirstFree function"""
1255
1256 def test(self):
1257 """Test FirstFree"""
1258 self.failUnlessEqual(FirstFree([0, 1, 3]), 2)
1259 self.failUnlessEqual(FirstFree([]), None)
1260 self.failUnlessEqual(FirstFree([3, 4, 6]), 0)
1261 self.failUnlessEqual(FirstFree([3, 4, 6], base=3), 5)
1262 self.failUnlessRaises(AssertionError, FirstFree, [0, 3, 4, 6], base=3)
1263
1264
1265 class TestTailFile(testutils.GanetiTestCase):
1266 """Test case for the TailFile function"""
1267
1268 def testEmpty(self):
1269 fname = self._CreateTempFile()
1270 self.failUnlessEqual(TailFile(fname), [])
1271 self.failUnlessEqual(TailFile(fname, lines=25), [])
1272
1273 def testAllLines(self):
1274 data = ["test %d" % i for i in range(30)]
1275 for i in range(30):
1276 fname = self._CreateTempFile()
1277 fd = open(fname, "w")
1278 fd.write("\n".join(data[:i]))
1279 if i > 0:
1280 fd.write("\n")
1281 fd.close()
1282 self.failUnlessEqual(TailFile(fname, lines=i), data[:i])
1283
1284 def testPartialLines(self):
1285 data = ["test %d" % i for i in range(30)]
1286 fname = self._CreateTempFile()
1287 fd = open(fname, "w")
1288 fd.write("\n".join(data))
1289 fd.write("\n")
1290 fd.close()
1291 for i in range(1, 30):
1292 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1293
1294 def testBigFile(self):
1295 data = ["test %d" % i for i in range(30)]
1296 fname = self._CreateTempFile()
1297 fd = open(fname, "w")
1298 fd.write("X" * 1048576)
1299 fd.write("\n")
1300 fd.write("\n".join(data))
1301 fd.write("\n")
1302 fd.close()
1303 for i in range(1, 30):
1304 self.failUnlessEqual(TailFile(fname, lines=i), data[-i:])
1305
1306
1307 class _BaseFileLockTest:
1308 """Test case for the FileLock class"""
1309
1310 def testSharedNonblocking(self):
1311 self.lock.Shared(blocking=False)
1312 self.lock.Close()
1313
1314 def testExclusiveNonblocking(self):
1315 self.lock.Exclusive(blocking=False)
1316 self.lock.Close()
1317
1318 def testUnlockNonblocking(self):
1319 self.lock.Unlock(blocking=False)
1320 self.lock.Close()
1321
1322 def testSharedBlocking(self):
1323 self.lock.Shared(blocking=True)
1324 self.lock.Close()
1325
1326 def testExclusiveBlocking(self):
1327 self.lock.Exclusive(blocking=True)
1328 self.lock.Close()
1329
1330 def testUnlockBlocking(self):
1331 self.lock.Unlock(blocking=True)
1332 self.lock.Close()
1333
1334 def testSharedExclusiveUnlock(self):
1335 self.lock.Shared(blocking=False)
1336 self.lock.Exclusive(blocking=False)
1337 self.lock.Unlock(blocking=False)
1338 self.lock.Close()
1339
1340 def testExclusiveSharedUnlock(self):
1341 self.lock.Exclusive(blocking=False)
1342 self.lock.Shared(blocking=False)
1343 self.lock.Unlock(blocking=False)
1344 self.lock.Close()
1345
1346 def testSimpleTimeout(self):
1347 # These will succeed on the first attempt, hence a short timeout
1348 self.lock.Shared(blocking=True, timeout=10.0)
1349 self.lock.Exclusive(blocking=False, timeout=10.0)
1350 self.lock.Unlock(blocking=True, timeout=10.0)
1351 self.lock.Close()
1352
1353 @staticmethod
1354 def _TryLockInner(filename, shared, blocking):
1355 lock = utils.FileLock.Open(filename)
1356
1357 if shared:
1358 fn = lock.Shared
1359 else:
1360 fn = lock.Exclusive
1361
1362 try:
1363 # The timeout doesn't really matter as the parent process waits for us to
1364 # finish anyway.
1365 fn(blocking=blocking, timeout=0.01)
1366 except errors.LockError, err:
1367 return False
1368
1369 return True
1370
1371 def _TryLock(self, *args):
1372 return utils.RunInSeparateProcess(self._TryLockInner, self.tmpfile.name,
1373 *args)
1374
1375 def testTimeout(self):
1376 for blocking in [True, False]:
1377 self.lock.Exclusive(blocking=True)
1378 self.failIf(self._TryLock(False, blocking))
1379 self.failIf(self._TryLock(True, blocking))
1380
1381 self.lock.Shared(blocking=True)
1382 self.assert_(self._TryLock(True, blocking))
1383 self.failIf(self._TryLock(False, blocking))
1384
1385 def testCloseShared(self):
1386 self.lock.Close()
1387 self.assertRaises(AssertionError, self.lock.Shared, blocking=False)
1388
1389 def testCloseExclusive(self):
1390 self.lock.Close()
1391 self.assertRaises(AssertionError, self.lock.Exclusive, blocking=False)
1392
1393 def testCloseUnlock(self):
1394 self.lock.Close()
1395 self.assertRaises(AssertionError, self.lock.Unlock, blocking=False)
1396
1397
1398 class TestFileLockWithFilename(testutils.GanetiTestCase, _BaseFileLockTest):
1399 TESTDATA = "Hello World\n" * 10
1400
1401 def setUp(self):
1402 testutils.GanetiTestCase.setUp(self)
1403
1404 self.tmpfile = tempfile.NamedTemporaryFile()
1405 utils.WriteFile(self.tmpfile.name, data=self.TESTDATA)
1406 self.lock = utils.FileLock.Open(self.tmpfile.name)
1407
1408 # Ensure "Open" didn't truncate file
1409 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1410
1411 def tearDown(self):
1412 self.assertFileContent(self.tmpfile.name, self.TESTDATA)
1413
1414 testutils.GanetiTestCase.tearDown(self)
1415
1416
1417 class TestFileLockWithFileObject(unittest.TestCase, _BaseFileLockTest):
1418 def setUp(self):
1419 self.tmpfile = tempfile.NamedTemporaryFile()
1420 self.lock = utils.FileLock(open(self.tmpfile.name, "w"), self.tmpfile.name)
1421
1422
1423 class TestTimeFunctions(unittest.TestCase):
1424 """Test case for time functions"""
1425
1426 def runTest(self):
1427 self.assertEqual(utils.SplitTime(1), (1, 0))
1428 self.assertEqual(utils.SplitTime(1.5), (1, 500000))
1429 self.assertEqual(utils.SplitTime(1218448917.4809151), (1218448917, 480915))
1430 self.assertEqual(utils.SplitTime(123.48012), (123, 480120))
1431 self.assertEqual(utils.SplitTime(123.9996), (123, 999600))
1432 self.assertEqual(utils.SplitTime(123.9995), (123, 999500))
1433 self.assertEqual(utils.SplitTime(123.9994), (123, 999400))
1434 self.assertEqual(utils.SplitTime(123.999999999), (123, 999999))
1435
1436 self.assertRaises(AssertionError, utils.SplitTime, -1)
1437
1438 self.assertEqual(utils.MergeTime((1, 0)), 1.0)
1439 self.assertEqual(utils.MergeTime((1, 500000)), 1.5)
1440 self.assertEqual(utils.MergeTime((1218448917, 500000)), 1218448917.5)
1441
1442 self.assertEqual(round(utils.MergeTime((1218448917, 481000)), 3),
1443 1218448917.481)
1444 self.assertEqual(round(utils.MergeTime((1, 801000)), 3), 1.801)
1445
1446 self.assertRaises(AssertionError, utils.MergeTime, (0, -1))
1447 self.assertRaises(AssertionError, utils.MergeTime, (0, 1000000))
1448 self.assertRaises(AssertionError, utils.MergeTime, (0, 9999999))
1449 self.assertRaises(AssertionError, utils.MergeTime, (-1, 0))
1450 self.assertRaises(AssertionError, utils.MergeTime, (-9999, 0))
1451
1452
1453 class FieldSetTestCase(unittest.TestCase):
1454 """Test case for FieldSets"""
1455
1456 def testSimpleMatch(self):
1457 f = utils.FieldSet("a", "b", "c", "def")
1458 self.failUnless(f.Matches("a"))
1459 self.failIf(f.Matches("d"), "Substring matched")
1460 self.failIf(f.Matches("defghi"), "Prefix string matched")
1461 self.failIf(f.NonMatching(["b", "c"]))
1462 self.failIf(f.NonMatching(["a", "b", "c", "def"]))
1463 self.failUnless(f.NonMatching(["a", "d"]))
1464
1465 def testRegexMatch(self):
1466 f = utils.FieldSet("a", "b([0-9]+)", "c")
1467 self.failUnless(f.Matches("b1"))
1468 self.failUnless(f.Matches("b99"))
1469 self.failIf(f.Matches("b/1"))
1470 self.failIf(f.NonMatching(["b12", "c"]))
1471 self.failUnless(f.NonMatching(["a", "1"]))
1472
1473 class TestForceDictType(unittest.TestCase):
1474 """Test case for ForceDictType"""
1475
1476 def setUp(self):
1477 self.key_types = {
1478 'a': constants.VTYPE_INT,
1479 'b': constants.VTYPE_BOOL,
1480 'c': constants.VTYPE_STRING,
1481 'd': constants.VTYPE_SIZE,
1482 }
1483
1484 def _fdt(self, dict, allowed_values=None):
1485 if allowed_values is None:
1486 utils.ForceDictType(dict, self.key_types)
1487 else:
1488 utils.ForceDictType(dict, self.key_types, allowed_values=allowed_values)
1489
1490 return dict
1491
1492 def testSimpleDict(self):
1493 self.assertEqual(self._fdt({}), {})
1494 self.assertEqual(self._fdt({'a': 1}), {'a': 1})
1495 self.assertEqual(self._fdt({'a': '1'}), {'a': 1})
1496 self.assertEqual(self._fdt({'a': 1, 'b': 1}), {'a':1, 'b': True})
1497 self.assertEqual(self._fdt({'b': 1, 'c': 'foo'}), {'b': True, 'c': 'foo'})
1498 self.assertEqual(self._fdt({'b': 1, 'c': False}), {'b': True, 'c': ''})
1499 self.assertEqual(self._fdt({'b': 'false'}), {'b': False})
1500 self.assertEqual(self._fdt({'b': 'False'}), {'b': False})
1501 self.assertEqual(self._fdt({'b': 'true'}), {'b': True})
1502 self.assertEqual(self._fdt({'b': 'True'}), {'b': True})
1503 self.assertEqual(self._fdt({'d': '4'}), {'d': 4})
1504 self.assertEqual(self._fdt({'d': '4M'}), {'d': 4})
1505
1506 def testErrors(self):
1507 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'a': 'astring'})
1508 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'c': True})
1509 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': 'astring'})
1510 self.assertRaises(errors.TypeEnforcementError, self._fdt, {'d': '4 L'})
1511
1512
1513 class TestIsNormAbsPath(unittest.TestCase):
1514 """Testing case for IsNormAbsPath"""
1515
1516 def _pathTestHelper(self, path, result):
1517 if result:
1518 self.assert_(utils.IsNormAbsPath(path),
1519 "Path %s should result absolute and normalized" % path)
1520 else:
1521 self.assertFalse(utils.IsNormAbsPath(path),
1522 "Path %s should not result absolute and normalized" % path)
1523
1524 def testBase(self):
1525 self._pathTestHelper('/etc', True)
1526 self._pathTestHelper('/srv', True)
1527 self._pathTestHelper('etc', False)
1528 self._pathTestHelper('/etc/../root', False)
1529 self._pathTestHelper('/etc/', False)
1530
1531
1532 class TestSafeEncode(unittest.TestCase):
1533 """Test case for SafeEncode"""
1534
1535 def testAscii(self):
1536 for txt in [string.digits, string.letters, string.punctuation]:
1537 self.failUnlessEqual(txt, SafeEncode(txt))
1538
1539 def testDoubleEncode(self):
1540 for i in range(255):
1541 txt = SafeEncode(chr(i))
1542 self.failUnlessEqual(txt, SafeEncode(txt))
1543
1544 def testUnicode(self):
1545 # 1024 is high enough to catch non-direct ASCII mappings
1546 for i in range(1024):
1547 txt = SafeEncode(unichr(i))
1548 self.failUnlessEqual(txt, SafeEncode(txt))
1549
1550
1551 class TestFormatTime(unittest.TestCase):
1552 """Testing case for FormatTime"""
1553
1554 def testNone(self):
1555 self.failUnlessEqual(FormatTime(None), "N/A")
1556
1557 def testInvalid(self):
1558 self.failUnlessEqual(FormatTime(()), "N/A")
1559
1560 def testNow(self):
1561 # tests that we accept time.time input
1562 FormatTime(time.time())
1563 # tests that we accept int input
1564 FormatTime(int(time.time()))
1565
1566
1567 class RunInSeparateProcess(unittest.TestCase):
1568 def test(self):
1569 for exp in [True, False]:
1570 def _child():
1571 return exp
1572
1573 self.assertEqual(exp, utils.RunInSeparateProcess(_child))
1574
1575 def testArgs(self):
1576 for arg in [0, 1, 999, "Hello World", (1, 2, 3)]:
1577 def _child(carg1, carg2):
1578 return carg1 == "Foo" and carg2 == arg
1579
1580 self.assert_(utils.RunInSeparateProcess(_child, "Foo", arg))
1581
1582 def testPid(self):
1583 parent_pid = os.getpid()
1584
1585 def _check():
1586 return os.getpid() == parent_pid
1587
1588 self.failIf(utils.RunInSeparateProcess(_check))
1589
1590 def testSignal(self):
1591 def _kill():
1592 os.kill(os.getpid(), signal.SIGTERM)
1593
1594 self.assertRaises(errors.GenericError,
1595 utils.RunInSeparateProcess, _kill)
1596
1597 def testException(self):
1598 def _exc():
1599 raise errors.GenericError("This is a test")
1600
1601 self.assertRaises(errors.GenericError,
1602 utils.RunInSeparateProcess, _exc)
1603
1604
1605 class TestFingerprintFile(unittest.TestCase):
1606 def setUp(self):
1607 self.tmpfile = tempfile.NamedTemporaryFile()
1608
1609 def test(self):
1610 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1611 "da39a3ee5e6b4b0d3255bfef95601890afd80709")
1612
1613 utils.WriteFile(self.tmpfile.name, data="Hello World\n")
1614 self.assertEqual(utils._FingerprintFile(self.tmpfile.name),
1615 "648a6a6ffffdaa0badb23b8baf90b6168dd16b3a")
1616
1617
1618 class TestUnescapeAndSplit(unittest.TestCase):
1619 """Testing case for UnescapeAndSplit"""
1620
1621 def setUp(self):
1622 # testing more that one separator for regexp safety
1623 self._seps = [",", "+", "."]
1624
1625 def testSimple(self):
1626 a = ["a", "b", "c", "d"]
1627 for sep in self._seps:
1628 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), a)
1629
1630 def testEscape(self):
1631 for sep in self._seps:
1632 a = ["a", "b\\" + sep + "c", "d"]
1633 b = ["a", "b" + sep + "c", "d"]
1634 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1635
1636 def testDoubleEscape(self):
1637 for sep in self._seps:
1638 a = ["a", "b\\\\", "c", "d"]
1639 b = ["a", "b\\", "c", "d"]
1640 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1641
1642 def testThreeEscape(self):
1643 for sep in self._seps:
1644 a = ["a", "b\\\\\\" + sep + "c", "d"]
1645 b = ["a", "b\\" + sep + "c", "d"]
1646 self.failUnlessEqual(UnescapeAndSplit(sep.join(a), sep=sep), b)
1647
1648
1649 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
1650 def setUp(self):
1651 self.tmpdir = tempfile.mkdtemp()
1652
1653 def tearDown(self):
1654 shutil.rmtree(self.tmpdir)
1655
1656 def _checkRsaPrivateKey(self, key):
1657 lines = key.splitlines()
1658 return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
1659 "-----END RSA PRIVATE KEY-----" in lines)
1660
1661 def _checkCertificate(self, cert):
1662 lines = cert.splitlines()
1663 return ("-----BEGIN CERTIFICATE-----" in lines and
1664 "-----END CERTIFICATE-----" in lines)
1665
1666 def test(self):
1667 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
1668 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
1669 self._checkRsaPrivateKey(key_pem)
1670 self._checkCertificate(cert_pem)
1671
1672 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
1673 key_pem)
1674 self.assert_(key.bits() >= 1024)
1675 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
1676 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
1677
1678 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1679 cert_pem)
1680 self.failIf(x509.has_expired())
1681 self.assertEqual(x509.get_issuer().CN, common_name)
1682 self.assertEqual(x509.get_subject().CN, common_name)
1683 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
1684
1685 def testLegacy(self):
1686 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
1687
1688 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
1689
1690 cert1 = utils.ReadFile(cert1_filename)
1691
1692 self.assert_(self._checkRsaPrivateKey(cert1))
1693 self.assert_(self._checkCertificate(cert1))
1694
1695
1696 class TestPathJoin(unittest.TestCase):
1697 """Testing case for PathJoin"""
1698
1699 def testBasicItems(self):
1700 mlist = ["/a", "b", "c"]
1701 self.failUnlessEqual(PathJoin(*mlist), "/".join(mlist))
1702
1703 def testNonAbsPrefix(self):
1704 self.failUnlessRaises(ValueError, PathJoin, "a", "b")
1705
1706 def testBackTrack(self):
1707 self.failUnlessRaises(ValueError, PathJoin, "/a", "b/../c")
1708
1709 def testMultiAbs(self):
1710 self.failUnlessRaises(ValueError, PathJoin, "/a", "/b")
1711
1712
1713 class TestValidateServiceName(unittest.TestCase):
1714 def testValid(self):
1715 testnames = [
1716 0, 1, 2, 3, 1024, 65000, 65534, 65535,
1717 "ganeti",
1718 "gnt-masterd",
1719 "HELLO_WORLD_SVC",
1720 "hello.world.1",
1721 "0", "80", "1111", "65535",
1722 ]
1723
1724 for name in testnames:
1725 self.assertEqual(utils.ValidateServiceName(name), name)
1726
1727 def testInvalid(self):
1728 testnames = [
1729 -15756, -1, 65536, 133428083,
1730 "", "Hello World!", "!", "'", "\"", "\t", "\n", "`",
1731 "-8546", "-1", "65536",
1732 (129 * "A"),
1733 ]
1734
1735 for name in testnames:
1736 self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
1737
1738
1739 class TestParseAsn1Generalizedtime(unittest.TestCase):
1740 def test(self):
1741 # UTC
1742 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
1743 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1744 1266860512)
1745 self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
1746 (2**31) - 1)
1747
1748 # With offset
1749 self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1750 1266860512)
1751 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1752 1266931012)
1753 self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1754 1266931088)
1755 self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1756 1266931295)
1757 self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
1758 3600)
1759
1760 # Leap seconds are not supported by datetime.datetime
1761 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1762 "19841231235960+0000")
1763 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1764 "19920630235960+0000")
1765
1766 # Errors
1767 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
1768 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
1769 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1770 "20100222174152")
1771 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1772 "Mon Feb 22 17:47:02 UTC 2010")
1773 self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
1774 "2010-02-22 17:42:02")
1775
1776
1777 class TestGetX509CertValidity(testutils.GanetiTestCase):
1778 def setUp(self):
1779 testutils.GanetiTestCase.setUp(self)
1780
1781 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
1782
1783 # Test whether we have pyOpenSSL 0.7 or above
1784 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
1785
1786 if not self.pyopenssl0_7:
1787 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
1788 " function correctly")
1789
1790 def _LoadCert(self, name):
1791 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1792 self._ReadTestData(name))
1793
1794 def test(self):
1795 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
1796 if self.pyopenssl0_7:
1797 self.assertEqual(validity, (1266919967, 1267524767))
1798 else:
1799 self.assertEqual(validity, (None, None))
1800
1801
1802 class TestSignX509Certificate(unittest.TestCase):
1803 KEY = "My private key!"
1804 KEY_OTHER = "Another key"
1805
1806 def test(self):
1807 # Generate certificate valid for 5 minutes
1808 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
1809
1810 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1811 cert_pem)
1812
1813 # No signature at all
1814 self.assertRaises(errors.GenericError,
1815 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
1816
1817 # Invalid input
1818 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1819 "", self.KEY)
1820 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1821 "X-Ganeti-Signature: \n", self.KEY)
1822 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1823 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
1824 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1825 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1826 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1827 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1828
1829 # Invalid salt
1830 for salt in list("-_@$,:;/\\ \t\n"):
1831 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1832 cert_pem, self.KEY, "foo%sbar" % salt)
1833
1834 for salt in ["HelloWorld", "salt", string.letters, string.digits,
1835 utils.GenerateSecret(numbytes=4),
1836 utils.GenerateSecret(numbytes=16),
1837 "{123:456}".encode("hex")]:
1838 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1839
1840 self._Check(cert, salt, signed_pem)
1841
1842 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1843 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1844 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1845 "lines----\n------ at\nthe end!"))
1846
1847 def _Check(self, cert, salt, pem):
1848 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1849 self.assertEqual(salt, salt2)
1850 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1851
1852 # Other key
1853 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1854 pem, self.KEY_OTHER)
1855
1856
1857 class TestMakedirs(unittest.TestCase):
1858 def setUp(self):
1859 self.tmpdir = tempfile.mkdtemp()
1860
1861 def tearDown(self):
1862 shutil.rmtree(self.tmpdir)
1863
1864 def testNonExisting(self):
1865 path = PathJoin(self.tmpdir, "foo")
1866 utils.Makedirs(path)
1867 self.assert_(os.path.isdir(path))
1868
1869 def testExisting(self):
1870 path = PathJoin(self.tmpdir, "foo")
1871 os.mkdir(path)
1872 utils.Makedirs(path)
1873 self.assert_(os.path.isdir(path))
1874
1875 def testRecursiveNonExisting(self):
1876 path = PathJoin(self.tmpdir, "foo/bar/baz")
1877 utils.Makedirs(path)
1878 self.assert_(os.path.isdir(path))
1879
1880 def testRecursiveExisting(self):
1881 path = PathJoin(self.tmpdir, "B/moo/xyz")
1882 self.assertFalse(os.path.exists(path))
1883 os.mkdir(PathJoin(self.tmpdir, "B"))
1884 utils.Makedirs(path)
1885 self.assert_(os.path.isdir(path))
1886
1887
1888 class TestRetry(testutils.GanetiTestCase):
1889 def setUp(self):
1890 testutils.GanetiTestCase.setUp(self)
1891 self.retries = 0
1892
1893 @staticmethod
1894 def _RaiseRetryAgain():
1895 raise utils.RetryAgain()
1896
1897 @staticmethod
1898 def _RaiseRetryAgainWithArg(args):
1899 raise utils.RetryAgain(*args)
1900
1901 def _WrongNestedLoop(self):
1902 return utils.Retry(self._RaiseRetryAgain, 0.01, 0.02)
1903
1904 def _RetryAndSucceed(self, retries):
1905 if self.retries < retries:
1906 self.retries += 1
1907 raise utils.RetryAgain()
1908 else:
1909 return True
1910
1911 def testRaiseTimeout(self):
1912 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1913 self._RaiseRetryAgain, 0.01, 0.02)
1914 self.failUnlessRaises(utils.RetryTimeout, utils.Retry,
1915 self._RetryAndSucceed, 0.01, 0, args=[1])
1916 self.failUnlessEqual(self.retries, 1)
1917
1918 def testComplete(self):
1919 self.failUnlessEqual(utils.Retry(lambda: True, 0, 1), True)
1920 self.failUnlessEqual(utils.Retry(self._RetryAndSucceed, 0, 1, args=[2]),
1921 True)
1922 self.failUnlessEqual(self.retries, 2)
1923
1924 def testNestedLoop(self):
1925 try:
1926 self.failUnlessRaises(errors.ProgrammerError, utils.Retry,
1927 self._WrongNestedLoop, 0, 1)
1928 except utils.RetryTimeout:
1929 self.fail("Didn't detect inner loop's exception")
1930
1931 def testTimeoutArgument(self):
1932 retry_arg="my_important_debugging_message"
1933 try:
1934 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02, args=[[retry_arg]])
1935 except utils.RetryTimeout, err:
1936 self.failUnlessEqual(err.args, (retry_arg, ))
1937 else:
1938 self.fail("Expected timeout didn't happen")
1939
1940 def testRaiseInnerWithExc(self):
1941 retry_arg="my_important_debugging_message"
1942 try:
1943 try:
1944 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1945 args=[[errors.GenericError(retry_arg, retry_arg)]])
1946 except utils.RetryTimeout, err:
1947 err.RaiseInner()
1948 else:
1949 self.fail("Expected timeout didn't happen")
1950 except errors.GenericError, err:
1951 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1952 else:
1953 self.fail("Expected GenericError didn't happen")
1954
1955 def testRaiseInnerWithMsg(self):
1956 retry_arg="my_important_debugging_message"
1957 try:
1958 try:
1959 utils.Retry(self._RaiseRetryAgainWithArg, 0.01, 0.02,
1960 args=[[retry_arg, retry_arg]])
1961 except utils.RetryTimeout, err:
1962 err.RaiseInner()
1963 else:
1964 self.fail("Expected timeout didn't happen")
1965 except utils.RetryTimeout, err:
1966 self.failUnlessEqual(err.args, (retry_arg, retry_arg))
1967 else:
1968 self.fail("Expected RetryTimeout didn't happen")
1969
1970
1971 class TestLineSplitter(unittest.TestCase):
1972 def test(self):
1973 lines = []
1974 ls = utils.LineSplitter(lines.append)
1975 ls.write("Hello World\n")
1976 self.assertEqual(lines, [])
1977 ls.write("Foo\n Bar\r\n ")
1978 ls.write("Baz")
1979 ls.write("Moo")
1980 self.assertEqual(lines, [])
1981 ls.flush()
1982 self.assertEqual(lines, ["Hello World", "Foo", " Bar"])
1983 ls.close()
1984 self.assertEqual(lines, ["Hello World", "Foo", " Bar", " BazMoo"])
1985
1986 def _testExtra(self, line, all_lines, p1, p2):
1987 self.assertEqual(p1, 999)
1988 self.assertEqual(p2, "extra")
1989 all_lines.append(line)
1990
1991 def testExtraArgsNoFlush(self):
1992 lines = []
1993 ls = utils.LineSplitter(self._testExtra, lines, 999, "extra")
1994 ls.write("\n\nHello World\n")
1995 ls.write("Foo\n Bar\r\n ")
1996 ls.write("")
1997 ls.write("Baz")
1998 ls.write("Moo\n\nx\n")
1999 self.assertEqual(lines, [])
2000 ls.close()
2001 self.assertEqual(lines, ["", "", "Hello World", "Foo", " Bar", " BazMoo",
2002 "", "x"])
2003
2004
2005 class TestReadLockedPidFile(unittest.TestCase):
2006 def setUp(self):
2007 self.tmpdir = tempfile.mkdtemp()
2008
2009 def tearDown(self):
2010 shutil.rmtree(self.tmpdir)
2011
2012 def testNonExistent(self):
2013 path = PathJoin(self.tmpdir, "nonexist")
2014 self.assert_(utils.ReadLockedPidFile(path) is None)
2015
2016 def testUnlocked(self):
2017 path = PathJoin(self.tmpdir, "pid")
2018 utils.WriteFile(path, data="123")
2019 self.assert_(utils.ReadLockedPidFile(path) is None)
2020
2021 def testLocked(self):
2022 path = PathJoin(self.tmpdir, "pid")
2023 utils.WriteFile(path, data="123")
2024
2025 fl = utils.FileLock.Open(path)
2026 try:
2027 fl.Exclusive(blocking=True)
2028
2029 self.assertEqual(utils.ReadLockedPidFile(path), 123)
2030 finally:
2031 fl.Close()
2032
2033 self.assert_(utils.ReadLockedPidFile(path) is None)
2034
2035 def testError(self):
2036 path = PathJoin(self.tmpdir, "foobar", "pid")
2037 utils.WriteFile(PathJoin(self.tmpdir, "foobar"), data="")
2038 # open(2) should return ENOTDIR
2039 self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
2040
2041
2042 class TestCertVerification(testutils.GanetiTestCase):
2043 def setUp(self):
2044 testutils.GanetiTestCase.setUp(self)
2045
2046 self.tmpdir = tempfile.mkdtemp()
2047
2048 def tearDown(self):
2049 shutil.rmtree(self.tmpdir)
2050
2051 def testVerifyCertificate(self):
2052 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
2053 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
2054 cert_pem)
2055
2056 # Not checking return value as this certificate is expired
2057 utils.VerifyX509Certificate(cert, 30, 7)
2058
2059
2060 class TestVerifyCertificateInner(unittest.TestCase):
2061 def test(self):
2062 vci = utils._VerifyCertificateInner
2063
2064 # Valid
2065 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
2066 (None, None))
2067
2068 # Not yet valid
2069 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
2070 self.assertEqual(errcode, utils.CERT_WARNING)
2071
2072 # Expiring soon
2073 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
2074 self.assertEqual(errcode, utils.CERT_ERROR)
2075
2076 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
2077 self.assertEqual(errcode, utils.CERT_WARNING)
2078
2079 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
2080 self.assertEqual(errcode, None)
2081
2082 # Expired
2083 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
2084 self.assertEqual(errcode, utils.CERT_ERROR)
2085
2086 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
2087 self.assertEqual(errcode, utils.CERT_ERROR)
2088
2089 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
2090 self.assertEqual(errcode, utils.CERT_ERROR)
2091
2092 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
2093 self.assertEqual(errcode, utils.CERT_ERROR)
2094
2095
2096 class TestHmacFunctions(unittest.TestCase):
2097 # Digests can be checked with "openssl sha1 -hmac $key"
2098 def testSha1Hmac(self):
2099 self.assertEqual(utils.Sha1Hmac("", ""),
2100 "fbdb1d1b18aa6c08324b7d64b71fb76370690e1d")
2101 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World"),
2102 "ef4f3bda82212ecb2f7ce868888a19092481f1fd")
2103 self.assertEqual(utils.Sha1Hmac("TguMTA2K", ""),
2104 "f904c2476527c6d3e6609ab683c66fa0652cb1dc")
2105
2106 longtext = 1500 * "The quick brown fox jumps over the lazy dog\n"
2107 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", longtext),
2108 "35901b9a3001a7cdcf8e0e9d7c2e79df2223af54")
2109
2110 def testSha1HmacSalt(self):
2111 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc0"),
2112 "4999bf342470eadb11dfcd24ca5680cf9fd7cdce")
2113 self.assertEqual(utils.Sha1Hmac("TguMTA2K", "", salt="abc9"),
2114 "17a4adc34d69c0d367d4ffbef96fd41d4df7a6e8")
2115 self.assertEqual(utils.Sha1Hmac("3YzMxZWE", "Hello World", salt="xyz0"),
2116 "7f264f8114c9066afc9bb7636e1786d996d3cc0d")
2117
2118 def testVerifySha1Hmac(self):
2119 self.assert_(utils.VerifySha1Hmac("", "", ("fbdb1d1b18aa6c08324b"
2120 "7d64b71fb76370690e1d")))
2121 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2122 ("f904c2476527c6d3e660"
2123 "9ab683c66fa0652cb1dc")))
2124
2125 digest = "ef4f3bda82212ecb2f7ce868888a19092481f1fd"
2126 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World", digest))
2127 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2128 digest.lower()))
2129 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2130 digest.upper()))
2131 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2132 digest.title()))
2133
2134 def testVerifySha1HmacSalt(self):
2135 self.assert_(utils.VerifySha1Hmac("TguMTA2K", "",
2136 ("17a4adc34d69c0d367d4"
2137 "ffbef96fd41d4df7a6e8"),
2138 salt="abc9"))
2139 self.assert_(utils.VerifySha1Hmac("3YzMxZWE", "Hello World",
2140 ("7f264f8114c9066afc9b"
2141 "b7636e1786d996d3cc0d"),
2142 salt="xyz0"))
2143
2144
2145 class TestIgnoreSignals(unittest.TestCase):
2146 """Test the IgnoreSignals decorator"""
2147
2148 @staticmethod
2149 def _Raise(exception):
2150 raise exception
2151
2152 @staticmethod
2153 def _Return(rval):
2154 return rval
2155
2156 def testIgnoreSignals(self):
2157 sock_err_intr = socket.error(errno.EINTR, "Message")
2158 sock_err_inval = socket.error(errno.EINVAL, "Message")
2159
2160 env_err_intr = EnvironmentError(errno.EINTR, "Message")
2161 env_err_inval = EnvironmentError(errno.EINVAL, "Message")
2162
2163 self.assertRaises(socket.error, self._Raise, sock_err_intr)
2164 self.assertRaises(socket.error, self._Raise, sock_err_inval)
2165 self.assertRaises(EnvironmentError, self._Raise, env_err_intr)
2166 self.assertRaises(EnvironmentError, self._Raise, env_err_inval)
2167
2168 self.assertEquals(utils.IgnoreSignals(self._Raise, sock_err_intr), None)
2169 self.assertEquals(utils.IgnoreSignals(self._Raise, env_err_intr), None)
2170 self.assertRaises(socket.error, utils.IgnoreSignals, self._Raise,
2171 sock_err_inval)
2172 self.assertRaises(EnvironmentError, utils.IgnoreSignals, self._Raise,
2173 env_err_inval)
2174
2175 self.assertEquals(utils.IgnoreSignals(self._Return, True), True)
2176 self.assertEquals(utils.IgnoreSignals(self._Return, 33), 33)
2177
2178
2179 class TestEnsureDirs(unittest.TestCase):
2180 """Tests for EnsureDirs"""
2181
2182 def setUp(self):
2183 self.dir = tempfile.mkdtemp()
2184 self.old_umask = os.umask(0777)
2185
2186 def testEnsureDirs(self):
2187 utils.EnsureDirs([
2188 (PathJoin(self.dir, "foo"), 0777),
2189 (PathJoin(self.dir, "bar"), 0000),
2190 ])
2191 self.assertEquals(os.stat(PathJoin(self.dir, "foo"))[0] & 0777, 0777)
2192 self.assertEquals(os.stat(PathJoin(self.dir, "bar"))[0] & 0777, 0000)
2193
2194 def tearDown(self):
2195 os.rmdir(PathJoin(self.dir, "foo"))
2196 os.rmdir(PathJoin(self.dir, "bar"))
2197 os.rmdir(self.dir)
2198 os.umask(self.old_umask)
2199
2200
2201 class TestFormatSeconds(unittest.TestCase):
2202 def test(self):
2203 self.assertEqual(utils.FormatSeconds(1), "1s")
2204 self.assertEqual(utils.FormatSeconds(3600), "1h 0m 0s")
2205 self.assertEqual(utils.FormatSeconds(3599), "59m 59s")
2206 self.assertEqual(utils.FormatSeconds(7200), "2h 0m 0s")
2207 self.assertEqual(utils.FormatSeconds(7201), "2h 0m 1s")
2208 self.assertEqual(utils.FormatSeconds(7281), "2h 1m 21s")
2209 self.assertEqual(utils.FormatSeconds(29119), "8h 5m 19s")
2210 self.assertEqual(utils.FormatSeconds(19431228), "224d 21h 33m 48s")
2211 self.assertEqual(utils.FormatSeconds(-1), "-1s")
2212 self.assertEqual(utils.FormatSeconds(-282), "-282s")
2213 self.assertEqual(utils.FormatSeconds(-29119), "-29119s")
2214
2215 def testFloat(self):
2216 self.assertEqual(utils.FormatSeconds(1.3), "1s")
2217 self.assertEqual(utils.FormatSeconds(1.9), "2s")
2218 self.assertEqual(utils.FormatSeconds(3912.12311), "1h 5m 12s")
2219 self.assertEqual(utils.FormatSeconds(3912.8), "1h 5m 13s")
2220
2221
2222 class RunIgnoreProcessNotFound(unittest.TestCase):
2223 @staticmethod
2224 def _WritePid(fd):
2225 os.write(fd, str(os.getpid()))
2226 os.close(fd)
2227 return True
2228
2229 def test(self):
2230 (pid_read_fd, pid_write_fd) = os.pipe()
2231
2232 # Start short-lived process which writes its PID to pipe
2233 self.assert_(utils.RunInSeparateProcess(self._WritePid, pid_write_fd))
2234 os.close(pid_write_fd)
2235
2236 # Read PID from pipe
2237 pid = int(os.read(pid_read_fd, 1024))
2238 os.close(pid_read_fd)
2239
2240 # Try to send signal to process which exited recently
2241 self.assertFalse(utils.IgnoreProcessNotFound(os.kill, pid, 0))
2242
2243
2244 if __name__ == '__main__':
2245 testutils.GanetiTestProgram()