2
2
3
3
import re
4
4
import time
5
- from typing import AsyncGenerator , List , Tuple
5
+ from typing import AsyncGenerator , Dict , List , Tuple
6
6
7
7
from loguru import logger
8
8
12
12
from .vocabulary import tokenize
13
13
from ...structures .schemas import NormalizationOptions
14
14
15
+ # Pre-compiled regex patterns for performance
16
+ CUSTOM_PHONEMES = re .compile (r"(\[([^\]]|\n)*?\])(\(\/([^\/)]|\n)*?\/\))" )
17
+
15
18
def process_text_chunk (
16
19
text : str , language : str = "a" , skip_phonemize : bool = False
17
20
) -> List [int ]:
@@ -85,12 +88,21 @@ def process_text(text: str, language: str = "a") -> List[int]:
85
88
return process_text_chunk (text , language )
86
89
87
90
88
- def get_sentence_info (text : str ) -> List [Tuple [str , List [int ], int ]]:
91
+ def get_sentence_info (text : str , custom_phenomes_list : Dict [ str , str ] ) -> List [Tuple [str , List [int ], int ]]:
89
92
"""Process all sentences and return info."""
90
93
sentences = re .split (r"([.!?;:])(?=\s|$)" , text )
94
+ phoneme_length , min_value = len (custom_phenomes_list ), 0
95
+
91
96
results = []
92
97
for i in range (0 , len (sentences ), 2 ):
93
98
sentence = sentences [i ].strip ()
99
+ for replaced in range (min_value , phoneme_length ):
100
+ current_id = f"</|custom_phonemes_{ replaced } |/>"
101
+ if current_id in sentence :
102
+ sentence = sentence .replace (current_id , custom_phenomes_list .pop (current_id ))
103
+ min_value += 1
104
+
105
+
94
106
punct = sentences [i + 1 ] if i + 1 < len (sentences ) else ""
95
107
96
108
if not sentence :
@@ -102,6 +114,10 @@ def get_sentence_info(text: str) -> List[Tuple[str, List[int], int]]:
102
114
103
115
return results
104
116
117
+ def handle_custom_phonemes (s : re .Match [str ], phenomes_list : Dict [str ,str ]) -> str :
118
+ latest_id = f"</|custom_phonemes_{ len (phenomes_list )} |/>"
119
+ phenomes_list [latest_id ] = s .group (0 ).strip ()
120
+ return latest_id
105
121
106
122
async def smart_split (
107
123
text : str ,
@@ -114,15 +130,18 @@ async def smart_split(
114
130
chunk_count = 0
115
131
logger .info (f"Starting smart split for { len (text )} chars" )
116
132
133
+ custom_phoneme_list = {}
134
+
117
135
# Normalize text
118
136
if settings .advanced_text_normalization and normalization_options .normalize :
119
137
if lang_code in ["a" ,"b" ,"en-us" ,"en-gb" ]:
138
+ text = CUSTOM_PHONEMES .sub (lambda s : handle_custom_phonemes (s , custom_phoneme_list ), text )
120
139
text = normalize_text (text ,normalization_options )
121
140
else :
122
141
logger .info ("Skipping text normalization as it is only supported for english" )
123
142
124
143
# Process all sentences
125
- sentences = get_sentence_info (text )
144
+ sentences = get_sentence_info (text , custom_phoneme_list )
126
145
127
146
current_chunk = []
128
147
current_tokens = []
@@ -245,4 +264,4 @@ async def smart_split(
245
264
total_time = time .time () - start_time
246
265
logger .info (
247
266
f"Split completed in { total_time * 1000 :.2f} ms, produced { chunk_count } chunks"
248
- )
267
+ )
0 commit comments