new = fixer.transform(node, results)
def processed_file(self, new_text, filename, old_text=None, write=False,
Called when a file has been refactored and there may be changes.
self.files.append(filename)
old_text = self._read_python_source(filename)[0]
equal = old_text == new_text
self.print_output(old_text, new_text, filename, equal)
self.log_debug("No changes to %s", filename)
if not self.write_unchanged_files:
self.write_file(new_text, filename, old_text, encoding)
self.log_debug("Not writing changes to %s", filename)
def write_file(self, new_text, filename, old_text, encoding=None):
"""Writes a string to a file.
It first shows a unified diff between the old text and the new text, and
then rewrites the file; the latter is only done if the write option is
f = _open_with_encoding(filename, "w", encoding=encoding)
self.log_error("Can't create %s: %s", filename, err)
f.write(_to_system_newlines(new_text))
self.log_error("Can't write %s: %s", filename, err)
self.log_debug("Wrote changes to %s", filename)
def refactor_docstring(self, input, filename):
"""Refactors a docstring, looking for doctests.
This returns a modified version of the input string. It looks
for doctests, which start with a ">>>" prompt, and may be
continued with "..." prompts, as long as the "..." is indented
(Unfortunately we can't use the doctest module's parser,
since, like most parsers, it is not geared towards preserving
for line in input.splitlines(True):
if line.lstrip().startswith(self.PS1):
result.extend(self.refactor_doctest(block, block_lineno,
elif (indent is not None and
(line.startswith(indent + self.PS2) or
line == indent + self.PS2.rstrip() + u"\n")):
result.extend(self.refactor_doctest(block, block_lineno,
result.extend(self.refactor_doctest(block, block_lineno,
def refactor_doctest(self, block, lineno, indent, filename):
"""Refactors one doctest.
A doctest is given as a block of lines, the first of which starts
with ">>>" (possibly indented), while the remaining lines start
with "..." (identically indented).
tree = self.parse_block(block, lineno, indent)
if self.logger.isEnabledFor(logging.DEBUG):
self.log_debug("Source: %s", line.rstrip(u"\n"))
self.log_error("Can't parse docstring in %s line %s: %s: %s",
filename, lineno, err.__class__.__name__, err)
if self.refactor_tree(tree, filename):
new = unicode(tree).splitlines(True)
# Undo the adjustment of the line numbers in wrap_toks() below.
clipped, new = new[:lineno-1], new[lineno-1:]
assert clipped == [u"\n"] * (lineno-1), clipped
if not new[-1].endswith(u"\n"):
block = [indent + self.PS1 + new.pop(0)]
block += [indent + self.PS2 + line for line in new]
self.log_message("No files %s modified.", were)
self.log_message("Files that %s modified:", were)
self.log_message("Warnings/messages while refactoring:")
for message in self.fixer_log:
self.log_message(message)
if len(self.errors) == 1:
self.log_message("There was 1 error:")
self.log_message("There were %d errors:", len(self.errors))
for msg, args, kwds in self.errors:
self.log_message(msg, *args, **kwds)
def parse_block(self, block, lineno, indent):
"""Parses a block into a tree.
This is necessary to get correct line number / offset information
in the parser diagnostics and embedded into the parse tree.
tree = self.driver.parse_tokens(self.wrap_toks(block, lineno, indent))
tree.future_features = frozenset()
def wrap_toks(self, block, lineno, indent):
"""Wraps a tokenize stream to systematically modify start/end."""
tokens = tokenize.generate_tokens(self.gen_lines(block, indent).next)
for type, value, (line0, col0), (line1, col1), line_text in tokens:
# Don't bother updating the columns; this is too complicated
# since line_text would also have to be updated and it would
# still break for tokens spanning lines. Let the user guess
# that the column numbers for doctests are relative to the
# end of the prompt string (PS1 or PS2).
yield type, value, (line0, col0), (line1, col1), line_text
def gen_lines(self, block, indent):
"""Generates lines as expected by tokenize from a list of lines.
This strips the first len(indent + self.PS1) characters off each line.
prefix1 = indent + self.PS1
prefix2 = indent + self.PS2
if line.startswith(prefix):
elif line == prefix.rstrip() + u"\n":
raise AssertionError("line=%r, prefix=%r" % (line, prefix))
class MultiprocessingUnsupported(Exception):
class MultiprocessRefactoringTool(RefactoringTool):
def __init__(self, *args, **kwargs):
super(MultiprocessRefactoringTool, self).__init__(*args, **kwargs)
def refactor(self, items, write=False, doctests_only=False,
return super(MultiprocessRefactoringTool, self).refactor(
items, write, doctests_only)
raise MultiprocessingUnsupported
if self.queue is not None:
raise RuntimeError("already doing multiple processes")
self.queue = multiprocessing.JoinableQueue()
self.output_lock = multiprocessing.Lock()
processes = [multiprocessing.Process(target=self._child)
for i in xrange(num_processes)]
super(MultiprocessRefactoringTool, self).refactor(items, write,
for i in xrange(num_processes):
super(MultiprocessRefactoringTool, self).refactor_file(
def refactor_file(self, *args, **kwargs):
if self.queue is not None:
self.queue.put((args, kwargs))
return super(MultiprocessRefactoringTool, self).refactor_file(