Add BaseChunkedParser for automatic large PDF splitting and reassembly#909
Add BaseChunkedParser for automatic large PDF splitting and reassembly#909
Conversation
Large documents (70+ pages) that exceed Cloud Run timeout and response size limits now automatically split into page-range chunks for independent parsing and reassembly. New BaseChunkedParser abstract class extends BaseParser with: - Automatic PDF splitting via pypdf with configurable thresholds - Optional concurrent chunk dispatch via ThreadPoolExecutor - Per-chunk retry with exponential back-off - Correct reassembly of PAWLs page indices, annotation references, and parent-child relationships across chunk boundaries - Post-reassembly hook for document-wide processing (image extraction) DoclingParser now extends BaseChunkedParser. Documents below the chunking threshold (default: 75 pages) are processed unchanged. Configurable via PipelineSettings (DOCLING_MAX_PAGES_PER_CHUNK, DOCLING_MIN_PAGES_FOR_CHUNKING, DOCLING_MAX_CONCURRENT_CHUNKS). https://claude.ai/code/session_016xZEFaSfqGzQuFa2xrCpT4
Code Review:
|
Moves the identical _make_pdf helper from test_pdf_splitting.py and test_chunked_parser.py into a shared opencontractserver/tests/helpers.py module.
Previously, when one chunk raised an exception inside the ThreadPoolExecutor context manager, executor.__exit__ called shutdown(wait=True), blocking until all remaining futures completed. Now the executor is managed manually with shutdown(wait=False, cancel_futures=True) in a finally block, so failures surface immediately.
The concurrent test previously only checked page_count but not that PAWLs pages were in correct global order. Since as_completed returns futures in completion order, this assertion exercises the ordering logic in results_by_index.
Each chunk is parsed independently so parent-child annotation references spanning chunk boundaries become orphaned after reassembly. This adds a warning during reassembly when orphaned parent_ids are detected and documents the limitation in the class docstring.
Previously all chunk PDFs were pre-split into memory before any parsing began. Now the sequential path splits each chunk on demand, so only one chunk PDF is in memory at a time. The concurrent path still pre-splits since all futures are submitted upfront.
Avoids re-parsing the full PDF for every chunk when splitting concurrently. The concurrent dispatch path now creates a single shared PdfReader and passes it to all split calls.
Code Review: BaseChunkedParser for Large PDF SplittingOverall this is a well-structured PR. The design is clean, the documentation is thorough, and the test coverage is solid. A few items worth addressing before merging: Bugs / Correctness1. PAWLS page mutation via mutable default sentinel ( page_info = page_data.get("page", {})
page_info["index"] = page_info.get("index", 0) + offsetIf if "page" in page_data:
page_data["page"]["index"] = page_data["page"].get("index", 0) + offset2. Single-chunk shortcut returns the original dict without copying if len(chunk_results) == 1 and page_offsets[0] == 0:
return chunk_results[0]The caller ( 3. Duplicate reader = PdfReader(io.BytesIO(pdf_bytes))
reader = PdfReader(io.BytesIO(pdf_bytes)) # dead assignmentThe first assignment is immediately overwritten. One line should be removed. Performance / Resource Management4. Concurrent mode pre-splits all chunks into memory simultaneously In chunk_data: list[tuple[int, bytes, int]] = []
for idx, (start, end) in enumerate(chunks):
chunk_bytes = split_pdf_by_page_range(...)
chunk_data.append((idx, chunk_bytes, start))For a 500-page document split into 10 × 50-page chunks, this holds all 10 chunk PDFs in memory at once (potentially several hundred MB if each chunk is ~20 MB). This trade-off is intentional for concurrent dispatch, but it's worth documenting explicitly in the class docstring alongside the existing "Limitation" note, so operators can make an informed decision when tuning 5. Cancelled concurrent threads can still block until their network timeout
Code Quality6. import time
time.sleep(5)Top-level imports are preferred. The outer test file already imports 7. Timing-sensitive assertion in self.assertLess(elapsed, 3.0, "Failure should not wait for remaining chunks")A 3-second wall-clock assertion is fragile on slow/loaded CI runners. The slow chunk sleeps for 5 seconds, so the guard is testing that the cancelled-but-not-yet-started futures don't run. That's the right thing to test, but the assertion margin is tight. Consider raising the threshold (e.g. 4.5s) or restructuring the test to mock 8.
Minor / Nits9. The flow for non-integer page keys is: except (ValueError, TypeError):
new_key = page_key
new_json[new_key] = page_data # added here
continue # skip tokensJsons mutation and the normal append belowThis is functionally correct, but the assignment and 10. Log message alignment with actual threshold semantics The log message in What's working well
Items 1 and 8 should be confirmed before merge; the rest can be addressed at your discretion. |
Tests were failing because DoclingParser now extends BaseChunkedParser, which introduced a new code path: read PDF from storage -> count pages via pypdf -> decide chunking. Tests needed two fixes: 1. Mock get_pdf_page_count (returns 1) so invalid mock PDF bytes don't cause ValueError in pypdf's PdfReader 2. Fix default_storage mock path in test_pipeline_hardening.py from docling_parser_rest to chunked_parser (where storage is now used) 3. Set chunk_retry_limit=0 to disable per-chunk retries in unit tests (prevents duplicate requests.post calls and 5s backoff delays)
PR Review: Add BaseChunkedParser for automatic large PDF splitting and reassemblyOverall this is well-designed and clearly thought through. The layered architecture (utility functions → abstract base → concrete parser), the post-reassembly hook, and the per-chunk retry logic are all solid. Test coverage is good. I have a mix of bugs, design concerns, and minor nits below. Bugs1. Duplicate line in test ( def test_accepts_prebuilt_reader(self):
pdf_bytes = make_test_pdf(10)
reader = PdfReader(io.BytesIO(pdf_bytes))
reader = PdfReader(io.BytesIO(pdf_bytes)) # ← duplicate, dead assignment
chunk = split_pdf_by_page_range(pdf_bytes, 2, 5, reader=reader)Harmless but should be removed. 2. Infinite loop if while start < total_pages:
end = min(start + max_pages_per_chunk, total_pages) # end == start if max == 0
chunks.append((start, end))
start = end # never advancesIf a misconfigured or malicious setting supplies if max_pages_per_chunk <= 0:
raise ValueError(f"max_pages_per_chunk must be > 0, got {max_pages_per_chunk}")Similarly, Design Concerns3. Single-chunk fast-path in if len(chunk_results) == 1 and page_offsets[0] == 0:
return chunk_results[0] # IDs are NOT prefixed (e.g. "ann-1")In the production flow, When it is exercised (tests calling the function directly), it creates a subtle inconsistency: a single chunk at offset 0 returns unprefixed IDs ( Recommendation: either remove the short-circuit (always prefix, even for single chunk), or document explicitly that this is a test-only path and add a comment to prevent future surprise. 4. Cross-chunk parent-child is a significant data quality limitation — consider filing a follow-up The orphaned-parent warning is appreciated, but for structure-heavy contracts this will be frequent. Consider whether the chunk boundary could be shifted to avoid splitting in the middle of a detected section: e.g., after a first pass that reads only page structure metadata, snap chunk boundaries to section headers. Even a heuristic (never split on a page that starts a new heading) would reduce orphan rate significantly. This could be a follow-up issue rather than blocking this PR, but worth filing. 5. backoff = 5 * (2 ** (attempt - 1))
time.sleep(backoff)With the default Potential Flakiness6. Timing assertion in elapsed = time.monotonic() - start
self.assertLess(elapsed, 3.0, "Failure should not wait for remaining chunks")The sleeping chunk uses Minor / Style7. Missing boundary test for
8. return results # type: ignore[return-value] # None slots would have raised aboveThe reasoning is correct, but returning from typing import cast
return cast(list[OpenContractDocExport], results)9. Memory note for concurrent dispatch The concurrent path holds all chunk bytes in memory simultaneously before the thread pool starts. For a 500-page document with 10 chunks this is significant. A comment noting the trade-off (vs lazy sequential splitting) would help future maintainers understand why the two paths exist. 10. Orphaned-parent log level
What's Done Well
|
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
…ocessing-nYJRg # Conflicts: # CHANGELOG.md
Code Review: BaseChunkedParser for Large PDF ProcessingOverall this is a well-structured addition. The abstraction is clean, the reassembly logic is isolated into a pure function (easy to test), and the public API of existing parsers is untouched. The test coverage is solid. Below are specific findings ranging from bugs to suggestions. Bugs / Issues1. Duplicate PdfReader construction in test (test_pdf_splitting.py:94-95)reader = PdfReader(io.BytesIO(pdf_bytes))
reader = PdfReader(io.BytesIO(pdf_bytes)) # redundant - first reader is discardedThe first reader is immediately discarded. Harmless but noisy - remove one line. 2. Orphaned cross-chunk parent_id references left in output dataThe reassembly code detects and warns about orphaned parent_id references but leaves the broken references in the returned data: # chunked_parser.py ~line 462
if orphaned_count > 0:
logger.warning(...)
# no cleanup; consumers downstream will find dangling parent_idsDownstream code that trusts parent_id to point to a valid annotation will encounter silent data corruption - a warning is not enough. Consider setting orphaned parent_id values to None during reassembly, which is the safe sentinel the rest of the codebase uses for root-level annotations. If you want to preserve the original reference for debugging, store it in a separate field. 3. cancel_futures=True does not interrupt running threadsIn _dispatch_concurrent: finally:
executor.shutdown(wait=False, cancel_futures=True)cancel_futures=True only cancels queued futures that have not started executing yet. Python threads that are already running (e.g., mid-HTTP-request or sleeping through retry back-off) will NOT be interrupted - they continue in the background until they complete naturally. On failure, you may have several lingering HTTP requests to the Docling service consuming connections. The test_concurrent_failure_cancels_remaining test passes (< 3s) only because wait=False causes the main thread not to block - the background threads are still running their 5s sleep after the test returns. This is an acceptable trade-off, but the docstring for _dispatch_concurrent should document that background threads may outlive the raised exception. Design / Architecture Concerns4. Memory spike in concurrent mode - all chunk bytes held simultaneously# chunked_parser.py ~line 207
chunk_data: list[tuple[int, bytes, int]] = []
for idx, (start, end) in enumerate(chunks):
chunk_bytes = split_pdf_by_page_range(...)
chunk_data.append((idx, chunk_bytes, start))
# All chunk bytes are in memory before any chunk is submitted
chunk_results = self._dispatch_concurrent(..., chunk_data=chunk_data, ...)For concurrent dispatch, all chunk PDFs are pre-split and held in memory before any chunk is submitted. For a 500-page high-resolution PDF split into 10 chunks, this holds 10x the per-chunk PDF size simultaneously (plus the original pdf_bytes). Sequential dispatch correctly avoids this by splitting lazily. A better approach would be to pre-compute only the page ranges and let each worker thread call split_pdf_by_page_range with the shared PdfReader (which is already being created), producing bytes only when needed. 5. _parse_single_chunk_impl thread-safety requirement not documentedWhen max_concurrent_chunks > 1, _parse_single_chunk_impl is called concurrently from multiple threads. The abstract method's docstring doesn't mention this. Subclass authors could unknowingly use shared mutable state and introduce race conditions. Add a note: "When max_concurrent_chunks > 1, this method is called concurrently from multiple threads. Implementations must be thread-safe." 6. _reassemble_chunk_results short-circuit skips ID prefixing inconsistentlyif len(chunk_results) == 1 and page_offsets[0] == 0:
return chunk_results[0] # no c0_ prefix appliedWhen this early return fires, annotation IDs are not prefixed, but when a single chunk goes through the normal loop path (offset > 0), they get the c0_ prefix. This is currently only reachable in tests (production single-chunk paths bypass _reassemble_chunk_results entirely), but it is an unexpected inconsistency for future callers of the pure function. Minor / Nits7. Content joining separator may lose paragraph boundaries"content": "\n".join(combined_content_parts),If each chunk's parser returns content with a trailing newline, this produces double newlines at boundaries. If content ends without a trailing newline, two chunks' text is joined with only a single newline - potentially breaking downstream consumers that rely on double-newline paragraph separation. Confirm this matches expectations, or use "\n\n" to be safe. 8. executor not used as a context managerexecutor = ThreadPoolExecutor(max_workers=max_workers)
try:
...
finally:
executor.shutdown(wait=False, cancel_futures=True)Prefer "with ThreadPoolExecutor(...) as executor:" for idiomatic Python. Since you are explicitly controlling shutdown behavior (wait=False), the manual approach works - but a context manager makes the cleanup intent clearer. Test Coverage Gaps
Summary
The cross-chunk parent_id orphaning issue (#2) is the one I would prioritize - silent data corruption risk is real even if it is a documented limitation. Zeroing out orphaned references instead of logging and moving on would make the behavior safe and predictable. |
Address all 10 refinement items cataloged in issue #914: - Remove duplicate PdfReader assignment in test_pdf_splitting.py - Add input validation for max_pages_per_chunk, min_pages_for_chunking, and max_concurrent_chunks to prevent infinite loops - Remove dead single-chunk fast-path in _reassemble_chunk_results that caused ID prefixing inconsistency between single and multi-chunk results - Fix flaky timing assertion in test_concurrent_failure_cancels_remaining by reducing sleep duration instead of relying on wall-clock bounds - Add boundary test for exact min_pages_for_chunking threshold (75 pages) - Replace type: ignore with cast() in _dispatch_concurrent return - Add comment documenting memory trade-off in concurrent chunk dispatch - Downgrade orphaned parent-child log from warning to debug level - Add MAX_CHUNK_RETRY_BACKOFF_SECONDS (30s) cap on exponential backoff - Document cross-chunk parent-child limitation with follow-up suggestion Closes #914 https://claude.ai/code/session_01Y6hT1BWp6TaZFS9tacGNYz
Summary
Introduces a new
BaseChunkedParserabstract base class that automatically handles splitting large PDF documents into smaller page-range chunks, parsing each chunk independently (optionally in parallel), and reassembling the results into a single coherent document export. This enables parsers to transparently handle documents of any size without modifying their core parsing logic.Key Changes
New Components
BaseChunkedParser(opencontractserver/pipeline/base/chunked_parser.py)BaseParserwith automatic chunking support_parse_single_chunk_impl()instead of_parse_document_impl()_post_reassemble_hook()for document-wide post-processing on the full PDFmax_pages_per_chunk,min_pages_for_chunking,max_concurrent_chunks,chunk_retry_limitPDF Splitting Utilities (
opencontractserver/utils/pdf_splitting.py)get_pdf_page_count(): Extract page count from PDF bytessplit_pdf_by_page_range(): Extract a contiguous page range as new PDF bytescalculate_page_chunks(): Compute chunk boundaries based on page count and thresholdsReassembly Logic (
opencontractserver/pipeline/base/chunked_parser.py)_reassemble_chunk_results(): Merge per-chunk results with correct global page offsets_offset_annotation(): Adjust annotation page indices and IDs_offset_relationship(): Adjust relationship annotation referencesModified Components
DoclingParser(opencontractserver/pipeline/parsers/docling_parser_rest.py)BaseChunkedParserinstead ofBaseParser_parse_single_chunk_impl()to send individual chunks to the Docling microservice_post_reassemble_hook()for image extraction on the full PDF after reassemblymax_pages_per_chunk,min_pages_for_chunking,max_concurrent_chunksConstants (
opencontractserver/constants/document_processing.py)DEFAULT_MAX_PAGES_PER_CHUNK,DEFAULT_MIN_PAGES_FOR_CHUNKING,DEFAULT_MAX_CONCURRENT_CHUNKS,DEFAULT_CHUNK_RETRY_LIMITTests
test_chunked_parser.py: Comprehensive tests for reassembly logic, offset helpers, and integration withBaseChunkedParsertest_pdf_splitting.py: Tests for PDF page counting, splitting, and chunk calculationtest_doc_parser_docling_rest.py: Updated mocks to reflect chunked parser architectureImplementation Details
min_pages_for_chunkingthreshold are processed as a single chunk with no splitting overheadThreadPoolExecutorfor parallel chunk processing whenmax_concurrent_chunks > 1Backward Compatibility
The public API (
process_document,parse_document,save_parsed_data) remains unchanged. ExistingBaseParsersubclasses continue to work without modification.https://claude.ai/code/session_016xZEFaSfqGzQuFa2xrCpT4