8594fb8a39df5c152f3dcbf421d5587264f836e6
[ganeti-github.git] / test / ganeti.utils.log_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.log"""
23
24 import os
25 import unittest
26 import logging
27 import tempfile
28 import shutil
29
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import utils
33
34 import testutils
35
36
37 class TestLogHandler(unittest.TestCase):
38 def test(self):
39 tmpfile = tempfile.NamedTemporaryFile()
40
41 handler = utils.log._ReopenableLogHandler(tmpfile.name)
42 handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
43
44 logger = logging.Logger("TestLogger")
45 logger.addHandler(handler)
46 self.assertEqual(len(logger.handlers), 1)
47
48 logger.error("Test message ERROR")
49 logger.info("Test message INFO")
50
51 logger.removeHandler(handler)
52 self.assertFalse(logger.handlers)
53 handler.close()
54
55 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 2)
56
57 def testReopen(self):
58 tmpfile = tempfile.NamedTemporaryFile()
59 tmpfile2 = tempfile.NamedTemporaryFile()
60
61 handler = utils.log._ReopenableLogHandler(tmpfile.name)
62
63 self.assertFalse(utils.ReadFile(tmpfile.name))
64 self.assertFalse(utils.ReadFile(tmpfile2.name))
65
66 logger = logging.Logger("TestLoggerReopen")
67 logger.addHandler(handler)
68
69 for _ in range(3):
70 logger.error("Test message ERROR")
71 handler.flush()
72 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 3)
73 before_id = utils.GetFileID(tmpfile.name)
74
75 handler.RequestReopen()
76 self.assertTrue(handler._reopen)
77 self.assertTrue(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
78 before_id))
79
80 # Rename only after requesting reopen
81 os.rename(tmpfile.name, tmpfile2.name)
82 assert not os.path.exists(tmpfile.name)
83
84 # Write another message, should reopen
85 for _ in range(4):
86 logger.info("Test message INFO")
87 self.assertFalse(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
88 before_id))
89
90 logger.removeHandler(handler)
91 self.assertFalse(logger.handlers)
92 handler.close()
93
94 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 4)
95 self.assertEqual(len(utils.ReadFile(tmpfile2.name).splitlines()), 3)
96
97 def testConsole(self):
98 for (console, check) in [(None, False),
99 (tempfile.NamedTemporaryFile(), True),
100 (self._FailingFile(os.devnull), False)]:
101 # Create a handler which will fail when handling errors
102 cls = utils.log._LogErrorsToConsole(self._FailingHandler)
103
104 # Instantiate handler with file which will fail when writing,
105 # provoking a write to the console
106 handler = cls(console, self._FailingFile(os.devnull))
107
108 logger = logging.Logger("TestLogger")
109 logger.addHandler(handler)
110 self.assertEqual(len(logger.handlers), 1)
111
112 # Provoke write
113 logger.error("Test message ERROR")
114
115 # Take everything apart
116 logger.removeHandler(handler)
117 self.assertFalse(logger.handlers)
118 handler.close()
119
120 if console and check:
121 console.flush()
122
123 # Check console output
124 consout = utils.ReadFile(console.name)
125 self.assertTrue("Cannot log message" in consout)
126 self.assertTrue("Test message ERROR" in consout)
127
128 class _FailingFile(file):
129 def write(self, _):
130 raise Exception
131
132 class _FailingHandler(logging.StreamHandler):
133 def handleError(self, _):
134 raise Exception
135
136
137 class TestSetupLogging(unittest.TestCase):
138 def setUp(self):
139 self.tmpdir = tempfile.mkdtemp()
140
141 def tearDown(self):
142 shutil.rmtree(self.tmpdir)
143
144 def testSimple(self):
145 logfile = utils.PathJoin(self.tmpdir, "basic.log")
146 logger = logging.Logger("TestLogger")
147 self.assertTrue(callable(utils.SetupLogging(logfile, "test",
148 console_logging=False,
149 syslog=constants.SYSLOG_NO,
150 stderr_logging=False,
151 multithreaded=False,
152 root_logger=logger)))
153 self.assertEqual(utils.ReadFile(logfile), "")
154 logger.error("This is a test")
155
156 # Ensure SetupLogging used custom logger
157 logging.error("This message should not show up in the test log file")
158
159 self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
160
161 def testReopen(self):
162 logfile = utils.PathJoin(self.tmpdir, "reopen.log")
163 logfile2 = utils.PathJoin(self.tmpdir, "reopen.log.OLD")
164 logger = logging.Logger("TestLogger")
165 reopen_fn = utils.SetupLogging(logfile, "test",
166 console_logging=False,
167 syslog=constants.SYSLOG_NO,
168 stderr_logging=False,
169 multithreaded=False,
170 root_logger=logger)
171 self.assertTrue(callable(reopen_fn))
172
173 self.assertEqual(utils.ReadFile(logfile), "")
174 logger.error("This is a test")
175 self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
176
177 os.rename(logfile, logfile2)
178 assert not os.path.exists(logfile)
179
180 # Notify logger to reopen on the next message
181 reopen_fn()
182 assert not os.path.exists(logfile)
183
184 # Provoke actual reopen
185 logger.error("First message")
186
187 self.assertTrue(utils.ReadFile(logfile).endswith("First message\n"))
188 self.assertTrue(utils.ReadFile(logfile2).endswith("This is a test\n"))
189
190
191 if __name__ == "__main__":
192 testutils.GanetiTestProgram()