"""Routines related to PyPI, indexes"""
from __future__ import absolute_import
from collections import namedtuple
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._vendor.six.moves.urllib import request as urllib_request
from pip.compat import ipaddress
cached_property, splitext, normalize_path,
ARCHIVE_EXTENSIONS, SUPPORTED_EXTENSIONS,
from pip.utils.deprecation import RemovedInPip10Warning
from pip.utils.logging import indent_log
from pip.utils.packaging import check_requires_python
from pip.exceptions import (
DistributionNotFound, BestVersionAlreadyInstalled, InvalidWheelFilename,
from pip.download import HAS_TLS, is_url, path_to_url, url_to_path
from pip.wheel import Wheel, wheel_ext
from pip.pep425tags import get_supported
from pip._vendor import html5lib, requests, six
from pip._vendor.packaging.version import parse as parse_version
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.packaging import specifiers
from pip._vendor.requests.exceptions import SSLError
from pip._vendor.distlib.compat import unescape
__all__ = ['FormatControl', 'fmt_ctl_handle_mutual_exclude', 'PackageFinder']
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
("*", "127.0.0.0/8", "*"),
logger = logging.getLogger(__name__)
class InstallationCandidate(object):
def __init__(self, project, version, location):
self.version = parse_version(version)
self._key = (self.project, self.version, self.location)
return "<InstallationCandidate({0!r}, {1!r}, {2!r})>".format(
self.project, self.version, self.location,
return self._compare(other, lambda s, o: s < o)
return self._compare(other, lambda s, o: s <= o)
return self._compare(other, lambda s, o: s == o)
return self._compare(other, lambda s, o: s >= o)
return self._compare(other, lambda s, o: s > o)
return self._compare(other, lambda s, o: s != o)
def _compare(self, other, method):
if not isinstance(other, InstallationCandidate):
return method(self._key, other._key)
class PackageFinder(object):
This is meant to match easy_install's technique for looking for
packages, by reading pages and looking for appropriate links.
def __init__(self, find_links, index_urls, allow_all_prereleases=False,
trusted_hosts=None, process_dependency_links=False,
session=None, format_control=None, platform=None,
versions=None, abi=None, implementation=None):
"""Create a PackageFinder.
:param format_control: A FormatControl object or None. Used to control
the selection of source packages / binary packages when consulting
:param platform: A string or None. If None, searches for packages
that are supported by the current system. Otherwise, will find
packages that can be built on the platform passed in. These
packages will only be downloaded for distribution: they will
:param versions: A list of strings or None. This is passed directly
to pep425tags.py in the get_supported() method.
:param abi: A string or None. This is passed directly
to pep425tags.py in the get_supported() method.
:param implementation: A string or None. This is passed directly
to pep425tags.py in the get_supported() method.
"PackageFinder() missing 1 required keyword argument: "
# Build find_links. If an argument starts with ~, it may be
# a local file relative to a home directory. So try normalizing
# it and if it exists, use the normalized version.
# This is deliberately conservative - it might be fine just to
# blindly normalize anything starting with a ~...
new_link = normalize_path(link)
if os.path.exists(new_link):
self.find_links.append(link)
self.index_urls = index_urls
self.dependency_links = []
# These are boring links that have already been logged somehow:
self.logged_links = set()
self.format_control = format_control or FormatControl(set(), set())
# Domains that we won't emit warnings for when not using HTTPS
for host in (trusted_hosts if trusted_hosts else [])
# Do we want to allow _all_ pre-releases?
self.allow_all_prereleases = allow_all_prereleases
# Do we process dependency links?
self.process_dependency_links = process_dependency_links
# The Session we'll use to make requests
# The valid tags to check potential found wheel candidates against
self.valid_tags = get_supported(
# If we don't have TLS enabled, then WARN if anyplace we're looking
for link in itertools.chain(self.index_urls, self.find_links):
parsed = urllib_parse.urlparse(link)
if parsed.scheme == "https":
"pip is configured with locations that require "
"TLS/SSL, however the ssl module in Python is not "
def add_dependency_links(self, links):
# # FIXME: this shouldn't be global list this, it should only
# # apply to requirements of the package that specifies the
# # dependency_links value
# # FIXME: also, we should track comes_from (i.e., use Link)
if self.process_dependency_links:
"Dependency Links processing has been deprecated and will be "
"removed in a future release.",
self.dependency_links.extend(links)
def _sort_locations(locations, expand_dir=False):
Sort locations into "files" (archives) and "urls", and return
a pair of lists (files,urls)
# puts the url for the given file path into the appropriate list
if mimetypes.guess_type(url, strict=False)[0] == 'text/html':
is_local_path = os.path.exists(url)
is_file_url = url.startswith('file:')
if is_local_path or is_file_url:
path = os.path.realpath(path)
for item in os.listdir(path):
sort_path(os.path.join(path, item))
elif os.path.isfile(path):
"Url '%s' is ignored: it is neither a file "
# Only add url with clear scheme
"Url '%s' is ignored. It is either a non-existing "
"path or lacks a specific scheme.", url)
def _candidate_sort_key(self, candidate):
Function used to generate link sort key for link tuples.
The greater the return value, the more preferred it is.
If not finding wheels, then sorted by version only.
If finding wheels, then the sort order is by version, then:
2. wheels ordered via Wheel.support_index_min(self.valid_tags)
Note: it was considered to embed this logic into the Link
comparison operators, but then different sdist links
with the same version, would have to be considered equal
support_num = len(self.valid_tags)
if candidate.location.is_wheel:
# can raise InvalidWheelFilename
wheel = Wheel(candidate.location.filename)
if not wheel.supported(self.valid_tags):
"%s is not a supported wheel for this platform. It "
"can't be sorted." % wheel.filename
pri = -(wheel.support_index_min(self.valid_tags))
return (candidate.version, pri)
def _validate_secure_origin(self, logger, location):
# Determine if this url used a secure transport mechanism
parsed = urllib_parse.urlparse(str(location))
origin = (parsed.scheme, parsed.hostname, parsed.port)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
protocol = origin[0].rsplit('+', 1)[-1]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for secure_origin in (SECURE_ORIGINS + self.secure_origins):
if protocol != secure_origin[0] and secure_origin[0] != "*":
# We need to do this decode dance to ensure that we have a
# unicode object, even on Python 2.x.
addr = ipaddress.ip_address(
isinstance(origin[1], six.text_type) or
else origin[1].decode("utf8")
network = ipaddress.ip_network(
if isinstance(secure_origin[1], six.text_type)
else secure_origin[1].decode("utf8")
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
origin[1].lower() != secure_origin[1].lower() and
secure_origin[1] != "*"):
# We have a valid address and network, so see if the address
# is contained within the network.
# Check to see if the port patches
if (origin[2] != secure_origin[2] and
secure_origin[2] != "*" and
secure_origin[2] is not None):
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
"The repository located at %s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS it "
"is recommended to use HTTPS instead, otherwise you may silence "
"this warning and allow it anyways with '--trusted-host %s'.",
def _get_index_urls_locations(self, project_name):
"""Returns the locations found via self.index_urls
Checks the url_name on the main (first in the list) index and
use this url_name to produce all locations
urllib_parse.quote(canonicalize_name(project_name)))
# For maximum compatibility with easy_install, ensure the path
# ends in a trailing slash. Although this isn't in the spec
# (and PyPI can handle it without the slash) some other index
# implementations might break if they relied on easy_install's
if not loc.endswith('/'):
return [mkurl_pypi_url(url) for url in self.index_urls]
def find_all_candidates(self, project_name):
"""Find all available InstallationCandidate for project_name
This checks index_urls, find_links and dependency_links.
All versions found are returned as an InstallationCandidate list.
See _link_package_versions for details on which files are accepted
index_locations = self._get_index_urls_locations(project_name)
index_file_loc, index_url_loc = self._sort_locations(index_locations)
fl_file_loc, fl_url_loc = self._sort_locations(
self.find_links, expand_dir=True)
dep_file_loc, dep_url_loc = self._sort_locations(self.dependency_links)
Link(url) for url in itertools.chain(
index_file_loc, fl_file_loc, dep_file_loc)
# We trust every url that the user has given us whether it was given
# via --index-url or --find-links
# We explicitly do not trust links that came from dependency_links
# We want to filter out any thing which does not have a secure origin.
link for link in itertools.chain(
(Link(url) for url in index_url_loc),
(Link(url) for url in fl_url_loc),
(Link(url) for url in dep_url_loc),
if self._validate_secure_origin(logger, link)
logger.debug('%d location(s) to search for versions of %s:',
len(url_locations), project_name)
for location in url_locations:
logger.debug('* %s', location)
canonical_name = canonicalize_name(project_name)
formats = fmt_ctl_formats(self.format_control, canonical_name)
search = Search(project_name, canonical_name, formats)
find_links_versions = self._package_versions(
# We trust every directly linked archive in find_links
(Link(url, '-f') for url in self.find_links),
for page in self._get_pages(url_locations, project_name):
logger.debug('Analyzing links from page %s', page.url)
self._package_versions(page.links, search)
dependency_versions = self._package_versions(
(Link(url) for url in self.dependency_links), search
'dependency_links found: %s',
version.location.url for version in dependency_versions
file_versions = self._package_versions(file_locations, search)
file_versions.sort(reverse=True)
url_to_path(candidate.location.url)
for candidate in file_versions
# This is an intentional priority ordering
file_versions + find_links_versions + page_versions +
def find_requirement(self, req, upgrade):
"""Try to find a Link matching req
Expects req, an InstallRequirement and upgrade, a boolean
Raises DistributionNotFound or BestVersionAlreadyInstalled otherwise
all_candidates = self.find_all_candidates(req.name)
# Filter out anything which doesn't match our specifier
compatible_versions = set(
# We turn the version object into a str here because otherwise
# when we're debundled but setuptools isn't, Python will see
# packaging.version.Version and
# pkg_resources._vendor.packaging.version.Version as different
# types. This way we'll use a str as a common data interchange
# format. If we stop using the pkg_resources provided specifier
# and start using our own, we can drop the cast to str().
[str(c.version) for c in all_candidates],
self.allow_all_prereleases
if self.allow_all_prereleases else None
applicable_candidates = [
# Again, converting to str to deal with debundling.
c for c in all_candidates if str(c.version) in compatible_versions
if applicable_candidates:
best_candidate = max(applicable_candidates,
key=self._candidate_sort_key)
# If we cannot find a non-yanked candidate,
# use the best one and print a warning about it.
# Otherwise, try to find another best candidate, ignoring
# all the yanked releases.
if getattr(best_candidate.location, "yanked", False):
c for c in applicable_candidates
if not getattr(c.location, "yanked", False)