# Copyright 2006 Google, Inc. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Refactoring framework.
Used as a main program, this can refactor any number of files and/or
recursively descend down directories. Imported as a module, this
provides infrastructure to write your own refactoring tool.
__author__ = "Guido van Rossum <guido@python.org>"
from itertools import chain
from .pgen2 import driver, tokenize, token
from .fixer_util import find_root
from . import pytree, pygram
from . import btm_matcher as bm
def get_all_fix_names(fixer_pkg, remove_prefix=True):
"""Return a sorted list of all available fix names in the given package."""
pkg = __import__(fixer_pkg, [], [], ["*"])
fixer_dir = os.path.dirname(pkg.__file__)
for name in sorted(os.listdir(fixer_dir)):
if name.startswith("fix_") and name.endswith(".py"):
fix_names.append(name[:-3])
class _EveryNode(Exception):
def _get_head_types(pat):
""" Accepts a pytree Pattern Node and returns a set
of the pattern types which will match first. """
if isinstance(pat, (pytree.NodePattern, pytree.LeafPattern)):
# NodePatters must either have no type and no content
# or a type and content -- so they don't get any farther
if isinstance(pat, pytree.NegatedPattern):
return _get_head_types(pat.content)
raise _EveryNode # Negated Patterns don't have a type
if isinstance(pat, pytree.WildcardPattern):
# Recurse on each node in content
r.update(_get_head_types(x))
raise Exception("Oh no! I don't understand pattern %s" %(pat))
def _get_headnode_dict(fixer_list):
""" Accepts a list of fixers and returns a dictionary
of head node type --> fixer list. """
head_nodes = collections.defaultdict(list)
heads = _get_head_types(fixer.pattern)
head_nodes[node_type].append(fixer)
if fixer._accept_type is not None:
head_nodes[fixer._accept_type].append(fixer)
for node_type in chain(pygram.python_grammar.symbol2number.values(),
pygram.python_grammar.tokens):
head_nodes[node_type].extend(every)
def get_fixers_from_package(pkg_name):
Return the fully qualified names for fixers in the package pkg_name.
return [pkg_name + "." + fix_name
for fix_name in get_all_fix_names(pkg_name, False)]
if sys.version_info < (3, 0):
_open_with_encoding = codecs.open
# codecs.open doesn't translate newlines sadly.
def _from_system_newlines(input):
return input.replace("\r\n", "\n")
def _to_system_newlines(input):
return input.replace("\n", os.linesep)
_open_with_encoding = open
_from_system_newlines = _identity
_to_system_newlines = _identity
def _detect_future_features(source):
gen = tokenize.generate_tokens(io.StringIO(source).readline)
ignore = frozenset({token.NEWLINE, tokenize.NL, token.COMMENT})
elif tp == token.NAME and value == "from":
if tp != token.NAME or value != "__future__":
if tp != token.NAME or value != "import":
if tp == token.OP and value == "(":
if tp != token.OP or value != ",":
return frozenset(features)
class FixerError(Exception):
"""A fixer could not be loaded."""
class RefactoringTool(object):
_default_options = {"print_function" : False,
"write_unchanged_files" : False}
CLASS_PREFIX = "Fix" # The prefix for fixer classes
FILE_PREFIX = "fix_" # The prefix for modules with a fixer within
def __init__(self, fixer_names, options=None, explicit=None):
fixer_names: a list of fixers to import
options: a dict with configuration.
explicit: a list of fixers to run even if they are explicit.
self.fixers = fixer_names
self.explicit = explicit or []
self.options = self._default_options.copy()
self.options.update(options)
if self.options["print_function"]:
self.grammar = pygram.python_grammar_no_print_statement
self.grammar = pygram.python_grammar
# When this is True, the refactor*() methods will call write_file() for
# files processed even if they were not changed during refactoring. If
# and only if the refactor method's write parameter was True.
self.write_unchanged_files = self.options.get("write_unchanged_files")
self.logger = logging.getLogger("RefactoringTool")
self.driver = driver.Driver(self.grammar,
self.pre_order, self.post_order = self.get_fixers()
self.files = [] # List of files that were or should be modified
self.BM = bm.BottomMatcher()
self.bmi_pre_order = [] # Bottom Matcher incompatible fixers
for fixer in chain(self.post_order, self.pre_order):
# remove fixers that will be handled by the bottom-up
elif fixer in self.pre_order:
self.bmi_pre_order.append(fixer)
elif fixer in self.post_order:
self.bmi_post_order.append(fixer)
self.bmi_pre_order_heads = _get_headnode_dict(self.bmi_pre_order)
self.bmi_post_order_heads = _get_headnode_dict(self.bmi_post_order)
"""Inspects the options to load the requested patterns and handlers.
(pre_order, post_order), where pre_order is the list of fixers that
want a pre-order AST traversal, and post_order is the list that want
for fix_mod_path in self.fixers:
mod = __import__(fix_mod_path, {}, {}, ["*"])
fix_name = fix_mod_path.rsplit(".", 1)[-1]
if fix_name.startswith(self.FILE_PREFIX):
fix_name = fix_name[len(self.FILE_PREFIX):]
parts = fix_name.split("_")
class_name = self.CLASS_PREFIX + "".join([p.title() for p in parts])
fix_class = getattr(mod, class_name)
raise FixerError("Can't find %s.%s" % (fix_name, class_name))
fixer = fix_class(self.options, self.fixer_log)
if fixer.explicit and self.explicit is not True and \
fix_mod_path not in self.explicit:
self.log_message("Skipping optional fixer: %s", fix_name)
self.log_debug("Adding transformation: %s", fix_name)
pre_order_fixers.append(fixer)
elif fixer.order == "post":
post_order_fixers.append(fixer)
raise FixerError("Illegal fixer order: %r" % fixer.order)
key_func = operator.attrgetter("run_order")
pre_order_fixers.sort(key=key_func)
post_order_fixers.sort(key=key_func)
return (pre_order_fixers, post_order_fixers)
def log_error(self, msg, *args, **kwds):
"""Called when an error occurs."""
def log_message(self, msg, *args):
"""Hook to log a message."""
def log_debug(self, msg, *args):
def print_output(self, old_text, new_text, filename, equal):
"""Called with the old version, new version, and filename of a
def refactor(self, items, write=False, doctests_only=False):
"""Refactor a list of files and directories."""
for dir_or_file in items:
if os.path.isdir(dir_or_file):
self.refactor_dir(dir_or_file, write, doctests_only)
self.refactor_file(dir_or_file, write, doctests_only)
def refactor_dir(self, dir_name, write=False, doctests_only=False):
"""Descends down a directory and refactor every Python file found.
Python files are assumed to have a .py extension.
Files and subdirectories starting with '.' are skipped.
py_ext = os.extsep + "py"
for dirpath, dirnames, filenames in os.walk(dir_name):
self.log_debug("Descending into %s", dirpath)
if (not name.startswith(".") and
os.path.splitext(name)[1] == py_ext):
fullname = os.path.join(dirpath, name)
self.refactor_file(fullname, write, doctests_only)
# Modify dirnames in-place to remove subdirs with leading dots
dirnames[:] = [dn for dn in dirnames if not dn.startswith(".")]
def _read_python_source(self, filename):
Do our best to decode a Python source file correctly.
self.log_error("Can't open %s: %s", filename, err)
encoding = tokenize.detect_encoding(f.readline)[0]
with _open_with_encoding(filename, "r", encoding=encoding) as f:
return _from_system_newlines(f.read()), encoding
def refactor_file(self, filename, write=False, doctests_only=False):
input, encoding = self._read_python_source(filename)
# Reading the file failed.
input += "\n" # Silence certain parse errors
self.log_debug("Refactoring doctests in %s", filename)
output = self.refactor_docstring(input, filename)
if self.write_unchanged_files or output != input:
self.processed_file(output, filename, input, write, encoding)
self.log_debug("No doctest changes in %s", filename)
tree = self.refactor_string(input, filename)
if self.write_unchanged_files or (tree and tree.was_changed):
# The [:-1] is to take off the \n we added earlier
self.processed_file(str(tree)[:-1], filename,
write=write, encoding=encoding)
self.log_debug("No changes in %s", filename)
def refactor_string(self, data, name):
"""Refactor a given input string.
data: a string holding the code to be refactored.
name: a human-readable name for use in error/log messages.
An AST corresponding to the refactored input stream; None if
there were errors during the parse.
features = _detect_future_features(data)
if "print_function" in features:
self.driver.grammar = pygram.python_grammar_no_print_statement
tree = self.driver.parse_string(data)
self.log_error("Can't parse %s: %s: %s",
name, err.__class__.__name__, err)
self.driver.grammar = self.grammar
tree.future_features = features
self.log_debug("Refactoring %s", name)
self.refactor_tree(tree, name)
def refactor_stdin(self, doctests_only=False):
self.log_debug("Refactoring doctests in stdin")
output = self.refactor_docstring(input, "<stdin>")
if self.write_unchanged_files or output != input:
self.processed_file(output, "<stdin>", input)
self.log_debug("No doctest changes in stdin")
tree = self.refactor_string(input, "<stdin>")
if self.write_unchanged_files or (tree and tree.was_changed):
self.processed_file(str(tree), "<stdin>", input)
self.log_debug("No changes in stdin")
def refactor_tree(self, tree, name):
"""Refactors a parse tree (modifying the tree in place).
For compatible patterns the bottom matcher module is
used. Otherwise the tree is traversed node-to-node for
tree: a pytree.Node instance representing the root of the tree
name: a human-readable name for this tree.
True if the tree was modified, False otherwise.
for fixer in chain(self.pre_order, self.post_order):
fixer.start_tree(tree, name)
#use traditional matching for the incompatible fixers
self.traverse_by(self.bmi_pre_order_heads, tree.pre_order())
self.traverse_by(self.bmi_post_order_heads, tree.post_order())
# obtain a set of candidate nodes
match_set = self.BM.run(tree.leaves())
while any(match_set.values()):
for fixer in self.BM.fixers:
if fixer in match_set and match_set[fixer]:
#sort by depth; apply fixers from bottom(of the AST) to top
match_set[fixer].sort(key=pytree.Base.depth, reverse=True)
if fixer.keep_line_order:
#some fixers(eg fix_imports) must be applied
#with the original file's line order
match_set[fixer].sort(key=pytree.Base.get_lineno)
for node in list(match_set[fixer]):
if node in match_set[fixer]:
match_set[fixer].remove(node)
# this node has been cut off from a
# previous transformation ; skip
if node.fixers_applied and fixer in node.fixers_applied:
# do not apply the same fixer again
results = fixer.match(node)
new = fixer.transform(node, results)
#new.fixers_applied.append(fixer)
for node in new.post_order():
# do not apply the fixer again to
if not node.fixers_applied:
node.fixers_applied.append(fixer)
# update the original match set for
new_matches = self.BM.run(new.leaves())
match_set[fxr].extend(new_matches[fxr])
for fixer in chain(self.pre_order, self.post_order):
fixer.finish_tree(tree, name)
def traverse_by(self, fixers, traversal):
"""Traverse an AST, applying a set of fixers to each node.
This is a helper method for refactor_tree().
fixers: a list of fixer instances.
traversal: a generator that yields AST nodes.
for fixer in fixers[node.type]:
results = fixer.match(node)
new = fixer.transform(node, results)