Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pykeypull/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Extract Android keystore and keybox files over ADB")
"""Build the command-line argument parser for the KeyPull CLI."""

parser = argparse.ArgumentParser(
description="Extract Android keystore and keybox files over ADB",
)
parser.add_argument(
"locations",
nargs="*",
Expand All @@ -30,6 +34,8 @@ def build_parser() -> argparse.ArgumentParser:


def run(argv: Sequence[str] | None = None) -> int:
"""Execute the extraction workflow and return the exit status."""

parser = build_parser()
args = parser.parse_args(argv)

Expand Down Expand Up @@ -58,4 +64,6 @@ def run(argv: Sequence[str] | None = None) -> int:


def main() -> None:
"""Entrypoint used by ``python -m`` and console scripts."""

raise SystemExit(run())
18 changes: 14 additions & 4 deletions pykeypull/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def adb_stat(self) -> None:
except FileNotFoundError as exc:
raise ExtractionError("ADB executable not found in PATH") from exc
except subprocess.CalledProcessError as exc:
raise ExtractionError(f"failed to start ADB server: {exc.stderr.decode().strip()}") from exc
error = exc.stderr.decode().strip()
raise ExtractionError(f"failed to start ADB server: {error}") from exc

try:
result = subprocess.run(
Expand All @@ -48,7 +49,8 @@ def adb_stat(self) -> None:
text=True,
)
except subprocess.CalledProcessError as exc:
raise ExtractionError(f"failed to fetch connected devices: {exc.stderr.strip()}") from exc
error = exc.stderr.strip()
raise ExtractionError(f"failed to fetch connected devices: {error}") from exc

connected: List[str] = []
for line in result.stdout.splitlines():
Expand Down Expand Up @@ -89,7 +91,8 @@ def obtain_root(self) -> None:
)
except subprocess.CalledProcessError:
raise ExtractionError(
"root access required but not available. Please root your device or enable root access in Developer Options."
"root access required but not available. Enable Developer Options "
"root access or root the device."
) from None
print("Obtained root via SU")
return
Expand All @@ -100,6 +103,8 @@ def obtain_root(self) -> None:
# ------------------------------------------------------------------
# File operations
def ensure_output_directory(self) -> None:
"""Create the output directory if it does not already exist."""

self.output.mkdir(parents=True, exist_ok=True)

def extract_from_location(self, location: str) -> None:
Expand All @@ -115,10 +120,14 @@ def extract_from_location(self, location: str) -> None:
# ------------------------------------------------------------------
# Internal helpers
def _ensure_device(self) -> None:
"""Verify that a device has been discovered before continuing."""

if not self.device:
raise ExtractionError("ADB device not initialised; call adb_stat() first")

def _adb_pull(self, remote: str, local: Path) -> None:
"""Pull a file from the device to the local filesystem."""

self._ensure_device()

local.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -130,7 +139,8 @@ def _adb_pull(self, remote: str, local: Path) -> None:
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as exc:
raise ExtractionError(f"ADB pull failed for {remote}: {exc.stderr.decode().strip()}") from exc
error = exc.stderr.decode().strip()
raise ExtractionError(f"ADB pull failed for {remote}: {error}") from exc

def _pull_keybox(self, remote: str) -> None:
destination = self.output / Path(remote).name
Expand Down
22 changes: 15 additions & 7 deletions pykeypull/keybox.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,49 @@

@dataclass
class Certificate:
"""Represent a single certificate element in the XML structure."""

format: str
data: str


@dataclass
class CertificateChain:
"""Container for the certificates associated with a key."""

number_of_certificates: int
certificates: List[Certificate] = field(default_factory=list)


@dataclass
class PrivateKey:
"""Hold the private key metadata referenced by a key entry."""

format: str
data: str


@dataclass
class Key:
"""Full key entry combining algorithm, private key, and certificates."""

algorithm: str
private_key: PrivateKey
certificate_chain: CertificateChain


@dataclass
class Keybox:
"""Group of keys tied to a specific device identifier."""

device_id: str
keys: List[Key] = field(default_factory=list)


@dataclass
class Attestation:
"""Root element representing the contents of a keybox file."""

number_of_keyboxes: int
keyboxes: List[Keybox] = field(default_factory=list)

