START_ELEMENT = "START_ELEMENT"
END_ELEMENT = "END_ELEMENT"
START_DOCUMENT = "START_DOCUMENT"
END_DOCUMENT = "END_DOCUMENT"
PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
CHARACTERS = "CHARACTERS"
class PullDOM(xml.sax.ContentHandler):
def __init__(self, documentFactory=None):
from xml.dom import XML_NAMESPACE
self.documentFactory = documentFactory
self.firstEvent = [None, None]
self.lastEvent = self.firstEvent
self.push = self.elementStack.append
self.pop = self.elementStack.pop
self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
self._current_context = self._ns_contexts[-1]
result = self.elementStack[-1]
del self.elementStack[-1]
def setDocumentLocator(self, locator):
def startPrefixMapping(self, prefix, uri):
if not hasattr(self, '_xmlns_attrs'):
self._xmlns_attrs.append((prefix or 'xmlns', uri))
self._ns_contexts.append(self._current_context.copy())
self._current_context[uri] = prefix or None
def endPrefixMapping(self, prefix):
self._current_context = self._ns_contexts.pop()
def startElementNS(self, name, tagName , attrs):
# Retrieve xml namespace declaration attributes.
xmlns_uri = 'http://www.w3.org/2000/xmlns/'
xmlns_attrs = getattr(self, '_xmlns_attrs', None)
if xmlns_attrs is not None:
for aname, value in xmlns_attrs:
attrs._attrs[(xmlns_uri, aname)] = value
# When using namespaces, the reader may or may not
# provide us with the original name. If not, create
# *a* valid tagName from the current context.
prefix = self._current_context[uri]
tagName = prefix + ":" + localname
node = self.document.createElementNS(uri, tagName)
node = self.buildDocument(uri, tagName)
# When the tagname is not prefixed, it just appears as
node = self.document.createElement(localname)
node = self.buildDocument(None, localname)
for aname,value in attrs.items():
a_uri, a_localname = aname
if a_localname == 'xmlns':
qname = 'xmlns:' + a_localname
attr = self.document.createAttributeNS(a_uri, qname)
node.setAttributeNodeNS(attr)
prefix = self._current_context[a_uri]
qname = prefix + ":" + a_localname
attr = self.document.createAttributeNS(a_uri, qname)
node.setAttributeNodeNS(attr)
attr = self.document.createAttribute(a_localname)
node.setAttributeNode(attr)
self.lastEvent[1] = [(START_ELEMENT, node), None]
self.lastEvent = self.lastEvent[1]
def endElementNS(self, name, tagName):
self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
self.lastEvent = self.lastEvent[1]
def startElement(self, name, attrs):
node = self.document.createElement(name)
node = self.buildDocument(None, name)
for aname,value in attrs.items():
attr = self.document.createAttribute(aname)
node.setAttributeNode(attr)
self.lastEvent[1] = [(START_ELEMENT, node), None]
self.lastEvent = self.lastEvent[1]
def endElement(self, name):
self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
self.lastEvent = self.lastEvent[1]
node = self.document.createComment(s)
self.lastEvent[1] = [(COMMENT, node), None]
self.lastEvent = self.lastEvent[1]
event = [(COMMENT, s), None]
self.pending_events.append(event)
def processingInstruction(self, target, data):
node = self.document.createProcessingInstruction(target, data)
self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
self.lastEvent = self.lastEvent[1]
event = [(PROCESSING_INSTRUCTION, target, data), None]
self.pending_events.append(event)
def ignorableWhitespace(self, chars):
node = self.document.createTextNode(chars)
self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
self.lastEvent = self.lastEvent[1]
def characters(self, chars):
node = self.document.createTextNode(chars)
self.lastEvent[1] = [(CHARACTERS, node), None]
self.lastEvent = self.lastEvent[1]
if self.documentFactory is None:
self.documentFactory = xml.dom.minidom.Document.implementation
def buildDocument(self, uri, tagname):
# Can't do that in startDocument, since we need the tagname
# XXX: obtain DocumentType
node = self.documentFactory.createDocument(uri, tagname, None)
self.lastEvent[1] = [(START_DOCUMENT, node), None]
self.lastEvent = self.lastEvent[1]
# Put everything we have seen so far into the document
for e in self.pending_events:
if e[0][0] == PROCESSING_INSTRUCTION:
n = self.document.createProcessingInstruction(target, data)
e[0] = (PROCESSING_INSTRUCTION, n)
n = self.document.createComment(e[0][1])
raise AssertionError("Unknown pending event ",e[0][0])
self.pending_events = None
self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
"clear(): Explicitly release parsing structures"
def warning(self, exception):
def error(self, exception):
def fatalError(self, exception):
def __init__(self, stream, parser, bufsize):
if not hasattr(self.parser, 'feed'):
self.getEvent = self._slurp
# This content handler relies on namespace support
self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
self.parser.setContentHandler(self.pulldom)
def __getitem__(self, pos):
def expandNode(self, node):
parents[-1].appendChild(cur_node)
if token == START_ELEMENT:
elif token == END_ELEMENT:
# use IncrementalParser interface, so we get the desired
if not self.pulldom.firstEvent[1]:
self.pulldom.lastEvent = self.pulldom.firstEvent
while not self.pulldom.firstEvent[1]:
buf = self.stream.read(self.bufsize)
rc = self.pulldom.firstEvent[1][0]
self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
""" Fallback replacement for getEvent() using the
standard SAX2 interface, which means we slurp the
SAX events into memory (no performance gain, but
we are compatible to all SAX parsers).
self.parser.parse(self.stream)
self.getEvent = self._emit
""" Fallback replacement for getEvent() that emits
the events that _slurp() read previously.
rc = self.pulldom.firstEvent[1][0]
self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
"""clear(): Explicitly release parsing objects"""
def startElementNS(self, name, tagName , attrs):
PullDOM.startElementNS(self, name, tagName, attrs)
curNode = self.elementStack[-1]
parentNode = self.elementStack[-2]
parentNode.appendChild(curNode)
def startElement(self, name, attrs):
PullDOM.startElement(self, name, attrs)
curNode = self.elementStack[-1]
parentNode = self.elementStack[-2]
parentNode.appendChild(curNode)
def processingInstruction(self, target, data):
PullDOM.processingInstruction(self, target, data)
node = self.lastEvent[0][1]
parentNode = self.elementStack[-1]
parentNode.appendChild(node)
def ignorableWhitespace(self, chars):
PullDOM.ignorableWhitespace(self, chars)
node = self.lastEvent[0][1]
parentNode = self.elementStack[-1]
parentNode.appendChild(node)
def characters(self, chars):
PullDOM.characters(self, chars)
node = self.lastEvent[0][1]
parentNode = self.elementStack[-1]
parentNode.appendChild(node)
default_bufsize = (2 ** 14) - 20
def parse(stream_or_string, parser=None, bufsize=None):
bufsize = default_bufsize
if isinstance(stream_or_string, str):
stream = open(stream_or_string, 'rb')
stream = stream_or_string
parser = xml.sax.make_parser()
return DOMEventStream(stream, parser, bufsize)
def parseString(string, parser=None):
parser = xml.sax.make_parser()
return DOMEventStream(buf, parser, bufsize)