incremental = config.pop('incremental', False)
handlers = config.get('handlers', EMPTY_DICT)
if name not in logging._handlers:
raise ValueError('No handler found with '
handler = logging._handlers[name]
handler_config = handlers[name]
level = handler_config.get('level', None)
handler.setLevel(logging._checkLevel(level))
raise ValueError('Unable to configure handler '
loggers = config.get('loggers', EMPTY_DICT)
self.configure_logger(name, loggers[name], True)
raise ValueError('Unable to configure logger '
root = config.get('root', None)
self.configure_root(root, True)
raise ValueError('Unable to configure root '
disable_existing = config.pop('disable_existing_loggers', True)
# Do formatters first - they don't refer to anything else
formatters = config.get('formatters', EMPTY_DICT)
formatters[name] = self.configure_formatter(
raise ValueError('Unable to configure '
'formatter %r: %s' % (name, e))
# Next, do filters - they don't refer to anything else, either
filters = config.get('filters', EMPTY_DICT)
filters[name] = self.configure_filter(filters[name])
raise ValueError('Unable to configure '
'filter %r: %s' % (name, e))
# Next, do handlers - they refer to formatters and filters
# As handlers can refer to other handlers, sort the keys
# to allow a deterministic order of configuration
handlers = config.get('handlers', EMPTY_DICT)
for name in sorted(handlers):
handler = self.configure_handler(handlers[name])
if 'target not configured yet' in str(e):
raise ValueError('Unable to configure handler '
# Now do any that were deferred
handler = self.configure_handler(handlers[name])
raise ValueError('Unable to configure handler '
# Next, do loggers - they refer to handlers and filters
#we don't want to lose the existing loggers,
#since other threads may have pointers to them.
#existing is set to contain all existing loggers,
#and as we go through the new configuration we
#remove any which are configured. At the end,
#what's left in existing is the set of loggers
#which were in the previous configuration but
#which are not in the new configuration.
existing = list(root.manager.loggerDict.keys())
#The list needs to be sorted so that we can
#avoid disabling child loggers of explicitly
#named loggers. With a sorted list it is easier
#to find the child loggers.
#We'll keep the list of existing loggers
#which are children of named loggers here...
#now set up the new ones...
loggers = config.get('loggers', EMPTY_DICT)
i = existing.index(name) + 1 # look after name
num_existing = len(existing)
if existing[i][:pflen] == prefixed:
child_loggers.append(existing[i])
self.configure_logger(name, loggers[name])
raise ValueError('Unable to configure logger '
#Disable any old loggers. There's no point deleting
#them as other threads may continue to hold references
#and by disabling them, you stop them doing any logging.
#However, don't disable children of named loggers, as that's
#probably not what was intended by the user.
# logger = root.manager.loggerDict[log]
# if log in child_loggers:
# logger.level = logging.NOTSET
# logger.propagate = True
_handle_existing_loggers(existing, child_loggers,
# And finally, do the root logger
root = config.get('root', None)
self.configure_root(root)
raise ValueError('Unable to configure root '
def configure_formatter(self, config):
"""Configure a formatter from a dictionary."""
factory = config['()'] # for use in exception handler
result = self.configure_custom(config)
if "'format'" not in str(te):
#Name of parameter changed from fmt to format.
#This is so that code can be used with older Python versions
config['fmt'] = config.pop('format')
result = self.configure_custom(config)
fmt = config.get('format', None)
dfmt = config.get('datefmt', None)
style = config.get('style', '%')
cname = config.get('class', None)
result = c(fmt, dfmt, style)
def configure_filter(self, config):
"""Configure a filter from a dictionary."""
result = self.configure_custom(config)
name = config.get('name', '')
result = logging.Filter(name)
def add_filters(self, filterer, filters):
"""Add filters to a filterer from a list of names."""
filterer.addFilter(self.config['filters'][f])
raise ValueError('Unable to add filter %r: %s' % (f, e))
def configure_handler(self, config):
"""Configure a handler from a dictionary."""
config_copy = dict(config) # for restoring in case of error
formatter = config.pop('formatter', None)
formatter = self.config['formatters'][formatter]
raise ValueError('Unable to set formatter '
'%r: %s' % (formatter, e))
level = config.pop('level', None)
filters = config.pop('filters', None)
cname = config.pop('class')
klass = self.resolve(cname)
#Special case for handler which refers to another handler
if issubclass(klass, logging.handlers.MemoryHandler) and\
th = self.config['handlers'][config['target']]
if not isinstance(th, logging.Handler):
config.update(config_copy) # restore for deferred cfg
raise TypeError('target not configured yet')
raise ValueError('Unable to set target handler '
'%r: %s' % (config['target'], e))
elif issubclass(klass, logging.handlers.SMTPHandler) and\
config['mailhost'] = self.as_tuple(config['mailhost'])
elif issubclass(klass, logging.handlers.SysLogHandler) and\
config['address'] = self.as_tuple(config['address'])
props = config.pop('.', None)
kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
result = factory(**kwargs)
if "'stream'" not in str(te):
#The argument name changed from strm to stream
#This is so that code can be used with older Python versions
kwargs['strm'] = kwargs.pop('stream')
result = factory(**kwargs)
result.setFormatter(formatter)
result.setLevel(logging._checkLevel(level))
self.add_filters(result, filters)
for name, value in props.items():
setattr(result, name, value)
def add_handlers(self, logger, handlers):
"""Add handlers to a logger from a list of names."""
logger.addHandler(self.config['handlers'][h])
raise ValueError('Unable to add handler %r: %s' % (h, e))
def common_logger_config(self, logger, config, incremental=False):
Perform configuration which is common to root and non-root loggers.
level = config.get('level', None)
logger.setLevel(logging._checkLevel(level))
#Remove any existing handlers
for h in logger.handlers[:]:
handlers = config.get('handlers', None)
self.add_handlers(logger, handlers)
filters = config.get('filters', None)
self.add_filters(logger, filters)
def configure_logger(self, name, config, incremental=False):
"""Configure a non-root logger from a dictionary."""
logger = logging.getLogger(name)
self.common_logger_config(logger, config, incremental)
propagate = config.get('propagate', None)
if propagate is not None:
logger.propagate = propagate
def configure_root(self, config, incremental=False):
"""Configure a root logger from a dictionary."""
root = logging.getLogger()
self.common_logger_config(root, config, incremental)
dictConfigClass = DictConfigurator
"""Configure logging using a dictionary."""
dictConfigClass(config).configure()
def listen(port=DEFAULT_LOGGING_CONFIG_PORT, verify=None):
Start up a socket server on the specified port, and listen for new
These will be sent as a file suitable for processing by fileConfig().
Returns a Thread object on which you can call start() to start the server,
and which you can join() when appropriate. To stop the server, call
Use the ``verify`` argument to verify any bytes received across the wire
from a client. If specified, it should be a callable which receives a
single argument - the bytes of configuration data received across the
network - and it should return either ``None``, to indicate that the
passed in bytes could not be verified and should be discarded, or a
byte string which is then passed to the configuration machinery as
normal. Note that you can return transformed bytes, e.g. by decrypting
if not thread: #pragma: no cover
raise NotImplementedError("listen() needs threading to work")
class ConfigStreamHandler(StreamRequestHandler):
Handler for a logging configuration request.
It expects a completely new logging configuration and uses fileConfig
Each request is expected to be a 4-byte length, packed using
struct.pack(">L", n), followed by the config file.
Uses fileConfig() to do the grunt work.
slen = struct.unpack(">L", chunk)[0]
chunk = self.connection.recv(slen)
chunk = chunk + conn.recv(slen - len(chunk))
if self.server.verify is not None:
chunk = self.server.verify(chunk)
if chunk is not None: # verified, can process
chunk = chunk.decode("utf-8")
assert isinstance(d, dict)
#Apply new configuration.
file = io.StringIO(chunk)
if e.errno != RESET_ERROR:
class ConfigSocketReceiver(ThreadingTCPServer):
A simple TCP socket-based logging config receiver.
def __init__(self, host='localhost', port=DEFAULT_LOGGING_CONFIG_PORT,
handler=None, ready=None, verify=None):
ThreadingTCPServer.__init__(self, (host, port), handler)
def serve_until_stopped(self):
rd, wr, ex = select.select([self.socket.fileno()],
class Server(threading.Thread):
def __init__(self, rcvr, hdlr, port, verify):
super(Server, self).__init__()
self.ready = threading.Event()
server = self.rcvr(port=self.port, handler=self.hdlr,
self.port = server.server_address[1]
server.serve_until_stopped()
return Server(ConfigSocketReceiver, ConfigStreamHandler, port, verify)
Stop the listening server which was created with a call to listen().