Expand Down Expand Up @@ -131,12 +143,8 @@ def validate(path: Path) -> Attestation:
for index, keybox in enumerate(attestation.keyboxes, start=1):
print(f"Keybox {index} - Device ID: {keybox.device_id}")
for key_index, key in enumerate(keybox.keys, start=1):
print(
" Key {idx}: Algorithm={alg}, Certificates={count}".format(
idx=key_index,
alg=key.algorithm or "unknown",
count=key.certificate_chain.number_of_certificates,
)
)
certificate_count = key.certificate_chain.number_of_certificates
algorithm = key.algorithm or "unknown"
print(f" Key {key_index}: Algorithm={algorithm}, Certificates={certificate_count}")

return attestation
37 changes: 32 additions & 5 deletions tests/test_extractor.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
"""Unit tests covering the Extractor workflow."""

import subprocess
import unittest
from unittest.mock import call, patch

# Accessing protected members is acceptable in unit tests.
# pylint: disable=protected-access

from pykeypull.extractor import Extractor, ExtractionError


class ExtractorTests(unittest.TestCase):
"""Behavioural tests for the :class:`Extractor` class."""

def setUp(self) -> None:
"""Create a fresh extractor instance for each test."""

# Ensure output directory uses a temporary unique path for tests
self.extractor = Extractor(output="test-output")
self.addCleanup(self._cleanup_output)

def _cleanup_output(self) -> None:
"""Remove any output directory created during a test run."""

if self.extractor.output.exists():
for child in self.extractor.output.iterdir():
if child.is_file():
child.unlink()
self.extractor.output.rmdir()

def test_adb_stat_detects_device(self) -> None:
"""ADB device discovery should record the first connected device."""

with patch("pykeypull.extractor.subprocess.run") as mock_run:
mock_run.side_effect = [
subprocess.CompletedProcess(args=[], returncode=0, stdout=b"", stderr=b""),
Expand Down Expand Up @@ -52,11 +65,15 @@ def test_adb_stat_detects_device(self) -> None:
)

def test_obtain_root_via_adb(self) -> None:
"""Rooting via ADB should succeed when the command runs cleanly."""

self.extractor.device = "ABC123"
with patch("pykeypull.extractor.subprocess.run") as mock_run, patch(
"pykeypull.extractor.time.sleep"
"pykeypull.extractor.time.sleep",
) as mock_sleep:
mock_run.return_value = subprocess.CompletedProcess(args=[], returncode=0, stdout=b"", stderr=b"")
mock_run.return_value = subprocess.CompletedProcess(
args=[], returncode=0, stdout=b"", stderr=b""
)

self.extractor.obtain_root()

Expand All @@ -69,6 +86,8 @@ def test_obtain_root_via_adb(self) -> None:
mock_sleep.assert_called_once_with(2)

def test_obtain_root_falls_back_to_su(self) -> None:
"""If root fails, the extractor should attempt an SU fallback."""

self.extractor.device = "ABC123"
with patch("pykeypull.extractor.subprocess.run") as mock_run:
mock_run.side_effect = [
Expand All @@ -91,6 +110,8 @@ def test_obtain_root_falls_back_to_su(self) -> None:
)

def test_obtain_root_raises_when_su_fails(self) -> None:
"""The extractor should raise an error if SU access is unavailable."""

self.extractor.device = "ABC123"
with patch("pykeypull.extractor.subprocess.run") as mock_run:
mock_run.side_effect = [
Expand All @@ -102,12 +123,16 @@ def test_obtain_root_raises_when_su_fails(self) -> None:
self.extractor.obtain_root()

def test_pull_directory_pulls_each_file(self) -> None:
"""Directory extraction should download each discovered file."""

self.extractor.device = "ABC123"
directory_listing = "/data/file1\n/data/keybox.xml\n"

with patch("pykeypull.extractor.subprocess.run") as mock_run, patch.object(
self.extractor, "_adb_pull"
) as mock_pull, patch("pykeypull.extractor.validate_keybox") as mock_validate:
with (
patch("pykeypull.extractor.subprocess.run") as mock_run,
patch.object(self.extractor, "_adb_pull") as mock_pull,
patch("pykeypull.extractor.validate_keybox") as mock_validate,
):
mock_run.return_value = subprocess.CompletedProcess(
args=[], returncode=0, stdout=directory_listing, stderr=""
)
Expand All @@ -128,6 +153,8 @@ def test_pull_directory_pulls_each_file(self) -> None:
mock_validate.assert_called_once_with(expected_destinations[1])

def test_extract_all_collects_successful_locations(self) -> None:
"""Only successful extraction locations should be returned."""

locations = ["one", "two", "three"]

with patch.object(self.extractor, "extract_from_location") as mock_extract:
Expand Down
Loading