|
| 1 | +from fastapi import FastAPI, UploadFile, File |
| 2 | +from fastapi.responses import ORJSONResponse |
| 3 | +import time |
| 4 | +import io |
| 5 | +from pypdf import PdfReader |
| 6 | +from loguru import logger |
| 7 | + |
| 8 | +app = FastAPI() |
| 9 | + |
| 10 | +@app.post('/validate-pdf', response_class=ORJSONResponse, status_code=201) |
| 11 | +async def check_pdf( |
| 12 | + file: UploadFile = File(...), |
| 13 | + include_text: bool = False, |
| 14 | + check_text: bool = False, |
| 15 | + include_page_errors: bool = False |
| 16 | +): |
| 17 | + response = dict() |
| 18 | + t0 = time.time() |
| 19 | + |
| 20 | + response["file_name"] = file.filename |
| 21 | + response["content_type"] = file.content_type |
| 22 | + response["file_size"] = file.size |
| 23 | + filters = { |
| 24 | + "include_text": include_text, |
| 25 | + "check_text": check_text |
| 26 | + } |
| 27 | + |
| 28 | + if file.content_type != "application/pdf": |
| 29 | + message = f"File is not a PDF, but type {file.content_type}" |
| 30 | + logger.error(message) |
| 31 | + response["message"] = message |
| 32 | + return ORJSONResponse(content=response, status_code=400) |
| 33 | + |
| 34 | + pdf_content = await file.read() |
| 35 | + reader = PdfReader(io.BytesIO(pdf_content)) |
| 36 | + |
| 37 | + if len(reader.pages) == 0: |
| 38 | + message = "The PDF is empty" |
| 39 | + logger.error(message) |
| 40 | + response["message"] = message |
| 41 | + return ORJSONResponse(content=response, status_code=400) |
| 42 | + |
| 43 | + response["page_count"] = len(reader.pages) |
| 44 | + |
| 45 | + meta = reader.metadata |
| 46 | + if meta is None: |
| 47 | + message = "The PDF does not contain meta data" |
| 48 | + logger.error(message) |
| 49 | + response["message"] = message |
| 50 | + return ORJSONResponse(content=response, status_code=400) |
| 51 | + |
| 52 | + cleaned_meta = {k: str(v).replace("\x00", "") for k, v in meta.items()} |
| 53 | + response["meta"] = cleaned_meta |
| 54 | + |
| 55 | + text = "" |
| 56 | + if check_text: |
| 57 | + results = get_pdf_content(pdf_content=pdf_content) |
| 58 | + text = results["text"] |
| 59 | + if not text.strip(): |
| 60 | + message = "The PDF does not contain readable text" |
| 61 | + logger.error(message) |
| 62 | + response["message"] = message |
| 63 | + return ORJSONResponse(content=response, status_code=400) |
| 64 | + |
| 65 | + common_words = ["the", "and", "is"] |
| 66 | + words_found = [word for word in common_words if word in text] |
| 67 | + if len(words_found) == 0: |
| 68 | + message = "The PDF does not contain readable text, like the word 'the'" |
| 69 | + logger.error(message) |
| 70 | + response["message"] = message |
| 71 | + return ORJSONResponse(content=response, status_code=400) |
| 72 | + |
| 73 | + response["characters"] = len(text) |
| 74 | + response["words_found"] = words_found |
| 75 | + if include_page_errors: |
| 76 | + response["errors"] = results["errors"] |
| 77 | + |
| 78 | + if reader.is_encrypted: |
| 79 | + message = "The PDF is encrypted and not allowed" |
| 80 | + logger.error(message) |
| 81 | + response["message"] = message |
| 82 | + return ORJSONResponse(content=response, status_code=400) |
| 83 | + |
| 84 | + embedded_fonts = [] |
| 85 | + for page in tqdm(reader.pages, desc="Finding Fonts"): |
| 86 | + fonts = page.get_fonts() |
| 87 | + for font in fonts: |
| 88 | + font_name = font.get("BaseFont", "").replace("/", "").replace("+", "") |
| 89 | + if font_name not in embedded_fonts: |
| 90 | + embedded_fonts.append(font_name) |
| 91 | + |
| 92 | + if not embedded_fonts: |
| 93 | + message = "The PDF does not have embedded fonts" |
| 94 | + logger.error(message) |
| 95 | + response["message"] = message |
| 96 | + return ORJSONResponse(content=response, status_code=400) |
| 97 | + |
| 98 | + response["fonts"] = embedded_fonts |
| 99 | + form_fields = any("/AcroForm" in reader.trailer for _ in reader.pages) |
| 100 | + if form_fields: |
| 101 | + message = "The PDF contains form fields" |
| 102 | + logger.error(message) |
| 103 | + response["message"] = message |
| 104 | + return ORJSONResponse(content=response, status_code=400) |
| 105 | + |
| 106 | + if include_text: |
| 107 | + response["text"] = text |
| 108 | + |
| 109 | + t1 = time.time() |
| 110 | + logger.debug(f"PDF check response: {response}") |
| 111 | + response["processing_time_seconds"] = f"{t1 - t0:.2f}" |
| 112 | + return ORJSONResponse(content=response, status_code=201) |
| 113 | + |
| 114 | + |
| 115 | +# Function to extract data from a PDF file |
| 116 | + |
| 117 | + |
| 118 | +# coding: utf-8 |
| 119 | +import io |
| 120 | +import re |
| 121 | +from functools import lru_cache |
| 122 | + |
| 123 | +from loguru import logger # Import the Loguru logger |
| 124 | +from pypdf import PdfReader, PaperSize |
| 125 | +from tqdm import tqdm |
| 126 | +from unsync import unsync |
| 127 | + |
| 128 | +@unsync |
| 129 | +def extract_pdf_text(pdf_content, page_number: int): |
| 130 | + try: |
| 131 | + reader = get_reader(pdf_content) |
| 132 | + page = reader.pages[page_number].extract_text(extraction_mode="layout", layout_mode_strip_rotated=True) |
| 133 | + text = reader.pages[page_number].extract_text() |
| 134 | + box = reader.pages[page_number].mediabox |
| 135 | + |
| 136 | + print(f"left {box.left}") |
| 137 | + print(f"right {box.right}") |
| 138 | + print(f"lower left {box.lower_left}") |
| 139 | + print(f"lower right {box.lower_right}") |
| 140 | + print(f"upper left {box.upper_left}") |
| 141 | + print(f"upper right {box.upper_right}") |
| 142 | + print(f"top {box.top}") |
| 143 | + print(f"bottom {box.bottom}") |
| 144 | + |
| 145 | + return {"text": text, "page_num": page_number, "margin": box, "error": None} |
| 146 | + except Exception as ex: |
| 147 | + logger.error(ex) |
| 148 | + return {"text": "", "page_num": page_number, "margin": None, "error": ex} |
| 149 | + |
| 150 | +@lru_cache(maxsize=300, typed=False) |
| 151 | +def get_reader(pdf_content): |
| 152 | + reader = PdfReader(io.BytesIO(pdf_content)) |
| 153 | + return reader |
| 154 | + |
| 155 | +def is_valid_ssn(ssn): |
| 156 | + ssn_regex = re.compile(r"^(?!000|666)[0-8]\d{2}-(?!00)\d{2}-(?!0000)\d{4}$") |
| 157 | + return bool(ssn_regex.match(ssn)) |
| 158 | + |
| 159 | +def get_pdf_content(pdf_content): |
| 160 | + reader = PdfReader(io.BytesIO(pdf_content)) |
| 161 | + |
| 162 | + tasks = [ |
| 163 | + extract_pdf_text(pdf_content=pdf_content, page_number=page_number) |
| 164 | + for page_number in tqdm(range(len(reader.pages)), desc="PDF Text Processing") |
| 165 | + ] |
| 166 | + |
| 167 | + results = [task.result() for task in tqdm(tasks, desc="PDF Text Results")] |
| 168 | + |
| 169 | + results.sort(key=lambda x: x["page_num"]) |
| 170 | + combined_text = "\n".join([result["text"] for result in results]) |
| 171 | + has_ssn = is_valid_ssn(combined_text) |
| 172 | + margins = [result["margin"] for result in results] |
| 173 | + error_list = [result for result in results if result["error"] is not None] |
| 174 | + |
| 175 | + for result in results: |
| 176 | + if result["error"] is not None: |
| 177 | + error_list.append(f"Error on page {result['page_num']} of {result['error']}") |
| 178 | + |
| 179 | + return {"text": combined_text, "margins": margins, "errors": error_list, "PII": has_ssn} |
0 commit comments