| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592 |
- import argparse
- import json
- import os
- import random
- import tempfile
- from concurrent.futures import ThreadPoolExecutor
- from enum import Enum
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import boto3
- import pydantic
- from openai import OpenAI
- from tqdm import tqdm
- from olmocr.data.renderpdf import render_pdf_to_base64png
- from olmocr.s3_utils import get_s3_bytes, parse_s3_path
- LanguageCode = Enum(
- "LanguageCode",
- {
- "en": "English",
- "zh": "Chinese",
- "hi": "Hindi",
- "es": "Spanish",
- "fr": "French",
- "ar": "Arabic",
- "bn": "Bengali",
- "ru": "Russian",
- "pt": "Portuguese",
- "ur": "Urdu",
- "id": "Indonesian",
- "de": "German",
- "ja": "Japanese",
- "sw": "Swahili",
- "mr": "Marathi",
- "te": "Telugu",
- "tr": "Turkish",
- "vi": "Vietnamese",
- "ta": "Tamil",
- "ko": "Korean",
- "other": "Other",
- },
- )
- class PIIAnnotation(pydantic.BaseModel):
- """Structured model for PII annotations returned by ChatGPT"""
- document_description: str
- language_code: LanguageCode
- cannot_read: bool
- inappropriate_content: bool
- is_public_document: bool
- # PII identifiers
- contains_names: bool
- contains_email_addresses: bool
- contains_phone_numbers: bool
- # PII that must co-occur with identifiers
- contains_addresses: bool
- contains_biographical_info: bool # DOB, gender, etc.
- contains_location_info: bool
- contains_employment_info: bool
- contains_education_info: bool
- contains_medical_info: bool
- # Always sensitive PII
- contains_government_ids: bool
- contains_financial_info: bool
- contains_biometric_data: bool
- contains_login_info: bool
- other_pii: str
- @property
- def has_pii(self) -> bool:
- """Check if the document contains any PII"""
- pii_fields = [
- self.contains_names,
- self.contains_email_addresses,
- self.contains_phone_numbers,
- self.contains_addresses,
- self.contains_biographical_info,
- self.contains_location_info,
- self.contains_employment_info,
- self.contains_education_info,
- self.contains_medical_info,
- self.contains_government_ids,
- self.contains_financial_info,
- self.contains_biometric_data,
- self.contains_login_info,
- ]
- return any(pii_fields) or bool(self.other_pii.strip())
- def get_pii_types(self) -> List[str]:
- """Get a list of all PII types found in the document"""
- pii_types = []
- if self.contains_names:
- pii_types.append("names")
- if self.contains_email_addresses:
- pii_types.append("email")
- if self.contains_phone_numbers:
- pii_types.append("phone")
- if self.contains_addresses:
- pii_types.append("addresses")
- if self.contains_biographical_info:
- pii_types.append("biographical")
- if self.contains_location_info:
- pii_types.append("location")
- if self.contains_employment_info:
- pii_types.append("employment")
- if self.contains_education_info:
- pii_types.append("education")
- if self.contains_medical_info:
- pii_types.append("medical")
- if self.contains_government_ids:
- pii_types.append("government-id")
- if self.contains_financial_info:
- pii_types.append("financial")
- if self.contains_biometric_data:
- pii_types.append("biometric")
- if self.contains_login_info:
- pii_types.append("login-info")
- if self.other_pii.strip():
- pii_types.append("other")
- return pii_types
- def parse_args():
- parser = argparse.ArgumentParser(description="Automatically scan OLMO OCR workspace results using ChatGPT")
- parser.add_argument("workspace", help="OLMO OCR workspace path (s3://bucket/workspace)")
- parser.add_argument("--pages_per_run", type=int, default=30, help="Number of pages per run")
- parser.add_argument("--pdf_profile", help="AWS profile for accessing PDFs")
- parser.add_argument("--output_dir", default="dolma_samples", help="Directory to save output files")
- parser.add_argument("--max_workers", type=int, default=4, help="Maximum number of worker threads")
- parser.add_argument("--openai_api_key", help="OpenAI API key (or set OPENAI_API_KEY env var)")
- parser.add_argument("--openai_model", default="gpt-4.1", help="OpenAI model to use")
- return parser.parse_args()
- def list_result_files(s3_client, workspace_path):
- """List all JSON result files in the workspace results directory."""
- bucket, prefix = parse_s3_path(workspace_path)
- results_prefix = os.path.join(prefix, "results").rstrip("/") + "/"
- all_files = []
- paginator = s3_client.get_paginator("list_objects_v2")
- for page in paginator.paginate(Bucket=bucket, Prefix=results_prefix):
- if "Contents" in page:
- all_files.extend([f"s3://{bucket}/{obj['Key']}" for obj in page["Contents"] if obj["Key"].endswith(".jsonl") or obj["Key"].endswith(".json")])
- # if len(all_files) > 1000:
- # break
- return all_files
- def get_random_pages(s3_client, result_files, count=30):
- """Get random pages from the result files."""
- random_pages = []
- # Try to collect the requested number of pages
- attempts = 0
- max_attempts = count * 3 # Allow extra attempts to handle potential failures
- while len(random_pages) < count and attempts < max_attempts:
- attempts += 1
- # Pick a random result file
- if not result_files:
- print("No result files found!")
- break
- result_file = random.choice(result_files)
- try:
- # Get the content of the file
- content = get_s3_bytes(s3_client, result_file)
- lines = content.decode("utf-8").strip().split("\n")
- if not lines:
- continue
- # Pick a random line (which contains a complete document)
- line = random.choice(lines)
- doc = json.loads(line)
- # A Dolma document has "text", "metadata", and "attributes" fields
- if "text" not in doc or "metadata" not in doc or "attributes" not in doc:
- print(f"Document in {result_file} is not a valid Dolma document")
- continue
- # Get the original PDF path from metadata
- pdf_path = doc["metadata"].get("Source-File")
- if not pdf_path:
- continue
- # Get page spans from attributes
- page_spans = doc["attributes"].get("pdf_page_numbers", [])
- if not page_spans:
- continue
- # Pick a random page span
- page_span = random.choice(page_spans)
- if len(page_span) >= 3:
- # Page spans are [start_pos, end_pos, page_num]
- page_num = page_span[2]
- # Extract text for this page
- start_pos, end_pos = page_span[0], page_span[1]
- page_text = doc["text"][start_pos:end_pos].strip()
- # Include the text snippet with the page info
- random_pages.append((pdf_path, page_num, page_text, result_file))
- if len(random_pages) >= count:
- break
- except Exception as e:
- print(f"Error processing {result_file}: {e}")
- continue
- print(f"Found {len(random_pages)} random pages from Dolma documents")
- return random_pages
- def chatgpt_analyze_page(pdf_path: str, page_num: int, pdf_s3_client, openai_api_key: str, openai_model: str) -> Optional[PIIAnnotation]:
- """Analyze a page using the ChatGPT vision model with structured outputs."""
- try:
- # Download PDF to temp file and render to image
- bucket, key = parse_s3_path(pdf_path)
- with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
- pdf_data = pdf_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
- temp_file.write(pdf_data)
- temp_file_path = temp_file.name
- # Render PDF to base64 image
- base64_image = render_pdf_to_base64png(temp_file_path, page_num, target_longest_image_dim=2048)
- # Clean up temp file
- os.unlink(temp_file_path)
- # Create OpenAI client
- client = OpenAI(api_key=openai_api_key)
- # Prepare the user message with all instructions
- user_message = """
- You are a document analyzer that identifies Personally Identifiable Information (PII) in documents.
- Your task is to analyze the provided document image and determine:
- 1. Whether the document is intended for public release or dissemination (e.g., research paper, public report, etc.)
- 2. If the document contains any PII
- For PII identification, follow these specific guidelines:
- IDENTIFIERS FOR PII:
- The following are considered identifiers that can make information PII:
- - Names (full names, first names, last names, nicknames)
- - Email addresses
- - Phone numbers
- PII THAT MUST CO-OCCUR WITH AN IDENTIFIER:
- The following types of information should ONLY be marked as PII if they occur ALONGSIDE an identifier (commonly, a person's name):
- - Addresses (street address, postal code, etc.)
- - Biographical Information (date of birth, place of birth, gender, sexual orientation, race, ethnicity, citizenship/immigration status, religion)
- - Location Information (geolocations, specific coordinates)
- - Employment Information (job titles, workplace names, employment history)
- - Education Information (school names, degrees, transcripts)
- - Medical Information (health records, diagnoses, genetic or neural data)
- PII THAT OCCURS EVEN WITHOUT AN IDENTIFIER:
- The following should ALWAYS be marked as PII even if they do not occur alongside an identifier:
- - Government IDs (Social Security Numbers, passport numbers, driver's license numbers, tax IDs)
- - Financial Information (credit card numbers, bank account/routing numbers)
- - Biometric Data (fingerprints, retina scans, facial recognition data, voice signatures)
- - Login information (ONLY mark as PII when a username, password, and login location are present together)
- If the document is a form, then only consider fields which are filled out with specific values as potential PII.
- If this page does not itself contain PII, but references documents (such as curriculum vitae, personal statements) that typically contain PII, then do not mark it as PII.
- Only consider actual occurrences of the PII within the document shown.
- """
- # Use the chat completions API with the custom schema
- completion = client.beta.chat.completions.parse(
- model=openai_model,
- messages=[
- {
- "role": "user",
- "content": [{"type": "text", "text": user_message}, {"type": "image_url", "image_url": {"url": f"data:image/webp;base64,{base64_image}"}}],
- }
- ],
- response_format=PIIAnnotation,
- max_tokens=1000,
- )
- return completion.choices[0].message.parsed
- except Exception as e:
- print(f"Error analyzing page {pdf_path} (page {page_num}): {e}")
- return None
- def create_presigned_url(s3_client, pdf_path, expiration=3600 * 24 * 7):
- """Create a presigned URL for the given S3 path."""
- try:
- bucket, key = parse_s3_path(pdf_path)
- url = s3_client.generate_presigned_url("get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expiration)
- return url
- except Exception as e:
- print(f"Error creating presigned URL for {pdf_path}: {e}")
- return None
- def process_pages(random_pages, pdf_s3_client, openai_api_key, openai_model, max_workers):
- """Process multiple pages in parallel using ThreadPoolExecutor."""
- results = []
- # First generate presigned URLs for all PDFs
- print("Generating presigned URLs for PDFs...")
- presigned_urls = {}
- for pdf_path, page_num, _, _ in random_pages:
- if pdf_path not in presigned_urls and pdf_path.startswith("s3://"):
- url = create_presigned_url(pdf_s3_client, pdf_path)
- if url:
- presigned_urls[pdf_path] = url
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = {}
- # Submit all tasks
- for pdf_path, page_num, page_text, result_file in tqdm(random_pages, desc="Submitting pages for analysis"):
- future = executor.submit(chatgpt_analyze_page, pdf_path, page_num, pdf_s3_client, openai_api_key, openai_model)
- futures[future] = (pdf_path, page_num, page_text, result_file)
- # Process results as they complete
- for future in tqdm(futures, desc="Processing results"):
- pdf_path, page_num, page_text, result_file = futures[future]
- try:
- annotation = future.result()
- if annotation:
- # Get presigned URL with page number
- presigned_url = None
- if pdf_path in presigned_urls:
- presigned_url = f"{presigned_urls[pdf_path]}#page={page_num}"
- results.append((pdf_path, page_num, page_text, result_file, annotation, presigned_url))
- else:
- print(f"Failed to get annotation for {pdf_path} (page {page_num})")
- except Exception as e:
- print(f"Error processing {pdf_path} (page {page_num}): {e}")
- return results
- def categorize_results(all_results):
- """Categorize results for reporting."""
- categories = {
- "public_document": [],
- "private_document": [],
- "cannot_read": [],
- "report_content": [],
- "no_annotation": [],
- }
- for pdf_path, page_num, page_text, result_file, annotation, presigned_url in all_results:
- if annotation.cannot_read or annotation.language_code != LanguageCode.en:
- categories["cannot_read"].append({"pdf_path": pdf_path, "pdf_page": page_num, "result_file": result_file, "presigned_url": presigned_url})
- elif annotation.inappropriate_content:
- categories["report_content"].append({"pdf_path": pdf_path, "pdf_page": page_num, "result_file": result_file, "presigned_url": presigned_url})
- elif annotation.is_public_document:
- categories["public_document"].append(
- {
- "pdf_path": pdf_path,
- "pdf_page": page_num,
- "result_file": result_file,
- "pii_types": annotation.get_pii_types(),
- "has_pii": annotation.has_pii,
- "description": annotation.other_pii,
- "presigned_url": presigned_url,
- }
- )
- else:
- # Private document
- categories["private_document"].append(
- {
- "pdf_path": pdf_path,
- "pdf_page": page_num,
- "result_file": result_file,
- "pii_types": annotation.get_pii_types(),
- "has_pii": annotation.has_pii,
- "description": annotation.other_pii,
- "presigned_url": presigned_url,
- }
- )
- return categories
- def print_annotation_report(annotation_results: Dict[str, List[Dict[str, Any]]]):
- """Print a summary report of annotations."""
- total_pages = sum(len(items) for items in annotation_results.values())
- print("\n" + "=" * 80)
- print(f"ANNOTATION REPORT - Total Pages: {total_pages}")
- print("=" * 80)
- # Count pages with PII in public documents
- public_with_pii = [page for page in annotation_results["public_document"] if page.get("has_pii", False)]
- public_without_pii = [page for page in annotation_results["public_document"] if not page.get("has_pii", False)]
- # Count pages with PII in private documents
- private_with_pii = [page for page in annotation_results["private_document"] if page.get("has_pii", False)]
- private_without_pii = [page for page in annotation_results["private_document"] if not page.get("has_pii", False)]
- # Print summary statistics
- print("\nSummary:")
- print(
- f" Public documents (total): {len(annotation_results['public_document'])} ({len(annotation_results['public_document'])/total_pages*100:.1f}% of all pages)"
- )
- print(f" - With PII: {len(public_with_pii)} ({len(public_with_pii)/max(1, len(annotation_results['public_document']))*100:.1f}% of public docs)")
- print(
- f" - Without PII: {len(public_without_pii)} ({len(public_without_pii)/max(1, len(annotation_results['public_document']))*100:.1f}% of public docs)"
- )
- print(
- f" Private documents (total): {len(annotation_results['private_document'])} ({len(annotation_results['private_document'])/total_pages*100:.1f}% of all pages)"
- )
- print(f" - With PII: {len(private_with_pii)} ({len(private_with_pii)/max(1, len(annotation_results['private_document']))*100:.1f}% of private docs)")
- print(
- f" - Without PII: {len(private_without_pii)} ({len(private_without_pii)/max(1, len(annotation_results['private_document']))*100:.1f}% of private docs)"
- )
- print(f" Unreadable pages: {len(annotation_results['cannot_read'])} ({len(annotation_results['cannot_read'])/total_pages*100:.1f}%)")
- print(f" Pages with reported content: {len(annotation_results['report_content'])} ({len(annotation_results['report_content'])/total_pages*100:.1f}%)")
- print(f" Pages without annotation: {len(annotation_results['no_annotation'])} ({len(annotation_results['no_annotation'])/total_pages*100:.1f}%)")
- # Analyze PII types in private documents
- if private_with_pii:
- # Categorize the PII types for clearer reporting
- pii_categories = {
- "Identifiers": ["names", "email", "phone"],
- "PII requiring identifiers": ["addresses", "biographical", "location", "employment", "education", "medical"],
- "Always sensitive PII": ["government-id", "financial", "biometric", "login-info"],
- }
- # Dictionary to track all PII counts
- pii_counts_private = {}
- for page in private_with_pii:
- for pii_type in page.get("pii_types", []):
- pii_counts_private[pii_type] = pii_counts_private.get(pii_type, 0) + 1
- # Print categorized PII counts
- print("\nPII Types in Private Documents:")
- # Print each category
- for category, pii_types in pii_categories.items():
- print(f"\n {category}:")
- for pii_type in pii_types:
- count = pii_counts_private.get(pii_type, 0)
- if count > 0:
- print(f" - {pii_type}: {count} ({count/len(private_with_pii)*100:.1f}%)")
- # Print any other PII types not in our categories (like "other")
- other_pii = [pii_type for pii_type in pii_counts_private.keys() if not any(pii_type in types for types in pii_categories.values())]
- if other_pii:
- print("\n Other PII types:")
- for pii_type in other_pii:
- count = pii_counts_private.get(pii_type, 0)
- print(f" - {pii_type}: {count} ({count/len(private_with_pii)*100:.1f}%)")
- # Print detailed report for private documents with PII
- if private_with_pii:
- print("\nDetailed Report - Private Documents with PII:")
- print("-" * 80)
- for i, item in enumerate(private_with_pii, 1):
- pdf_path = item["pdf_path"]
- pdf_page = item["pdf_page"]
- presigned_url = item.get("presigned_url")
- print(f"{i}. PDF: {pdf_path}")
- print(f" Page: {pdf_page}")
- if presigned_url:
- print(f" Presigned URL: {presigned_url}")
- print(f" PII Types: {', '.join(item['pii_types'])}")
- if item.get("description"):
- print(f" Description: {item['description']}")
- print("-" * 80)
- # Print links to unreadable pages
- # if annotation_results["cannot_read"]:
- # print("\nUnreadable Pages:")
- # print("-" * 80)
- # for i, item in enumerate(annotation_results["cannot_read"], 1):
- # pdf_path = item["pdf_path"]
- # pdf_page = item["pdf_page"]
- # presigned_url = item.get("presigned_url")
- # print(f"{i}. PDF: {pdf_path}")
- # print(f" Page: {pdf_page}")
- # if presigned_url:
- # print(f" Presigned URL: {presigned_url}")
- # print("-" * 80)
- # Print links to inappropriate content
- if annotation_results["report_content"]:
- print("\nReported Content:")
- print("-" * 80)
- for i, item in enumerate(annotation_results["report_content"], 1):
- pdf_path = item["pdf_path"]
- pdf_page = item["pdf_page"]
- presigned_url = item.get("presigned_url")
- print(f"{i}. PDF: {pdf_path}")
- print(f" Page: {pdf_page}")
- if presigned_url:
- print(f" Presigned URL: {presigned_url}")
- print("-" * 80)
- print("\nReport complete.")
- def save_results(results, output_dir):
- """Save the results to a JSON file."""
- output_path = Path(output_dir) / "autoscan_results.json"
- # Convert results to serializable format
- serializable_results = []
- for pdf_path, page_num, page_text, result_file, annotation, presigned_url in results:
- serializable_results.append(
- {
- "pdf_path": pdf_path,
- "page_num": page_num,
- "page_text": page_text,
- "result_file": result_file,
- "annotation": annotation.dict(),
- "presigned_url": presigned_url,
- }
- )
- with open(output_path, "w") as f:
- json.dump(serializable_results, f, indent=2, default=lambda o: o.value if isinstance(o, Enum) else o)
- print(f"Results saved to {output_path}")
- def main():
- args = parse_args()
- # Get OpenAI API key from args or environment
- openai_api_key = args.openai_api_key or os.environ.get("OPENAI_API_KEY")
- if not openai_api_key:
- raise ValueError("OpenAI API key must be provided via --openai_api_key or OPENAI_API_KEY environment variable")
- # Set up S3 clients
- s3_client = boto3.client("s3")
- # Set up PDF S3 client with profile if specified
- if args.pdf_profile:
- pdf_session = boto3.Session(profile_name=args.pdf_profile)
- pdf_s3_client = pdf_session.client("s3")
- else:
- pdf_s3_client = s3_client
- # Create output directory
- output_dir = Path(args.output_dir)
- output_dir.mkdir(exist_ok=True, parents=True)
- # List all result files
- print(f"Listing result files in {args.workspace}/results...")
- result_files = list_result_files(s3_client, args.workspace)
- print(f"Found {len(result_files)} result files")
- # Get random pages
- random_pages = get_random_pages(s3_client, result_files, args.pages_per_run)
- # Process pages with ChatGPT
- print(f"Processing {len(random_pages)} pages with ChatGPT...")
- all_results = process_pages(random_pages, pdf_s3_client, openai_api_key, args.openai_model, args.max_workers)
- # Save results
- save_results(all_results, args.output_dir)
- # Categorize and report results
- categorized_results = categorize_results(all_results)
- print_annotation_report(categorized_results)
- if __name__ == "__main__":
- main()
|