Move python test files to test/py
[ganeti-github.git] / test / py / ganeti.utils.log_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.log"""
23
24 import os
25 import unittest
26 import logging
27 import tempfile
28 import shutil
29 import threading
30 from cStringIO import StringIO
31
32 from ganeti import constants
33 from ganeti import errors
34 from ganeti import compat
35 from ganeti import utils
36
37 import testutils
38
39
40 class TestLogHandler(unittest.TestCase):
41 def testNormal(self):
42 tmpfile = tempfile.NamedTemporaryFile()
43
44 handler = utils.log._ReopenableLogHandler(tmpfile.name)
45 handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))
46
47 logger = logging.Logger("TestLogger")
48 logger.addHandler(handler)
49 self.assertEqual(len(logger.handlers), 1)
50
51 logger.error("Test message ERROR")
52 logger.info("Test message INFO")
53
54 logger.removeHandler(handler)
55 self.assertFalse(logger.handlers)
56 handler.close()
57
58 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 2)
59
60 def testReopen(self):
61 tmpfile = tempfile.NamedTemporaryFile()
62 tmpfile2 = tempfile.NamedTemporaryFile()
63
64 handler = utils.log._ReopenableLogHandler(tmpfile.name)
65
66 self.assertFalse(utils.ReadFile(tmpfile.name))
67 self.assertFalse(utils.ReadFile(tmpfile2.name))
68
69 logger = logging.Logger("TestLoggerReopen")
70 logger.addHandler(handler)
71
72 for _ in range(3):
73 logger.error("Test message ERROR")
74 handler.flush()
75 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 3)
76 before_id = utils.GetFileID(tmpfile.name)
77
78 handler.RequestReopen()
79 self.assertTrue(handler._reopen)
80 self.assertTrue(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
81 before_id))
82
83 # Rename only after requesting reopen
84 os.rename(tmpfile.name, tmpfile2.name)
85 assert not os.path.exists(tmpfile.name)
86
87 # Write another message, should reopen
88 for _ in range(4):
89 logger.info("Test message INFO")
90
91 # Flag must be reset
92 self.assertFalse(handler._reopen)
93
94 self.assertFalse(utils.VerifyFileID(utils.GetFileID(tmpfile.name),
95 before_id))
96
97 logger.removeHandler(handler)
98 self.assertFalse(logger.handlers)
99 handler.close()
100
101 self.assertEqual(len(utils.ReadFile(tmpfile.name).splitlines()), 4)
102 self.assertEqual(len(utils.ReadFile(tmpfile2.name).splitlines()), 3)
103
104 def testConsole(self):
105 for (console, check) in [(None, False),
106 (tempfile.NamedTemporaryFile(), True),
107 (self._FailingFile(os.devnull), False)]:
108 # Create a handler which will fail when handling errors
109 cls = utils.log._LogErrorsToConsole(self._FailingHandler)
110
111 # Instantiate handler with file which will fail when writing,
112 # provoking a write to the console
113 handler = cls(console, self._FailingFile(os.devnull))
114
115 logger = logging.Logger("TestLogger")
116 logger.addHandler(handler)
117 self.assertEqual(len(logger.handlers), 1)
118
119 # Provoke write
120 logger.error("Test message ERROR")
121
122 # Take everything apart
123 logger.removeHandler(handler)
124 self.assertFalse(logger.handlers)
125 handler.close()
126
127 if console and check:
128 console.flush()
129
130 # Check console output
131 consout = utils.ReadFile(console.name)
132 self.assertTrue("Cannot log message" in consout)
133 self.assertTrue("Test message ERROR" in consout)
134
135 class _FailingFile(file):
136 def write(self, _):
137 raise Exception
138
139 class _FailingHandler(logging.StreamHandler):
140 def handleError(self, _):
141 raise Exception
142
143
144 class TestSetupLogging(unittest.TestCase):
145 def setUp(self):
146 self.tmpdir = tempfile.mkdtemp()
147
148 def tearDown(self):
149 shutil.rmtree(self.tmpdir)
150
151 def testSimple(self):
152 logfile = utils.PathJoin(self.tmpdir, "basic.log")
153 logger = logging.Logger("TestLogger")
154 self.assertTrue(callable(utils.SetupLogging(logfile, "test",
155 console_logging=False,
156 syslog=constants.SYSLOG_NO,
157 stderr_logging=False,
158 multithreaded=False,
159 root_logger=logger)))
160 self.assertEqual(utils.ReadFile(logfile), "")
161 logger.error("This is a test")
162
163 # Ensure SetupLogging used custom logger
164 logging.error("This message should not show up in the test log file")
165
166 self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
167
168 def testReopen(self):
169 logfile = utils.PathJoin(self.tmpdir, "reopen.log")
170 logfile2 = utils.PathJoin(self.tmpdir, "reopen.log.OLD")
171 logger = logging.Logger("TestLogger")
172 reopen_fn = utils.SetupLogging(logfile, "test",
173 console_logging=False,
174 syslog=constants.SYSLOG_NO,
175 stderr_logging=False,
176 multithreaded=False,
177 root_logger=logger)
178 self.assertTrue(callable(reopen_fn))
179
180 self.assertEqual(utils.ReadFile(logfile), "")
181 logger.error("This is a test")
182 self.assertTrue(utils.ReadFile(logfile).endswith("This is a test\n"))
183
184 os.rename(logfile, logfile2)
185 assert not os.path.exists(logfile)
186
187 # Notify logger to reopen on the next message
188 reopen_fn()
189 assert not os.path.exists(logfile)
190
191 # Provoke actual reopen
192 logger.error("First message")
193
194 self.assertTrue(utils.ReadFile(logfile).endswith("First message\n"))
195 self.assertTrue(utils.ReadFile(logfile2).endswith("This is a test\n"))
196
197
198 class TestSetupToolLogging(unittest.TestCase):
199 def test(self):
200 error_name = logging.getLevelName(logging.ERROR)
201 warn_name = logging.getLevelName(logging.WARNING)
202 info_name = logging.getLevelName(logging.INFO)
203 debug_name = logging.getLevelName(logging.DEBUG)
204
205 for debug in [False, True]:
206 for verbose in [False, True]:
207 logger = logging.Logger("TestLogger")
208 buf = StringIO()
209
210 utils.SetupToolLogging(debug, verbose, _root_logger=logger, _stream=buf)
211
212 logger.error("level=error")
213 logger.warning("level=warning")
214 logger.info("level=info")
215 logger.debug("level=debug")
216
217 lines = buf.getvalue().splitlines()
218
219 self.assertTrue(compat.all(line.count(":") == 3 for line in lines))
220
221 messages = [line.split(":", 3)[-1].strip() for line in lines]
222
223 if debug:
224 self.assertEqual(messages, [
225 "%s level=error" % error_name,
226 "%s level=warning" % warn_name,
227 "%s level=info" % info_name,
228 "%s level=debug" % debug_name,
229 ])
230 elif verbose:
231 self.assertEqual(messages, [
232 "%s level=error" % error_name,
233 "%s level=warning" % warn_name,
234 "%s level=info" % info_name,
235 ])
236 else:
237 self.assertEqual(messages, [
238 "level=error",
239 "level=warning",
240 ])
241
242 def testThreadName(self):
243 thread_name = threading.currentThread().getName()
244
245 for enable_threadname in [False, True]:
246 logger = logging.Logger("TestLogger")
247 buf = StringIO()
248
249 utils.SetupToolLogging(True, True, threadname=enable_threadname,
250 _root_logger=logger, _stream=buf)
251
252 logger.debug("test134042376")
253
254 lines = buf.getvalue().splitlines()
255 self.assertEqual(len(lines), 1)
256
257 if enable_threadname:
258 self.assertTrue((" %s " % thread_name) in lines[0])
259 else:
260 self.assertTrue(thread_name not in lines[0])
261
262
263 if __name__ == "__main__":
264 testutils.GanetiTestProgram()