99ecd762f7602a449d07b488a431f9c0ad8cc058
[ganeti-github.git] / test / py / ganeti.utils.x509_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Script for testing ganeti.utils.x509"""
32
33 import os
34 import tempfile
35 import unittest
36 import shutil
37 import OpenSSL
38 import distutils.version
39 import string
40
41 from ganeti import constants
42 from ganeti import utils
43 from ganeti import errors
44
45 import testutils
46
47
48 class TestParseAsn1Generalizedtime(unittest.TestCase):
49 def setUp(self):
50 self._Parse = utils.x509._ParseAsn1Generalizedtime
51
52 def test(self):
53 # UTC
54 self.assertEqual(self._Parse("19700101000000Z"), 0)
55 self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
56 self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
57
58 # With offset
59 self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
60 self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
61 self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
62 self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
63 self.assertEqual(self._Parse("19700101000000-0100"), 3600)
64
65 # Leap seconds are not supported by datetime.datetime
66 self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
67 self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
68
69 # Errors
70 self.assertRaises(ValueError, self._Parse, "")
71 self.assertRaises(ValueError, self._Parse, "invalid")
72 self.assertRaises(ValueError, self._Parse, "20100222174152")
73 self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
74 self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
75
76
77 class TestGetX509CertValidity(testutils.GanetiTestCase):
78 def setUp(self):
79 testutils.GanetiTestCase.setUp(self)
80
81 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
82
83 # Test whether we have pyOpenSSL 0.7 or above
84 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
85
86 if not self.pyopenssl0_7:
87 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
88 " function correctly")
89
90 def _LoadCert(self, name):
91 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
92 testutils.ReadTestData(name))
93
94 def test(self):
95 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
96 if self.pyopenssl0_7:
97 self.assertEqual(validity, (1266919967, 1267524767))
98 else:
99 self.assertEqual(validity, (None, None))
100
101
102 class TestSignX509Certificate(unittest.TestCase):
103 KEY = "My private key!"
104 KEY_OTHER = "Another key"
105
106 def test(self):
107 # Generate certificate valid for 5 minutes
108 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300, 1)
109
110 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
111 cert_pem)
112
113 # No signature at all
114 self.assertRaises(errors.GenericError,
115 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
116
117 # Invalid input
118 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
119 "", self.KEY)
120 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
121 "X-Ganeti-Signature: \n", self.KEY)
122 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
123 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
124 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
125 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
126 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
127 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
128
129 # Invalid salt
130 for salt in list("-_@$,:;/\\ \t\n"):
131 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
132 cert_pem, self.KEY, "foo%sbar" % salt)
133
134 for salt in ["HelloWorld", "salt", string.letters, string.digits,
135 utils.GenerateSecret(numbytes=4),
136 utils.GenerateSecret(numbytes=16),
137 "{123:456}".encode("hex")]:
138 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
139
140 self._Check(cert, salt, signed_pem)
141
142 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
143 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
144 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
145 "lines----\n------ at\nthe end!"))
146
147 def _Check(self, cert, salt, pem):
148 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
149 self.assertEqual(salt, salt2)
150 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
151
152 # Other key
153 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
154 pem, self.KEY_OTHER)
155
156
157 class TestCertVerification(testutils.GanetiTestCase):
158 def setUp(self):
159 testutils.GanetiTestCase.setUp(self)
160
161 self.tmpdir = tempfile.mkdtemp()
162
163 def tearDown(self):
164 shutil.rmtree(self.tmpdir)
165
166 def testVerifyCertificate(self):
167 cert_pem = testutils.ReadTestData("cert1.pem")
168 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
169 cert_pem)
170
171 # Not checking return value as this certificate is expired
172 utils.VerifyX509Certificate(cert, 30, 7)
173
174 @staticmethod
175 def _GenCert(key, before, validity):
176 # Urgh... mostly copied from x509.py :(
177
178 # Create self-signed certificate
179 cert = OpenSSL.crypto.X509()
180 cert.set_serial_number(1)
181 if before != 0:
182 cert.gmtime_adj_notBefore(int(before))
183 cert.gmtime_adj_notAfter(validity)
184 cert.set_issuer(cert.get_subject())
185 cert.set_pubkey(key)
186 cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
187 return cert
188
189 def testClockSkew(self):
190 SKEW = constants.NODE_MAX_CLOCK_SKEW
191 # Create private and public key
192 key = OpenSSL.crypto.PKey()
193 key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
194
195 validity = 7 * 86400
196 # skew small enough, accepting cert; note that this is a timed
197 # test, and could fail if the machine is so loaded that the next
198 # few lines take more than NODE_MAX_CLOCK_SKEW / 2
199 for before in [-1, 0, SKEW / 4, SKEW / 2]:
200 cert = self._GenCert(key, before, validity)
201 result = utils.VerifyX509Certificate(cert, 1, 2)
202 self.assertEqual(result, (None, None))
203
204 # skew too great, not accepting certs
205 for before in [SKEW * 2, SKEW * 10]:
206 cert = self._GenCert(key, before, validity)
207 (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
208 self.assertEqual(status, utils.CERT_WARNING)
209 self.assertTrue(msg.startswith("Certificate not yet valid"))
210
211
212 class TestVerifyCertificateInner(unittest.TestCase):
213 def test(self):
214 vci = utils.x509._VerifyCertificateInner
215
216 # Valid
217 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
218 (None, None))
219
220 # Not yet valid
221 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
222 self.assertEqual(errcode, utils.CERT_WARNING)
223
224 # Expiring soon
225 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
226 self.assertEqual(errcode, utils.CERT_ERROR)
227
228 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
229 self.assertEqual(errcode, utils.CERT_WARNING)
230
231 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
232 self.assertEqual(errcode, None)
233
234 # Expired
235 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
236 self.assertEqual(errcode, utils.CERT_ERROR)
237
238 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
239 self.assertEqual(errcode, utils.CERT_ERROR)
240
241 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
242 self.assertEqual(errcode, utils.CERT_ERROR)
243
244 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
245 self.assertEqual(errcode, utils.CERT_ERROR)
246
247
248 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
249 def setUp(self):
250 self.tmpdir = tempfile.mkdtemp()
251
252 def tearDown(self):
253 shutil.rmtree(self.tmpdir)
254
255 def _checkRsaPrivateKey(self, key):
256 lines = key.splitlines()
257 return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
258 "-----END RSA PRIVATE KEY-----" in lines) or
259 ("-----BEGIN PRIVATE KEY-----" in lines and
260 "-----END PRIVATE KEY-----" in lines))
261
262 def _checkCertificate(self, cert):
263 lines = cert.splitlines()
264 return ("-----BEGIN CERTIFICATE-----" in lines and
265 "-----END CERTIFICATE-----" in lines)
266
267 def test(self):
268 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
269 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300,
270 1)
271 self._checkRsaPrivateKey(key_pem)
272 self._checkCertificate(cert_pem)
273
274 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
275 key_pem)
276 self.assert_(key.bits() >= 1024)
277 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
278 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
279
280 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
281 cert_pem)
282 self.failIf(x509.has_expired())
283 self.assertEqual(x509.get_issuer().CN, common_name)
284 self.assertEqual(x509.get_subject().CN, common_name)
285 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
286
287 def testLegacy(self):
288 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
289
290 utils.GenerateSelfSignedSslCert(cert1_filename, 1, validity=1)
291
292 cert1 = utils.ReadFile(cert1_filename)
293
294 self.assert_(self._checkRsaPrivateKey(cert1))
295 self.assert_(self._checkCertificate(cert1))
296
297
298 class TestCheckNodeCertificate(testutils.GanetiTestCase):
299 def setUp(self):
300 testutils.GanetiTestCase.setUp(self)
301 self.tmpdir = tempfile.mkdtemp()
302
303 def tearDown(self):
304 testutils.GanetiTestCase.tearDown(self)
305 shutil.rmtree(self.tmpdir)
306
307 def testMismatchingKey(self):
308 other_cert = testutils.TestDataFilename("cert1.pem")
309 node_cert = testutils.TestDataFilename("cert2.pem")
310
311 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
312 utils.ReadFile(other_cert))
313
314 try:
315 utils.CheckNodeCertificate(cert, _noded_cert_file=node_cert)
316 except errors.GenericError, err:
317 self.assertEqual(str(err),
318 "Given cluster certificate does not match local key")
319 else:
320 self.fail("Exception was not raised")
321
322 def testMatchingKey(self):
323 cert_filename = testutils.TestDataFilename("cert2.pem")
324
325 # Extract certificate
326 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
327 utils.ReadFile(cert_filename))
328 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
329 cert)
330
331 utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
332
333 def testMissingFile(self):
334 cert_path = testutils.TestDataFilename("cert1.pem")
335 nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
336
337 utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
338
339 self.assertFalse(os.path.exists(nodecert))
340
341 def testInvalidCertificate(self):
342 tmpfile = utils.PathJoin(self.tmpdir, "cert")
343 utils.WriteFile(tmpfile, data="not a certificate")
344
345 self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
346 NotImplemented, _noded_cert_file=tmpfile)
347
348 def testNoPrivateKey(self):
349 cert = testutils.TestDataFilename("cert1.pem")
350 self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
351 NotImplemented, _noded_cert_file=cert)
352
353 def testMismatchInNodeCert(self):
354 cert1_path = testutils.TestDataFilename("cert1.pem")
355 cert2_path = testutils.TestDataFilename("cert2.pem")
356 tmpfile = utils.PathJoin(self.tmpdir, "cert")
357
358 # Extract certificate
359 cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
360 utils.ReadFile(cert1_path))
361 cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
362 cert1)
363
364 # Extract mismatching key
365 key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
366 utils.ReadFile(cert2_path))
367 key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
368 key2)
369
370 # Write to file
371 utils.WriteFile(tmpfile, data=cert1_pem + key2_pem)
372
373 try:
374 utils.CheckNodeCertificate(cert1, _noded_cert_file=tmpfile)
375 except errors.X509CertError, err:
376 self.assertEqual(err.args,
377 (tmpfile, "Certificate does not match with private key"))
378 else:
379 self.fail("Exception was not raised")
380
381
382 if __name__ == "__main__":
383 testutils.GanetiTestProgram()