diff --git a/pdelfin/birrpipeline.py b/pdelfin/birrpipeline.py index 96d6659..d5c965e 100644 --- a/pdelfin/birrpipeline.py +++ b/pdelfin/birrpipeline.py @@ -25,7 +25,7 @@ from pdelfin.data.renderpdf import render_pdf_to_base64png from pdelfin.prompts import build_finetuning_prompt from pdelfin.prompts.anchor import get_anchor_text -from pdelfin.s3_utils import parse_custom_id, expand_s3_glob, get_s3_bytes, put_s3_bytes +from pdelfin.s3_utils import parse_custom_id, expand_s3_glob, get_s3_bytes, parse_s3_path # Global s3 client for the whole script, feel free to adjust params if you need it @@ -310,10 +310,7 @@ def _write_batch_to_file(self, temp_file_path: str, batch_objects: List[Any]): output_path = self._get_output_path(hash_str) if self.is_s3: - # Use s3 upload_file - parsed = urlparse(output_path) - bucket = parsed.netloc - key = parsed.path.lstrip("/") + bucket, key = parse_s3_path(output_path) # Use the s3 client directly try: @@ -328,7 +325,6 @@ def _write_batch_to_file(self, temp_file_path: str, batch_objects: List[Any]): if self.after_flush: self.after_flush(batch_objects) - # Delete the temporary file os.remove(temp_file_path) def _compute_hash(self, temp_file_path: str) -> str: @@ -687,12 +683,7 @@ def get_current_round(s3_workspace: str) -> int: for _ in range(min(max_pending, total_pdfs)): pdf = next(pdf_iter) future = executor.submit( - build_pdf_queries, - args.workspace, - pdf, - current_round, - args.target_longest_image_dim, - args.target_anchor_text_len, + build_pdf_queries, args.workspace, pdf, current_round, args.target_longest_image_dim,args.target_anchor_text_len, ) pending_futures[future] = pdf @@ -721,15 +712,10 @@ def get_current_round(s3_workspace: str) -> int: # Submit a new future if there are more PDFs try: pdf = next(pdf_iter) - new_future = executor.submit( - build_pdf_queries, - args.workspace, - pdf, - current_round, - args.target_longest_image_dim, - args.target_anchor_text_len, + future = executor.submit( + build_pdf_queries, args.workspace, pdf, current_round, args.target_longest_image_dim,args.target_anchor_text_len, ) - pending_futures[new_future] = pdf + pending_futures[future] = pdf except StopIteration: pass # No more PDFs to process