Skip to content

Commit

Permalink
Better stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 19, 2024
1 parent 3ef4609 commit 204a4a8
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,17 +589,16 @@ def print_stats(args):
index_file_s3_path = os.path.join(args.workspace, "work_index_list.csv.zstd")
output_glob = os.path.join(args.workspace, "results", "*.jsonl")

work_queue_lines = download_zstd_csv(workspace_s3, index_file_s3_path)
done_work_items = expand_s3_glob(workspace_s3, output_glob)
work_queue = {
parts[0]: parts[1:]
for line in download_zstd_csv(workspace_s3, index_file_s3_path)
if (parts := line.strip().split(",")) and line.strip()
}

total_items = len([line for line in work_queue_lines if line.strip()])
total_items = len(work_queue)
completed_items = len(done_work_items)

print(f"\nWork Items Status:")
print(f"Total work items: {total_items:,}")
print(f"Completed items: {completed_items:,}")
print(f"Remaining items: {total_items - completed_items:,}")

def process_output_file(s3_path):
try:
data = get_s3_bytes(workspace_s3, s3_path)
Expand Down Expand Up @@ -632,10 +631,10 @@ def process_output_file(s3_path):
original_paths = set()

# First collect all original PDF paths
for line in work_queue_lines:
if line.strip():
paths = line.strip().split(',')
original_paths.update(paths[1:])
for done_work_item in done_work_items:
if match := re.search(r"output_(\w+).jsonl", done_work_item):
done_work_hash = match.group(1)
original_paths.update(work_queue[done_work_hash])

with ThreadPoolExecutor() as executor:
futures = {executor.submit(process_output_file, item): item for item in done_work_items}
Expand All @@ -649,6 +648,11 @@ def process_output_file(s3_path):
all_processed_paths.update(processed_paths)

skipped_paths = original_paths - all_processed_paths

print(f"\nWork Items Status:")
print(f"Total work items: {total_items:,}")
print(f"Completed items: {completed_items:,}")
print(f"Remaining items: {total_items - completed_items:,}")

print(f"\nResults:")
print(f"Total documents processed: {docs_total:,}")
Expand Down

0 comments on commit 204a4a8

Please sign in to comment.