Skip to content

Commit 63848b9

Browse files
committed
Refactored Cmd._initialize_history() to handle different exception types between lzma and bz2 libraries.
1 parent ff46021 commit 63848b9

File tree

1 file changed

+52
-42
lines changed

1 file changed

+52
-42
lines changed

cmd2/cmd2.py

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4627,7 +4627,7 @@ def do_ipy(self, _: argparse.Namespace) -> Optional[bool]: # pragma: no cover
46274627
)
46284628

46294629
# Start IPython
4630-
start_ipython(config=config, argv=[], user_ns=local_vars)
4630+
start_ipython(config=config, argv=[], user_ns=local_vars) # type: ignore[no-untyped-call]
46314631
self.poutput("Now exiting IPython shell...")
46324632

46334633
# The IPython application is a singleton and won't be recreated next time
@@ -4814,16 +4814,9 @@ def _initialize_history(self, hist_file: str) -> None:
48144814
previous sessions will be included. Additionally, all history will be written
48154815
to this file when the application exits.
48164816
"""
4817-
import json
4818-
try:
4819-
import lzma
4820-
bz2 = None
4821-
except ModuleNotFoundError: # pragma: no cover
4822-
lzma = None
4823-
import bz2
4824-
48254817
self.history = History()
4826-
# with no persistent history, nothing else in this method is relevant
4818+
4819+
# With no persistent history, nothing else in this method is relevant
48274820
if not hist_file:
48284821
self.persistent_history_file = hist_file
48294822
return
@@ -4844,31 +4837,60 @@ def _initialize_history(self, hist_file: str) -> None:
48444837
self.perror(f"Error creating persistent history file directory '{hist_file_dir}': {ex}")
48454838
return
48464839

4847-
# Read and process history file
4840+
# Read history file
48484841
try:
48494842
with open(hist_file, 'rb') as fobj:
48504843
compressed_bytes = fobj.read()
4851-
if lzma is not None:
4852-
history_json = lzma.decompress(compressed_bytes).decode(encoding='utf-8')
4853-
else:
4854-
history_json = bz2.decompress(compressed_bytes).decode(encoding='utf-8')
4855-
self.history = History.from_json(history_json)
48564844
except FileNotFoundError:
4857-
# Just use an empty history
4858-
pass
4845+
compressed_bytes = bytes()
48594846
except OSError as ex:
48604847
self.perror(f"Cannot read persistent history file '{hist_file}': {ex}")
48614848
return
4862-
except (json.JSONDecodeError, lzma.LZMAError, KeyError, UnicodeDecodeError, ValueError) as ex:
4849+
4850+
# Register a function to write history at save
4851+
import atexit
4852+
4853+
self.persistent_history_file = hist_file
4854+
atexit.register(self._persist_history)
4855+
4856+
# Empty or nonexistent history file. Nothing more to do.
4857+
if not compressed_bytes:
4858+
return
4859+
4860+
# Decompress history data
4861+
try:
4862+
import lzma as decompress_lib
4863+
4864+
decompress_exceptions: Tuple[type[Exception]] = (decompress_lib.LZMAError,)
4865+
except ModuleNotFoundError: # pragma: no cover
4866+
import bz2 as decompress_lib # type: ignore[no-redef]
4867+
4868+
decompress_exceptions: Tuple[type[Exception]] = (OSError, ValueError) # type: ignore[no-redef]
4869+
4870+
try:
4871+
history_json = decompress_lib.decompress(compressed_bytes).decode(encoding='utf-8')
4872+
except decompress_exceptions as ex:
4873+
self.perror(
4874+
f"Error decompressing persistent history data '{hist_file}': {ex}\n"
4875+
f"The history file will be recreated when this application exits."
4876+
)
4877+
return
4878+
4879+
# Decode history json
4880+
import json
4881+
4882+
try:
4883+
self.history = History.from_json(history_json)
4884+
except (json.JSONDecodeError, KeyError, ValueError) as ex:
48634885
self.perror(
4864-
f"Error processing persistent history file '{hist_file}': {ex}\n"
4886+
f"Error processing persistent history data '{hist_file}': {ex}\n"
48654887
f"The history file will be recreated when this application exits."
48664888
)
4889+
return
48674890

48684891
self.history.start_session()
4869-
self.persistent_history_file = hist_file
48704892

4871-
# populate readline history
4893+
# Populate readline history
48724894
if rl_type != RlType.NONE:
48734895
last = None
48744896
for item in self.history:
@@ -4880,33 +4902,21 @@ def _initialize_history(self, hist_file: str) -> None:
48804902
readline.add_history(line)
48814903
last = line
48824904

4883-
# register a function to write history at save
4884-
# if the history file is in plain text format from 0.9.12 or lower
4885-
# this will fail, and the history in the plain text file will be lost
4886-
import atexit
4887-
4888-
atexit.register(self._persist_history)
4889-
48904905
def _persist_history(self) -> None:
48914906
"""Write history out to the persistent history file as compressed JSON"""
4892-
try:
4893-
import lzma
4894-
bz2 = None
4895-
except ModuleNotFoundError: # pragma: no cover
4896-
lzma = None
4897-
import bz2
4898-
48994907
if not self.persistent_history_file:
49004908
return
49014909

4902-
self.history.truncate(self._persistent_history_length)
49034910
try:
4904-
history_json = self.history.to_json()
4905-
if lzma is not None:
4906-
compressed_bytes = lzma.compress(history_json.encode(encoding='utf-8'))
4907-
else:
4908-
compressed_bytes = bz2.compress(history_json.encode(encoding='utf-8'))
4911+
import lzma as compress_lib
4912+
except ModuleNotFoundError: # pragma: no cover
4913+
import bz2 as compress_lib # type: ignore[no-redef]
49094914

4915+
self.history.truncate(self._persistent_history_length)
4916+
history_json = self.history.to_json()
4917+
compressed_bytes = compress_lib.compress(history_json.encode(encoding='utf-8'))
4918+
4919+
try:
49104920
with open(self.persistent_history_file, 'wb') as fobj:
49114921
fobj.write(compressed_bytes)
49124922
except OSError as ex:

0 commit comments

Comments
 (0)