x509 function for creating signed certs
[ganeti-github.git] / test / py / ganeti.utils.x509_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Script for testing ganeti.utils.x509"""
32
33 import os
34 import tempfile
35 import unittest
36 import shutil
37 import OpenSSL
38 import distutils.version
39 import string
40
41 from ganeti import constants
42 from ganeti import utils
43 from ganeti import errors
44
45 import testutils
46
47
48 class TestParseAsn1Generalizedtime(unittest.TestCase):
49 def setUp(self):
50 self._Parse = utils.x509._ParseAsn1Generalizedtime
51
52 def test(self):
53 # UTC
54 self.assertEqual(self._Parse("19700101000000Z"), 0)
55 self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
56 self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
57
58 # With offset
59 self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
60 self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
61 self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
62 self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
63 self.assertEqual(self._Parse("19700101000000-0100"), 3600)
64
65 # Leap seconds are not supported by datetime.datetime
66 self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
67 self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
68
69 # Errors
70 self.assertRaises(ValueError, self._Parse, "")
71 self.assertRaises(ValueError, self._Parse, "invalid")
72 self.assertRaises(ValueError, self._Parse, "20100222174152")
73 self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
74 self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
75
76
77 class TestGetX509CertValidity(testutils.GanetiTestCase):
78 def setUp(self):
79 testutils.GanetiTestCase.setUp(self)
80
81 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
82
83 # Test whether we have pyOpenSSL 0.7 or above
84 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
85
86 if not self.pyopenssl0_7:
87 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
88 " function correctly")
89
90 def _LoadCert(self, name):
91 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
92 testutils.ReadTestData(name))
93
94 def test(self):
95 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
96 if self.pyopenssl0_7:
97 self.assertEqual(validity, (1266919967, 1267524767))
98 else:
99 self.assertEqual(validity, (None, None))
100
101
102 class TestSignX509Certificate(unittest.TestCase):
103 KEY = "My private key!"
104 KEY_OTHER = "Another key"
105
106 def test(self):
107 # Generate certificate valid for 5 minutes
108 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300, 1)
109
110 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
111 cert_pem)
112
113 # No signature at all
114 self.assertRaises(errors.GenericError,
115 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
116
117 # Invalid input
118 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
119 "", self.KEY)
120 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
121 "X-Ganeti-Signature: \n", self.KEY)
122 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
123 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
124 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
125 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
126 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
127 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
128
129 # Invalid salt
130 for salt in list("-_@$,:;/\\ \t\n"):
131 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
132 cert_pem, self.KEY, "foo%sbar" % salt)
133
134 for salt in ["HelloWorld", "salt", string.letters, string.digits,
135 utils.GenerateSecret(numbytes=4),
136 utils.GenerateSecret(numbytes=16),
137 "{123:456}".encode("hex")]:
138 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
139
140 self._Check(cert, salt, signed_pem)
141
142 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
143 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
144 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
145 "lines----\n------ at\nthe end!"))
146
147 def _Check(self, cert, salt, pem):
148 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
149 self.assertEqual(salt, salt2)
150 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
151
152 # Other key
153 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
154 pem, self.KEY_OTHER)
155
156
157 class TestCertVerification(testutils.GanetiTestCase):
158 def setUp(self):
159 testutils.GanetiTestCase.setUp(self)
160
161 self.tmpdir = tempfile.mkdtemp()
162
163 def tearDown(self):
164 shutil.rmtree(self.tmpdir)
165
166 def testVerifyCertificate(self):
167 cert_pem = testutils.ReadTestData("cert1.pem")
168 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
169 cert_pem)
170
171 # Not checking return value as this certificate is expired
172 utils.VerifyX509Certificate(cert, 30, 7)
173
174 @staticmethod
175 def _GenCert(key, before, validity):
176 # Urgh... mostly copied from x509.py :(
177
178 # Create self-signed certificate
179 cert = OpenSSL.crypto.X509()
180 cert.set_serial_number(1)
181 if before != 0:
182 cert.gmtime_adj_notBefore(int(before))
183 cert.gmtime_adj_notAfter(validity)
184 cert.set_issuer(cert.get_subject())
185 cert.set_pubkey(key)
186 cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
187 return cert
188
189 def testClockSkew(self):
190 SKEW = constants.NODE_MAX_CLOCK_SKEW
191 # Create private and public key
192 key = OpenSSL.crypto.PKey()
193 key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
194
195 validity = 7 * 86400
196 # skew small enough, accepting cert; note that this is a timed
197 # test, and could fail if the machine is so loaded that the next
198 # few lines take more than NODE_MAX_CLOCK_SKEW / 2
199 for before in [-1, 0, SKEW / 4, SKEW / 2]:
200 cert = self._GenCert(key, before, validity)
201 result = utils.VerifyX509Certificate(cert, 1, 2)
202 self.assertEqual(result, (None, None))
203
204 # skew too great, not accepting certs
205 for before in [SKEW * 2, SKEW * 10]:
206 cert = self._GenCert(key, before, validity)
207 (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
208 self.assertEqual(status, utils.CERT_WARNING)
209 self.assertTrue(msg.startswith("Certificate not yet valid"))
210
211
212 class TestVerifyCertificateInner(unittest.TestCase):
213 def test(self):
214 vci = utils.x509._VerifyCertificateInner
215
216 # Valid
217 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
218 (None, None))
219
220 # Not yet valid
221 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
222 self.assertEqual(errcode, utils.CERT_WARNING)
223
224 # Expiring soon
225 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
226 self.assertEqual(errcode, utils.CERT_ERROR)
227
228 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
229 self.assertEqual(errcode, utils.CERT_WARNING)
230
231 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
232 self.assertEqual(errcode, None)
233
234 # Expired
235 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
236 self.assertEqual(errcode, utils.CERT_ERROR)
237
238 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
239 self.assertEqual(errcode, utils.CERT_ERROR)
240
241 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
242 self.assertEqual(errcode, utils.CERT_ERROR)
243
244 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
245 self.assertEqual(errcode, utils.CERT_ERROR)
246
247
248 class TestGenerateX509Certs(unittest.TestCase):
249 def setUp(self):
250 self.tmpdir = tempfile.mkdtemp()
251
252 def tearDown(self):
253 shutil.rmtree(self.tmpdir)
254
255 def _checkRsaPrivateKey(self, key):
256 lines = key.splitlines()
257 return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
258 "-----END RSA PRIVATE KEY-----" in lines) or
259 ("-----BEGIN PRIVATE KEY-----" in lines and
260 "-----END PRIVATE KEY-----" in lines))
261
262 def _checkCertificate(self, cert):
263 lines = cert.splitlines()
264 return ("-----BEGIN CERTIFICATE-----" in lines and
265 "-----END CERTIFICATE-----" in lines)
266
267 def test(self):
268 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
269 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300,
270 1)
271 self._checkRsaPrivateKey(key_pem)
272 self._checkCertificate(cert_pem)
273
274 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
275 key_pem)
276 self.assert_(key.bits() >= 1024)
277 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
278 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
279
280 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
281 cert_pem)
282 self.failIf(x509.has_expired())
283 self.assertEqual(x509.get_issuer().CN, common_name)
284 self.assertEqual(x509.get_subject().CN, common_name)
285 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
286
287 def testLegacy(self):
288 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
289
290 utils.GenerateSelfSignedSslCert(cert1_filename, 1, validity=1)
291
292 cert1 = utils.ReadFile(cert1_filename)
293
294 self.assert_(self._checkRsaPrivateKey(cert1))
295 self.assert_(self._checkCertificate(cert1))
296
297 def _checkKeyMatchesCert(self, key, cert):
298 ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
299 ctx.use_privatekey(key)
300 ctx.use_certificate(cert)
301 try:
302 ctx.check_privatekey()
303 except OpenSSL.SSL.Error:
304 return False
305 else:
306 return True
307
308 def testSignedSslCertificate(self):
309 server_cert_filename = os.path.join(self.tmpdir, "server.pem")
310 utils.GenerateSelfSignedSslCert(server_cert_filename, 123456)
311
312 client_hostname = "myhost.example.com"
313 client_cert_filename = os.path.join(self.tmpdir, "client.pem")
314 utils.GenerateSignedSslCert(client_cert_filename, 666,
315 server_cert_filename, common_name=client_hostname)
316
317 client_cert_pem = utils.ReadFile(client_cert_filename)
318
319 self._checkRsaPrivateKey(client_cert_pem)
320 self._checkCertificate(client_cert_pem)
321
322 priv_key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
323 client_cert_pem)
324 client_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
325 client_cert_pem)
326
327 self.assertTrue(self._checkKeyMatchesCert(priv_key, client_cert))
328 self.assertEqual(client_cert.get_issuer().CN, "ganeti.example.com")
329 self.assertEqual(client_cert.get_subject().CN, client_hostname)
330
331
332 class TestCheckNodeCertificate(testutils.GanetiTestCase):
333 def setUp(self):
334 testutils.GanetiTestCase.setUp(self)
335 self.tmpdir = tempfile.mkdtemp()
336
337 def tearDown(self):
338 testutils.GanetiTestCase.tearDown(self)
339 shutil.rmtree(self.tmpdir)
340
341 def testMismatchingKey(self):
342 other_cert = testutils.TestDataFilename("cert1.pem")
343 node_cert = testutils.TestDataFilename("cert2.pem")
344
345 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
346 utils.ReadFile(other_cert))
347
348 try:
349 utils.CheckNodeCertificate(cert, _noded_cert_file=node_cert)
350 except errors.GenericError, err:
351 self.assertEqual(str(err),
352 "Given cluster certificate does not match local key")
353 else:
354 self.fail("Exception was not raised")
355
356 def testMatchingKey(self):
357 cert_filename = testutils.TestDataFilename("cert2.pem")
358
359 # Extract certificate
360 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
361 utils.ReadFile(cert_filename))
362 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
363 cert)
364
365 utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
366
367 def testMissingFile(self):
368 cert_path = testutils.TestDataFilename("cert1.pem")
369 nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
370
371 utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
372
373 self.assertFalse(os.path.exists(nodecert))
374
375 def testInvalidCertificate(self):
376 tmpfile = utils.PathJoin(self.tmpdir, "cert")
377 utils.WriteFile(tmpfile, data="not a certificate")
378
379 self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
380 NotImplemented, _noded_cert_file=tmpfile)
381
382 def testNoPrivateKey(self):
383 cert = testutils.TestDataFilename("cert1.pem")
384 self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
385 NotImplemented, _noded_cert_file=cert)
386
387 def testMismatchInNodeCert(self):
388 cert1_path = testutils.TestDataFilename("cert1.pem")
389 cert2_path = testutils.TestDataFilename("cert2.pem")
390 tmpfile = utils.PathJoin(self.tmpdir, "cert")
391
392 # Extract certificate
393 cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
394 utils.ReadFile(cert1_path))
395 cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
396 cert1)
397
398 # Extract mismatching key
399 key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
400 utils.ReadFile(cert2_path))
401 key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
402 key2)
403
404 # Write to file
405 utils.WriteFile(tmpfile, data=cert1_pem + key2_pem)
406
407 try:
408 utils.CheckNodeCertificate(cert1, _noded_cert_file=tmpfile)
409 except errors.X509CertError, err:
410 self.assertEqual(err.args,
411 (tmpfile, "Certificate does not match with private key"))
412 else:
413 self.fail("Exception was not raised")
414
415
416 if __name__ == "__main__":
417 testutils.GanetiTestProgram()