Skip to content

Add new feature to extract annotated text in xml structure and sorting rules #3038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions changedetectionio/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,114 @@ def process_formdata(self, valuelist):
else:
self.data = {}


class StringSelectorPairListField(StringField):
"""
A StringField that expects each non-empty line in its input to be:
{first_selector}{second_selector}
or just:
{first_selector}
and stores them as a list of (first_selector, second_selector) tuples in self.data.
If the second selector is omitted, it is set to an empty string.
"""
widget = widgets.TextArea()

# Convert self.data (the list of tuples) back into textarea lines
def _value(self):
if self.data:
lines = []
for (first_selector, second_selector) in self.data:
if second_selector:
line = f"{{{first_selector}}}{{{second_selector}}}"
else:
line = f"{{{first_selector}}}"
lines.append(line)
return "\r\n".join(lines)
else:
return u''

# Parse the raw textarea input into a list of (first_selector, second_selector) tuples
def process_formdata(self, valuelist):
if valuelist:
self.data = []
# Split the textarea into lines
lines = valuelist[0].split("\n")

# Filter out empty or whitespace-only lines
cleaned = [line.strip() for line in lines if line.strip()]

for line in cleaned:
# Use regex to capture:
# { first_selector } and optionally { second_selector }
match = re.match(r'^\{([^}]*)\}(?:\{([^}]*)\})?$', line)
if match:
first_selector = match.group(1).strip()
second_selector = match.group(2).strip() if match.group(2) is not None else ""
self.data.append((first_selector, second_selector))
else:
self.data = []


class StringSelectorTagDictField(StringField):
"""
A StringField that expects each non-empty line in its input to be:
{css_selector} tag1, tag2, tag3
and stores them as a dictionary mapping css_selector (str) to a list of tags (List[str]) in self.data.
Tags must be comma-separated.
Tags are stripped of all whitespace.
There must be at least one tag.
"""
widget = widgets.TextArea()

# Convert self.data (the dictionary) back into textarea lines.
def _value(self):
if self.data:
lines = []
for css_selector, tags in self.data.items():
# Only output the line if there's at least one tag.
if tags:
line = f"{{{css_selector}}} " + ", ".join(tags)
lines.append(line)
return "\r\n".join(lines)
else:
return u''

# Parse the raw textarea input into a dictionary mapping css_selector to a list of tags.
def process_formdata(self, valuelist):
if valuelist:
self.data = {}
# Split the textarea into lines.
lines = valuelist[0].split("\n")

# Filter out empty or whitespace-only lines.
cleaned = [line.strip() for line in lines if line.strip()]

for line in cleaned:
# Use regex to capture:
# {css_selector} {tags}
match = re.match(r'^\{([^}]*)\}(.+)$', line)
if match:
css_selector = match.group(1).strip()
tags_part = match.group(2).strip()
# Split tags by comma.
raw_tags = tags_part.split(',')
tags = []
for tag in raw_tags:
# Remove all whitespace (leading, trailing, and internal).
processed_tag = ''.join(tag.split())
if processed_tag:
tags.append(processed_tag)

# Only record the line if there is at least one valid tag.
if tags:
if css_selector in self.data:
self.data[css_selector].extend(tags)
else:
self.data[css_selector] = tags
else:
self.data = {}


class ValidateContentFetcherIsReady(object):
"""
Validates that anything that looks like a regex passes as a regex
Expand Down Expand Up @@ -580,6 +688,10 @@ class processor_text_json_diff_form(commonSettingsForm):
remove_duplicate_lines = BooleanField('Remove duplicate lines of text', default=False)
sort_text_alphabetically = BooleanField('Sort text alphabetically', default=False)
trim_text_whitespace = BooleanField('Trim whitespace before and after text', default=False)
extraction_method = RadioField('Extraction method', choices=[('TEXT', 'Extract plain text'),('ANNOTATED_TEXT', 'Extract text with custom annotations')], default='TEXT')
annotation_rules = StringSelectorTagDictField('Annotation rules', [validators.Optional()])

annotated_sort_selectors = StringSelectorPairListField('Sort annotated text', [validators.Optional()])

filter_text_added = BooleanField('Added lines', default=True)
filter_text_replaced = BooleanField('Replaced/changed lines', default=True)
Expand Down
254 changes: 253 additions & 1 deletion changedetectionio/html_tools.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from loguru import logger
from lxml import etree
from typing import List
from typing import List, Dict, Tuple, Any
import json
import re

Expand Down Expand Up @@ -453,6 +453,258 @@ def html_to_text(html_content: str, render_anchor_tag_content=False, is_rss=Fals
return text_content


def get_annotated_text(
html_content: str,
annotation_rules: Dict[str, List[str]],
*,
insert_block_newlines: bool = True,
strip_edge_whitespace: bool = True,
collapse_whitespace: bool = True,
normalize_whitespace: bool = True,
) -> Dict[str, Any]:
"""
Extract text with custom whitespace handling from HTML while annotating matched elements using CSS selectors.

:param html_content: The HTML content as a string.
:param annotation_rules: A dictionary mapping CSS selectors to lists of labels.
:param insert_block_newlines: Insert newlines around block elements.
:param strip_edge_whitespace: Remove leading and trailing whitespace (except newlines).
:param collapse_whitespace: Replace consecutive whitespace characters with a single space.
:param normalize_whitespace: Convert tabs, carriage returns, and newlines to spaces.
:return: A dictionary with 'text' (extracted plain text) and 'label' (annotations with start/end indices).
"""

# Default configuration
inline_tags = {
"a", "span", "em", "strong", "b", "i", "u", "small", "sup", "sub", "mark", "cite", "abbr",
"acronym", "dfn", "kbd", "var", "samp", "code", "tt"
}

from bs4 import BeautifulSoup, NavigableString, Tag
soup = BeautifulSoup(html_content, "html.parser")

# Remove unwanted tags and their content
for tag in soup.find_all(["script", "style"]):
tag.extract()

# Collect the final text chunks
extracted_text_segments = []
text_char_count = 0

# Maps (element_id -> (start_index, end_index)) here for annotation rules
index_map = {}

# Helper to add text segments, updating index_map for elements
def add_text_segment(text_segment: str) -> (int, int) or None:
"""
Append text_segment to the global extracted_text_segments, return its (start, end) coverage.
Returns None if empty string.
"""
nonlocal text_char_count
if not text_segment:
return None

start_index = text_char_count
extracted_text_segments.append(text_segment)
text_char_count += len(text_segment)
end_index = text_char_count
return (start_index, end_index)

def traverse(node) -> (int, int) or None:
"""
Recursively traverse the DOM. Return a tuple (start, end) indicating
the coverage of text contributed by this node (including children).
If no text was generated by this node, return None.
"""

node_start_index = None
node_end_index = None

if isinstance(node, NavigableString):
text = str(node)

if normalize_whitespace:
text = text.translate(str.maketrans({'\t': ' ', '\r': ' ', '\n': ' '}))
if strip_edge_whitespace:
text = text.lstrip(" \t\r\f\v").rstrip(" \t\r\f\v")
if collapse_whitespace:
text = " ".join(text.split())

if text:
text_range = add_text_segment(text)
if text_range:
node_start_index, node_end_index = text_range

return (node_start_index, node_end_index) if node_start_index is not None else None

if not isinstance(node, Tag):
# e.g., Comment or Doctype - skip
return None

# node is a Tag -> figure out block or inline
tag_name = node.name.lower() if node.name else ""

is_block = insert_block_newlines and (tag_name not in inline_tags)

# If block, add newline before (if necessary)
if is_block:
if extracted_text_segments and not extracted_text_segments[-1].endswith("\n"):
text_range = add_text_segment("\n")
# coverage of a newline is ephemeral,
# but we'll just fold it into this node's coverage
if text_range:
# If node_start_index not set, adopt it
if node_start_index is None:
node_start_index = text_range[0]
node_end_index = text_range[1]

# Recurse children
any_text = False
for child in node.children:
text_range = traverse(child)
if text_range:
if node_start_index is None:
node_start_index = text_range[0]
node_end_index = text_range[1]
any_text = True

# If block, add newline after
if is_block:
# If last appended chunk didn't end with newline
if extracted_text_segments and not extracted_text_segments[-1].endswith("\n"):
text_range = add_text_segment("\n")
if text_range:
if node_start_index is None:
node_start_index = text_range[0]
node_end_index = text_range[1]

# If this node contributed any text (directly or via children),
# record (start, end) in index_map
if any_text and node_start_index is not None:
node_id = id(node)
index_map[node_id] = (node_start_index, node_end_index)

return (node_start_index, node_end_index) if node_start_index is not None else None

traverse(soup)

annotations = []
for css_selector, labels in annotation_rules.items():
for element in soup.select(css_selector):
elem_id = id(element)
if elem_id in index_map:
start_index, end_index = index_map[elem_id]
# apply each label
for label in labels:
annotations.append((start_index, end_index, label))

return {"text": "".join(extracted_text_segments), "label": annotations}


def html_to_annotated_text(
html_content: str,
annotation_rules: Dict[str, List[str]],
*,
insert_block_newlines: bool = True,
strip_edge_whitespace: bool = True,
collapse_whitespace: bool = True,
normalize_whitespace: bool = True,
) -> str:
from collections import defaultdict

annotated_text = get_annotated_text(
html_content,
annotation_rules,
insert_block_newlines = insert_block_newlines,
strip_edge_whitespace = strip_edge_whitespace,
collapse_whitespace = collapse_whitespace,
normalize_whitespace = normalize_whitespace,)

tag_indices = defaultdict(list)

for start, end, label in sorted(annotated_text["label"]):
length = end - start
tag_indices[start].append((label, length))
tag_indices[end].append(("/" + label, length))

current_idx = 0
tagged_content = ['<text>']
text = annotated_text["text"]
for index, tags in sorted(tag_indices.items()):
tagged_content.append(text[current_idx:index])

# Separate closing vs opening tags
closing_tags = [t for t in tags if t[0].startswith("/")]
opening_tags = [t for t in tags if not t[0].startswith("/")]

# Sort closing tags by ascending length (so outer closes last)
closing_tags.sort(key=lambda x: (x[1], x[0]))
for tag, _ in closing_tags:
tagged_content.append(f"<{tag}>")

# Sort opening tags by descending length (so outer opens first)
opening_tags.sort(key=lambda x: (x[1], x[0]), reverse=True)
for tag, _ in opening_tags:
tagged_content.append(f"<{tag}>")

current_idx = index
tagged_content.append(text[current_idx:])

tagged_content.append('</text>')

return "".join(tagged_content)


def sort_annotated_text_by_selectors(annotated_xml: str, selector_pairs: List[Tuple[str, str]]) -> str:
from lxml.html import fromstring, tostring
from collections import defaultdict

def find_elements(root, selector):
if selector.startswith(("xpath:", "xpath1:", "//")):
xpath_selector = (selector
.removeprefix("xpath:")
.removeprefix("xpath1:"))
return root.xpath(xpath_selector)
else:
return root.cssselect(selector)

html_tree = fromstring(annotated_xml.strip())

for element_to_sort_selector, sort_identifier_selector in selector_pairs:
# Find all elements-to-sort
elements_to_sort = find_elements(html_tree, element_to_sort_selector)

# Group by parent
parent_map = defaultdict(list)
for el in elements_to_sort:
parent_map[el.getparent()].append(el)

# Sort each group's elements-to-sort by the text of the sort-element
for parent, elements_to_sort in parent_map.items():
def get_sort_key(element):
if sort_identifier_selector:
# Use first sort-element matched by `sort_identifier_selector`
sort_element_matches = find_elements(element, sort_identifier_selector)
if sort_element_matches and sort_element_matches[0].text:
return sort_element_matches[0].text.strip()
elif element.text:
return element.text.strip()
return ""

sorted_elements = sorted(elements_to_sort, key=get_sort_key)

# Remove original elements
for element in elements_to_sort:
parent.remove(element)

# Reattach elements in sorted order
for element in sorted_elements:
parent.append(element)

# Finally, convert the modified DOM back to string maintaining while the input indentation
return tostring(html_tree, pretty_print=False, method="xml").decode("utf-8")


# Does LD+JSON exist with a @type=='product' and a .price set anywhere?
def has_ldjson_product_info(content):
try:
Expand Down
Loading
Loading