Skip to content

Commit

Permalink
Adding support for fallback pages
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 19, 2024
1 parent 204a4a8 commit b0acfa8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
25 changes: 23 additions & 2 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class PageResult:

input_tokens: int
output_tokens: int
is_fallback: bool


async def build_page_query(local_pdf_path: str, page: int, target_longest_image_dim: int, target_anchor_text_len: int, image_rotation: int=0) -> dict:
Expand Down Expand Up @@ -176,7 +177,8 @@ async def process_page(args, session: httpx.AsyncClient, worker_id: int, pdf_s3_
page_num,
page_response,
input_tokens=base_response_data["usage"].get("prompt_tokens", 0),
output_tokens=base_response_data["usage"].get("completion_tokens", 0)
output_tokens=base_response_data["usage"].get("completion_tokens", 0),
is_fallback=False,
)
except (httpx.TimeoutException, asyncio.TimeoutError) as e:
logger.warning(f"Client error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")
Expand Down Expand Up @@ -204,7 +206,16 @@ async def process_page(args, session: httpx.AsyncClient, worker_id: int, pdf_s3_

logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.")
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "errored")
raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts")

return PageResult(
pdf_s3_path,
page_num,
PageResponse(natural_text=get_anchor_text(pdf_local_path, page_num, pdf_engine="pdftotext"),
is_rotation_valid=True, rotation_correction=0, is_table=False, is_diagram=False),
input_tokens=0,
output_tokens=0,
is_fallback=True
)


async def process_pdf(args, session: httpx.AsyncClient, worker_id: int, pdf_s3_path: str):
Expand Down Expand Up @@ -235,6 +246,15 @@ async def process_pdf(args, session: httpx.AsyncClient, worker_id: int, pdf_s3_p

# Collect the results from the entire task group, assuming no exceptions
page_results = [task.result() for task in page_tasks]

num_fallback_pages = sum(page_result.is_fallback for page_result in page_results)

if num_fallback_pages / num_pages > args.max_page_error_rate:
logger.error(f"Document {pdf_s3_path} has {num_fallback_pages} fallback pages out of {num_pages} exceeding max_page_error_rate of {args.max_page_error_rate}, discarding document.")
return None
elif num_fallback_pages > 0:
logger.warning(f"Document {pdf_s3_path} processed with {num_fallback_pages} fallback pages out of {num_pages}, proceeding to build Dolma document.")

return build_dolma_document(pdf_s3_path, page_results)
except Exception as e:
logger.exception(f"Exception in process_pdf for {pdf_s3_path}: {e}")
Expand Down Expand Up @@ -674,6 +694,7 @@ async def main():
parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None)
parser.add_argument('--pages_per_group', type=int, default=500, help='Aiming for this many pdf pages per work item group')
parser.add_argument('--max_page_retries', type=int, default=8, help='Max number of times we will retry rendering a page')
parser.add_argument('--max_page_error_rate', type=float, default=0.004, help='Rate of allowable failed pages in a document, 1/250 by default')
parser.add_argument('--workers', type=int, default=8, help='Number of workers to run at a time')
parser.add_argument('--stats', action='store_true', help='Instead of running any job, reports some statistics about the current workspace')

Expand Down
2 changes: 1 addition & 1 deletion pdelfin/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
_MINOR = "1"
# On main and in a nightly release the patch should be one ahead of the last
# released build.
_PATCH = "34"
_PATCH = "35"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = ""
Expand Down

0 comments on commit b0acfa8

Please sign in to comment.