diff --git a/pypdf/_writer.py b/pypdf/_writer.py index 069a36deb..09f17591d 100644 --- a/pypdf/_writer.py +++ b/pypdf/_writer.py @@ -103,11 +103,13 @@ NumberObject, PdfObject, RectangleObject, + ReferenceLink, StreamObject, TextStringObject, TreeObject, ViewerPreferences, create_string_object, + extract_links, hex_to_rgb, is_null_or_none, ) @@ -209,6 +211,11 @@ def __init__( """The PDF file identifier, defined by the ID in the PDF file's trailer dictionary.""" + self._unresolved_links: list[tuple[ReferenceLink, ReferenceLink]] = [] + "Tracks links in pages added to the writer for resolving later." + self._merged_in_pages: Dict[Optional[IndirectObject], Optional[IndirectObject]] = {} + "Tracks pages added to the writer and what page they turned into." + if self.incremental: if isinstance(fileobj, (str, Path)): with open(fileobj, "rb") as f: @@ -482,12 +489,14 @@ def _add_page( ] except Exception: pass + page = cast( "PageObject", page_org.clone(self, False, excluded_keys).get_object() ) if page_org.pdf is not None: other = page_org.pdf.pdf_header self.pdf_header = _get_max_pdf_version_header(self.pdf_header, other) + node, idx = self._get_page_in_node(index) page[NameObject(PA.PARENT)] = node.indirect_reference @@ -505,6 +514,15 @@ def _add_page( recurse += 1 if recurse > 1000: raise PyPdfError("Too many recursive calls!") + + if page_org.pdf is not None: + # the page may contain links to other pages, and those other + # pages may or may not already be added. we store the + # information we need, so that we can resolve the references + # later. + self._unresolved_links.extend(extract_links(page, page_org)) + self._merged_in_pages[page_org.indirect_reference] = page.indirect_reference + return page def set_need_appearances_writer(self, state: bool = True) -> None: @@ -1350,6 +1368,19 @@ def encrypt( self._add_object(entry) self._encrypt_entry = entry + def _resolve_links(self) -> None: + """Patch up links that were added to the document earlier, to + make sure they still point to the same pages. + """ + for (new_link, old_link) in self._unresolved_links: + old_page = old_link.find_referenced_page() + if not old_page: + continue + new_page = self._merged_in_pages.get(old_page) + if new_page is None: + continue + new_link.patch_reference(self, new_page) + def write_stream(self, stream: StreamType) -> None: if hasattr(stream, "mode") and "b" not in stream.mode: logger_warning( @@ -1361,6 +1392,7 @@ def write_stream(self, stream: StreamType) -> None: # if not self._root: # self._root = self._add_object(self._root_object) # self._sweep_indirect_references(self._root) + self._resolve_links() if self.incremental: self._reader.stream.seek(0) diff --git a/pypdf/generic/__init__.py b/pypdf/generic/__init__.py index dc4545993..4df7852db 100644 --- a/pypdf/generic/__init__.py +++ b/pypdf/generic/__init__.py @@ -62,6 +62,7 @@ ) from ._files import EmbeddedFile from ._fit import Fit +from ._link import DirectReferenceLink, NamedReferenceLink, ReferenceLink, extract_links from ._outline import OutlineItem from ._rectangle import RectangleObject from ._utils import ( @@ -208,6 +209,7 @@ def link( "DecodedStreamObject", "Destination", "DictionaryObject", + "DirectReferenceLink", "EmbeddedFile", "EncodedStreamObject", "Field", @@ -215,12 +217,14 @@ def link( "FloatObject", "IndirectObject", "NameObject", + "NamedReferenceLink", "NullObject", "NumberObject", "OutlineFontFlag", "OutlineItem", "PdfObject", "RectangleObject", + "ReferenceLink", "StreamObject", "TextStringObject", "TreeObject", @@ -229,6 +233,7 @@ def link( "create_string_object", "decode_pdfdocencoding", "encode_pdfdocencoding", + "extract_links", "hex_to_rgb", "is_null_or_none", "read_hex_string_from_stream", diff --git a/pypdf/generic/_data_structures.py b/pypdf/generic/_data_structures.py index 6d5abfaba..c67b6c98b 100644 --- a/pypdf/generic/_data_structures.py +++ b/pypdf/generic/_data_structures.py @@ -1707,8 +1707,8 @@ def title(self) -> Optional[str]: return self.get("/Title") @property - def page(self) -> Optional[int]: - """Read-only property accessing the destination page number.""" + def page(self) -> Optional[IndirectObject]: + """Read-only property accessing the IndirectObject of the destination page.""" return self.get("/Page") @property diff --git a/pypdf/generic/_link.py b/pypdf/generic/_link.py new file mode 100644 index 000000000..1750e2b46 --- /dev/null +++ b/pypdf/generic/_link.py @@ -0,0 +1,116 @@ +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * The name of the author may not be used to endorse or promote products +# derived from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + + +# This module contains code used by _writer.py to track links in pages +# being added to the writer until the links can be resolved. + +from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast + +from . import ArrayObject, DictionaryObject, IndirectObject, PdfObject, TextStringObject + +if TYPE_CHECKING: + from .._page import PageObject + from .._reader import PdfReader + from .._writer import PdfWriter + + +class NamedReferenceLink: + """Named reference link being preserved until we can resolve it correctly.""" + + def __init__(self, reference: TextStringObject, source_pdf: "PdfReader") -> None: + """reference: TextStringObject with named reference""" + self._reference = reference + self._source_pdf = source_pdf + + def find_referenced_page(self) -> Union[IndirectObject, None]: + destination = self._source_pdf.named_destinations.get(str(self._reference)) + return destination.page if destinatino else None + + def patch_reference(self, target_pdf: "PdfWriter", new_page: IndirectObject) -> None: + """target_pdf: PdfWriter which the new link went into""" + # point named destination in new PDF to the new page + if str(self._reference) not in target_pdf.named_destinations: + target_pdf.add_named_destination(str(self._reference), new_page.page_number) + + +class DirectReferenceLink: + """Direct reference link being preserved until we can resolve it correctly.""" + + def __init__(self, reference: ArrayObject) -> None: + """reference: an ArrayObject whose first element is the Page indirect object""" + self._reference = reference + + def find_referenced_page(self) -> IndirectObject: + return self._reference[0] + + def patch_reference(self, target_pdf: "PdfWriter", new_page: IndirectObject) -> None: + """target_pdf: PdfWriter which the new link went into""" + self._reference[0] = new_page + + +ReferenceLink = Union[NamedReferenceLink, DirectReferenceLink] + + +def extract_links(new_page: "PageObject", old_page: "PageObject") -> List[Tuple[ReferenceLink, ReferenceLink]]: + """Extracts links from two pages on the assumption that the two pages are + the same. Produces one list of (new link, old link) tuples. + """ + new_links = [_build_link(link, new_page) for link in new_page.get("/Annots", [])] + old_links = [_build_link(link, old_page) for link in old_page.get("/Annots", [])] + + return [ + (new_link, old_link) for (new_link, old_link) + in zip(new_links, old_links) + if new_link and old_link + ] + + +def _build_link(indirect_object: IndirectObject, page: "PageObject") -> Optional[ReferenceLink]: + src = cast("PdfReader", page.pdf) + link = cast(DictionaryObject, indirect_object.get_object()) + if link.get("/Subtype") != "/Link": + return None + + if "/A" in link: + action = cast(DictionaryObject, link["/A"]) + if action.get("/S") != "/GoTo": + return None + + return _create_link(action["/D"], src) + + if "/Dest" in link: + return _create_link(link["/Dest"], src) + + return None # Nothing to do here + + +def _create_link(reference: PdfObject, source_pdf: "PdfReader")-> Optional[ReferenceLink]: + if isinstance(reference, TextStringObject): + return NamedReferenceLink(reference, source_pdf) + if isinstance(reference, ArrayObject): + return DirectReferenceLink(reference) + return None diff --git a/tests/example_files.yaml b/tests/example_files.yaml index 4ea82a0d5..249c6a265 100644 --- a/tests/example_files.yaml +++ b/tests/example_files.yaml @@ -110,3 +110,7 @@ url: https://github.com/py-pdf/pypdf/files/12483807/AEO.1172.pdf - local_filename: iss3268.pdf url: https://github.com/user-attachments/files/20060394/broken.pdf +- local_filename: direct-link.pdf + url: https://github.com/user-attachments/files/20348304/tst.pdf +- local_filename: named-reference.pdf + url: https://github.com/user-attachments/files/20455804/MinimalJob.pdf diff --git a/tests/test_merger.py b/tests/test_merger.py index e5680d647..2683ba5e5 100644 --- a/tests/test_merger.py +++ b/tests/test_merger.py @@ -409,3 +409,107 @@ def test_deprecate_pdfmerger(): def test_get_reference(): writer = PdfWriter(RESOURCE_ROOT / "crazyones.pdf") assert writer.get_reference(writer.pages[0]) == writer.pages[0].indirect_reference + + +@pytest.mark.enable_socket +def test_direct_link_preserved(pdf_file_path): + # this could be any PDF -- we don't care which + reader = PdfReader(BytesIO(get_data_from_url(name="iss3268.pdf"))) + writer = PdfWriter(clone_from = reader) + + # this PDF has a direct link from p1 to p2 + merger = PdfReader(BytesIO(get_data_from_url(name="direct-link.pdf"))) + for p in merger.pages: + writer.add_page(p) + + writer.write(pdf_file_path) + + check = PdfReader(pdf_file_path) + page3 = check.pages[2] + link = page3["/Annots"][0].get_object() + assert link["/Subtype"] == "/Link" + dest = link["/Dest"][0] # indirect reference of page referred to + + page4 = check.flattened_pages[3] + assert dest == page4.indirect_reference, "Link from page 3 to page 4 is broken" + + +@pytest.mark.enable_socket +def test_direct_link_preserved_reordering(pdf_file_path): + # this could be any PDF -- we don't care which + reader = PdfReader(BytesIO(get_data_from_url(name="iss3268.pdf"))) + writer = PdfWriter(clone_from = reader) + + # this PDF has a direct link from p1 to p2 + merger = PdfReader(BytesIO(get_data_from_url(name="direct-link.pdf"))) + for p in merger.pages: + writer.add_page(p) + + # let's insert a page to mess up the page order + writer.insert_page(reader.pages[0], 3) + + writer.write(pdf_file_path) + + check = PdfReader(pdf_file_path) + page3 = check.pages[2] + link = page3["/Annots"][0].get_object() + assert link["/Subtype"] == "/Link" + dest = link["/Dest"][0] # indirect reference of page referred to + + page5 = check.flattened_pages[4] # it moved one out + assert dest == page5.indirect_reference, "Link from page 3 to page 5 is broken" + + +@pytest.mark.enable_socket +def test_direct_link_page_missing(pdf_file_path): + # this could be any PDF -- we don't care which + reader = PdfReader(BytesIO(get_data_from_url(name="iss3268.pdf"))) + writer = PdfWriter(clone_from = reader) + + # this PDF has a direct link from p1 to p2 + merger = PdfReader(BytesIO(get_data_from_url(name="direct-link.pdf"))) + writer.add_page(merger.pages[0]) + # but we're not adding page 2 + + writer.write(pdf_file_path) # verify nothing crashes + + +@pytest.mark.enable_socket +def test_named_reference_preserved(pdf_file_path): + # this could be any PDF -- we don't care which + reader = PdfReader(BytesIO(get_data_from_url(name="iss3268.pdf"))) + writer = PdfWriter(clone_from = reader) + + # this PDF has a named reference from from p3 to p5 + merger = PdfReader(BytesIO(get_data_from_url(name="named-reference.pdf"))) + for p in merger.pages: + writer.add_page(p) + + writer.write(pdf_file_path) + + check = PdfReader(pdf_file_path) + page5 = check.pages[4] + page7 = check.flattened_pages[6] + for link in page5["/Annots"]: + action = link["/A"] + assert action.get("/S") == "/GoTo" + dest = str(action["/D"]) + assert dest in check.named_destinations + pref = check.named_destinations[dest].page + + assert pref == page7.indirect_reference, "Link from page 5 to page 7 is broken" + + +@pytest.mark.enable_socket +def test_named_ref_to_page_that_is_gone(pdf_file_path): + source = PdfReader(BytesIO(get_data_from_url(name="named-reference.pdf"))) + buf = BytesIO() + tmp = PdfWriter() + tmp.add_page(source.pages[2]) # we add only the page with the reference + tmp.write(buf) + + source = PdfReader(buf) + + writer = PdfWriter() + writer.add_page(source.pages[0]) # now references to non-existent page + writer.write(pdf_file_path) # don't crash