From 4760279bd1bf8bec66d7c2539a5bca4ff6854203 Mon Sep 17 00:00:00 2001 From: Kydaix Date: Sat, 25 Oct 2025 13:45:22 +0000 Subject: [PATCH] Improve lint compliance --- pykeypull/cli.py | 10 +++++++++- pykeypull/extractor.py | 18 ++++++++++++++---- pykeypull/keybox.py | 22 +++++++++++++++------- tests/test_extractor.py | 37 ++++++++++++++++++++++++++++++++----- 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/pykeypull/cli.py b/pykeypull/cli.py index b24c66e..5795b56 100644 --- a/pykeypull/cli.py +++ b/pykeypull/cli.py @@ -10,7 +10,11 @@ def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser(description="Extract Android keystore and keybox files over ADB") + """Build the command-line argument parser for the KeyPull CLI.""" + + parser = argparse.ArgumentParser( + description="Extract Android keystore and keybox files over ADB", + ) parser.add_argument( "locations", nargs="*", @@ -30,6 +34,8 @@ def build_parser() -> argparse.ArgumentParser: def run(argv: Sequence[str] | None = None) -> int: + """Execute the extraction workflow and return the exit status.""" + parser = build_parser() args = parser.parse_args(argv) @@ -58,4 +64,6 @@ def run(argv: Sequence[str] | None = None) -> int: def main() -> None: + """Entrypoint used by ``python -m`` and console scripts.""" + raise SystemExit(run()) diff --git a/pykeypull/extractor.py b/pykeypull/extractor.py index 6246965..cd86cd3 100644 --- a/pykeypull/extractor.py +++ b/pykeypull/extractor.py @@ -37,7 +37,8 @@ def adb_stat(self) -> None: except FileNotFoundError as exc: raise ExtractionError("ADB executable not found in PATH") from exc except subprocess.CalledProcessError as exc: - raise ExtractionError(f"failed to start ADB server: {exc.stderr.decode().strip()}") from exc + error = exc.stderr.decode().strip() + raise ExtractionError(f"failed to start ADB server: {error}") from exc try: result = subprocess.run( @@ -48,7 +49,8 @@ def adb_stat(self) -> None: text=True, ) except subprocess.CalledProcessError as exc: - raise ExtractionError(f"failed to fetch connected devices: {exc.stderr.strip()}") from exc + error = exc.stderr.strip() + raise ExtractionError(f"failed to fetch connected devices: {error}") from exc connected: List[str] = [] for line in result.stdout.splitlines(): @@ -89,7 +91,8 @@ def obtain_root(self) -> None: ) except subprocess.CalledProcessError: raise ExtractionError( - "root access required but not available. Please root your device or enable root access in Developer Options." + "root access required but not available. Enable Developer Options " + "root access or root the device." ) from None print("Obtained root via SU") return @@ -100,6 +103,8 @@ def obtain_root(self) -> None: # ------------------------------------------------------------------ # File operations def ensure_output_directory(self) -> None: + """Create the output directory if it does not already exist.""" + self.output.mkdir(parents=True, exist_ok=True) def extract_from_location(self, location: str) -> None: @@ -115,10 +120,14 @@ def extract_from_location(self, location: str) -> None: # ------------------------------------------------------------------ # Internal helpers def _ensure_device(self) -> None: + """Verify that a device has been discovered before continuing.""" + if not self.device: raise ExtractionError("ADB device not initialised; call adb_stat() first") def _adb_pull(self, remote: str, local: Path) -> None: + """Pull a file from the device to the local filesystem.""" + self._ensure_device() local.parent.mkdir(parents=True, exist_ok=True) @@ -130,7 +139,8 @@ def _adb_pull(self, remote: str, local: Path) -> None: stderr=subprocess.PIPE, ) except subprocess.CalledProcessError as exc: - raise ExtractionError(f"ADB pull failed for {remote}: {exc.stderr.decode().strip()}") from exc + error = exc.stderr.decode().strip() + raise ExtractionError(f"ADB pull failed for {remote}: {error}") from exc def _pull_keybox(self, remote: str) -> None: destination = self.output / Path(remote).name diff --git a/pykeypull/keybox.py b/pykeypull/keybox.py index ab8a88a..d4ed40f 100644 --- a/pykeypull/keybox.py +++ b/pykeypull/keybox.py @@ -10,24 +10,32 @@ @dataclass class Certificate: + """Represent a single certificate element in the XML structure.""" + format: str data: str @dataclass class CertificateChain: + """Container for the certificates associated with a key.""" + number_of_certificates: int certificates: List[Certificate] = field(default_factory=list) @dataclass class PrivateKey: + """Hold the private key metadata referenced by a key entry.""" + format: str data: str @dataclass class Key: + """Full key entry combining algorithm, private key, and certificates.""" + algorithm: str private_key: PrivateKey certificate_chain: CertificateChain @@ -35,12 +43,16 @@ class Key: @dataclass class Keybox: + """Group of keys tied to a specific device identifier.""" + device_id: str keys: List[Key] = field(default_factory=list) @dataclass class Attestation: + """Root element representing the contents of a keybox file.""" + number_of_keyboxes: int keyboxes: List[Keybox] = field(default_factory=list) @@ -131,12 +143,8 @@ def validate(path: Path) -> Attestation: for index, keybox in enumerate(attestation.keyboxes, start=1): print(f"Keybox {index} - Device ID: {keybox.device_id}") for key_index, key in enumerate(keybox.keys, start=1): - print( - " Key {idx}: Algorithm={alg}, Certificates={count}".format( - idx=key_index, - alg=key.algorithm or "unknown", - count=key.certificate_chain.number_of_certificates, - ) - ) + certificate_count = key.certificate_chain.number_of_certificates + algorithm = key.algorithm or "unknown" + print(f" Key {key_index}: Algorithm={algorithm}, Certificates={certificate_count}") return attestation diff --git a/tests/test_extractor.py b/tests/test_extractor.py index deafcb9..14efbf4 100644 --- a/tests/test_extractor.py +++ b/tests/test_extractor.py @@ -1,17 +1,28 @@ +"""Unit tests covering the Extractor workflow.""" + import subprocess import unittest from unittest.mock import call, patch +# Accessing protected members is acceptable in unit tests. +# pylint: disable=protected-access + from pykeypull.extractor import Extractor, ExtractionError class ExtractorTests(unittest.TestCase): + """Behavioural tests for the :class:`Extractor` class.""" + def setUp(self) -> None: + """Create a fresh extractor instance for each test.""" + # Ensure output directory uses a temporary unique path for tests self.extractor = Extractor(output="test-output") self.addCleanup(self._cleanup_output) def _cleanup_output(self) -> None: + """Remove any output directory created during a test run.""" + if self.extractor.output.exists(): for child in self.extractor.output.iterdir(): if child.is_file(): @@ -19,6 +30,8 @@ def _cleanup_output(self) -> None: self.extractor.output.rmdir() def test_adb_stat_detects_device(self) -> None: + """ADB device discovery should record the first connected device.""" + with patch("pykeypull.extractor.subprocess.run") as mock_run: mock_run.side_effect = [ subprocess.CompletedProcess(args=[], returncode=0, stdout=b"", stderr=b""), @@ -52,11 +65,15 @@ def test_adb_stat_detects_device(self) -> None: ) def test_obtain_root_via_adb(self) -> None: + """Rooting via ADB should succeed when the command runs cleanly.""" + self.extractor.device = "ABC123" with patch("pykeypull.extractor.subprocess.run") as mock_run, patch( - "pykeypull.extractor.time.sleep" + "pykeypull.extractor.time.sleep", ) as mock_sleep: - mock_run.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout=b"", stderr=b"") + mock_run.return_value = subprocess.CompletedProcess( + args=[], returncode=0, stdout=b"", stderr=b"" + ) self.extractor.obtain_root() @@ -69,6 +86,8 @@ def test_obtain_root_via_adb(self) -> None: mock_sleep.assert_called_once_with(2) def test_obtain_root_falls_back_to_su(self) -> None: + """If root fails, the extractor should attempt an SU fallback.""" + self.extractor.device = "ABC123" with patch("pykeypull.extractor.subprocess.run") as mock_run: mock_run.side_effect = [ @@ -91,6 +110,8 @@ def test_obtain_root_falls_back_to_su(self) -> None: ) def test_obtain_root_raises_when_su_fails(self) -> None: + """The extractor should raise an error if SU access is unavailable.""" + self.extractor.device = "ABC123" with patch("pykeypull.extractor.subprocess.run") as mock_run: mock_run.side_effect = [ @@ -102,12 +123,16 @@ def test_obtain_root_raises_when_su_fails(self) -> None: self.extractor.obtain_root() def test_pull_directory_pulls_each_file(self) -> None: + """Directory extraction should download each discovered file.""" + self.extractor.device = "ABC123" directory_listing = "/data/file1\n/data/keybox.xml\n" - with patch("pykeypull.extractor.subprocess.run") as mock_run, patch.object( - self.extractor, "_adb_pull" - ) as mock_pull, patch("pykeypull.extractor.validate_keybox") as mock_validate: + with ( + patch("pykeypull.extractor.subprocess.run") as mock_run, + patch.object(self.extractor, "_adb_pull") as mock_pull, + patch("pykeypull.extractor.validate_keybox") as mock_validate, + ): mock_run.return_value = subprocess.CompletedProcess( args=[], returncode=0, stdout=directory_listing, stderr="" ) @@ -128,6 +153,8 @@ def test_pull_directory_pulls_each_file(self) -> None: mock_validate.assert_called_once_with(expected_destinations[1]) def test_extract_all_collects_successful_locations(self) -> None: + """Only successful extraction locations should be returned.""" + locations = ["one", "two", "three"] with patch.object(self.extractor, "extract_from_location") as mock_extract: