Skip to content

Commit d76976b

Browse files
PJBrsstefan6419846
andauthored
ENH: Implement flattening for writer (#3312)
Closes #232. --------- Co-authored-by: Stefan <96178532+stefan6419846@users.noreply.github.com>
1 parent 8cda64d commit d76976b

File tree

4 files changed

+215
-16
lines changed

4 files changed

+215
-16
lines changed

pypdf/_page.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,13 @@ def compress(matrix: TransformationMatrixType) -> CompressedTransformationMatrix
196196
matrix[2][1],
197197
)
198198

199+
def _to_cm(self) -> str:
200+
# Returns the cm operation string for the given transformation matrix
201+
return (
202+
f"{self.ctm[0]:.4f} {self.ctm[1]:.4f} {self.ctm[2]:.4f} "
203+
f"{self.ctm[3]:.4f} {self.ctm[4]:.4f} {self.ctm[5]:.4f} cm"
204+
)
205+
199206
def transform(self, m: "Transformation") -> "Transformation":
200207
"""
201208
Apply one transformation to another.

pypdf/_writer.py

Lines changed: 121 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
from ._cmap import _default_fonts_space_width, build_char_map_from_dict
5656
from ._doc_common import DocumentInformation, PdfDocCommon
5757
from ._encryption import EncryptAlgorithm, Encryption
58-
from ._page import PageObject
58+
from ._page import PageObject, Transformation
5959
from ._page_labels import nums_clear_range, nums_insert, nums_next
6060
from ._reader import PdfReader
6161
from ._utils import (
@@ -865,12 +865,102 @@ def append_pages_from_reader(
865865
if callable(after_page_append):
866866
after_page_append(writer_page)
867867

868+
def _merge_content_stream_to_page(
869+
self,
870+
page: PageObject,
871+
new_content_data: bytes,
872+
) -> None:
873+
"""
874+
Combines existing content stream(s) with new content (as bytes),
875+
and returns a new single StreamObject.
876+
877+
Args:
878+
page: The page to which the new content data will be added.
879+
new_content_data: A binary-encoded new content stream, for
880+
instance the commands to draw an XObject.
881+
"""
882+
# First resolve the existing page content. This always is an IndirectObject:
883+
# PDF Explained by John Whitington
884+
# https://www.oreilly.com/library/view/pdf-explained/9781449321581/ch04.html
885+
if NameObject("/Contents") in page:
886+
existing_content_ref = page[NameObject("/Contents")]
887+
existing_content = existing_content_ref.get_object()
888+
889+
if isinstance(existing_content, ArrayObject):
890+
# Create a new StreamObject for the new_content_data
891+
new_stream_obj = StreamObject()
892+
new_stream_obj.set_data(new_content_data)
893+
existing_content.append(self._add_object(new_stream_obj))
894+
page[NameObject("/Contents")] = self._add_object(existing_content)
895+
if isinstance(existing_content, StreamObject):
896+
# Merge new content to existing StreamObject
897+
merged_data = existing_content.get_data() + b"\n" + new_content_data
898+
new_stream = StreamObject()
899+
new_stream.set_data(merged_data)
900+
page[NameObject("/Contents")] = self._add_object(new_stream)
901+
else:
902+
# If no existing content, then we have an empty page.
903+
# Create a new StreamObject in a new /Contents entry.
904+
new_stream = StreamObject()
905+
new_stream.set_data(new_content_data)
906+
page[NameObject("/Contents")] = self._add_object(new_stream)
907+
908+
def _add_apstream_object(
909+
self,
910+
page: PageObject,
911+
appearance_stream_obj: StreamObject,
912+
object_name: str,
913+
x_offset: float,
914+
y_offset: float,
915+
font_res: Optional[DictionaryObject] = None
916+
) -> None:
917+
"""
918+
Adds an appearance stream to the page content in the form of
919+
an XObject.
920+
921+
Args:
922+
page: The page to which to add the appearance stream.
923+
appearance_stream_obj: The appearance stream.
924+
object_name: The name of the appearance stream.
925+
x_offset: The horizontal offset for the appearance stream.
926+
y_offset: The vertical offset for the appearance stream.
927+
font_res: The appearance stream's font resource (if given).
928+
"""
929+
# Prepare XObject resource dictionary on the page
930+
pg_res = cast(DictionaryObject, page[PG.RESOURCES])
931+
if font_res is not None:
932+
font_name = font_res["/BaseFont"] # [/"Name"] often also exists, but is deprecated
933+
if "/Font" not in pg_res:
934+
pg_res[NameObject("/Font")] = DictionaryObject()
935+
pg_ft_res = cast(DictionaryObject, pg_res[NameObject("/Font")])
936+
if font_name not in pg_ft_res:
937+
pg_ft_res[NameObject(font_name)] = font_res
938+
# Always add the resolved stream object to the writer to get a new IndirectObject.
939+
# This ensures we have a valid IndirectObject managed by *this* writer.
940+
xobject_ref = self._add_object(appearance_stream_obj)
941+
xobject_name = NameObject(f"/Fm_{object_name}")._sanitize()
942+
if "/XObject" not in pg_res:
943+
pg_res[NameObject("/XObject")] = DictionaryObject()
944+
pg_xo_res = cast(DictionaryObject, pg_res["/XObject"])
945+
if xobject_name not in pg_xo_res:
946+
pg_xo_res[xobject_name] = xobject_ref
947+
else:
948+
logger_warning(
949+
f"XObject {xobject_name!r} already added to page resources. This might be an issue.",
950+
__name__
951+
)
952+
xobject_cm = Transformation().translate(x_offset, y_offset)
953+
xobject_drawing_commands = f"q\n{xobject_cm._to_cm()}\n{xobject_name} Do\nQ".encode()
954+
self._merge_content_stream_to_page(page, xobject_drawing_commands)
955+
868956
def _update_field_annotation(
869957
self,
958+
page: PageObject,
870959
field: DictionaryObject,
871960
annotation: DictionaryObject,
872961
font_name: str = "",
873962
font_size: float = -1,
963+
flatten: bool = False,
874964
) -> None:
875965
# Calculate rectangle dimensions
876966
_rct = cast(RectangleObject, annotation[AA.Rect])
@@ -1013,6 +1103,10 @@ def _update_field_annotation(
10131103
self._objects[n - 1] = dct
10141104
dct.indirect_reference = IndirectObject(n, 0, self)
10151105

1106+
if flatten:
1107+
field_name = self._get_qualified_field_name(annotation)
1108+
self._add_apstream_object(page, dct, field_name, _rct[0], _rct[1], font_res)
1109+
10161110
FFBITS_NUL = FA.FfBits(0)
10171111

10181112
def update_page_form_field_values(
@@ -1021,6 +1115,7 @@ def update_page_form_field_values(
10211115
fields: Dict[str, Union[str, List[str], Tuple[str, str, float]]],
10221116
flags: FA.FfBits = FFBITS_NUL,
10231117
auto_regenerate: Optional[bool] = True,
1118+
flatten: bool = False,
10241119
) -> None:
10251120
"""
10261121
Update the form field values for a given page from a fields dictionary.
@@ -1047,6 +1142,10 @@ def update_page_form_field_values(
10471142
auto_regenerate: Set/unset the need_appearances flag;
10481143
the flag is unchanged if auto_regenerate is None.
10491144
1145+
flatten: Whether or not to flatten the annotation. If True, this adds the annotation's
1146+
appearance stream to the page contents. Note that this option does not remove the
1147+
annotation itself.
1148+
10501149
"""
10511150
if CatalogDictionary.ACRO_FORM not in self._root_object:
10521151
raise PyPdfError("No /AcroForm dictionary in PDF of PdfWriter Object")
@@ -1061,7 +1160,7 @@ def update_page_form_field_values(
10611160
if isinstance(page, list):
10621161
for p in page:
10631162
if PG.ANNOTS in p: # just to prevent warnings
1064-
self.update_page_form_field_values(p, fields, flags, None)
1163+
self.update_page_form_field_values(p, fields, flags, None, flatten=flatten)
10651164
return
10661165
if PG.ANNOTS not in page:
10671166
logger_warning("No fields to update on this page", __name__)
@@ -1090,35 +1189,43 @@ def update_page_form_field_values(
10901189
del parent_annotation["/I"]
10911190
if flags:
10921191
annotation[NameObject(FA.Ff)] = NumberObject(flags)
1093-
if isinstance(value, list):
1094-
lst = ArrayObject(TextStringObject(v) for v in value)
1095-
parent_annotation[NameObject(FA.V)] = lst
1096-
elif isinstance(value, tuple):
1097-
annotation[NameObject(FA.V)] = TextStringObject(
1098-
value[0],
1099-
)
1100-
else:
1101-
parent_annotation[NameObject(FA.V)] = TextStringObject(value)
1192+
if not (value is None and flatten): # Only change values if given by user and not flattening.
1193+
if isinstance(value, list):
1194+
lst = ArrayObject(TextStringObject(v) for v in value)
1195+
parent_annotation[NameObject(FA.V)] = lst
1196+
elif isinstance(value, tuple):
1197+
annotation[NameObject(FA.V)] = TextStringObject(
1198+
value[0],
1199+
)
1200+
else:
1201+
parent_annotation[NameObject(FA.V)] = TextStringObject(value)
11021202
if parent_annotation.get(FA.FT) == "/Btn":
11031203
# Checkbox button (no /FT found in Radio widgets)
11041204
v = NameObject(value)
11051205
ap = cast(DictionaryObject, annotation[NameObject(AA.AP)])
1106-
if v not in cast(ArrayObject, ap[NameObject("/N")]):
1206+
normal_ap = cast(DictionaryObject, ap["/N"])
1207+
if v not in normal_ap:
11071208
v = NameObject("/Off")
1209+
appearance_stream_obj = normal_ap.get(v)
11081210
# other cases will be updated through the for loop
11091211
annotation[NameObject(AA.AS)] = v
11101212
annotation[NameObject(FA.V)] = v
1213+
if flatten and appearance_stream_obj is not None:
1214+
# We basically copy the entire appearance stream, which should be an XObject that
1215+
# is already registered. No need to add font resources.
1216+
rct = cast(RectangleObject, annotation[AA.Rect])
1217+
self._add_apstream_object(page, appearance_stream_obj, field, rct[0], rct[1])
11111218
elif (
11121219
parent_annotation.get(FA.FT) == "/Tx"
11131220
or parent_annotation.get(FA.FT) == "/Ch"
11141221
):
11151222
# textbox
11161223
if isinstance(value, tuple):
11171224
self._update_field_annotation(
1118-
parent_annotation, annotation, value[1], value[2]
1225+
page, parent_annotation, annotation, value[1], value[2], flatten=flatten
11191226
)
11201227
else:
1121-
self._update_field_annotation(parent_annotation, annotation)
1228+
self._update_field_annotation(page, parent_annotation, annotation, flatten=flatten)
11221229
elif (
11231230
annotation.get(FA.FT) == "/Sig"
11241231
): # deprecated # not implemented yet

pypdf/generic/_base.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,21 @@ def renumber(self) -> bytes:
841841
out += c.encode("utf-8")
842842
return out
843843

844+
def _sanitize(self) -> "NameObject":
845+
"""
846+
Sanitize the NameObject's name to be a valid PDF name part
847+
(alphanumeric, underscore, hyphen). The _sanitize method replaces
848+
spaces and any non-alphanumeric/non-underscore/non-hyphen with
849+
underscores.
850+
851+
Returns:
852+
NameObject with sanitized name.
853+
"""
854+
name = str(self)[1:] # Remove leading forward slash
855+
name = re.sub(r"\ ", "_", name)
856+
name = re.sub(r"[^a-zA-Z0-9_-]", "_", name)
857+
return NameObject("/" + name)
858+
844859
@classproperty
845860
def surfix(cls) -> bytes: # noqa: N805
846861
deprecate_with_replacement("surfix", "prefix", "6.0.0")

tests/test_writer.py

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
from pypdf.errors import PageSizeNotDefinedError, PyPdfError
2424
from pypdf.generic import (
2525
ArrayObject,
26+
ByteStringObject,
2627
ContentStream,
28+
DecodedStreamObject,
2729
Destination,
2830
DictionaryObject,
2931
Fit,
@@ -513,12 +515,12 @@ def test_fill_form(pdf_file_path):
513515
writer.append(RESOURCE_ROOT / "crazyones.pdf", [0])
514516

515517
writer.update_page_form_field_values(
516-
writer.pages[0], {"foo": "some filled in text"}, flags=1
518+
writer.pages[0], {"foo": "some filled in text"}, flags=1, flatten=True
517519
)
518520

519521
# check if no fields to fill in the page
520522
writer.update_page_form_field_values(
521-
writer.pages[1], {"foo": "some filled in text"}, flags=1
523+
writer.pages[1], {"foo": "some filled in text"}, flags=1, flatten=True
522524
)
523525

524526
writer.update_page_form_field_values(
@@ -1526,13 +1528,21 @@ def test_update_form_fields(tmp_path):
15261528
"DropList1": "DropListe3",
15271529
},
15281530
auto_regenerate=False,
1531+
flatten=True,
15291532
)
15301533
del writer.pages[0]["/Annots"][1].get_object()["/AP"]["/N"]
1534+
del writer.pages[0]["/Resources"]["/Font"]
15311535
writer.update_page_form_field_values(
15321536
writer.pages[0],
15331537
{"Text1": "my Text1", "Text2": "ligne1\nligne2\nligne3"},
15341538
auto_regenerate=False,
15351539
)
1540+
writer.update_page_form_field_values(
1541+
writer.pages[0],
1542+
{"Text1": None, "Text2": None},
1543+
auto_regenerate=False,
1544+
flatten=True,
1545+
)
15361546

15371547
writer.write(write_data_here)
15381548
reader = PdfReader(write_data_here)
@@ -1575,11 +1585,71 @@ def test_update_form_fields(tmp_path):
15751585
None,
15761586
{"Text1": "my Text1", "Text2": "ligne1\nligne2\nligne3"},
15771587
auto_regenerate=False,
1588+
flatten=True
15781589
)
15791590

15801591
Path(write_data_here).unlink()
15811592

15821593

1594+
def test_add_apstream_object():
1595+
writer = PdfWriter()
1596+
page = writer.add_blank_page(1000, 1000)
1597+
assert NameObject("/Contents") not in page
1598+
apstream_object = DecodedStreamObject.initialize_from_dictionary(
1599+
{
1600+
NameObject("/Type"): NameObject("/XObject"),
1601+
NameObject("/Subtype"): NameObject("/Form"),
1602+
NameObject("/BBox"): RectangleObject([0.0, 0.0, 10.5, 10.5]),
1603+
"__streamdata__": ByteStringObject(b"BT /F1 12 Tf (Hello World) Tj ET")
1604+
}
1605+
)
1606+
writer._add_object(apstream_object)
1607+
object_name = "AA2342!@#$% ^^##aa:-)"
1608+
x_offset = 200
1609+
y_offset = 200
1610+
writer._add_apstream_object(page, apstream_object, object_name, x_offset, y_offset)
1611+
assert NameObject("/XObject") in page[NameObject("/Resources")]
1612+
assert "/Fm_AA2342__________aa_-_" in page[NameObject("/Resources")][NameObject("/XObject")]
1613+
assert NameObject("/Contents") in page
1614+
contents_obj = page[NameObject("/Contents")]
1615+
stream = contents_obj.get_object()
1616+
assert isinstance(stream, StreamObject)
1617+
assert stream.get_data() == (
1618+
b"q\n1.0000 0.0000 0.0000 1.0000 200.0000 200.0000 cm\n/Fm_AA2342__________aa_-_ Do\nQ"
1619+
)
1620+
1621+
1622+
def test_merge_content_stream_to_page():
1623+
"""Test that new content data is correctly added to page contents
1624+
in the form of an ArrayObject or StreamObject. The
1625+
test_add_apstream_object code already correctly checks that
1626+
_merge_content_stream_to_page works for an emtpy page.
1627+
"""
1628+
writer = PdfWriter()
1629+
page = writer.add_blank_page(100, 100)
1630+
new_content = b"BT /F1 12 Tf (Hello World) Tj ET"
1631+
# Call the method under test
1632+
writer._merge_content_stream_to_page(page, new_content)
1633+
more_content = b"BT /F1 12 Tf (Hello Again, World) Tj ET"
1634+
writer._merge_content_stream_to_page(page, more_content)
1635+
contents_obj = page[NameObject("/Contents")]
1636+
stream = contents_obj.get_object()
1637+
assert isinstance(stream, StreamObject)
1638+
assert stream.get_data() == b"BT /F1 12 Tf (Hello World) Tj ET\nBT /F1 12 Tf (Hello Again, World) Tj ET"
1639+
new_stream_obj = StreamObject()
1640+
new_stream_obj.set_data(new_content)
1641+
content = ArrayObject()
1642+
content.append(new_stream_obj)
1643+
page[NameObject("/Contents")] = writer._add_object(content)
1644+
writer._merge_content_stream_to_page(page, more_content)
1645+
contents_obj = page[NameObject("/Contents")]
1646+
array = contents_obj.get_object()
1647+
assert isinstance(array, ArrayObject)
1648+
contents = page[NameObject("/Contents")].get_object()
1649+
assert contents[0].get_object().get_data() == new_content
1650+
assert contents[1].get_object().get_data() == more_content
1651+
1652+
15831653
@pytest.mark.enable_socket
15841654
def test_update_form_fields2():
15851655
my_files = {

0 commit comments

Comments
 (0)