Skip to content

Commit

Permalink
Logging and perf stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 11, 2024
1 parent ade3580 commit 6154095
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
from io import BytesIO
from PIL import Image
from pypdf import PdfReader
from functools import partial
from dataclasses import dataclass
from typing import Optional
from concurrent.futures import ProcessPoolExecutor

from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
from pdelfin.data.renderpdf import render_pdf_to_base64png
Expand Down Expand Up @@ -52,6 +54,8 @@
process_start_time = time.perf_counter()
last_batch_time = process_start_time

# Process pool for offloading cpu bound work, like calculating anchor texts
process_pool = ProcessPoolExecutor()

@dataclass(frozen=True)
class PageResult:
Expand All @@ -71,9 +75,11 @@ async def build_page_query(local_pdf_path: str, page: int, target_longest_image_
image_base64 = asyncio.to_thread(render_pdf_to_base64png, local_pdf_path, page, target_longest_image_dim=target_longest_image_dim)

# GET ANCHOR TEXT IS NOT THREAD SAFE!! Ahhhh..... don't try to do it
anchor_text = get_anchor_text(local_pdf_path, page, pdf_engine="pdfreport", target_length=target_anchor_text_len)
# and it's also CPU bound, so it needs to run in a process pool
loop = asyncio.get_running_loop()
anchor_text = loop.run_in_executor(process_pool, partial(get_anchor_text, pdf_engine="pdfreport", target_length=target_anchor_text_len), local_pdf_path, page)

image_base64 = await image_base64
image_base64, anchor_text = await asyncio.gather(image_base64, anchor_text)
if image_rotation != 0:
image_bytes = base64.b64decode(image_base64)
with Image.open(BytesIO(image_bytes)) as img:
Expand Down Expand Up @@ -244,15 +250,15 @@ async def process_page(args, session: aiohttp.ClientSession, pdf_s3_path: str, p
total_output_tokens=base_response_data["usage"].get("completion_tokens", 0)
)
except aiohttp.ClientError as e:
logger.warning(f"Client error on attempt {attempt} for page {page_num}: {e}")
logger.warning(f"Client error on attempt {attempt} for {pdf_s3_path}-{page_num}:: {e}")
except json.JSONDecodeError as e:
logger.warning(f"JSON decode error on attempt {attempt} for page {page_num}: {e}")
logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")
except Exception as e:
logger.warning(f"Unexpected error on attempt {attempt} for page {page_num}: {e}")
logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}:: {e}")

if attempt >= MAX_RETRIES:
logger.error(f"Failed to process page {page_num} after {MAX_RETRIES} attempts.")
raise
logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.")
raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts")


async def process_pdf(args, pdf_s3_path: str):
Expand All @@ -268,7 +274,7 @@ async def process_pdf(args, pdf_s3_path: str):
# List to hold the tasks for processing each page
page_tasks = []

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600), connector=aiohttp.TCPConnector(limit=50)) as session:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600), connector=aiohttp.TCPConnector(limit=10)) as session:
for page_num in range(1, num_pages + 1):
# Create a task for each page
task = asyncio.create_task(process_page(args, session, pdf_s3_path, tf.name, page_num))
Expand Down Expand Up @@ -431,7 +437,7 @@ async def main():
parser.add_argument('--workspace_profile', help='S3 configuration profile for accessing the workspace', default=None)
parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None)
parser.add_argument('--group_size', type=int, default=20, help='Number of pdfs that will be part of each work item in the work queue.')
parser.add_argument('--workers', type=int, default=1, help='Number of workers to run at a time')
parser.add_argument('--workers', type=int, default=3, help='Number of workers to run at a time')

parser.add_argument('--model', help='List of paths where you can find the model to convert this pdf. You can specify several different paths here, and the script will try to use the one which is fastest to access',
default=["weka://oe-data-default/jakep/Qwen_Qwen2-VL-7B-Instruct-e4ecf8-01JAH8GMWHTJ376S2N7ETXRXH4/best_bf16/",
Expand Down

0 comments on commit 6154095

Please sign in to comment.