diff --git a/absl/testing/absltest.py b/absl/testing/absltest.py index e7b15b1..d9675cb 100644 --- a/absl/testing/absltest.py +++ b/absl/testing/absltest.py @@ -195,12 +195,14 @@ def _get_default_randomize_ordering_seed() -> int: 'test_srcdir', get_default_test_srcdir(), 'Root of directory tree where source files live', - allow_override_cpp=True) + allow_override_cpp=True, +) TEST_TMPDIR = flags.DEFINE_string( 'test_tmpdir', get_default_test_tmpdir(), 'Directory for temporary testing files', - allow_override_cpp=True) + allow_override_cpp=True, +) flags.DEFINE_integer( 'test_random_seed', @@ -208,7 +210,8 @@ def _get_default_randomize_ordering_seed() -> int: 'Random seed for testing. Some test frameworks may ' 'change the default value of this flag between runs, so ' 'it is not appropriate for seeding probabilistic tests.', - allow_override_cpp=True) + allow_override_cpp=True, +) flags.DEFINE_string( 'test_randomize_ordering_seed', '', @@ -217,7 +220,8 @@ def _get_default_randomize_ordering_seed() -> int: 'random seed to use. If 0 or not set, do not randomize ' 'test case execution order. This flag also overrides ' 'the TEST_RANDOMIZE_ORDERING_SEED environment variable.', - allow_override_cpp=True) + allow_override_cpp=True, +) flags.DEFINE_string('xml_output_file', '', 'File to store XML test results') @@ -238,16 +242,22 @@ def wasSuccessful(self) -> bool: Returns: Whether or not this result was a success. """ - return (len(self.failures) == len(self.errors) == - len(self.unexpectedSuccesses) == 0) + return ( + len(self.failures) + == len(self.errors) + == len(self.unexpectedSuccesses) + == 0 + ) test_result = unittest.TestResult() test_result.addUnexpectedSuccess(unittest.FunctionTestCase(lambda: None)) if test_result.wasSuccessful(): # The bug is present. unittest.TestResult.wasSuccessful = wasSuccessful # type: ignore[method-assign] if test_result.wasSuccessful(): # Warn the user if our hot-fix failed. - sys.stderr.write('unittest.result.TestResult monkey patch to report' - ' unexpected passes as failures did not work.\n') + sys.stderr.write( + 'unittest.result.TestResult monkey patch to report' + ' unexpected passes as failures did not work.\n' + ) _monkey_patch_test_result_for_unexpected_passes() @@ -315,9 +325,9 @@ def create_file( Args: file_path: Optional file path for the temp file. If not given, a unique file name will be generated and used. Slashes are allowed in the name; - any missing intermediate directories will be created. NOTE: This path - is the path that will be cleaned up, including any directories in the - path, e.g., 'foo/bar/baz.txt' will `rm -r foo` + any missing intermediate directories will be created. NOTE: This path is + the path that will be cleaned up, including any directories in the path, + e.g., 'foo/bar/baz.txt' will `rm -r foo` content: Optional string or bytes to initially write to the file. If not specified, then an empty file is created. mode: Mode string to use when writing content. Only used if `content` is @@ -330,16 +340,17 @@ def create_file( Returns: A _TempFile representing the created file. """ - tf, _ = _TempFile._create(self._path, file_path, content, mode, encoding, - errors) + tf, _ = _TempFile._create( + self._path, file_path, content, mode, encoding, errors + ) return tf def mkdir(self, dir_path: Optional[str] = None) -> '_TempDir': """Create a directory in the directory. Args: - dir_path: Optional path to the directory to create. If not given, - a unique name will be generated and used. + dir_path: Optional path to the directory to create. If not given, a unique + name will be generated and used. Returns: A _TempDir representing the created directory. @@ -442,9 +453,9 @@ def write_text( """Write text to the file. Args: - text: Text to write. In Python 2, it can be bytes, which will be - decoded using the `encoding` arg (this is as an aid for code that - is 2 and 3 compatible). + text: Text to write. In Python 2, it can be bytes, which will be decoded + using the `encoding` arg (this is as an aid for code that is 2 and 3 + compatible). mode: The mode to open the file for writing. encoding: The encoding to use when writing the text to the file. errors: The error handling strategy to use when converting text to bytes. @@ -457,8 +468,8 @@ def write_bytes(self, data: bytes, mode: str = 'wb') -> None: Args: data: bytes to write. - mode: Mode to open the file for writing. The "b" flag is implicit if - not already present. It must not have the "t" flag. + mode: Mode to open the file for writing. The "b" flag is implicit if not + already present. It must not have the "t" flag. """ with self.open_bytes(mode) as fp: fp.write(data) @@ -481,8 +492,10 @@ def open_text( ValueError: if invalid inputs are provided. """ if 'b' in mode: - raise ValueError('Invalid mode {!r}: "b" flag not allowed when opening ' - 'file in text mode'.format(mode)) + raise ValueError( + 'Invalid mode {!r}: "b" flag not allowed when opening ' + 'file in text mode'.format(mode) + ) if 't' not in mode: mode += 't' cm = self._open(mode, encoding, errors) @@ -502,8 +515,10 @@ def open_bytes(self, mode: str = 'rb') -> ContextManager[BinaryIO]: ValueError: if invalid inputs are provided. """ if 't' in mode: - raise ValueError('Invalid mode {!r}: "t" flag not allowed when opening ' - 'file in binary mode'.format(mode)) + raise ValueError( + 'Invalid mode {!r}: "t" flag not allowed when opening ' + 'file in binary mode'.format(mode) + ) if 'b' not in mode: mode += 'b' cm = self._open(mode, encoding=None, errors=None) @@ -637,8 +652,8 @@ def test_foo(self): See also: :meth:`create_tempfile` for creating temporary files. Args: - name: Optional name of the directory. If not given, a unique - name will be generated and used. + name: Optional name of the directory. If not given, a unique name will be + generated and used. cleanup: Optional cleanup policy on when/if to remove the directory (and all its contents) at the end of the test. If None, then uses :attr:`tempfile_cleanup`. @@ -700,8 +715,7 @@ def test_foo(self): any missing intermediate directories will be created. NOTE: This path is the path that will be cleaned up, including any directories in the path, e.g., ``'foo/bar/baz.txt'`` will ``rm -r foo``. - content: Optional string or - bytes to initially write to the file. If not + content: Optional string or bytes to initially write to the file. If not specified, then an empty file is created. mode: Mode string to use when writing content. Only used if `content` is non-empty. @@ -718,9 +732,14 @@ def test_foo(self): usage. """ test_path = self._get_tempdir_path_test() - tf, cleanup_path = _TempFile._create(test_path, file_path, content=content, - mode=mode, encoding=encoding, - errors=errors) + tf, cleanup_path = _TempFile._create( + test_path, + file_path, + content=content, + mode=mode, + encoding=encoding, + errors=errors, + ) self._maybe_add_temp_path_cleanup(cleanup_path, cleanup) return tf @@ -754,7 +773,8 @@ def enter_context(self, manager: ContextManager[_T]) -> _T: if not self._exit_stack: raise AssertionError( 'self._exit_stack is not set: enter_context is Py3-only; also make ' - 'sure that AbslTest.setUp() is called.') + 'sure that AbslTest.setUp() is called.' + ) return self._exit_stack.enter_context(manager) @enter_context.classmethod @@ -766,13 +786,15 @@ def _enter_context_cls(cls, manager: ContextManager[_T]) -> _T: if not cls._cls_exit_stack: raise AssertionError( 'cls._cls_exit_stack is not set: cls.enter_context requires ' - 'Python 3.8+; also make sure that AbslTest.setUpClass() is called.') + 'Python 3.8+; also make sure that AbslTest.setUpClass() is called.' + ) return cls._cls_exit_stack.enter_context(manager) @classmethod def _get_tempdir_path_cls(cls) -> str: - return os.path.join(TEST_TMPDIR.value, - cls.__qualname__.replace('__main__.', '')) + return os.path.join( + TEST_TMPDIR.value, cls.__qualname__.replace('__main__.', '') + ) def _get_tempdir_path_test(self) -> str: return os.path.join(self._get_tempdir_path_cls(), self._testMethodName) @@ -811,11 +833,14 @@ def _internal_add_cleanup_on_success( + len(outcome.result.errors) + len(outcome.result.unexpectedSuccesses) ) + def _call_cleaner_on_success(*args, **kwargs): if not self._internal_ran_and_passed_when_called_during_cleanup( - previous_failure_count): + previous_failure_count + ): return function(*args, **kwargs) + self.addCleanup(_call_cleaner_on_success, *args, **kwargs) def _internal_ran_and_passed_when_called_during_cleanup( @@ -856,7 +881,7 @@ def shortDescription(self) -> str: # Omit the main name so that test name can be directly copy/pasted to # the command line. if desc.startswith('__main__.'): - desc = desc[len('__main__.'):] + desc = desc[len('__main__.') :] # NOTE: super() is used here instead of directly invoking # unittest.TestCase.shortDescription(self), because of the @@ -942,24 +967,30 @@ def assertSequenceStartsWith(self, prefix, whole, msg=None): try: whole_len = len(whole) except (TypeError, NotImplementedError): - self.fail('For whole: len(%s) is not supported, it appears to be type: ' - '%s' % (whole, type(whole)), msg) + self.fail( + 'For whole: len(%s) is not supported, it appears to be type: %s' + % (whole, type(whole)), + msg, + ) assert prefix_len <= whole_len, self._formatMessage( msg, - 'Prefix length (%d) is longer than whole length (%d).' % - (prefix_len, whole_len) + 'Prefix length (%d) is longer than whole length (%d).' + % (prefix_len, whole_len), ) if not prefix_len and whole_len: - self.fail('Prefix length is 0 but whole length is %d: %s' % - (len(whole), whole), msg) + self.fail( + 'Prefix length is 0 but whole length is %d: %s' % (len(whole), whole), + msg, + ) try: self.assertSequenceEqual(prefix, whole[:prefix_len], msg) except AssertionError: - self.fail('prefix: %s not found at start of whole: %s.' % - (prefix, whole), msg) + self.fail( + 'prefix: %s not found at start of whole: %s.' % (prefix, whole), msg + ) def assertEmpty(self, container, msg=None): """Asserts that an object has zero length. @@ -969,8 +1000,10 @@ def assertEmpty(self, container, msg=None): msg: Optional message to report on failure. """ if not isinstance(container, abc.Sized): - self.fail('Expected a Sized object, got: ' - '{!r}'.format(type(container).__name__), msg) + self.fail( + 'Expected a Sized object, got: {!r}'.format(type(container).__name__), + msg, + ) # explicitly check the length since some Sized objects (e.g. numpy.ndarray) # have strange __nonzero__/__bool__ behavior. @@ -985,8 +1018,10 @@ def assertNotEmpty(self, container, msg=None): msg: Optional message to report on failure. """ if not isinstance(container, abc.Sized): - self.fail('Expected a Sized object, got: ' - '{!r}'.format(type(container).__name__), msg) + self.fail( + 'Expected a Sized object, got: {!r}'.format(type(container).__name__), + msg, + ) # explicitly check the length since some Sized objects (e.g. numpy.ndarray) # have strange __nonzero__/__bool__ behavior. @@ -1002,15 +1037,22 @@ def assertLen(self, container, expected_len, msg=None): msg: Optional message to report on failure. """ if not isinstance(container, abc.Sized): - self.fail('Expected a Sized object, got: ' - '{!r}'.format(type(container).__name__), msg) + self.fail( + 'Expected a Sized object, got: {!r}'.format(type(container).__name__), + msg, + ) if len(container) != expected_len: container_repr = unittest.util.safe_repr(container) # pytype: disable=module-attr - self.fail('{} has length of {}, expected {}.'.format( - container_repr, len(container), expected_len), msg) + self.fail( + '{} has length of {}, expected {}.'.format( + container_repr, len(container), expected_len + ), + msg, + ) - def assertSequenceAlmostEqual(self, expected_seq, actual_seq, places=None, - msg=None, delta=None): + def assertSequenceAlmostEqual( + self, expected_seq, actual_seq, places=None, msg=None, delta=None + ): """An approximate equality assertion for ordered sequences. Fail if the two sequences are unequal as determined by their value @@ -1032,8 +1074,12 @@ def assertSequenceAlmostEqual(self, expected_seq, actual_seq, places=None, delta: The OK difference between compared values. """ if len(expected_seq) != len(actual_seq): - self.fail('Sequence size mismatch: {} vs {}'.format( - len(expected_seq), len(actual_seq)), msg) + self.fail( + 'Sequence size mismatch: {} vs {}'.format( + len(expected_seq), len(actual_seq) + ), + msg, + ) err_list = [] for idx, (exp_elem, act_elem) in enumerate(zip(expected_seq, actual_seq)): @@ -1042,8 +1088,9 @@ def assertSequenceAlmostEqual(self, expected_seq, actual_seq, places=None, # `delta`. However, it's okay for assertSequenceAlmostEqual to pass # both because we want the latter to fail if the former does. # pytype: disable=wrong-keyword-args - self.assertAlmostEqual(exp_elem, act_elem, places=places, msg=msg, - delta=delta) + self.assertAlmostEqual( + exp_elem, act_elem, places=places, msg=msg, delta=delta + ) # pytype: enable=wrong-keyword-args except self.failureException as err: err_list.append(f'At index {idx}: {err}') @@ -1060,8 +1107,11 @@ def assertContainsSubset(self, expected_subset, actual_set, msg=None): if not missing: return - self.fail('Missing elements %s\nExpected: %s\nActual: %s' % ( - missing, expected_subset, actual_set), msg) + self.fail( + 'Missing elements %s\nExpected: %s\nActual: %s' + % (missing, expected_subset, actual_set), + msg, + ) def assertNoCommonElements(self, expected_seq, actual_seq, msg=None): """Checks whether actual iterable and expected iterable are disjoint.""" @@ -1069,8 +1119,11 @@ def assertNoCommonElements(self, expected_seq, actual_seq, msg=None): if not common: return - self.fail('Common elements %s\nExpected: %s\nActual: %s' % ( - common, expected_seq, actual_seq), msg) + self.fail( + 'Common elements %s\nExpected: %s\nActual: %s' + % (common, expected_seq, actual_seq), + msg, + ) def assertItemsEqual(self, expected_seq, actual_seq, msg=None): """Deprecated, please use assertCountEqual instead. @@ -1108,11 +1161,14 @@ def assertSameElements(self, expected_seq, actual_seq, msg=None): # Fail on strings: empirically, passing strings to this test method # is almost always a bug. If comparing the character sets of two strings # is desired, cast the inputs to sets or lists explicitly. - if (isinstance(expected_seq, _TEXT_OR_BINARY_TYPES) or - isinstance(actual_seq, _TEXT_OR_BINARY_TYPES)): - self.fail('Passing string/bytes to assertSameElements is usually a bug. ' - 'Did you mean to use assertEqual?\n' - 'Expected: %s\nActual: %s' % (expected_seq, actual_seq)) + if isinstance(expected_seq, _TEXT_OR_BINARY_TYPES) or isinstance( + actual_seq, _TEXT_OR_BINARY_TYPES + ): + self.fail( + 'Passing string/bytes to assertSameElements is usually a bug. ' + 'Did you mean to use assertEqual?\n' + 'Expected: %s\nActual: %s' % (expected_seq, actual_seq) + ) try: expected = {element: None for element in expected_seq} actual = {element: None for element in actual_seq} @@ -1142,10 +1198,12 @@ def assertSameElements(self, expected_seq, actual_seq, msg=None): # has a different error format. However, I find this slightly more readable. def assertMultiLineEqual(self, first, second, msg=None, **kwargs): """Asserts that two multi-line strings are equal.""" - assert isinstance(first, - str), ('First argument is not a string: %r' % (first,)) - assert isinstance(second, - str), ('Second argument is not a string: %r' % (second,)) + assert isinstance(first, str), 'First argument is not a string: %r' % ( + first, + ) + assert isinstance(second, str), 'Second argument is not a string: %r' % ( + second, + ) line_limit = kwargs.pop('line_limit', 0) if kwargs: raise TypeError(f'Unexpected keyword args {tuple(kwargs)}') @@ -1167,15 +1225,17 @@ def assertMultiLineEqual(self, first, second, msg=None, **kwargs): failure_message = failure_message[:line_limit] failure_message.append( '(... and {} more delta lines omitted for brevity.)\n'.format( - n_omitted)) + n_omitted + ) + ) raise self.failureException(''.join(failure_message)) def assertBetween(self, value, minv, maxv, msg=None): """Asserts that value is between minv and maxv (inclusive).""" - msg = self._formatMessage(msg, - '"%r" unexpectedly not between "%r" and "%r"' % - (value, minv, maxv)) + msg = self._formatMessage( + msg, '"%r" unexpectedly not between "%r" and "%r"' % (value, minv, maxv) + ) self.assertTrue(minv <= value, msg) self.assertTrue(maxv >= value, msg) @@ -1208,13 +1268,12 @@ def assertRegexMatch(self, actual_str, regexes, message=None): Args: actual_str: The string we try to match with the items in regexes. - regexes: The regular expressions we want to match against str. - See "Notes" above for detailed notes on how this is interpreted. + regexes: The regular expressions we want to match against str. See + "Notes" above for detailed notes on how this is interpreted. message: The message to be printed if the test fails. """ if isinstance(regexes, _TEXT_OR_BINARY_TYPES): - self.fail('regexes is string or bytes; use assertRegex instead.', - message) + self.fail('regexes is string or bytes; use assertRegex instead.', message) if not regexes: self.fail('No regexes specified.', message) @@ -1233,28 +1292,33 @@ def assertRegexMatch(self, actual_str, regexes, message=None): if regex_type is str: regex = '(?:%s)' % ')|(?:'.join(regexes) elif regex_type is bytes: - regex = b'(?:' + (b')|(?:'.join(regexes)) + b')' + regex = b'(?:' + b')|(?:'.join(regexes) + b')' else: - self.fail('Only know how to deal with unicode str or bytes regexes.', - message) + self.fail( + 'Only know how to deal with unicode str or bytes regexes.', message + ) if not re.search(regex, actual_str, re.MULTILINE): - self.fail('"%s" does not contain any of these regexes: %s.' % - (actual_str, regexes), message) + self.fail( + '"%s" does not contain any of these regexes: %s.' + % (actual_str, regexes), + message, + ) - def assertCommandSucceeds(self, command, regexes=(b'',), env=None, - close_fds=True, msg=None): + def assertCommandSucceeds( + self, command, regexes=(b'',), env=None, close_fds=True, msg=None + ): """Asserts that a shell command succeeds (i.e. exits with code 0). Args: command: List or string representing the command to run. regexes: List of regular expression byte strings that match success. env: Dictionary of environment variable settings. If None, no environment - variables will be set for the child process. This is to make tests - more hermetic. NOTE: this behavior is different than the standard - subprocess module. + variables will be set for the child process. This is to make tests more + hermetic. NOTE: this behavior is different than the standard subprocess + module. close_fds: Whether or not to close all open fd's in the child after - forking. + forking. msg: Optional message to report on failure. """ (ret_code, err) = get_command_stderr(command, env, close_fds) @@ -1267,13 +1331,17 @@ def assertCommandSucceeds(self, command, regexes=(b'',), env=None, command_string = get_command_string(command) self.assertEqual( - ret_code, 0, - self._formatMessage(msg, - 'Running command\n' - '%s failed with error code %s and message\n' - '%s' % (_quote_long_string(command_string), - ret_code, - _quote_long_string(err))) + ret_code, + 0, + self._formatMessage( + msg, + 'Running command\n%s failed with error code %s and message\n%s' + % ( + _quote_long_string(command_string), + ret_code, + _quote_long_string(err), + ), + ), ) self.assertRegexMatch( err, @@ -1282,25 +1350,47 @@ def assertCommandSucceeds(self, command, regexes=(b'',), env=None, msg, 'Running command\n' '%s failed with error code %s and message\n' - '%s which matches no regex in %s' % ( + '%s which matches no regex in %s' + % ( _quote_long_string(command_string), ret_code, _quote_long_string(err), - regexes))) + regexes, + ), + ), + ) + + def executeCommand(self, command, env=None, close_fds=True): + """Executes a shell command and returns the std out and std error. + + Args: + command: List or string representing the command to run. + env: Dictionary of environment variable settings. If None, no environment + variables will be set for the child process. This is to make tests more + hermetic. NOTE: this behavior is different than the standard subprocess + module. + close_fds: Whether or not to close all open fd's in the child after + forking. + + Returns: + A tuple of (return code, std error). + """ + return get_command_stderr(command, env, close_fds) - def assertCommandFails(self, command, regexes, env=None, close_fds=True, - msg=None): + def assertCommandFails( + self, command, regexes, env=None, close_fds=True, msg=None + ): """Asserts a shell command fails and the error matches a regex in a list. Args: command: List or string representing the command to run. regexes: the list of regular expression strings. env: Dictionary of environment variable settings. If None, no environment - variables will be set for the child process. This is to make tests - more hermetic. NOTE: this behavior is different than the standard - subprocess module. + variables will be set for the child process. This is to make tests more + hermetic. NOTE: this behavior is different than the standard subprocess + module. close_fds: Whether or not to close all open fd's in the child after - forking. + forking. msg: Optional message to report on failure. """ (ret_code, err) = get_command_stderr(command, env, close_fds) @@ -1313,10 +1403,14 @@ def assertCommandFails(self, command, regexes, env=None, close_fds=True, command_string = get_command_string(command) self.assertNotEqual( - ret_code, 0, - self._formatMessage(msg, 'The following command succeeded ' - 'while expected to fail:\n%s' % - _quote_long_string(command_string))) + ret_code, + 0, + self._formatMessage( + msg, + 'The following command succeeded while expected to fail:\n%s' + % _quote_long_string(command_string), + ), + ) self.assertRegexMatch( err, regexes, @@ -1324,11 +1418,15 @@ def assertCommandFails(self, command, regexes, env=None, close_fds=True, msg, 'Running command\n' '%s failed with error code %s and message\n' - '%s which matches no regex in %s' % ( + '%s which matches no regex in %s' + % ( _quote_long_string(command_string), ret_code, _quote_long_string(err), - regexes))) + regexes, + ), + ), + ) class _AssertRaisesContext: @@ -1343,8 +1441,9 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, tb): if exc_type is None: - self.test_case.fail(self.expected_exception.__name__ + ' not raised', - self.msg) + self.test_case.fail( + self.expected_exception.__name__ + ' not raised', self.msg + ) if not issubclass(exc_type, self.expected_exception): return False self.test_func(exc_value) @@ -1354,27 +1453,34 @@ def __exit__(self, exc_type, exc_value, tb): @typing.overload def assertRaisesWithPredicateMatch( - self, expected_exception, predicate) -> _AssertRaisesContext: + self, expected_exception, predicate + ) -> _AssertRaisesContext: # The purpose of this return statement is to work around # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored. return self._AssertRaisesContext(None, None, None) @typing.overload def assertRaisesWithPredicateMatch( - self, expected_exception, predicate, callable_obj: Callable[..., Any], - *args, **kwargs) -> None: + self, + expected_exception, + predicate, + callable_obj: Callable[..., Any], + *args, + **kwargs, + ) -> None: # The purpose of this return statement is to work around # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored. return self._AssertRaisesContext(None, None, None) # type: ignore[return-value] - def assertRaisesWithPredicateMatch(self, expected_exception, predicate, - callable_obj=None, *args, **kwargs): + def assertRaisesWithPredicateMatch( + self, expected_exception, predicate, callable_obj=None, *args, **kwargs + ): """Asserts that exception is thrown and predicate(exception) is true. Args: expected_exception: Exception class expected to be raised. predicate: Function of one argument that inspects the passed-in exception - and returns True (success) or False (please fail the test). + and returns True (success) or False (please fail the test). callable_obj: Function to be called. *args: Extra args. **kwargs: Extra keyword args. @@ -1385,9 +1491,11 @@ def assertRaisesWithPredicateMatch(self, expected_exception, predicate, Raises: self.failureException if callable_obj does not raise a matching exception. """ + def Check(err): - self.assertTrue(predicate(err), - '%r does not match predicate %r' % (err, predicate)) + self.assertTrue( + predicate(err), '%r does not match predicate %r' % (err, predicate) + ) context = self._AssertRaisesContext(expected_exception, self, Check) if callable_obj is None: @@ -1405,15 +1513,25 @@ def assertRaisesWithLiteralMatch( @typing.overload def assertRaisesWithLiteralMatch( - self, expected_exception, expected_exception_message, - callable_obj: Callable[..., Any], *args, **kwargs) -> None: + self, + expected_exception, + expected_exception_message, + callable_obj: Callable[..., Any], + *args, + **kwargs, + ) -> None: # The purpose of this return statement is to work around # https://github.com/PyCQA/pylint/issues/5273; it is otherwise ignored. return self._AssertRaisesContext(None, None, None) # type: ignore[return-value] - def assertRaisesWithLiteralMatch(self, expected_exception, - expected_exception_message, - callable_obj=None, *args, **kwargs): + def assertRaisesWithLiteralMatch( + self, + expected_exception, + expected_exception_message, + callable_obj=None, + *args, + **kwargs, + ): """Asserts that the message in a raised exception equals the given string. Unlike assertRaisesRegex, this method takes a literal string, not @@ -1425,8 +1543,8 @@ def assertRaisesWithLiteralMatch(self, expected_exception, Args: expected_exception: Exception class expected to be raised. expected_exception_message: String message expected in the raised - exception. For a raise exception e, expected_exception_message must - equal str(e). + exception. For a raise exception e, expected_exception_message must + equal str(e). callable_obj: Function to be called, or None to return a context. *args: Extra args. **kwargs: Extra kwargs. @@ -1437,13 +1555,14 @@ def assertRaisesWithLiteralMatch(self, expected_exception, Raises: self.failureException if callable_obj does not raise a matching exception. """ + def Check(err): actual_exception_message = str(err) - self.assertTrue(expected_exception_message == actual_exception_message, - 'Exception message does not match.\n' - 'Expected: %r\n' - 'Actual: %r' % (expected_exception_message, - actual_exception_message)) + self.assertTrue( + expected_exception_message == actual_exception_message, + 'Exception message does not match.\nExpected: %r\nActual: %r' + % (expected_exception_message, actual_exception_message), + ) context = self._AssertRaisesContext(expected_exception, self, Check) if callable_obj is None: @@ -1458,8 +1577,8 @@ def assertContainsInOrder(self, strings, target, msg=None): Args: strings: A list of strings, such as [ 'fox', 'dog' ] - target: A target string in which to look for the strings, such as - 'The quick brown fox jumped over the lazy dog'. + target: A target string in which to look for the strings, such as 'The + quick brown fox jumped over the lazy dog'. msg: Optional message to report on failure. """ if isinstance(strings, (bytes, unicode if str is bytes else str)): @@ -1470,11 +1589,13 @@ def assertContainsInOrder(self, strings, target, msg=None): for string in strings: index = target.find(str(string), current_index) if index == -1 and current_index == 0: - self.fail("Did not find '%s' in '%s'" % - (string, target), msg) + self.fail("Did not find '%s' in '%s'" % (string, target), msg) elif index == -1: - self.fail("Did not find '%s' after '%s' in '%s'" % - (string, last_string, target), msg) + self.fail( + "Did not find '%s' after '%s' in '%s'" + % (string, last_string, target), + msg, + ) last_string = string current_index = index @@ -1502,8 +1623,11 @@ def assertContainsSubsequence(self, container, subsequence, msg=None): pass if first_nonmatching is not None: - self.fail('%s not a subsequence of %s. First non-matching element: %s' % - (subsequence, container, first_nonmatching), msg) + self.fail( + '%s not a subsequence of %s. First non-matching element: %s' + % (subsequence, container, first_nonmatching), + msg, + ) def assertContainsExactSubsequence(self, container, subsequence, msg=None): """Asserts that "container" contains "subsequence" as an exact subsequence. @@ -1525,15 +1649,19 @@ def assertContainsExactSubsequence(self, container, subsequence, msg=None): if longest_match == len(subsequence): break index = 0 - while (index < len(subsequence) and - subsequence[index] == container[start + index]): + while ( + index < len(subsequence) + and subsequence[index] == container[start + index] + ): index += 1 longest_match = max(longest_match, index) if longest_match < len(subsequence): - self.fail('%s not an exact subsequence of %s. ' - 'Longest matching prefix: %s' % - (subsequence, container, subsequence[:longest_match]), msg) + self.fail( + '%s not an exact subsequence of %s. Longest matching prefix: %s' + % (subsequence, container, subsequence[:longest_match]), + msg, + ) def assertTotallyOrdered(self, *groups, **kwargs): """Asserts that total ordering has been implemented correctly. @@ -1569,74 +1697,90 @@ def __lt__(self, other): Args: *groups: A list of groups of elements. Each group of elements is a list - of objects that are equal. The elements in each group must be less - than the elements in the group after it. For example, these groups are + of objects that are equal. The elements in each group must be less than + the elements in the group after it. For example, these groups are totally ordered: ``[None]``, ``[1]``, ``[2, 2]``, ``[3]``. **kwargs: optional msg keyword argument can be passed. """ def CheckOrder(small, big): """Ensures small is ordered before big.""" - self.assertFalse(small == big, - self._formatMessage(msg, '%r unexpectedly equals %r' % - (small, big))) - self.assertTrue(small != big, - self._formatMessage(msg, '%r unexpectedly equals %r' % - (small, big))) + self.assertFalse( + small == big, + self._formatMessage(msg, '%r unexpectedly equals %r' % (small, big)), + ) + self.assertTrue( + small != big, + self._formatMessage(msg, '%r unexpectedly equals %r' % (small, big)), + ) self.assertLess(small, big, msg) - self.assertFalse(big < small, - self._formatMessage(msg, - '%r unexpectedly less than %r' % - (big, small))) + self.assertFalse( + big < small, + self._formatMessage( + msg, '%r unexpectedly less than %r' % (big, small) + ), + ) self.assertLessEqual(small, big, msg) - self.assertFalse(big <= small, self._formatMessage( - '%r unexpectedly less than or equal to %r' % (big, small), msg - )) + self.assertFalse( + big <= small, + self._formatMessage( + '%r unexpectedly less than or equal to %r' % (big, small), msg + ), + ) self.assertGreater(big, small, msg) - self.assertFalse(small > big, - self._formatMessage(msg, - '%r unexpectedly greater than %r' % - (small, big))) + self.assertFalse( + small > big, + self._formatMessage( + msg, '%r unexpectedly greater than %r' % (small, big) + ), + ) self.assertGreaterEqual(big, small) - self.assertFalse(small >= big, self._formatMessage( - msg, - '%r unexpectedly greater than or equal to %r' % (small, big))) + self.assertFalse( + small >= big, + self._formatMessage( + msg, '%r unexpectedly greater than or equal to %r' % (small, big) + ), + ) def CheckEqual(a, b): """Ensures that a and b are equal.""" self.assertEqual(a, b, msg) - self.assertFalse(a != b, - self._formatMessage(msg, '%r unexpectedly unequals %r' % - (a, b))) + self.assertFalse( + a != b, + self._formatMessage(msg, '%r unexpectedly unequals %r' % (a, b)), + ) # Objects that compare equal must hash to the same value, but this only # applies if both objects are hashable. - if (isinstance(a, abc.Hashable) and - isinstance(b, abc.Hashable)): + if isinstance(a, abc.Hashable) and isinstance(b, abc.Hashable): self.assertEqual( - hash(a), hash(b), + hash(a), + hash(b), self._formatMessage( - msg, 'hash %d of %r unexpectedly not equal to hash %d of %r' % - (hash(a), a, hash(b), b))) - - self.assertFalse(a < b, - self._formatMessage(msg, - '%r unexpectedly less than %r' % - (a, b))) - self.assertFalse(b < a, - self._formatMessage(msg, - '%r unexpectedly less than %r' % - (b, a))) + msg, + 'hash %d of %r unexpectedly not equal to hash %d of %r' + % (hash(a), a, hash(b), b), + ), + ) + + self.assertFalse( + a < b, + self._formatMessage(msg, '%r unexpectedly less than %r' % (a, b)), + ) + self.assertFalse( + b < a, + self._formatMessage(msg, '%r unexpectedly less than %r' % (b, a)), + ) self.assertLessEqual(a, b, msg) self.assertLessEqual(b, a, msg) # pylint: disable=arguments-out-of-order - self.assertFalse(a > b, - self._formatMessage(msg, - '%r unexpectedly greater than %r' % - (a, b))) - self.assertFalse(b > a, - self._formatMessage(msg, - '%r unexpectedly greater than %r' % - (b, a))) + self.assertFalse( + a > b, + self._formatMessage(msg, '%r unexpectedly greater than %r' % (a, b)), + ) + self.assertFalse( + b > a, + self._formatMessage(msg, '%r unexpectedly greater than %r' % (b, a)), + ) self.assertGreaterEqual(a, b, msg) self.assertGreaterEqual(b, a, msg) # pylint: disable=arguments-out-of-order @@ -1647,7 +1791,7 @@ def CheckEqual(a, b): for elements in itertools.product(*groups): elements = list(elements) for index, small in enumerate(elements[:-1]): - for big in elements[index + 1:]: + for big in elements[index + 1 :]: CheckOrder(small, big) # Check that every element in each group is equal. @@ -1930,11 +2074,16 @@ def assertUrlEqual(self, a, b, msg=None): self.assertEqual(parsed_a.netloc, parsed_b.netloc, msg) self.assertEqual(parsed_a.path, parsed_b.path, msg) self.assertEqual(parsed_a.fragment, parsed_b.fragment, msg) - self.assertEqual(sorted(parsed_a.params.split(';')), - sorted(parsed_b.params.split(';')), msg) + self.assertEqual( + sorted(parsed_a.params.split(';')), + sorted(parsed_b.params.split(';')), + msg, + ) self.assertDictEqual( parse.parse_qs(parsed_a.query, keep_blank_values=True), - parse.parse_qs(parsed_b.query, keep_blank_values=True), msg) + parse.parse_qs(parsed_b.query, keep_blank_values=True), + msg, + ) def assertSameStructure(self, a, b, aname='a', bname='b', msg=None): """Asserts that two values contain the same structural content. @@ -1962,14 +2111,15 @@ def assertSameStructure(self, a, b, aname='a', bname='b', msg=None): # rather than just stopping at the first problems = [] - _walk_structure_for_problems(a, b, aname, bname, problems, - self.assertEqual, self.failureException) + _walk_structure_for_problems( + a, b, aname, bname, problems, self.assertEqual, self.failureException + ) # Avoid spamming the user toooo much if self.maxDiff is not None: max_problems_to_show = self.maxDiff // 80 if len(problems) > max_problems_to_show: - problems = problems[0:max_problems_to_show-1] + ['...'] + problems = problems[0 : max_problems_to_show - 1] + ['...'] if problems: self.fail('; '.join(problems), msg) @@ -1988,19 +2138,28 @@ def assertJsonEqual(self, first, second, msg=None): try: first_structured = json.loads(first) except ValueError as e: - raise ValueError(self._formatMessage( - msg, - 'could not decode first JSON value %s: %s' % (first, e))) + raise ValueError( + self._formatMessage( + msg, 'could not decode first JSON value %s: %s' % (first, e) + ) + ) try: second_structured = json.loads(second) except ValueError as e: - raise ValueError(self._formatMessage( - msg, - 'could not decode second JSON value %s: %s' % (second, e))) + raise ValueError( + self._formatMessage( + msg, 'could not decode second JSON value %s: %s' % (second, e) + ) + ) - self.assertSameStructure(first_structured, second_structured, - aname='first', bname='second', msg=msg) + self.assertSameStructure( + first_structured, + second_structured, + aname='first', + bname='second', + msg=msg, + ) def _getAssertEqualityFunc( self, first: Any, second: Any @@ -2035,6 +2194,7 @@ def _sorted_list_difference( Args: expected: The list we expected. actual: The list we actually got. + Returns: (missing, unexpected) missing: items in expected that are not in actual. @@ -2078,9 +2238,12 @@ def _are_both_of_integer_type(a: object, b: object) -> bool: def _are_both_of_sequence_type(a: object, b: object) -> bool: - return isinstance(a, abc.Sequence) and isinstance( - b, abc.Sequence) and not isinstance( - a, _TEXT_OR_BINARY_TYPES) and not isinstance(b, _TEXT_OR_BINARY_TYPES) + return ( + isinstance(a, abc.Sequence) + and isinstance(b, abc.Sequence) + and not isinstance(a, _TEXT_OR_BINARY_TYPES) + and not isinstance(b, _TEXT_OR_BINARY_TYPES) + ) def _are_both_of_set_type(a: object, b: object) -> bool: @@ -2088,8 +2251,7 @@ def _are_both_of_set_type(a: object, b: object) -> bool: def _are_both_of_mapping_type(a: object, b: object) -> bool: - return isinstance(a, abc.Mapping) and isinstance( - b, abc.Mapping) + return isinstance(a, abc.Mapping) and isinstance(b, abc.Mapping) def _walk_structure_for_problems( @@ -2097,20 +2259,23 @@ def _walk_structure_for_problems( ): """The recursive comparison behind assertSameStructure.""" if type(a) != type(b) and not ( # pylint: disable=unidiomatic-typecheck - _are_both_of_integer_type(a, b) or _are_both_of_sequence_type(a, b) or - _are_both_of_set_type(a, b) or _are_both_of_mapping_type(a, b)): + _are_both_of_integer_type(a, b) + or _are_both_of_sequence_type(a, b) + or _are_both_of_set_type(a, b) + or _are_both_of_mapping_type(a, b) + ): # We do not distinguish between int and long types as 99.99% of Python 2 # code should never care. They collapse into a single type in Python 3. - problem_list.append('%s is a %r but %s is a %r' % - (aname, type(a), bname, type(b))) + problem_list.append( + '%s is a %r but %s is a %r' % (aname, type(a), bname, type(b)) + ) # If they have different types there's no point continuing return if isinstance(a, abc.Set): for k in a: if k not in b: - problem_list.append( - '%s has %r but %s does not' % (aname, k, bname)) + problem_list.append('%s has %r but %s does not' % (aname, k, bname)) for k in b: if k not in a: problem_list.append('%s lacks %r but %s has it' % (aname, k, bname)) @@ -2121,32 +2286,47 @@ def _walk_structure_for_problems( for k in a: if k in b: _walk_structure_for_problems( - a[k], b[k], '%s[%r]' % (aname, k), '%s[%r]' % (bname, k), - problem_list, leaf_assert_equal_func, failure_exception) + a[k], + b[k], + '%s[%r]' % (aname, k), + '%s[%r]' % (bname, k), + problem_list, + leaf_assert_equal_func, + failure_exception, + ) else: problem_list.append( - "%s has [%r] with value %r but it's missing in %s" % - (aname, k, a[k], bname)) + "%s has [%r] with value %r but it's missing in %s" + % (aname, k, a[k], bname) + ) for k in b: if k not in a: problem_list.append( - '%s lacks [%r] but %s has it with value %r' % - (aname, k, bname, b[k])) + '%s lacks [%r] but %s has it with value %r' + % (aname, k, bname, b[k]) + ) # Strings/bytes are Sequences but we'll just do those with regular != - elif (isinstance(a, abc.Sequence) and - not isinstance(a, _TEXT_OR_BINARY_TYPES)): + elif isinstance(a, abc.Sequence) and not isinstance(a, _TEXT_OR_BINARY_TYPES): minlen = min(len(a), len(b)) for i in range(minlen): _walk_structure_for_problems( - a[i], b[i], '%s[%d]' % (aname, i), '%s[%d]' % (bname, i), - problem_list, leaf_assert_equal_func, failure_exception) + a[i], + b[i], + '%s[%d]' % (aname, i), + '%s[%d]' % (bname, i), + problem_list, + leaf_assert_equal_func, + failure_exception, + ) for i in range(minlen, len(a)): - problem_list.append('%s has [%i] with value %r but %s does not' % - (aname, i, a[i], bname)) + problem_list.append( + '%s has [%i] with value %r but %s does not' % (aname, i, a[i], bname) + ) for i in range(minlen, len(b)): - problem_list.append('%s lacks [%i] but %s has it with value %r' % - (aname, i, bname, b[i])) + problem_list.append( + '%s lacks [%i] but %s has it with value %r' % (aname, i, bname, b[i]) + ) else: try: @@ -2160,6 +2340,7 @@ def get_command_string(command): Args: command: List or string representing the command to run. + Returns: A string suitable for use as a shell command. """ @@ -2183,16 +2364,17 @@ def get_command_stderr(command, env=None, close_fds=True): Args: command: List or string representing the command to run. env: Dictionary of environment variable settings. If None, no environment - variables will be set for the child process. This is to make tests - more hermetic. NOTE: this behavior is different than the standard - subprocess module. + variables will be set for the child process. This is to make tests more + hermetic. NOTE: this behavior is different than the standard subprocess + module. close_fds: Whether or not to close all open fd's in the child after forking. - On Windows, this is ignored and close_fds is always False. + On Windows, this is ignored and close_fds is always False. Returns: Tuple of (exit status, text printed to stdout and stderr by the command). """ - if env is None: env = {} + if env is None: + env = {} if os.name == 'nt': # Windows does not support setting close_fds to True while also redirecting # standard handles. @@ -2237,18 +2419,17 @@ def _quote_long_string(s: Union[str, bytes, bytearray]) -> str: s = s.decode('utf-8') except UnicodeDecodeError: s = str(s) - return ('8<-----------\n' + - s + '\n' + - '----------->8\n') + return '8<-----------\n' + s + '\n' + '----------->8\n' def print_python_version() -> None: # Having this in the test output logs by default helps debugging when all # you've got is the log and no other idea of which Python was used. - sys.stderr.write('Running tests under Python {0[0]}.{0[1]}.{0[2]}: ' - '{1}\n'.format( - sys.version_info, - sys.executable if sys.executable else 'embedded.')) + sys.stderr.write( + 'Running tests under Python {0[0]}.{0[1]}.{0[2]}: {1}\n'.format( + sys.version_info, sys.executable if sys.executable else 'embedded.' + ) + ) def main(*args: str, **kwargs: Any) -> None: @@ -2261,9 +2442,9 @@ def main(*args: str, **kwargs: Any) -> None: Args: *args: Positional arguments passed through to - ``unittest.TestProgram.__init__``. + ``unittest.TestProgram.__init__``. **kwargs: Keyword arguments passed through to - ``unittest.TestProgram.__init__``. + ``unittest.TestProgram.__init__``. """ print_python_version() _run_in_app(run_tests, args, kwargs) @@ -2287,8 +2468,9 @@ def _register_sigterm_with_faulthandler() -> None: try: faulthandler.register(signal.SIGTERM, chain=True) # pytype: disable=module-attr except Exception as e: # pylint: disable=broad-except - sys.stderr.write('faulthandler.register(SIGTERM) failed ' - '%r; ignoring.\n' % e) + sys.stderr.write( + 'faulthandler.register(SIGTERM) failed %r; ignoring.\n' % e + ) def _run_in_app( @@ -2329,8 +2511,8 @@ def _run_in_app( Args: function: absltest.run_tests or a similar function. It will be called as - function(argv, args, kwargs) where argv is a list containing the - elements of sys.argv without the command-line flags. + function(argv, args, kwargs) where argv is a list containing the elements + of sys.argv without the command-line flags. args: Positional arguments passed through to unittest.TestProgram.__init__. kwargs: Keyword arguments passed through to unittest.TestProgram.__init__. """ @@ -2383,9 +2565,13 @@ def _is_suspicious_attribute( attr = getattr(testCaseClass, name) if inspect.isfunction(attr) or inspect.ismethod(attr): args = inspect.getfullargspec(attr) - return (len(args.args) == 1 and args.args[0] == 'self' and - args.varargs is None and args.varkw is None and - not args.kwonlyargs) + return ( + len(args.args) == 1 + and args.args[0] == 'self' + and args.varargs is None + and args.varkw is None + and not args.kwonlyargs + ) return False @@ -2498,10 +2684,13 @@ def getTestCaseNames(self, testCaseClass): # pylint:disable=invalid-name names = list(super().getTestCaseNames(testCaseClass)) if self._randomize_ordering_seed is not None: logging.info( - 'Randomizing test order with seed: %d', self._randomize_ordering_seed) + 'Randomizing test order with seed: %d', self._randomize_ordering_seed + ) logging.info( 'To reproduce this order, re-run with ' - '--test_randomize_ordering_seed=%d', self._randomize_ordering_seed) + '--test_randomize_ordering_seed=%d', + self._randomize_ordering_seed, + ) self._random.shuffle(names) return names @@ -2514,7 +2703,8 @@ def get_default_xml_output_filename() -> Optional[str]: elif os.environ.get('TEST_XMLOUTPUTDIR'): return os.path.join( os.environ['TEST_XMLOUTPUTDIR'], - os.path.splitext(os.path.basename(sys.argv[0]))[0] + '.xml') + os.path.splitext(os.path.basename(sys.argv[0]))[0] + '.xml', + ) return None @@ -2605,8 +2795,10 @@ def _setup_sharding( with open(os.environ['TEST_SHARD_STATUS_FILE'], 'w') as f: f.write('') except OSError: - sys.stderr.write('Error opening TEST_SHARD_STATUS_FILE (%s). Exiting.' - % os.environ['TEST_SHARD_STATUS_FILE']) + sys.stderr.write( + 'Error opening TEST_SHARD_STATUS_FILE (%s). Exiting.' + % os.environ['TEST_SHARD_STATUS_FILE'] + ) sys.exit(1) base_loader = custom_loader or TestLoader() @@ -2618,8 +2810,10 @@ def _setup_sharding( shard_index = int(os.environ['TEST_SHARD_INDEX']) if shard_index < 0 or shard_index >= total_shards: - sys.stderr.write('ERROR: Bad sharding values. index=%d, total=%d\n' % - (shard_index, total_shards)) + sys.stderr.write( + 'ERROR: Bad sharding values. index=%d, total=%d\n' + % (shard_index, total_shards) + ) sys.exit(1) # Replace the original getTestCaseNames with one that returns @@ -2734,11 +2928,14 @@ def _run_and_get_tests_result( # We can reuse testRunner if it supports XML output (e. g. by inheriting # from xml_reporter.TextAndXMLTestRunner). Otherwise we need to use # xml_reporter.TextAndXMLTestRunner. - if (kwargs.get('testRunner') is not None - and not hasattr(kwargs['testRunner'], 'set_default_xml_stream')): - sys.stderr.write('WARNING: XML_OUTPUT_FILE or --xml_output_file setting ' - 'overrides testRunner=%r setting (possibly from --pdb)' - % (kwargs['testRunner'])) + if kwargs.get('testRunner') is not None and not hasattr( + kwargs['testRunner'], 'set_default_xml_stream' + ): + sys.stderr.write( + 'WARNING: XML_OUTPUT_FILE or --xml_output_file setting ' + 'overrides testRunner=%r setting (possibly from --pdb)' + % (kwargs['testRunner']) + ) # Passing a class object here allows TestProgram to initialize # instances based on its kwargs and/or parsed command-line args. kwargs['testRunner'] = xml_test_runner_class @@ -2753,7 +2950,8 @@ def _run_and_get_tests_result( # If we've used a seed to randomize test case ordering, we want to record it # as a top-level attribute in the `testsuites` section of the XML output. randomize_ordering_seed = getattr( - kwargs['testLoader'], '_randomize_ordering_seed', None) + kwargs['testLoader'], '_randomize_ordering_seed', None + ) setter = getattr(kwargs['testRunner'], 'set_testsuites_property', None) if randomize_ordering_seed and setter: setter('test_randomize_ordering_seed', randomize_ordering_seed) @@ -2767,9 +2965,10 @@ def _run_and_get_tests_result( # Overriding testRunner isn't uncommon, so only enable the debugging # integration if the runner claims it does; we don't want to accidentally # clobber something on the runner. - if ((isinstance(runner, type) and - issubclass(runner, _pretty_print_reporter.TextTestRunner)) or - isinstance(runner, _pretty_print_reporter.TextTestRunner)): + if ( + isinstance(runner, type) + and issubclass(runner, _pretty_print_reporter.TextTestRunner) + ) or isinstance(runner, _pretty_print_reporter.TextTestRunner): runner.run_for_debugging = True # Make sure tmpdir exists.