Skip to content

Commit

Permalink
Separate validation of RECORD (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
BlueGlassBlock authored Dec 9, 2022
1 parent 45334ce commit 49a3540
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 16 deletions.
94 changes: 87 additions & 7 deletions src/installer/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import stat
import zipfile
from contextlib import contextmanager
from typing import BinaryIO, Iterator, List, Tuple, cast
from typing import BinaryIO, ClassVar, Iterator, List, Tuple, Type, cast

from installer.records import parse_record_file
from installer.records import RecordEntry, parse_record_file
from installer.utils import parse_wheel_filename

WheelContentElement = Tuple[Tuple[str, str, str], BinaryIO, bool]
Expand All @@ -22,6 +22,8 @@ class WheelSource:
This is an abstract class, whose methods have to be implemented by subclasses.
"""

validation_error: ClassVar[Type[Exception]] = ValueError

def __init__(self, distribution: str, version: str) -> None:
"""Initialize a WheelSource object.
Expand Down Expand Up @@ -65,6 +67,14 @@ def read_dist_info(self, filename: str) -> str:
"""
raise NotImplementedError

def validate_record(self) -> None:
"""Validate ``RECORD`` of the wheel.
This method should be called before :py:func:`install <installer.install>`
if validation is required.
"""
raise NotImplementedError

def get_contents(self) -> Iterator[WheelContentElement]:
"""Sequential access to all contents of the wheel (including dist-info files).
Expand All @@ -91,6 +101,17 @@ def get_contents(self) -> Iterator[WheelContentElement]:
raise NotImplementedError


class _WheelFileValidationError(ValueError):
"""Raised when a wheel file fails validation."""

def __init__(self, issues: List[str]) -> None: # noqa: D107
super().__init__(repr(issues))
self.issues = issues

def __repr__(self) -> str:
return f"WheelFileValidationError(issues={self.issues!r})"


class WheelFile(WheelSource):
"""Implements `WheelSource`, for an existing file from the filesystem.
Expand All @@ -100,6 +121,8 @@ class WheelFile(WheelSource):
... installer.install(source, destination)
"""

validation_error = _WheelFileValidationError

def __init__(self, f: zipfile.ZipFile) -> None:
"""Initialize a WheelFile object.
Expand Down Expand Up @@ -138,6 +161,66 @@ def read_dist_info(self, filename: str) -> str:
path = posixpath.join(self.dist_info_dir, filename)
return self._zipfile.read(path).decode("utf-8")

def validate_record(self, validate_file: bool = True) -> None:
"""Validate ``RECORD`` of the wheel.
This method should be called before :py:func:`install <installer.install>`
if validation is required.
File names will always be validated against ``RECORD``.
If ``validate_file`` is true, every file in the archive will be validated
against corresponding entry in ``RECORD``.
:param validate_file: Whether to validate content integrity.
"""
try:
record_lines = self.read_dist_info("RECORD").splitlines()
records = parse_record_file(record_lines)
except Exception as exc:
raise _WheelFileValidationError(
[f"Unable to retrieve `RECORD` from {self._zipfile.filename}: {exc!r}"]
) from exc

record_mapping = {record[0]: record for record in records}
issues: List[str] = []

for item in self._zipfile.infolist():
if item.filename[-1:] == "/": # looks like a directory
continue

record_args = record_mapping.pop(item.filename, None)

if self.dist_info_dir == posixpath.commonprefix(
[self.dist_info_dir, item.filename]
) and item.filename.split("/")[-1] in ("RECORD.p7s", "RECORD.jws"):
# both are for digital signatures, and not mentioned in RECORD
continue

if record_args is None:
issues.append(
f"In {self._zipfile.filename}, {item.filename} is not mentioned in RECORD"
)
continue

if item.filename == f"{self.dist_info_dir}/RECORD":
# Skip record file
continue
if validate_file:
record = RecordEntry.from_elements(*record_args)
data = self._zipfile.read(item)
if record.hash_ is None or record.size is None:
# Report empty hash / size
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} is not included in RECORD"
)
elif not record.validate(data):
issues.append(
f"In {self._zipfile.filename}, hash / size of {item.filename} didn't match RECORD"
)

if issues:
raise _WheelFileValidationError(issues)

def get_contents(self) -> Iterator[WheelContentElement]:
"""Sequential access to all contents of the wheel (including dist-info files).
Expand All @@ -154,11 +237,8 @@ def get_contents(self) -> Iterator[WheelContentElement]:
if item.filename[-1:] == "/": # looks like a directory
continue

record = record_mapping.pop(item.filename, None)
assert record is not None, "In {}, {} is not mentioned in RECORD".format(
self._zipfile.filename,
item.filename,
) # should not happen for valid wheels
# Pop record with empty default, because validation is handled by `validate_record`
record = record_mapping.pop(item.filename, (item.filename, "", ""))

# Borrowed from:
# /~https://github.com/pypa/pip/blob/0f21fb92/src/pip/_internal/utils/unpacking.py#L96-L100
Expand Down
16 changes: 7 additions & 9 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,14 @@ def main():
Platform: UNKNOWN
Classifier: Intended Audience :: Developers
""",
# The RECORD file is indirectly validated by the WheelFile, since it only
# provides the items that are a part of the wheel.
"fancy-1.0.0.dist-info/RECORD": b"""\
fancy/__init__.py,,
fancy/__main__.py,,
fancy-1.0.0.data/data/fancy/data.py,,
fancy-1.0.0.dist-info/top_level.txt,,
fancy-1.0.0.dist-info/entry_points.txt,,
fancy-1.0.0.dist-info/WHEEL,,
fancy-1.0.0.dist-info/METADATA,,
fancy/__init__.py,sha256=qZ2qq7xVBAiUFQVv-QBHhdtCUF5p1NsWwSOiD7qdHN0,36
fancy/__main__.py,sha256=Wd4SyWJOIMsHf_5-0oN6aNFwen8ehJnRo-erk2_K-eY,61
fancy-1.0.0.data/data/fancy/data.py,sha256=nuFRUNQF5vP7FWE-v5ysyrrfpIaAvfzSiGOgfPpLOeI,17
fancy-1.0.0.dist-info/top_level.txt,sha256=SW-yrrF_c8KlserorMw54inhLjZ3_YIuLz7fYT4f8ao,6
fancy-1.0.0.dist-info/entry_points.txt,sha256=AxJl21_zgoNWjCfvSkC9u_rWSzGyCtCzhl84n979jCc,75
fancy-1.0.0.dist-info/WHEEL,sha256=1DrXMF1THfnBjsdS5sZn-e7BKcmUn7jnMbShGeZomgc,84
fancy-1.0.0.dist-info/METADATA,sha256=hRhZavK_Y6WqKurFFAABDnoVMjZFBH0NJRjwLOutnJI,236
fancy-1.0.0.dist-info/RECORD,,
""",
}
Expand Down
4 changes: 4 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def dist_info_filenames(self):
def read_dist_info(self, filename):
return self.dist_info_files[filename]

def validate_record(self) -> None:
# Skip validation since the logic is different.
return

def get_contents(self):
# Sort for deterministic behaviour for Python versions that do not preserve
# insertion order for dictionaries.
Expand Down
139 changes: 139 additions & 0 deletions tests/test_sources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import posixpath
import zipfile
from base64 import urlsafe_b64encode
from hashlib import sha256

import pytest

Expand Down Expand Up @@ -30,6 +33,9 @@ def test_raises_not_implemented_error(self):
with pytest.raises(NotImplementedError):
source.get_contents()

with pytest.raises(NotImplementedError):
source.validate_record()


class TestWheelFile:
def test_rejects_not_okay_name(self, tmp_path):
Expand Down Expand Up @@ -92,3 +98,136 @@ def test_provides_correct_contents(self, fancy_wheel):

assert sorted(got_records) == sorted(expected_records)
assert got_files == files


def replace_file_in_zip(path: str, filename: str, content: "bytes | None") -> None:
"""Helper function for replacing a file in the zip.
Exists because ZipFile doesn't support remove.
"""
files = {}
# Copy everything except `filename`, and replace it with `content`.
with zipfile.ZipFile(path) as archive:
for file in archive.namelist():
if file == filename:
if content is None:
continue # Remove the file
files[file] = content
else:
files[file] = archive.read(file)
# Replace original archive
with zipfile.ZipFile(path, mode="w") as archive:
for name, content in files.items():
archive.writestr(name, content)


def test_rejects_no_record_on_validate(fancy_wheel):
# Remove RECORD
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=None,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error, match="Unable to retrieve `RECORD`"
):
source.validate_record()


def test_rejects_record_missing_file_on_validate(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

# Remove the first two entries from the RECORD file
new_record_file_contents = b"\n".join(record_file_contents.split(b"\n")[2:])
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(WheelFile.validation_error, match="not mentioned in RECORD"):
source.validate_record()


def test_rejects_record_missing_hash(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) is not included in RECORD",
):
source.validate_record()


def test_accept_record_missing_hash_on_skip_validation(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] + b",," # file name with empty size and hash
for line in record_file_contents.split(b"\n")
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
source.validate_record(validate_file=False)


def test_accept_wheel_with_signed_file(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()
hash_b64_nopad = (
urlsafe_b64encode(sha256(record_file_contents).digest())
.decode("utf-8")
.rstrip("=")
)
jws_content = json.dumps({"hash": f"sha256={hash_b64_nopad}"})
with zipfile.ZipFile(fancy_wheel, "a") as archive:
archive.writestr("fancy-1.0.0.dist-info/RECORD.jws", jws_content)
with WheelFile.open(fancy_wheel) as source:
source.validate_record()


def test_rejects_record_validation_failed(fancy_wheel):
with zipfile.ZipFile(fancy_wheel) as archive:
with archive.open("fancy-1.0.0.dist-info/RECORD") as f:
record_file_contents = f.read()

new_record_file_contents = b"\n".join(
line.split(b",")[0] # Original filename
+ b",sha256=pREiHcl39jRySUXMCOrwmSsnOay8FB7fOJP5mZQ3D3A,"
+ line.split(b",")[2] # Original size
for line in record_file_contents.split(b"\n")
if line # ignore trail endline
)
replace_file_in_zip(
fancy_wheel,
filename="fancy-1.0.0.dist-info/RECORD",
content=new_record_file_contents,
)
with WheelFile.open(fancy_wheel) as source:
with pytest.raises(
WheelFile.validation_error,
match="hash / size of (.+) didn't match RECORD",
):
source.validate_record()

0 comments on commit 49a3540

Please sign in to comment.