Reopen log files upon SIGHUP in daemons
[ganeti-github.git] / lib / daemon.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module with helper classes and functions for daemons"""
23
24
25 import asyncore
26 import asynchat
27 import collections
28 import os
29 import signal
30 import logging
31 import sched
32 import time
33 import socket
34 import select
35 import sys
36
37 from ganeti import utils
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import netutils
41 from ganeti import ssconf
42 from ganeti import runtime
43 from ganeti import compat
44
45
46 class SchedulerBreakout(Exception):
47 """Exception used to get out of the scheduler loop
48
49 """
50
51
52 def AsyncoreDelayFunction(timeout):
53 """Asyncore-compatible scheduler delay function.
54
55 This is a delay function for sched that, rather than actually sleeping,
56 executes asyncore events happening in the meantime.
57
58 After an event has occurred, rather than returning, it raises a
59 SchedulerBreakout exception, which will force the current scheduler.run()
60 invocation to terminate, so that we can also check for signals. The main loop
61 will then call the scheduler run again, which will allow it to actually
62 process any due events.
63
64 This is needed because scheduler.run() doesn't support a count=..., as
65 asyncore loop, and the scheduler module documents throwing exceptions from
66 inside the delay function as an allowed usage model.
67
68 """
69 asyncore.loop(timeout=timeout, count=1, use_poll=True)
70 raise SchedulerBreakout()
71
72
73 class AsyncoreScheduler(sched.scheduler):
74 """Event scheduler integrated with asyncore
75
76 """
77 def __init__(self, timefunc):
78 sched.scheduler.__init__(self, timefunc, AsyncoreDelayFunction)
79
80
81 class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher):
82 """Base Ganeti Asyncore Dispacher
83
84 """
85 # this method is overriding an asyncore.dispatcher method
86 def handle_error(self):
87 """Log an error in handling any request, and proceed.
88
89 """
90 logging.exception("Error while handling asyncore request")
91
92 # this method is overriding an asyncore.dispatcher method
93 def writable(self):
94 """Most of the time we don't want to check for writability.
95
96 """
97 return False
98
99
100 class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
101 """A stream server to use with asyncore.
102
103 Each request is accepted, and then dispatched to a separate asyncore
104 dispatcher to handle.
105
106 """
107
108 _REQUEST_QUEUE_SIZE = 5
109
110 def __init__(self, family, address):
111 """Constructor for AsyncUnixStreamSocket
112
113 @type family: integer
114 @param family: socket family (one of socket.AF_*)
115 @type address: address family dependent
116 @param address: address to bind the socket to
117
118 """
119 GanetiBaseAsyncoreDispatcher.__init__(self)
120 self.family = family
121 self.create_socket(self.family, socket.SOCK_STREAM)
122 self.set_reuse_addr()
123 self.bind(address)
124 self.listen(self._REQUEST_QUEUE_SIZE)
125
126 # this method is overriding an asyncore.dispatcher method
127 def handle_accept(self):
128 """Accept a new client connection.
129
130 Creates a new instance of the handler class, which will use asyncore to
131 serve the client.
132
133 """
134 accept_result = utils.IgnoreSignals(self.accept)
135 if accept_result is not None:
136 connected_socket, client_address = accept_result
137 if self.family == socket.AF_UNIX:
138 # override the client address, as for unix sockets nothing meaningful
139 # is passed in from accept anyway
140 client_address = netutils.GetSocketCredentials(connected_socket)
141 logging.info("Accepted connection from %s",
142 netutils.FormatAddress(client_address, family=self.family))
143 self.handle_connection(connected_socket, client_address)
144
145 def handle_connection(self, connected_socket, client_address):
146 """Handle an already accepted connection.
147
148 """
149 raise NotImplementedError
150
151
152 class AsyncTerminatedMessageStream(asynchat.async_chat):
153 """A terminator separated message stream asyncore module.
154
155 Handles a stream connection receiving messages terminated by a defined
156 separator. For each complete message handle_message is called.
157
158 """
159 def __init__(self, connected_socket, peer_address, terminator, family,
160 unhandled_limit):
161 """AsyncTerminatedMessageStream constructor.
162
163 @type connected_socket: socket.socket
164 @param connected_socket: connected stream socket to receive messages from
165 @param peer_address: family-specific peer address
166 @type terminator: string
167 @param terminator: terminator separating messages in the stream
168 @type family: integer
169 @param family: socket family
170 @type unhandled_limit: integer or None
171 @param unhandled_limit: maximum unanswered messages
172
173 """
174 # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
175 # using a positional argument rather than a keyword one.
176 asynchat.async_chat.__init__(self, connected_socket)
177 self.connected_socket = connected_socket
178 # on python 2.4 there is no "family" attribute for the socket class
179 # FIXME: when we move to python 2.5 or above remove the family parameter
180 #self.family = self.connected_socket.family
181 self.family = family
182 self.peer_address = peer_address
183 self.terminator = terminator
184 self.unhandled_limit = unhandled_limit
185 self.set_terminator(terminator)
186 self.ibuffer = []
187 self.receive_count = 0
188 self.send_count = 0
189 self.oqueue = collections.deque()
190 self.iqueue = collections.deque()
191
192 # this method is overriding an asynchat.async_chat method
193 def collect_incoming_data(self, data):
194 self.ibuffer.append(data)
195
196 def _can_handle_message(self):
197 return (self.unhandled_limit is None or
198 (self.receive_count < self.send_count + self.unhandled_limit) and
199 not self.iqueue)
200
201 # this method is overriding an asynchat.async_chat method
202 def found_terminator(self):
203 message = "".join(self.ibuffer)
204 self.ibuffer = []
205 message_id = self.receive_count
206 # We need to increase the receive_count after checking if the message can
207 # be handled, but before calling handle_message
208 can_handle = self._can_handle_message()
209 self.receive_count += 1
210 if can_handle:
211 self.handle_message(message, message_id)
212 else:
213 self.iqueue.append((message, message_id))
214
215 def handle_message(self, message, message_id):
216 """Handle a terminated message.
217
218 @type message: string
219 @param message: message to handle
220 @type message_id: integer
221 @param message_id: stream's message sequence number
222
223 """
224 pass
225 # TODO: move this method to raise NotImplementedError
226 # raise NotImplementedError
227
228 def send_message(self, message):
229 """Send a message to the remote peer. This function is thread-safe.
230
231 @type message: string
232 @param message: message to send, without the terminator
233
234 @warning: If calling this function from a thread different than the one
235 performing the main asyncore loop, remember that you have to wake that one
236 up.
237
238 """
239 # If we just append the message we received to the output queue, this
240 # function can be safely called by multiple threads at the same time, and
241 # we don't need locking, since deques are thread safe. handle_write in the
242 # asyncore thread will handle the next input message if there are any
243 # enqueued.
244 self.oqueue.append(message)
245
246 # this method is overriding an asyncore.dispatcher method
247 def readable(self):
248 # read from the socket if we can handle the next requests
249 return self._can_handle_message() and asynchat.async_chat.readable(self)
250
251 # this method is overriding an asyncore.dispatcher method
252 def writable(self):
253 # the output queue may become full just after we called writable. This only
254 # works if we know we'll have something else waking us up from the select,
255 # in such case, anyway.
256 return asynchat.async_chat.writable(self) or self.oqueue
257
258 # this method is overriding an asyncore.dispatcher method
259 def handle_write(self):
260 if self.oqueue:
261 # if we have data in the output queue, then send_message was called.
262 # this means we can process one more message from the input queue, if
263 # there are any.
264 data = self.oqueue.popleft()
265 self.push(data + self.terminator)
266 self.send_count += 1
267 if self.iqueue:
268 self.handle_message(*self.iqueue.popleft())
269 self.initiate_send()
270
271 def close_log(self):
272 logging.info("Closing connection from %s",
273 netutils.FormatAddress(self.peer_address, family=self.family))
274 self.close()
275
276 # this method is overriding an asyncore.dispatcher method
277 def handle_expt(self):
278 self.close_log()
279
280 # this method is overriding an asyncore.dispatcher method
281 def handle_error(self):
282 """Log an error in handling any request, and proceed.
283
284 """
285 logging.exception("Error while handling asyncore request")
286 self.close_log()
287
288
289 class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
290 """An improved asyncore udp socket.
291
292 """
293 def __init__(self, family):
294 """Constructor for AsyncUDPSocket
295
296 """
297 GanetiBaseAsyncoreDispatcher.__init__(self)
298 self._out_queue = []
299 self._family = family
300 self.create_socket(family, socket.SOCK_DGRAM)
301
302 # this method is overriding an asyncore.dispatcher method
303 def handle_connect(self):
304 # Python thinks that the first udp message from a source qualifies as a
305 # "connect" and further ones are part of the same connection. We beg to
306 # differ and treat all messages equally.
307 pass
308
309 # this method is overriding an asyncore.dispatcher method
310 def handle_read(self):
311 recv_result = utils.IgnoreSignals(self.recvfrom,
312 constants.MAX_UDP_DATA_SIZE)
313 if recv_result is not None:
314 payload, address = recv_result
315 if self._family == socket.AF_INET6:
316 # we ignore 'flow info' and 'scope id' as we don't need them
317 ip, port, _, _ = address
318 else:
319 ip, port = address
320
321 self.handle_datagram(payload, ip, port)
322
323 def handle_datagram(self, payload, ip, port):
324 """Handle an already read udp datagram
325
326 """
327 raise NotImplementedError
328
329 # this method is overriding an asyncore.dispatcher method
330 def writable(self):
331 # We should check whether we can write to the socket only if we have
332 # something scheduled to be written
333 return bool(self._out_queue)
334
335 # this method is overriding an asyncore.dispatcher method
336 def handle_write(self):
337 if not self._out_queue:
338 logging.error("handle_write called with empty output queue")
339 return
340 (ip, port, payload) = self._out_queue[0]
341 utils.IgnoreSignals(self.sendto, payload, 0, (ip, port))
342 self._out_queue.pop(0)
343
344 def enqueue_send(self, ip, port, payload):
345 """Enqueue a datagram to be sent when possible
346
347 """
348 if len(payload) > constants.MAX_UDP_DATA_SIZE:
349 raise errors.UdpDataSizeError('Packet too big: %s > %s' % (len(payload),
350 constants.MAX_UDP_DATA_SIZE))
351 self._out_queue.append((ip, port, payload))
352
353 def process_next_packet(self, timeout=0):
354 """Process the next datagram, waiting for it if necessary.
355
356 @type timeout: float
357 @param timeout: how long to wait for data
358 @rtype: boolean
359 @return: True if some data has been handled, False otherwise
360
361 """
362 result = utils.WaitForFdCondition(self, select.POLLIN, timeout)
363 if result is not None and result & select.POLLIN:
364 self.handle_read()
365 return True
366 else:
367 return False
368
369
370 class AsyncAwaker(GanetiBaseAsyncoreDispatcher):
371 """A way to notify the asyncore loop that something is going on.
372
373 If an asyncore daemon is multithreaded when a thread tries to push some data
374 to a socket, the main loop handling asynchronous requests might be sleeping
375 waiting on a select(). To avoid this it can create an instance of the
376 AsyncAwaker, which other threads can use to wake it up.
377
378 """
379 def __init__(self, signal_fn=None):
380 """Constructor for AsyncAwaker
381
382 @type signal_fn: function
383 @param signal_fn: function to call when awaken
384
385 """
386 GanetiBaseAsyncoreDispatcher.__init__(self)
387 assert signal_fn == None or callable(signal_fn)
388 (self.in_socket, self.out_socket) = socket.socketpair(socket.AF_UNIX,
389 socket.SOCK_STREAM)
390 self.in_socket.setblocking(0)
391 self.in_socket.shutdown(socket.SHUT_WR)
392 self.out_socket.shutdown(socket.SHUT_RD)
393 self.set_socket(self.in_socket)
394 self.need_signal = True
395 self.signal_fn = signal_fn
396 self.connected = True
397
398 # this method is overriding an asyncore.dispatcher method
399 def handle_read(self):
400 utils.IgnoreSignals(self.recv, 4096)
401 if self.signal_fn:
402 self.signal_fn()
403 self.need_signal = True
404
405 # this method is overriding an asyncore.dispatcher method
406 def close(self):
407 asyncore.dispatcher.close(self)
408 self.out_socket.close()
409
410 def signal(self):
411 """Signal the asyncore main loop.
412
413 Any data we send here will be ignored, but it will cause the select() call
414 to return.
415
416 """
417 # Yes, there is a race condition here. No, we don't care, at worst we're
418 # sending more than one wakeup token, which doesn't harm at all.
419 if self.need_signal:
420 self.need_signal = False
421 self.out_socket.send("\0")
422
423
424 class Mainloop(object):
425 """Generic mainloop for daemons
426
427 @ivar scheduler: A sched.scheduler object, which can be used to register
428 timed events
429
430 """
431 def __init__(self):
432 """Constructs a new Mainloop instance.
433
434 """
435 self._signal_wait = []
436 self.scheduler = AsyncoreScheduler(time.time)
437
438 @utils.SignalHandled([signal.SIGCHLD])
439 @utils.SignalHandled([signal.SIGTERM])
440 @utils.SignalHandled([signal.SIGINT])
441 def Run(self, signal_handlers=None):
442 """Runs the mainloop.
443
444 @type signal_handlers: dict
445 @param signal_handlers: signal->L{utils.SignalHandler} passed by decorator
446
447 """
448 assert isinstance(signal_handlers, dict) and \
449 len(signal_handlers) > 0, \
450 "Broken SignalHandled decorator"
451 running = True
452 # Start actual main loop
453 while running:
454 if not self.scheduler.empty():
455 try:
456 self.scheduler.run()
457 except SchedulerBreakout:
458 pass
459 else:
460 asyncore.loop(count=1, use_poll=True)
461
462 # Check whether a signal was raised
463 for sig in signal_handlers:
464 handler = signal_handlers[sig]
465 if handler.called:
466 self._CallSignalWaiters(sig)
467 running = sig not in (signal.SIGTERM, signal.SIGINT)
468 handler.Clear()
469
470 def _CallSignalWaiters(self, signum):
471 """Calls all signal waiters for a certain signal.
472
473 @type signum: int
474 @param signum: Signal number
475
476 """
477 for owner in self._signal_wait:
478 owner.OnSignal(signum)
479
480 def RegisterSignal(self, owner):
481 """Registers a receiver for signal notifications
482
483 The receiver must support a "OnSignal(self, signum)" function.
484
485 @type owner: instance
486 @param owner: Receiver
487
488 """
489 self._signal_wait.append(owner)
490
491
492 def _VerifyDaemonUser(daemon_name):
493 """Verifies the process uid matches the configured uid.
494
495 This method verifies that a daemon is started as the user it is
496 intended to be run
497
498 @param daemon_name: The name of daemon to be started
499 @return: A tuple with the first item indicating success or not,
500 the second item current uid and third with expected uid
501
502 """
503 getents = runtime.GetEnts()
504 running_uid = os.getuid()
505 daemon_uids = {
506 constants.MASTERD: getents.masterd_uid,
507 constants.RAPI: getents.rapi_uid,
508 constants.NODED: getents.noded_uid,
509 constants.CONFD: getents.confd_uid,
510 }
511
512 return (daemon_uids[daemon_name] == running_uid, running_uid,
513 daemon_uids[daemon_name])
514
515
516 def _BeautifyError(err):
517 """Try to format an error better.
518
519 Since we're dealing with daemon startup errors, in many cases this
520 will be due to socket error and such, so we try to format these cases better.
521
522 @param err: an exception object
523 @rtype: string
524 @return: the formatted error description
525
526 """
527 try:
528 if isinstance(err, socket.error):
529 return "Socket-related error: %s (errno=%s)" % (err.args[1], err.args[0])
530 elif isinstance(err, EnvironmentError):
531 if err.filename is None:
532 return "%s (errno=%s)" % (err.strerror, err.errno)
533 else:
534 return "%s (file %s) (errno=%s)" % (err.strerror, err.filename,
535 err.errno)
536 else:
537 return str(err)
538 except Exception: # pylint: disable-msg=W0703
539 logging.exception("Error while handling existing error %s", err)
540 return "%s" % str(err)
541
542
543 def _HandleSigHup(reopen_cb, signum, frame): # pylint: disable-msg=W0613
544 """Handler for SIGHUP.
545
546 @param reopen_cb: Callback function for reopening log files
547
548 """
549 assert callable(reopen_cb)
550 logging.info("Reopening log files after receiving SIGHUP")
551 reopen_cb()
552
553
554 def GenericMain(daemon_name, optionparser,
555 check_fn, prepare_fn, exec_fn,
556 multithreaded=False, console_logging=False,
557 default_ssl_cert=None, default_ssl_key=None):
558 """Shared main function for daemons.
559
560 @type daemon_name: string
561 @param daemon_name: daemon name
562 @type optionparser: optparse.OptionParser
563 @param optionparser: initialized optionparser with daemon-specific options
564 (common -f -d options will be handled by this module)
565 @type check_fn: function which accepts (options, args)
566 @param check_fn: function that checks start conditions and exits if they're
567 not met
568 @type prepare_fn: function which accepts (options, args)
569 @param prepare_fn: function that is run before forking, or None;
570 it's result will be passed as the third parameter to exec_fn, or
571 if None was passed in, we will just pass None to exec_fn
572 @type exec_fn: function which accepts (options, args, prepare_results)
573 @param exec_fn: function that's executed with the daemon's pid file held, and
574 runs the daemon itself.
575 @type multithreaded: bool
576 @param multithreaded: Whether the daemon uses threads
577 @type console_logging: boolean
578 @param console_logging: if True, the daemon will fall back to the system
579 console if logging fails
580 @type default_ssl_cert: string
581 @param default_ssl_cert: Default SSL certificate path
582 @type default_ssl_key: string
583 @param default_ssl_key: Default SSL key path
584
585 """
586 optionparser.add_option("-f", "--foreground", dest="fork",
587 help="Don't detach from the current terminal",
588 default=True, action="store_false")
589 optionparser.add_option("-d", "--debug", dest="debug",
590 help="Enable some debug messages",
591 default=False, action="store_true")
592 optionparser.add_option("--syslog", dest="syslog",
593 help="Enable logging to syslog (except debug"
594 " messages); one of 'no', 'yes' or 'only' [%s]" %
595 constants.SYSLOG_USAGE,
596 default=constants.SYSLOG_USAGE,
597 choices=["no", "yes", "only"])
598
599 if daemon_name in constants.DAEMONS_PORTS:
600 default_bind_address = constants.IP4_ADDRESS_ANY
601 family = ssconf.SimpleStore().GetPrimaryIPFamily()
602 # family will default to AF_INET if there is no ssconf file (e.g. when
603 # upgrading a cluster from 2.2 -> 2.3. This is intended, as Ganeti clusters
604 # <= 2.2 can not be AF_INET6
605 if family == netutils.IP6Address.family:
606 default_bind_address = constants.IP6_ADDRESS_ANY
607
608 default_port = netutils.GetDaemonPort(daemon_name)
609
610 # For networked daemons we allow choosing the port and bind address
611 optionparser.add_option("-p", "--port", dest="port",
612 help="Network port (default: %s)" % default_port,
613 default=default_port, type="int")
614 optionparser.add_option("-b", "--bind", dest="bind_address",
615 help=("Bind address (default: '%s')" %
616 default_bind_address),
617 default=default_bind_address, metavar="ADDRESS")
618
619 if default_ssl_key is not None and default_ssl_cert is not None:
620 optionparser.add_option("--no-ssl", dest="ssl",
621 help="Do not secure HTTP protocol with SSL",
622 default=True, action="store_false")
623 optionparser.add_option("-K", "--ssl-key", dest="ssl_key",
624 help=("SSL key path (default: %s)" %
625 default_ssl_key),
626 default=default_ssl_key, type="string",
627 metavar="SSL_KEY_PATH")
628 optionparser.add_option("-C", "--ssl-cert", dest="ssl_cert",
629 help=("SSL certificate path (default: %s)" %
630 default_ssl_cert),
631 default=default_ssl_cert, type="string",
632 metavar="SSL_CERT_PATH")
633
634 # Disable the use of fork(2) if the daemon uses threads
635 if multithreaded:
636 utils.DisableFork()
637
638 options, args = optionparser.parse_args()
639
640 if getattr(options, "ssl", False):
641 ssl_paths = {
642 "certificate": options.ssl_cert,
643 "key": options.ssl_key,
644 }
645
646 for name, path in ssl_paths.iteritems():
647 if not os.path.isfile(path):
648 print >> sys.stderr, "SSL %s file '%s' was not found" % (name, path)
649 sys.exit(constants.EXIT_FAILURE)
650
651 # TODO: By initiating http.HttpSslParams here we would only read the files
652 # once and have a proper validation (isfile returns False on directories)
653 # at the same time.
654
655 result, running_uid, expected_uid = _VerifyDaemonUser(daemon_name)
656 if not result:
657 msg = ("%s started using wrong user ID (%d), expected %d" %
658 (daemon_name, running_uid, expected_uid))
659 print >> sys.stderr, msg
660 sys.exit(constants.EXIT_FAILURE)
661
662 if check_fn is not None:
663 check_fn(options, args)
664
665 if options.fork:
666 utils.CloseFDs()
667 wpipe = utils.Daemonize(logfile=constants.DAEMONS_LOGFILES[daemon_name])
668 else:
669 wpipe = None
670
671 log_reopen_fn = \
672 utils.SetupLogging(constants.DAEMONS_LOGFILES[daemon_name], daemon_name,
673 debug=options.debug,
674 stderr_logging=not options.fork,
675 multithreaded=multithreaded,
676 syslog=options.syslog,
677 console_logging=console_logging)
678
679 # Reopen log file(s) on SIGHUP
680 signal.signal(signal.SIGHUP, compat.partial(_HandleSigHup, log_reopen_fn))
681
682 utils.WritePidFile(utils.DaemonPidFileName(daemon_name))
683 try:
684 try:
685 if callable(prepare_fn):
686 prep_results = prepare_fn(options, args)
687 else:
688 prep_results = None
689 logging.info("%s daemon startup", daemon_name)
690 except Exception, err:
691 utils.WriteErrorToFD(wpipe, _BeautifyError(err))
692 raise
693
694 if wpipe is not None:
695 # we're done with the preparation phase, we close the pipe to
696 # let the parent know it's safe to exit
697 os.close(wpipe)
698
699 exec_fn(options, args, prep_results)
700 finally:
701 utils.RemoveFile(utils.DaemonPidFileName(daemon_name))