diff --git a/pdelfin/beakerpipeline.py b/pdelfin/beakerpipeline.py index d0d8e18..bf3c562 100644 --- a/pdelfin/beakerpipeline.py +++ b/pdelfin/beakerpipeline.py @@ -19,8 +19,10 @@ from io import BytesIO from PIL import Image from pypdf import PdfReader +from functools import partial from dataclasses import dataclass from typing import Optional +from concurrent.futures import ProcessPoolExecutor from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory from pdelfin.data.renderpdf import render_pdf_to_base64png @@ -52,6 +54,8 @@ process_start_time = time.perf_counter() last_batch_time = process_start_time +# Process pool for offloading cpu bound work, like calculating anchor texts +process_pool = ProcessPoolExecutor() @dataclass(frozen=True) class PageResult: @@ -71,9 +75,11 @@ async def build_page_query(local_pdf_path: str, page: int, target_longest_image_ image_base64 = asyncio.to_thread(render_pdf_to_base64png, local_pdf_path, page, target_longest_image_dim=target_longest_image_dim) # GET ANCHOR TEXT IS NOT THREAD SAFE!! Ahhhh..... don't try to do it - anchor_text = get_anchor_text(local_pdf_path, page, pdf_engine="pdfreport", target_length=target_anchor_text_len) + # and it's also CPU bound, so it needs to run in a process pool + loop = asyncio.get_running_loop() + anchor_text = loop.run_in_executor(process_pool, partial(get_anchor_text, pdf_engine="pdfreport", target_length=target_anchor_text_len), local_pdf_path, page) - image_base64 = await image_base64 + image_base64, anchor_text = await asyncio.gather(image_base64, anchor_text) if image_rotation != 0: image_bytes = base64.b64decode(image_base64) with Image.open(BytesIO(image_bytes)) as img: @@ -244,15 +250,15 @@ async def process_page(args, session: aiohttp.ClientSession, pdf_s3_path: str, p total_output_tokens=base_response_data["usage"].get("completion_tokens", 0) ) except aiohttp.ClientError as e: - logger.warning(f"Client error on attempt {attempt} for page {page_num}: {e}") + logger.warning(f"Client error on attempt {attempt} for {pdf_s3_path}-{page_num}:: {e}") except json.JSONDecodeError as e: - logger.warning(f"JSON decode error on attempt {attempt} for page {page_num}: {e}") + logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}") except Exception as e: - logger.warning(f"Unexpected error on attempt {attempt} for page {page_num}: {e}") + logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}:: {e}") if attempt >= MAX_RETRIES: - logger.error(f"Failed to process page {page_num} after {MAX_RETRIES} attempts.") - raise + logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.") + raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts") async def process_pdf(args, pdf_s3_path: str): @@ -268,7 +274,7 @@ async def process_pdf(args, pdf_s3_path: str): # List to hold the tasks for processing each page page_tasks = [] - async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600), connector=aiohttp.TCPConnector(limit=50)) as session: + async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600), connector=aiohttp.TCPConnector(limit=10)) as session: for page_num in range(1, num_pages + 1): # Create a task for each page task = asyncio.create_task(process_page(args, session, pdf_s3_path, tf.name, page_num)) @@ -431,7 +437,7 @@ async def main(): parser.add_argument('--workspace_profile', help='S3 configuration profile for accessing the workspace', default=None) parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None) parser.add_argument('--group_size', type=int, default=20, help='Number of pdfs that will be part of each work item in the work queue.') - parser.add_argument('--workers', type=int, default=1, help='Number of workers to run at a time') + parser.add_argument('--workers', type=int, default=3, help='Number of workers to run at a time') parser.add_argument('--model', help='List of paths where you can find the model to convert this pdf. You can specify several different paths here, and the script will try to use the one which is fastest to access', default=["weka://oe-data-default/jakep/Qwen_Qwen2-VL-7B-Instruct-e4ecf8-01JAH8GMWHTJ376S2N7ETXRXH4/best_bf16/",