Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module with helper classes and functions for daemons"""
32
33
34 import asyncore
35 import asynchat
36 import collections
37 import os
38 import signal
39 import logging
40 import sched
41 import time
42 import socket
43 import select
44 import sys
45
46 from ganeti import utils
47 from ganeti import constants
48 from ganeti import errors
49 from ganeti import netutils
50 from ganeti import ssconf
51 from ganeti import runtime
52 from ganeti import compat
53
54
55 class SchedulerBreakout(Exception):
56 """Exception used to get out of the scheduler loop
57
58 """
59
60
61 def AsyncoreDelayFunction(timeout):
62 """Asyncore-compatible scheduler delay function.
63
64 This is a delay function for sched that, rather than actually sleeping,
65 executes asyncore events happening in the meantime.
66
67 After an event has occurred, rather than returning, it raises a
68 SchedulerBreakout exception, which will force the current scheduler.run()
69 invocation to terminate, so that we can also check for signals. The main loop
70 will then call the scheduler run again, which will allow it to actually
71 process any due events.
72
73 This is needed because scheduler.run() doesn't support a count=..., as
74 asyncore loop, and the scheduler module documents throwing exceptions from
75 inside the delay function as an allowed usage model.
76
77 """
78 asyncore.loop(timeout=timeout, count=1, use_poll=True)
79 raise SchedulerBreakout()
80
81
82 class AsyncoreScheduler(sched.scheduler):
83 """Event scheduler integrated with asyncore
84
85 """
86 def __init__(self, timefunc):
87 """Initializes this class.
88
89 """
90 sched.scheduler.__init__(self, timefunc, self._LimitedDelay)
91 self._max_delay = None
92
93 def run(self, max_delay=None): # pylint: disable=W0221
94 """Run any pending events.
95
96 @type max_delay: None or number
97 @param max_delay: Maximum delay (useful if caller has timeouts running)
98
99 """
100 assert self._max_delay is None
101
102 # The delay function used by the scheduler can't be different on each run,
103 # hence an instance variable must be used.
104 if max_delay is None:
105 self._max_delay = None
106 else:
107 self._max_delay = utils.RunningTimeout(max_delay, False)
108
109 try:
110 return sched.scheduler.run(self)
111 finally:
112 self._max_delay = None
113
114 def _LimitedDelay(self, duration):
115 """Custom delay function for C{sched.scheduler}.
116
117 """
118 if self._max_delay is None:
119 timeout = duration
120 else:
121 timeout = min(duration, self._max_delay.Remaining())
122
123 return AsyncoreDelayFunction(timeout)
124
125
126 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
127 """Base Ganeti Asyncore Dispacher
128
129 """
130 # this method is overriding an asyncore.dispatcher method
131 def handle_error(self):
132 """Log an error in handling any request, and proceed.
133
134 """
135 logging.exception("Error while handling asyncore request")
136
137 # this method is overriding an asyncore.dispatcher method
138 def writable(self):
139 """Most of the time we don't want to check for writability.
140
141 """
142 return False
143
144
145 class AsyncTerminatedMessageStream(asynchat.async_chat):
146 """A terminator separated message stream asyncore module.
147
148 Handles a stream connection receiving messages terminated by a defined
149 separator. For each complete message handle_message is called.
150
151 """
152 def __init__(self, connected_socket, peer_address, terminator, family,
153 unhandled_limit):
154 """AsyncTerminatedMessageStream constructor.
155
156 @type connected_socket: socket.socket
157 @param connected_socket: connected stream socket to receive messages from
158 @param peer_address: family-specific peer address
159 @type terminator: string
160 @param terminator: terminator separating messages in the stream
161 @type family: integer
162 @param family: socket family
163 @type unhandled_limit: integer or None
164 @param unhandled_limit: maximum unanswered messages
165
166 """
167 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
168 # using a positional argument rather than a keyword one.
169 asynchat.async_chat.__init__(self, connected_socket)
170 self.connected_socket = connected_socket
171 # on python 2.4 there is no "family" attribute for the socket class
172 # FIXME: when we move to python 2.5 or above remove the family parameter
173 #self.family = self.connected_socket.family
174 self.family = family
175 self.peer_address = peer_address
176 self.terminator = terminator
177 self.unhandled_limit = unhandled_limit
178 self.set_terminator(terminator)
179 self.ibuffer = []
180 self.receive_count = 0
181 self.send_count = 0
182 self.oqueue = collections.deque()
183 self.iqueue = collections.deque()
184
185 # this method is overriding an asynchat.async_chat method
186 def collect_incoming_data(self, data):
187 self.ibuffer.append(data)
188
189 def _can_handle_message(self):
190 return (self.unhandled_limit is None or
191 (self.receive_count < self.send_count + self.unhandled_limit) and
192 not self.iqueue)
193
194 # this method is overriding an asynchat.async_chat method
195 def found_terminator(self):
196 message = "".join(self.ibuffer)
197 self.ibuffer = []
198 message_id = self.receive_count
199 # We need to increase the receive_count after checking if the message can
200 # be handled, but before calling handle_message
201 can_handle = self._can_handle_message()
202 self.receive_count += 1
203 if can_handle:
204 self.handle_message(message, message_id)
205 else:
206 self.iqueue.append((message, message_id))
207
208 def handle_message(self, message, message_id):
209 """Handle a terminated message.
210
211 @type message: string
212 @param message: message to handle
213 @type message_id: integer
214 @param message_id: stream's message sequence number
215
216 """
217 pass
218 # TODO: move this method to raise NotImplementedError
219 # raise NotImplementedError
220
221 def send_message(self, message):
222 """Send a message to the remote peer. This function is thread-safe.
223
224 @type message: string
225 @param message: message to send, without the terminator
226
227 @warning: If calling this function from a thread different than the one
228 performing the main asyncore loop, remember that you have to wake that one
229 up.
230
231 """
232 # If we just append the message we received to the output queue, this
233 # function can be safely called by multiple threads at the same time, and
234 # we don't need locking, since deques are thread safe. handle_write in the
235 # asyncore thread will handle the next input message if there are any
236 # enqueued.
237 self.oqueue.append(message)
238
239 # this method is overriding an asyncore.dispatcher method
240 def readable(self):
241 # read from the socket if we can handle the next requests
242 return self._can_handle_message() and asynchat.async_chat.readable(self)
243
244 # this method is overriding an asyncore.dispatcher method
245 def writable(self):
246 # the output queue may become full just after we called writable. This only
247 # works if we know we'll have something else waking us up from the select,
248 # in such case, anyway.
249 return asynchat.async_chat.writable(self) or self.oqueue
250
251 # this method is overriding an asyncore.dispatcher method
252 def handle_write(self):
253 if self.oqueue:
254 # if we have data in the output queue, then send_message was called.
255 # this means we can process one more message from the input queue, if
256 # there are any.
257 data = self.oqueue.popleft()
258 self.push(data + self.terminator)
259 self.send_count += 1
260 if self.iqueue:
261 self.handle_message(*self.iqueue.popleft())
262 self.initiate_send()
263
264 def close_log(self):
265 logging.info("Closing connection from %s",
266 netutils.FormatAddress(self.peer_address, family=self.family))
267 self.close()
268
269 # this method is overriding an asyncore.dispatcher method
270 def handle_expt(self):
271 self.close_log()
272
273 # this method is overriding an asyncore.dispatcher method
274 def handle_error(self):
275 """Log an error in handling any request, and proceed.
276
277 """
278 logging.exception("Error while handling asyncore request")
279 self.close_log()
280
281
282 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
283 """An improved asyncore udp socket.
284
285 """
286 def __init__(self, family):
287 """Constructor for AsyncUDPSocket
288
289 """
290 GanetiBaseAsyncoreDispatcher.__init__(self)
291 self._out_queue = []
292 self._family = family
293 self.create_socket(family, socket.SOCK_DGRAM)
294
295 # this method is overriding an asyncore.dispatcher method
296 def handle_connect(self):
297 # Python thinks that the first udp message from a source qualifies as a
298 # "connect" and further ones are part of the same connection. We beg to
299 # differ and treat all messages equally.
300 pass
301
302 # this method is overriding an asyncore.dispatcher method
303 def handle_read(self):
304 recv_result = utils.IgnoreSignals(self.recvfrom,
305 constants.MAX_UDP_DATA_SIZE)
306 if recv_result is not None:
307 payload, address = recv_result
308 if self._family == socket.AF_INET6:
309 # we ignore 'flow info' and 'scope id' as we don't need them
310 ip, port, _, _ = address
311 else:
312 ip, port = address
313
314 self.handle_datagram(payload, ip, port)
315
316 def handle_datagram(self, payload, ip, port):
317 """Handle an already read udp datagram
318
319 """
320 raise NotImplementedError
321
322 # this method is overriding an asyncore.dispatcher method
323 def writable(self):
324 # We should check whether we can write to the socket only if we have
325 # something scheduled to be written
326 return bool(self._out_queue)
327
328 # this method is overriding an asyncore.dispatcher method
329 def handle_write(self):
330 if not self._out_queue:
331 logging.error("handle_write called with empty output queue")
332 return
333 (ip, port, payload) = self._out_queue[0]
334 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
335 self._out_queue.pop(0)
336
337 def enqueue_send(self, ip, port, payload):
338 """Enqueue a datagram to be sent when possible
339
340 """
341 if len(payload) > constants.MAX_UDP_DATA_SIZE:
342 raise errors.UdpDataSizeError("Packet too big: %s > %s" % (len(payload),
343 constants.MAX_UDP_DATA_SIZE))
344 self._out_queue.append((ip, port, payload))
345
346 def process_next_packet(self, timeout=0):
347 """Process the next datagram, waiting for it if necessary.
348
349 @type timeout: float
350 @param timeout: how long to wait for data
351 @rtype: boolean
352 @return: True if some data has been handled, False otherwise
353
354 """
355 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
356 if result is not None and result & select.POLLIN:
357 self.handle_read()
358 return True
359 else:
360 return False
361
362
363 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
364 """A way to notify the asyncore loop that something is going on.
365
366 If an asyncore daemon is multithreaded when a thread tries to push some data
367 to a socket, the main loop handling asynchronous requests might be sleeping
368 waiting on a select(). To avoid this it can create an instance of the
369 AsyncAwaker, which other threads can use to wake it up.
370
371 """
372 def __init__(self, signal_fn=None):
373 """Constructor for AsyncAwaker
374
375 @type signal_fn: function
376 @param signal_fn: function to call when awaken
377
378 """
379 GanetiBaseAsyncoreDispatcher.__init__(self)
380 assert signal_fn is None or callable(signal_fn)
381 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
382 socket.SOCK_STREAM)
383 self.in_socket.setblocking(0)
384 self.in_socket.shutdown(socket.SHUT_WR)
385 self.out_socket.shutdown(socket.SHUT_RD)
386 self.set_socket(self.in_socket)
387 self.need_signal = True
388 self.signal_fn = signal_fn
389 self.connected = True
390
391 # this method is overriding an asyncore.dispatcher method
392 def handle_read(self):
393 utils.IgnoreSignals(self.recv, 4096)
394 if self.signal_fn:
395 self.signal_fn()
396 self.need_signal = True
397
398 # this method is overriding an asyncore.dispatcher method
399 def close(self):
400 asyncore.dispatcher.close(self)
401 self.out_socket.close()
402
403 def signal(self):
404 """Signal the asyncore main loop.
405
406 Any data we send here will be ignored, but it will cause the select() call
407 to return.
408
409 """
410 # Yes, there is a race condition here. No, we don't care, at worst we're
411 # sending more than one wakeup token, which doesn't harm at all.
412 if self.need_signal:
413 self.need_signal = False
414 self.out_socket.send(chr(0))
415
416
417 class _ShutdownCheck(object):
418 """Logic for L{Mainloop} shutdown.
419
420 """
421 def __init__(self, fn):
422 """Initializes this class.
423
424 @type fn: callable
425 @param fn: Function returning C{None} if mainloop can be stopped or a
426 duration in seconds after which the function should be called again
427 @see: L{Mainloop.Run}
428
429 """
430 assert callable(fn)
431
432 self._fn = fn
433 self._defer = None
434
435 def CanShutdown(self):
436 """Checks whether mainloop can be stopped.
437
438 @rtype: bool
439
440 """
441 if self._defer and self._defer.Remaining() > 0:
442 # A deferred check has already been scheduled
443 return False
444
445 # Ask mainloop driver whether we can stop or should check again
446 timeout = self._fn()
447
448 if timeout is None:
449 # Yes, can stop mainloop
450 return True
451
452 # Schedule another check in the future
453 self._defer = utils.RunningTimeout(timeout, True)
454
455 return False
456
457
458 class Mainloop(object):
459 """Generic mainloop for daemons
460
461 @ivar scheduler: A sched.scheduler object, which can be used to register
462 timed events
463
464 """
465 _SHUTDOWN_TIMEOUT_PRIORITY = -(sys.maxint - 1)
466
467 def __init__(self):
468 """Constructs a new Mainloop instance.
469
470 """
471 self._signal_wait = []
472 self.scheduler = AsyncoreScheduler(time.time)
473
474 # Resolve uid/gids used
475 runtime.GetEnts()
476
477 @utils.SignalHandled([signal.SIGCHLD])
478 @utils.SignalHandled([signal.SIGTERM])
479 @utils.SignalHandled([signal.SIGINT])
480 def Run(self, shutdown_wait_fn=None, signal_handlers=None):
481 """Runs the mainloop.
482
483 @type shutdown_wait_fn: callable
484 @param shutdown_wait_fn: Function to check whether loop can be terminated;
485 B{important}: function must be idempotent and must return either None
486 for shutting down or a timeout for another call
487 @type signal_handlers: dict
488 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
489
490 """
491 assert isinstance(signal_handlers, dict) and \
492 len(signal_handlers) > 0, \
493 "Broken SignalHandled decorator"
494
495 # Counter for received signals
496 shutdown_signals = 0
497
498 # Logic to wait for shutdown
499 shutdown_waiter = None
500
501 # Start actual main loop
502 while True:
503 if shutdown_signals == 1 and shutdown_wait_fn is not None:
504 if shutdown_waiter is None:
505 shutdown_waiter = _ShutdownCheck(shutdown_wait_fn)
506
507 # Let mainloop driver decide if we can already abort
508 if shutdown_waiter.CanShutdown():
509 break
510
511 # Re-evaluate in a second
512 timeout = 1.0
513
514 elif shutdown_signals >= 1:
515 # Abort loop if more than one signal has been sent or no callback has
516 # been given
517 break
518
519 else:
520 # Wait forever on I/O events
521 timeout = None
522
523 if self.scheduler.empty():
524 asyncore.loop(count=1, timeout=timeout, use_poll=True)
525 else:
526 try:
527 self.scheduler.run(max_delay=timeout)
528 except SchedulerBreakout:
529 pass
530
531 # Check whether a signal was raised
532 for (sig, handler) in signal_handlers.items():
533 if handler.called:
534 self._CallSignalWaiters(sig)
535 if sig in (signal.SIGTERM, signal.SIGINT):
536 logging.info("Received signal %s asking for shutdown", sig)
537 shutdown_signals += 1
538 handler.Clear()
539
540 def _CallSignalWaiters(self, signum):
541 """Calls all signal waiters for a certain signal.
542
543 @type signum: int
544 @param signum: Signal number
545
546 """
547 for owner in self._signal_wait:
548 owner.OnSignal(signum)
549
550 def RegisterSignal(self, owner):
551 """Registers a receiver for signal notifications
552
553 The receiver must support a "OnSignal(self, signum)" function.
554
555 @type owner: instance
556 @param owner: Receiver
557
558 """
559 self._signal_wait.append(owner)
560
561
562 def _VerifyDaemonUser(daemon_name):
563 """Verifies the process uid matches the configured uid.
564
565 This method verifies that a daemon is started as the user it is
566 intended to be run
567
568 @param daemon_name: The name of daemon to be started
569 @return: A tuple with the first item indicating success or not,
570 the second item current uid and third with expected uid
571
572 """
573 getents = runtime.GetEnts()
574 running_uid = os.getuid()
575 daemon_uids = {
576 constants.MASTERD: getents.masterd_uid,
577 constants.RAPI: getents.rapi_uid,
578 constants.NODED: getents.noded_uid,
579 constants.CONFD: getents.confd_uid,
580 }
581 assert daemon_name in daemon_uids, "Invalid daemon %s" % daemon_name
582
583 return (daemon_uids[daemon_name] == running_uid, running_uid,
584 daemon_uids[daemon_name])
585
586
587 def _BeautifyError(err):
588 """Try to format an error better.
589
590 Since we're dealing with daemon startup errors, in many cases this
591 will be due to socket error and such, so we try to format these cases better.
592
593 @param err: an exception object
594 @rtype: string
595 @return: the formatted error description
596
597 """
598 try:
599 if isinstance(err, socket.error):
600 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
601 elif isinstance(err, EnvironmentError):
602 if err.filename is None:
603 return "%s (errno=%s)" % (err.strerror, err.errno)
604 else:
605 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
606 err.errno)
607 else:
608 return str(err)
609 except Exception: # pylint: disable=W0703
610 logging.exception("Error while handling existing error %s", err)
611 return "%s" % str(err)
612
613
614 def _HandleSigHup(reopen_fn, signum, frame): # pylint: disable=W0613
615 """Handler for SIGHUP.
616
617 @param reopen_fn: List of callback functions for reopening log files
618
619 """
620 logging.info("Reopening log files after receiving SIGHUP")
621
622 for fn in reopen_fn:
623 if fn:
624 fn()
625
626
627 def GenericMain(daemon_name, optionparser,
628 check_fn, prepare_fn, exec_fn,
629 multithreaded=False, console_logging=False,
630 default_ssl_cert=None, default_ssl_key=None,
631 warn_breach=False):
632 """Shared main function for daemons.
633
634 @type daemon_name: string
635 @param daemon_name: daemon name
636 @type optionparser: optparse.OptionParser
637 @param optionparser: initialized optionparser with daemon-specific options
638 (common -f -d options will be handled by this module)
639 @type check_fn: function which accepts (options, args)
640 @param check_fn: function that checks start conditions and exits if they're
641 not met
642 @type prepare_fn: function which accepts (options, args)
643 @param prepare_fn: function that is run before forking, or None;
644 it's result will be passed as the third parameter to exec_fn, or
645 if None was passed in, we will just pass None to exec_fn
646 @type exec_fn: function which accepts (options, args, prepare_results)
647 @param exec_fn: function that's executed with the daemon's pid file held, and
648 runs the daemon itself.
649 @type multithreaded: bool
650 @param multithreaded: Whether the daemon uses threads
651 @type console_logging: boolean
652 @param console_logging: if True, the daemon will fall back to the system
653 console if logging fails
654 @type default_ssl_cert: string
655 @param default_ssl_cert: Default SSL certificate path
656 @type default_ssl_key: string
657 @param default_ssl_key: Default SSL key path
658 @type warn_breach: bool
659 @param warn_breach: issue a warning at daemon launch time, before
660 daemonizing, about the possibility of breaking parameter privacy
661 invariants through the otherwise helpful debug logging.
662
663 """
664 optionparser.add_option("-f", "--foreground", dest="fork",
665 help="Don't detach from the current terminal",
666 default=True, action="store_false")
667 optionparser.add_option("-d", "--debug", dest="debug",
668 help="Enable some debug messages",
669 default=False, action="store_true")
670 optionparser.add_option("--syslog", dest="syslog",
671 help="Enable logging to syslog (except debug"
672 " messages); one of 'no', 'yes' or 'only' [%s]" %
673 constants.SYSLOG_USAGE,
674 default=constants.SYSLOG_USAGE,
675 choices=["no", "yes", "only"])
676
677 family = ssconf.SimpleStore().GetPrimaryIPFamily()
678 # family will default to AF_INET if there is no ssconf file (e.g. when
679 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
680 # <= 2.2 can not be AF_INET6
681 if daemon_name in constants.DAEMONS_PORTS:
682 default_bind_address = constants.IP4_ADDRESS_ANY
683 if family == netutils.IP6Address.family:
684 default_bind_address = constants.IP6_ADDRESS_ANY
685
686 default_port = netutils.GetDaemonPort(daemon_name)
687
688 # For networked daemons we allow choosing the port and bind address
689 optionparser.add_option("-p", "--port", dest="port",
690 help="Network port (default: %s)" % default_port,
691 default=default_port, type="int")
692 optionparser.add_option("-b", "--bind", dest="bind_address",
693 help=("Bind address (default: '%s')" %
694 default_bind_address),
695 default=default_bind_address, metavar="ADDRESS")
696 optionparser.add_option("-i", "--interface", dest="bind_interface",
697 help=("Bind interface"), metavar="INTERFACE")
698
699 if default_ssl_key is not None and default_ssl_cert is not None:
700 optionparser.add_option("--no-ssl", dest="ssl",
701 help="Do not secure HTTP protocol with SSL",
702 default=True, action="store_false")
703 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
704 help=("SSL key path (default: %s)" %
705 default_ssl_key),
706 default=default_ssl_key, type="string",
707 metavar="SSL_KEY_PATH")
708 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
709 help=("SSL certificate path (default: %s)" %
710 default_ssl_cert),
711 default=default_ssl_cert, type="string",
712 metavar="SSL_CERT_PATH")
713
714 # Disable the use of fork(2) if the daemon uses threads
715 if multithreaded:
716 utils.DisableFork()
717
718 options, args = optionparser.parse_args()
719
720 if getattr(options, "bind_interface", None) is not None:
721 if options.bind_address != default_bind_address:
722 msg = ("Can't specify both, bind address (%s) and bind interface (%s)" %
723 (options.bind_address, options.bind_interface))
724 print >> sys.stderr, msg
725 sys.exit(constants.EXIT_FAILURE)
726 interface_ip_addresses = \
727 netutils.GetInterfaceIpAddresses(options.bind_interface)
728 if family == netutils.IP6Address.family:
729 if_addresses = interface_ip_addresses[constants.IP6_VERSION]
730 else:
731 if_addresses = interface_ip_addresses[constants.IP4_VERSION]
732 if len(if_addresses) < 1:
733 msg = "Failed to find IP for interface %s" % options.bind_interace
734 print >> sys.stderr, msg
735 sys.exit(constants.EXIT_FAILURE)
736 options.bind_address = if_addresses[0]
737
738 if getattr(options, "ssl", False):
739 ssl_paths = {
740 "certificate": options.ssl_cert,
741 "key": options.ssl_key,
742 }
743
744 for name, path in ssl_paths.iteritems():
745 if not os.path.isfile(path):
746 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
747 sys.exit(constants.EXIT_FAILURE)
748
749 # TODO: By initiating http.HttpSslParams here we would only read the files
750 # once and have a proper validation (isfile returns False on directories)
751 # at the same time.
752
753 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
754 if not result:
755 msg = ("%s started using wrong user ID (%d), expected %d" %
756 (daemon_name, running_uid, expected_uid))
757 print >> sys.stderr, msg
758 sys.exit(constants.EXIT_FAILURE)
759
760 if check_fn is not None:
761 check_fn(options, args)
762
763 log_filename = constants.DAEMONS_LOGFILES[daemon_name]
764
765 # node-daemon logging in lib/http/server.py, _HandleServerRequestInner
766 if options.debug and warn_breach:
767 sys.stderr.write(constants.DEBUG_MODE_CONFIDENTIALITY_WARNING % daemon_name)
768
769 if options.fork:
770 # Newer GnuTLS versions (>= 3.3.0) use a library constructor for
771 # initialization and open /dev/urandom on library load time, way before we
772 # fork(). Closing /dev/urandom causes subsequent ganeti.http.client
773 # requests to fail and the process to receive a SIGABRT. As we cannot
774 # reliably detect GnuTLS's socket, we work our way around this by keeping
775 # all fds referring to /dev/urandom open.
776 noclose_fds = []
777 for fd in os.listdir("/proc/self/fd"):
778 try:
779 if os.readlink(os.path.join("/proc/self/fd", fd)) == "/dev/urandom":
780 noclose_fds.append(int(fd))
781 except EnvironmentError:
782 # The fd might have disappeared (although it shouldn't as we're running
783 # single-threaded).
784 continue
785
786 utils.CloseFDs(noclose_fds=noclose_fds)
787 (wpipe, stdio_reopen_fn) = utils.Daemonize(logfile=log_filename)
788 else:
789 (wpipe, stdio_reopen_fn) = (None, None)
790
791 log_reopen_fn = \
792 utils.SetupLogging(log_filename, daemon_name,
793 debug=options.debug,
794 stderr_logging=not options.fork,
795 multithreaded=multithreaded,
796 syslog=options.syslog,
797 console_logging=console_logging)
798
799 # Reopen log file(s) on SIGHUP
800 signal.signal(signal.SIGHUP,
801 compat.partial(_HandleSigHup, [log_reopen_fn, stdio_reopen_fn]))
802
803 try:
804 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
805 except errors.PidFileLockError, err:
806 print >> sys.stderr, "Error while locking PID file:\n%s" % err
807 sys.exit(constants.EXIT_FAILURE)
808
809 try:
810 try:
811 logging.info("%s daemon startup", daemon_name)
812 if callable(prepare_fn):
813 prep_results = prepare_fn(options, args)
814 else:
815 prep_results = None
816 except Exception, err:
817 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
818 raise
819
820 if wpipe is not None:
821 # we're done with the preparation phase, we close the pipe to
822 # let the parent know it's safe to exit
823 os.close(wpipe)
824
825 exec_fn(options, args, prep_results)
826 finally:
827 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))