4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 """Script for testing ganeti.utils.x509"""
38 import distutils
.version
41 from ganeti
import constants
42 from ganeti
import utils
43 from ganeti
import errors
48 class TestParseAsn1Generalizedtime(unittest
.TestCase
):
50 self
._Parse
= utils
.x509
._ParseAsn1Generalizedtime
54 self
.assertEqual(self
._Parse("19700101000000Z"), 0)
55 self
.assertEqual(self
._Parse("20100222174152Z"), 1266860512)
56 self
.assertEqual(self
._Parse("20380119031407Z"), (2**31) - 1)
59 self
.assertEqual(self
._Parse("20100222174152+0000"), 1266860512)
60 self
.assertEqual(self
._Parse("20100223131652+0000"), 1266931012)
61 self
.assertEqual(self
._Parse("20100223051808-0800"), 1266931088)
62 self
.assertEqual(self
._Parse("20100224002135+1100"), 1266931295)
63 self
.assertEqual(self
._Parse("19700101000000-0100"), 3600)
65 # Leap seconds are not supported by datetime.datetime
66 self
.assertRaises(ValueError, self
._Parse
, "19841231235960+0000")
67 self
.assertRaises(ValueError, self
._Parse
, "19920630235960+0000")
70 self
.assertRaises(ValueError, self
._Parse
, "")
71 self
.assertRaises(ValueError, self
._Parse
, "invalid")
72 self
.assertRaises(ValueError, self
._Parse
, "20100222174152")
73 self
.assertRaises(ValueError, self
._Parse
, "Mon Feb 22 17:47:02 UTC 2010")
74 self
.assertRaises(ValueError, self
._Parse
, "2010-02-22 17:42:02")
77 class TestGetX509CertValidity(testutils
.GanetiTestCase
):
79 testutils
.GanetiTestCase
.setUp(self
)
81 pyopenssl_version
= distutils
.version
.LooseVersion(OpenSSL
.__version__
)
83 # Test whether we have pyOpenSSL 0.7 or above
84 self
.pyopenssl0_7
= (pyopenssl_version
>= "0.7")
86 if not self
.pyopenssl0_7
:
87 warnings
.warn("This test requires pyOpenSSL 0.7 or above to"
88 " function correctly")
90 def _LoadCert(self
, name
):
91 return OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
92 testutils
.ReadTestData(name
))
95 validity
= utils
.GetX509CertValidity(self
._LoadCert("cert1.pem"))
97 self
.assertEqual(validity
, (1266919967, 1267524767))
99 self
.assertEqual(validity
, (None, None))
102 class TestSignX509Certificate(unittest
.TestCase
):
103 KEY
= "My private key!"
104 KEY_OTHER
= "Another key"
107 # Generate certificate valid for 5 minutes
108 (_
, cert_pem
) = utils
.GenerateSelfSignedX509Cert(None, 300, 1)
110 cert
= OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
113 # No signature at all
114 self
.assertRaises(errors
.GenericError
,
115 utils
.LoadSignedX509Certificate
, cert_pem
, self
.KEY
)
118 self
.assertRaises(errors
.GenericError
, utils
.LoadSignedX509Certificate
,
120 self
.assertRaises(errors
.GenericError
, utils
.LoadSignedX509Certificate
,
121 "X-Ganeti-Signature: \n", self
.KEY
)
122 self
.assertRaises(errors
.GenericError
, utils
.LoadSignedX509Certificate
,
123 "X-Ganeti-Sign: $1234$abcdef\n", self
.KEY
)
124 self
.assertRaises(errors
.GenericError
, utils
.LoadSignedX509Certificate
,
125 "X-Ganeti-Signature: $1234567890$abcdef\n", self
.KEY
)
126 self
.assertRaises(errors
.GenericError
, utils
.LoadSignedX509Certificate
,
127 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem
, self
.KEY
)
130 for salt
in list("-_@$,:;/\\ \t\n"):
131 self
.assertRaises(errors
.GenericError
, utils
.SignX509Certificate
,
132 cert_pem
, self
.KEY
, "foo%sbar" % salt
)
134 for salt
in ["HelloWorld", "salt", string
.letters
, string
.digits
,
135 utils
.GenerateSecret(numbytes
=4),
136 utils
.GenerateSecret(numbytes
=16),
137 "{123:456}".encode("hex")]:
138 signed_pem
= utils
.SignX509Certificate(cert
, self
.KEY
, salt
)
140 self
._Check(cert
, salt
, signed_pem
)
142 self
._Check(cert
, salt
, "X-Another-Header: with a value\n" + signed_pem
)
143 self
._Check(cert
, salt
, (10 * "Hello World!\n") + signed_pem
)
144 self
._Check(cert
, salt
, (signed_pem
+ "\n\na few more\n"
145 "lines----\n------ at\nthe end!"))
147 def _Check(self
, cert
, salt
, pem
):
148 (cert2
, salt2
) = utils
.LoadSignedX509Certificate(pem
, self
.KEY
)
149 self
.assertEqual(salt
, salt2
)
150 self
.assertEqual(cert
.digest("sha1"), cert2
.digest("sha1"))
153 self
.assertRaises(errors
.GenericError
, utils
.LoadSignedX509Certificate
,
157 class TestCertVerification(testutils
.GanetiTestCase
):
159 testutils
.GanetiTestCase
.setUp(self
)
161 self
.tmpdir
= tempfile
.mkdtemp()
164 shutil
.rmtree(self
.tmpdir
)
166 def testVerifyCertificate(self
):
167 cert_pem
= testutils
.ReadTestData("cert1.pem")
168 cert
= OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
171 # Not checking return value as this certificate is expired
172 utils
.VerifyX509Certificate(cert
, 30, 7)
175 def _GenCert(key
, before
, validity
):
176 # Urgh... mostly copied from x509.py :(
178 # Create self-signed certificate
179 cert
= OpenSSL
.crypto
.X509()
180 cert
.set_serial_number(1)
182 cert
.gmtime_adj_notBefore(int(before
))
183 cert
.gmtime_adj_notAfter(validity
)
184 cert
.set_issuer(cert
.get_subject())
186 cert
.sign(key
, constants
.X509_CERT_SIGN_DIGEST
)
189 def testClockSkew(self
):
190 SKEW
= constants
.NODE_MAX_CLOCK_SKEW
191 # Create private and public key
192 key
= OpenSSL
.crypto
.PKey()
193 key
.generate_key(OpenSSL
.crypto
.TYPE_RSA
, constants
.RSA_KEY_BITS
)
196 # skew small enough, accepting cert; note that this is a timed
197 # test, and could fail if the machine is so loaded that the next
198 # few lines take more than NODE_MAX_CLOCK_SKEW / 2
199 for before
in [-1, 0, SKEW
/ 4, SKEW
/ 2]:
200 cert
= self
._GenCert(key
, before
, validity
)
201 result
= utils
.VerifyX509Certificate(cert
, 1, 2)
202 self
.assertEqual(result
, (None, None))
204 # skew too great, not accepting certs
205 for before
in [SKEW
* 2, SKEW
* 10]:
206 cert
= self
._GenCert(key
, before
, validity
)
207 (status
, msg
) = utils
.VerifyX509Certificate(cert
, 1, 2)
208 self
.assertEqual(status
, utils
.CERT_WARNING
)
209 self
.assertTrue(msg
.startswith("Certificate not yet valid"))
212 class TestVerifyCertificateInner(unittest
.TestCase
):
214 vci
= utils
.x509
._VerifyCertificateInner
217 self
.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
221 (errcode
, msg
) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
222 self
.assertEqual(errcode
, utils
.CERT_WARNING
)
225 (errcode
, msg
) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
226 self
.assertEqual(errcode
, utils
.CERT_ERROR
)
228 (errcode
, msg
) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
229 self
.assertEqual(errcode
, utils
.CERT_WARNING
)
231 (errcode
, msg
) = vci(False, 1266507600, None, 1266939600, 30, 7)
232 self
.assertEqual(errcode
, None)
235 (errcode
, msg
) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
236 self
.assertEqual(errcode
, utils
.CERT_ERROR
)
238 (errcode
, msg
) = vci(True, None, 1267544400, 1266939600, 30, 7)
239 self
.assertEqual(errcode
, utils
.CERT_ERROR
)
241 (errcode
, msg
) = vci(True, 1266507600, None, 1266939600, 30, 7)
242 self
.assertEqual(errcode
, utils
.CERT_ERROR
)
244 (errcode
, msg
) = vci(True, None, None, 1266939600, 30, 7)
245 self
.assertEqual(errcode
, utils
.CERT_ERROR
)
248 class TestGenerateX509Certs(unittest
.TestCase
):
250 self
.tmpdir
= tempfile
.mkdtemp()
253 shutil
.rmtree(self
.tmpdir
)
255 def _checkRsaPrivateKey(self
, key
):
256 lines
= key
.splitlines()
257 return (("-----BEGIN RSA PRIVATE KEY-----" in lines
and
258 "-----END RSA PRIVATE KEY-----" in lines
) or
259 ("-----BEGIN PRIVATE KEY-----" in lines
and
260 "-----END PRIVATE KEY-----" in lines
))
262 def _checkCertificate(self
, cert
):
263 lines
= cert
.splitlines()
264 return ("-----BEGIN CERTIFICATE-----" in lines
and
265 "-----END CERTIFICATE-----" in lines
)
268 for common_name
in [None, ".", "Ganeti", "node1.example.com"]:
269 (key_pem
, cert_pem
) = utils
.GenerateSelfSignedX509Cert(common_name
, 300,
271 self
._checkRsaPrivateKey(key_pem
)
272 self
._checkCertificate(cert_pem
)
274 key
= OpenSSL
.crypto
.load_privatekey(OpenSSL
.crypto
.FILETYPE_PEM
,
276 self
.assert_(key
.bits() >= 1024)
277 self
.assertEqual(key
.bits(), constants
.RSA_KEY_BITS
)
278 self
.assertEqual(key
.type(), OpenSSL
.crypto
.TYPE_RSA
)
280 x509
= OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
282 self
.failIf(x509
.has_expired())
283 self
.assertEqual(x509
.get_issuer().CN
, common_name
)
284 self
.assertEqual(x509
.get_subject().CN
, common_name
)
285 self
.assertEqual(x509
.get_pubkey().bits(), constants
.RSA_KEY_BITS
)
287 def testLegacy(self
):
288 cert1_filename
= os
.path
.join(self
.tmpdir
, "cert1.pem")
290 utils
.GenerateSelfSignedSslCert(cert1_filename
, 1, validity
=1)
292 cert1
= utils
.ReadFile(cert1_filename
)
294 self
.assert_(self
._checkRsaPrivateKey(cert1
))
295 self
.assert_(self
._checkCertificate(cert1
))
297 def _checkKeyMatchesCert(self
, key
, cert
):
298 ctx
= OpenSSL
.SSL
.Context(OpenSSL
.SSL
.TLSv1_METHOD
)
299 ctx
.use_privatekey(key
)
300 ctx
.use_certificate(cert
)
302 ctx
.check_privatekey()
303 except OpenSSL
.SSL
.Error
:
308 def testSignedSslCertificate(self
):
309 server_cert_filename
= os
.path
.join(self
.tmpdir
, "server.pem")
310 utils
.GenerateSelfSignedSslCert(server_cert_filename
, 123456)
312 client_hostname
= "myhost.example.com"
313 client_cert_filename
= os
.path
.join(self
.tmpdir
, "client.pem")
314 utils
.GenerateSignedSslCert(client_cert_filename
, 666,
315 server_cert_filename
, common_name
=client_hostname
)
317 client_cert_pem
= utils
.ReadFile(client_cert_filename
)
319 self
._checkRsaPrivateKey(client_cert_pem
)
320 self
._checkCertificate(client_cert_pem
)
322 priv_key
= OpenSSL
.crypto
.load_privatekey(OpenSSL
.crypto
.FILETYPE_PEM
,
324 client_cert
= OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
327 self
.assertTrue(self
._checkKeyMatchesCert(priv_key
, client_cert
))
328 self
.assertEqual(client_cert
.get_issuer().CN
, "ganeti.example.com")
329 self
.assertEqual(client_cert
.get_subject().CN
, client_hostname
)
332 class TestCheckNodeCertificate(testutils
.GanetiTestCase
):
334 testutils
.GanetiTestCase
.setUp(self
)
335 self
.tmpdir
= tempfile
.mkdtemp()
338 testutils
.GanetiTestCase
.tearDown(self
)
339 shutil
.rmtree(self
.tmpdir
)
341 def testMismatchingKey(self
):
342 other_cert
= testutils
.TestDataFilename("cert1.pem")
343 node_cert
= testutils
.TestDataFilename("cert2.pem")
345 cert
= OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
346 utils
.ReadFile(other_cert
))
349 utils
.CheckNodeCertificate(cert
, _noded_cert_file
=node_cert
)
350 except errors
.GenericError
, err
:
351 self
.assertEqual(str(err
),
352 "Given cluster certificate does not match local key")
354 self
.fail("Exception was not raised")
356 def testMatchingKey(self
):
357 cert_filename
= testutils
.TestDataFilename("cert2.pem")
359 # Extract certificate
360 cert
= OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
361 utils
.ReadFile(cert_filename
))
362 cert_pem
= OpenSSL
.crypto
.dump_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
365 utils
.CheckNodeCertificate(cert
, _noded_cert_file
=cert_filename
)
367 def testMissingFile(self
):
368 cert_path
= testutils
.TestDataFilename("cert1.pem")
369 nodecert
= utils
.PathJoin(self
.tmpdir
, "does-not-exist")
371 utils
.CheckNodeCertificate(NotImplemented, _noded_cert_file
=nodecert
)
373 self
.assertFalse(os
.path
.exists(nodecert
))
375 def testInvalidCertificate(self
):
376 tmpfile
= utils
.PathJoin(self
.tmpdir
, "cert")
377 utils
.WriteFile(tmpfile
, data
="not a certificate")
379 self
.assertRaises(errors
.X509CertError
, utils
.CheckNodeCertificate
,
380 NotImplemented, _noded_cert_file
=tmpfile
)
382 def testNoPrivateKey(self
):
383 cert
= testutils
.TestDataFilename("cert1.pem")
384 self
.assertRaises(errors
.X509CertError
, utils
.CheckNodeCertificate
,
385 NotImplemented, _noded_cert_file
=cert
)
387 def testMismatchInNodeCert(self
):
388 cert1_path
= testutils
.TestDataFilename("cert1.pem")
389 cert2_path
= testutils
.TestDataFilename("cert2.pem")
390 tmpfile
= utils
.PathJoin(self
.tmpdir
, "cert")
392 # Extract certificate
393 cert1
= OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
394 utils
.ReadFile(cert1_path
))
395 cert1_pem
= OpenSSL
.crypto
.dump_certificate(OpenSSL
.crypto
.FILETYPE_PEM
,
398 # Extract mismatching key
399 key2
= OpenSSL
.crypto
.load_privatekey(OpenSSL
.crypto
.FILETYPE_PEM
,
400 utils
.ReadFile(cert2_path
))
401 key2_pem
= OpenSSL
.crypto
.dump_privatekey(OpenSSL
.crypto
.FILETYPE_PEM
,
405 utils
.WriteFile(tmpfile
, data
=cert1_pem
+ key2_pem
)
408 utils
.CheckNodeCertificate(cert1
, _noded_cert_file
=tmpfile
)
409 except errors
.X509CertError
, err
:
410 self
.assertEqual(err
.args
,
411 (tmpfile
, "Certificate does not match with private key"))
413 self
.fail("Exception was not raised")
416 if __name__
== "__main__":
417 testutils
.GanetiTestProgram()