diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b800d86..203736d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,11 +26,14 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install tox + pip install tox black - name: Run Tox run: tox + - name: Run linter + run: black --check . + test_python_2: runs-on: ubuntu-latest diff --git a/backtracepython/__init__.py b/backtracepython/__init__.py index 282fd7c..869697b 100644 --- a/backtracepython/__init__.py +++ b/backtracepython/__init__.py @@ -17,10 +17,20 @@ else: from urllib import urlencode -__all__ = ["BacktraceReport", "initialize", "finalize", "terminate", "version", "version_string", "send_last_exception", "send_report"] +__all__ = [ + "BacktraceReport", + "initialize", + "finalize", + "terminate", + "version", + "version_string", + "send_last_exception", + "send_report", +] attribute_manager = AttributeManager() + class globs: endpoint = None next_except_hook = None @@ -32,35 +42,43 @@ class globs: worker = None next_source_code_id = 0 + child_py_path = os.path.join(os.path.dirname(__file__), "child.py") + def get_python_version(): return "{} {}.{}.{}-{}".format( platform.python_implementation(), sys.version_info.major, sys.version_info.minor, sys.version_info.micro, - sys.version_info.releaselevel) + sys.version_info.releaselevel, + ) + def send_worker_msg(msg): - payload = json.dumps(msg, ignore_nan=True).encode('utf-8') + payload = json.dumps(msg, ignore_nan=True).encode("utf-8") globs.worker.stdin.write(payload) - globs.worker.stdin.write("\n".encode('utf-8')) + globs.worker.stdin.write("\n".encode("utf-8")) globs.worker.stdin.flush() + def walk_tb_backwards(tb): while tb is not None: yield tb.tb_frame, tb.tb_lineno tb = tb.tb_next + def walk_tb(tb): return reversed(list(walk_tb_backwards(tb))) + def make_unique_source_code_id(): result = str(globs.next_source_code_id) globs.next_source_code_id += 1 return result + def add_source_code(source_path, source_code_dict, source_path_dict, line): try: the_id = source_path_dict[source_path] @@ -68,38 +86,43 @@ def add_source_code(source_path, source_code_dict, source_path_dict, line): the_id = make_unique_source_code_id() source_path_dict[source_path] = the_id source_code_dict[the_id] = { - 'minLine': line, - 'maxLine': line, - 'path': source_path, + "minLine": line, + "maxLine": line, + "path": source_path, } return the_id - if line < source_code_dict[the_id]['minLine']: - source_code_dict[the_id]['minLine'] = line - if line > source_code_dict[the_id]['maxLine']: - source_code_dict[the_id]['maxLine'] = line + if line < source_code_dict[the_id]["minLine"]: + source_code_dict[the_id]["minLine"] = line + if line > source_code_dict[the_id]["maxLine"]: + source_code_dict[the_id]["maxLine"] = line return the_id + def process_frame(tb_frame, line, source_code_dict, source_path_dict): source_file = os.path.abspath(tb_frame.f_code.co_filename) frame = { - 'funcName': tb_frame.f_code.co_name, - 'line': line, - 'sourceCode': add_source_code(source_file, source_code_dict, source_path_dict, line), + "funcName": tb_frame.f_code.co_name, + "line": line, + "sourceCode": add_source_code( + source_file, source_code_dict, source_path_dict, line + ), } return frame + def get_main_thread(): if sys.version_info.major >= 3: return threading.main_thread() first = None for thread in threading.enumerate(): - if thread.name == 'MainThread': + if thread.name == "MainThread": return thread if first is None: first = thread return first + class BacktraceReport: def __init__(self): self.fault_thread = threading.current_thread() @@ -107,99 +130,114 @@ def __init__(self): self.source_path_dict = {} entry_source_code_id = None import __main__ + cwd_path = os.path.abspath(os.getcwd()) entry_thread = get_main_thread() - if hasattr(__main__, '__file__'): - entry_source_code_id = add_source_code(__main__.__file__, self.source_code, self.source_path_dict, 1) if hasattr(__main__, '__file__') else None - - init_attrs = { - 'error.type': 'Exception' - } + if hasattr(__main__, "__file__"): + entry_source_code_id = ( + add_source_code( + __main__.__file__, self.source_code, self.source_path_dict, 1 + ) + if hasattr(__main__, "__file__") + else None + ) + + init_attrs = {"error.type": "Exception"} init_attrs.update(attribute_manager.get()) self.log_lines = [] self.report = { - 'uuid': str(uuid.uuid4()), - 'timestamp': int(time.time()), - 'lang': "python", - 'langVersion': get_python_version(), - 'agent': "backtrace-python", - 'agentVersion': version_string, - 'mainThread': str(self.fault_thread.ident), - 'entryThread': str(entry_thread.ident), - 'cwd': cwd_path, - 'attributes': init_attrs, - 'annotations': { - 'Environment Variables': dict(os.environ), + "uuid": str(uuid.uuid4()), + "timestamp": int(time.time()), + "lang": "python", + "langVersion": get_python_version(), + "agent": "backtrace-python", + "agentVersion": version_string, + "mainThread": str(self.fault_thread.ident), + "entryThread": str(entry_thread.ident), + "cwd": cwd_path, + "attributes": init_attrs, + "annotations": { + "Environment Variables": dict(os.environ), }, } if entry_source_code_id is not None: - self.report['entrySourceCode'] = entry_source_code_id + self.report["entrySourceCode"] = entry_source_code_id def set_exception(self, garbage, ex_value, ex_traceback): - self.report['classifiers'] = [ex_value.__class__.__name__] - self.report['attributes']['error.message'] = str(ex_value) + self.report["classifiers"] = [ex_value.__class__.__name__] + self.report["attributes"]["error.message"] = str(ex_value) threads = {} for thread in threading.enumerate(): if thread.ident == self.fault_thread.ident: threads[str(self.fault_thread.ident)] = { - 'name': self.fault_thread.name, - 'stack': [process_frame(frame, line, self.source_code, - self.source_path_dict) for frame, line in walk_tb(ex_traceback)], + "name": self.fault_thread.name, + "stack": [ + process_frame( + frame, line, self.source_code, self.source_path_dict + ) + for frame, line in walk_tb(ex_traceback) + ], } else: threads[str(thread.ident)] = { - 'name': thread.name, + "name": thread.name, } - self.report['threads'] = threads + self.report["threads"] = threads def capture_last_exception(self): self.set_exception(*sys.exc_info()) def set_attribute(self, key, value): - self.report['attributes'][key] = value + self.report["attributes"][key] = value def set_dict_attributes(self, target_dict): - self.report['attributes'].update(target_dict) + self.report["attributes"].update(target_dict) def set_annotation(self, key, value): - self.report['annotations'][key] = value - - def get_attributes(self): - return self.report['attributes'] + self.report["annotations"][key] = value + + def get_attributes(self): + return self.report["attributes"] def set_dict_annotations(self, target_dict): - self.report['annotations'].update(target_dict) + self.report["annotations"].update(target_dict) def log(self, line): - self.log_lines.append({ - 'ts': time.time(), - 'msg': line, - }) + self.log_lines.append( + { + "ts": time.time(), + "msg": line, + } + ) def send(self): - if len(self.log_lines) != 0 and 'Log' not in self.report['annotations']: - self.report['annotations']['Log'] = self.log_lines - send_worker_msg({ - 'id': 'send', - 'report': self.report, - 'context_line_count': globs.context_line_count, - 'timeout': globs.timeout, - 'endpoint': globs.endpoint, - 'tab_width': globs.tab_width, - 'debug_backtrace': globs.debug_backtrace, - 'source_code': self.source_code, - }) + if len(self.log_lines) != 0 and "Log" not in self.report["annotations"]: + self.report["annotations"]["Log"] = self.log_lines + send_worker_msg( + { + "id": "send", + "report": self.report, + "context_line_count": globs.context_line_count, + "timeout": globs.timeout, + "endpoint": globs.endpoint, + "tab_width": globs.tab_width, + "debug_backtrace": globs.debug_backtrace, + "source_code": self.source_code, + } + ) + def create_and_send_report(ex_type, ex_value, ex_traceback): report = BacktraceReport() report.set_exception(ex_type, ex_value, ex_traceback) - report.set_attribute('error.type', 'Unhandled exception') + report.set_attribute("error.type", "Unhandled exception") report.send() + def bt_except_hook(ex_type, ex_value, ex_traceback): if globs.debug_backtrace: # Go back to normal exceptions while we do our work here. @@ -221,45 +259,60 @@ def bt_except_hook(ex_type, ex_value, ex_traceback): # Send the exception on to the next thing in the chain. globs.next_except_hook(ex_type, ex_value, ex_traceback) -def initialize(**kwargs): - globs.endpoint = construct_submission_url(kwargs['endpoint'], kwargs.get('token', None)) - globs.debug_backtrace = kwargs.get('debug_backtrace', False) - globs.timeout = kwargs.get('timeout', 4) - globs.tab_width = kwargs.get('tab_width', 8) - globs.context_line_count = kwargs.get('context_line_count', 200) - attribute_manager.add(kwargs.get('attributes', {})) +def initialize(**kwargs): + globs.endpoint = construct_submission_url( + kwargs["endpoint"], kwargs.get("token", None) + ) + globs.debug_backtrace = kwargs.get("debug_backtrace", False) + globs.timeout = kwargs.get("timeout", 4) + globs.tab_width = kwargs.get("tab_width", 8) + globs.context_line_count = kwargs.get("context_line_count", 200) + + attribute_manager.add(kwargs.get("attributes", {})) stdio_value = None if globs.debug_backtrace else subprocess.PIPE - globs.worker = subprocess.Popen([sys.executable, child_py_path], - stdin=subprocess.PIPE, stdout=stdio_value, stderr=stdio_value) - - disable_global_handler = kwargs.get('disable_global_handler', False) + globs.worker = subprocess.Popen( + [sys.executable, child_py_path], + stdin=subprocess.PIPE, + stdout=stdio_value, + stderr=stdio_value, + ) + + disable_global_handler = kwargs.get("disable_global_handler", False) if not disable_global_handler: globs.next_except_hook = sys.excepthook sys.excepthook = bt_except_hook + def construct_submission_url(endpoint, token): if "submit.backtrace.io" in endpoint or token is None: return endpoint - return "{}/post?{}".format(endpoint, urlencode({ - 'token': token, - 'format': "json", - })) - + return "{}/post?{}".format( + endpoint, + urlencode( + { + "token": token, + "format": "json", + } + ), + ) + + def finalize(): - send_worker_msg({ 'id': 'terminate' }) + send_worker_msg({"id": "terminate"}) if not globs.debug_backtrace: globs.worker.stdout.close() globs.worker.stderr.close() globs.worker.wait() + def send_last_exception(**kwargs): report = BacktraceReport() report.capture_last_exception() - report.set_dict_attributes(kwargs.get('attributes', {})) - report.set_dict_annotations(kwargs.get('annotations', {})) - report.set_attribute('error.type', 'Exception') + report.set_dict_attributes(kwargs.get("attributes", {})) + report.set_dict_annotations(kwargs.get("annotations", {})) + report.set_attribute("error.type", "Exception") report.send() @@ -269,11 +322,12 @@ def make_an_exception(): except: return sys.exc_info() + def send_report(msg, **kwargs): report = BacktraceReport() report.set_exception(*make_an_exception()) - report.set_dict_attributes(kwargs.get('attributes', {})) - report.set_dict_annotations(kwargs.get('annotations', {})) - report.set_attribute('error.message', msg) - report.set_attribute('error.type', 'Message') + report.set_dict_attributes(kwargs.get("attributes", {})) + report.set_dict_annotations(kwargs.get("annotations", {})) + report.set_attribute("error.message", msg) + report.set_attribute("error.type", "Message") report.send() diff --git a/backtracepython/child.py b/backtracepython/child.py index 99ba8d9..c3dbdf6 100644 --- a/backtracepython/child.py +++ b/backtracepython/child.py @@ -12,28 +12,32 @@ from urllib2 import urlopen from urllib2 import Request + class globs: tab_width = None debug_backtrace = None context_line_count = None + def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs) + def post_json(full_url, obj): if globs.debug_backtrace: eprint(full_url) eprint(json.dumps(obj, indent=2, ignore_nan=True)) - payload = json.dumps(obj, ignore_nan=True).encode('utf-8') + payload = json.dumps(obj, ignore_nan=True).encode("utf-8") headers = { - 'Content-Type': "application/json", - 'Content-Length': len(payload), + "Content-Type": "application/json", + "Content-Length": len(payload), } req = Request(full_url, payload, headers) resp = urlopen(req) if resp.code != 200: raise Exception(resp.code, resp.read()) + def read_file_or_none(file_path): try: with open(file_path) as f: @@ -41,6 +45,7 @@ def read_file_or_none(file_path): except: return None + def create_source_object(source_path, min_line, max_line): ext = os.path.splitext(source_path)[1] if ext != ".pyc": @@ -50,37 +55,42 @@ def create_source_object(source_path, min_line, max_line): line_start = max(min_line - globs.context_line_count - 1, 0) line_end = min(len(lines), max_line + globs.context_line_count + 1) return { - 'text': "\n".join(lines[line_start:line_end]), - 'startLine': line_start + 1, - 'startColumn': 1, - 'path': source_path, - 'tabWidth': globs.tab_width, + "text": "\n".join(lines[line_start:line_end]), + "startLine": line_start + 1, + "startColumn": 1, + "path": source_path, + "tabWidth": globs.tab_width, } - return { 'path': source_path } + return {"path": source_path} + def collect_source_code(report, source_code_dict): out_source_code = {} for key in source_code_dict: item = source_code_dict[key] - out_source_code[key] = create_source_object(item['path'], item['minLine'], item['maxLine']) - report['sourceCode'] = out_source_code + out_source_code[key] = create_source_object( + item["path"], item["minLine"], item["maxLine"] + ) + report["sourceCode"] = out_source_code + def prepare_and_send_report(msg): - globs.tab_width = msg['tab_width'] - globs.debug_backtrace = msg['debug_backtrace'] - globs.context_line_count = msg['context_line_count'] + globs.tab_width = msg["tab_width"] + globs.debug_backtrace = msg["debug_backtrace"] + globs.context_line_count = msg["context_line_count"] - report = msg['report'] - source_code = msg['source_code'] + report = msg["report"] + source_code = msg["source_code"] collect_source_code(report, source_code) - post_json(msg['endpoint'], report) + post_json(msg["endpoint"], report) + for line in sys.stdin: msg = json.loads(line) - if msg['id'] == 'terminate': + if msg["id"] == "terminate": sys.exit(0) - elif msg['id'] == 'send': + elif msg["id"] == "send": prepare_and_send_report(msg) else: - raise Exception("invalid message id", msg['id']) + raise Exception("invalid message id", msg["id"]) diff --git a/example/__init__.py b/example/__init__.py index 0a26ece..894d098 100644 --- a/example/__init__.py +++ b/example/__init__.py @@ -6,14 +6,18 @@ def open_file(name): open(name).read() + def main(): backtrace.initialize( - endpoint= os.getenv('BACKTRACE_SUBMISSION_URL', '"https://submit.backtrace.io/your-universe/token/json"'), - attributes = { - 'application': 'example-app', - 'application.version': '1.0.0', - 'version': '1.0.0' - } + endpoint=os.getenv( + "BACKTRACE_SUBMISSION_URL", + '"https://submit.backtrace.io/your-universe/token/json"', + ), + attributes={ + "application": "example-app", + "application.version": "1.0.0", + "version": "1.0.0", + }, ) # send an exception from the try/catch block @@ -22,11 +26,12 @@ def main(): except: backtrace.send_last_exception() - # send a message + # send a message backtrace.send_report("test message") # generate and send unhandled exception open_file("not existing file") + if __name__ == "__main__": main() diff --git a/setup.py b/setup.py index 20f87f3..0786186 100644 --- a/setup.py +++ b/setup.py @@ -5,27 +5,27 @@ import backtracepython setup( - name='backtracepython', + name="backtracepython", version=backtracepython.version_string, - description='Backtrace.io error reporting tool for Python', - author='Backtrace.io', - author_email='team@backtrace.io', - packages=find_packages(), + description="Backtrace.io error reporting tool for Python", + author="Backtrace.io", + author_email="team@backtrace.io", + packages=find_packages(), test_suite="tests", - url='https://github.com/backtrace-labs/backtrace-python', + url="https://github.com/backtrace-labs/backtrace-python", install_requires=[ - 'six', - 'simplejson', + "six", + "simplejson", ], extras_require={ - 'test': ['pytest'], + "test": ["pytest"], }, - python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4', + python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4", classifiers=[ - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", ], ) diff --git a/tests/exe/afile.py b/tests/exe/afile.py index 9bd882c..70c05f2 100644 --- a/tests/exe/afile.py +++ b/tests/exe/afile.py @@ -1,10 +1,13 @@ import bfile + def three(): bfile.bar("this is not valid json") + def two(): three() + def foo(): two() diff --git a/tests/exe/bfile.py b/tests/exe/bfile.py index f428c2f..1de4f95 100644 --- a/tests/exe/bfile.py +++ b/tests/exe/bfile.py @@ -1,4 +1,5 @@ import simplejson as json + def bar(s): return json.loads(s) diff --git a/tests/exe/multi_file.py b/tests/exe/multi_file.py index 9781c47..0a49ffa 100644 --- a/tests/exe/multi_file.py +++ b/tests/exe/multi_file.py @@ -1,6 +1,7 @@ import sys import os import afile + root_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.insert(0, root_dir) import backtracepython as bt @@ -9,15 +10,14 @@ endpoint = "http://{}:{}".format(host, port) -bt.initialize( - endpoint=endpoint, - token="FakeToken", - debug_backtrace=True) +bt.initialize(endpoint=endpoint, token="FakeToken", debug_backtrace=True) + def call_a_file(arg): if arg: afile.foo() + call_a_file(False) call_a_file(True) call_a_file(False) diff --git a/tests/exe/send_report.py b/tests/exe/send_report.py index 4fb75c0..5d10651 100644 --- a/tests/exe/send_report.py +++ b/tests/exe/send_report.py @@ -1,4 +1,5 @@ import sys, os + root_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.insert(0, root_dir) import backtracepython as bt @@ -14,7 +15,11 @@ context_line_count=2, ) + def do_the_thing(): - bt.send_report("dsa", annotations={"color": "blue"}, attributes={"genre": "happy hardcore"}) + bt.send_report( + "dsa", annotations={"color": "blue"}, attributes={"genre": "happy hardcore"} + ) + do_the_thing() diff --git a/tests/exe/simple_report.py b/tests/exe/simple_report.py index 1042802..bd4e984 100644 --- a/tests/exe/simple_report.py +++ b/tests/exe/simple_report.py @@ -1,4 +1,5 @@ import sys, os + root_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.insert(0, root_dir) import backtracepython as bt @@ -11,7 +12,7 @@ endpoint=endpoint, token="FakeToken", debug_backtrace=True, - attributes={'a': 1, 'b': "bar"}, + attributes={"a": 1, "b": "bar"}, ) example_local_var = "hello this is my value" diff --git a/tests/exe/threads.py b/tests/exe/threads.py index 23d5e97..90240d8 100644 --- a/tests/exe/threads.py +++ b/tests/exe/threads.py @@ -1,4 +1,5 @@ import sys, os + root_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) sys.path.insert(0, root_dir) import backtracepython as bt @@ -22,10 +23,12 @@ def thread_one(): while True: time.sleep(0.1) + def thread_two(): while True: time.sleep(0.2) + threading.Thread(target=thread_one, name="Uno", daemon=True).start() threading.Thread(target=thread_one, name="Dos", daemon=True).start() threading.Thread(target=thread_two, name="Tres", daemon=True).start() diff --git a/tests/test_basic_flow.py b/tests/test_basic_flow.py index 540f751..63d906e 100644 --- a/tests/test_basic_flow.py +++ b/tests/test_basic_flow.py @@ -13,50 +13,62 @@ exe_dir = os.path.join(tests_dir, "exe") debug_backtrace = False + def check_basic_report(obj): - assert obj['lang'] == "python" - assert obj['agent'] == "backtrace-python" - assert obj['classifiers'][0] == "NameError" + assert obj["lang"] == "python" + assert obj["agent"] == "backtrace-python" + assert obj["classifiers"][0] == "NameError" - if obj['langVersion'].startswith("PyPy"): - assert obj['attributes']['error.message'] == "global name 'b' is not defined" + if obj["langVersion"].startswith("PyPy"): + assert obj["attributes"]["error.message"] == "global name 'b' is not defined" else: - assert obj['attributes']['error.message'] == "name 'b' is not defined" + assert obj["attributes"]["error.message"] == "name 'b' is not defined" + + source_code_id = obj["threads"][obj["mainThread"]]["stack"][0]["sourceCode"] + assert obj["sourceCode"][source_code_id]["path"].endswith( + "tests/exe/simple_report.py" + ) + assert obj["sourceCode"][source_code_id]["text"].endswith("\na = b\n") - source_code_id = obj['threads'][obj['mainThread']]['stack'][0]['sourceCode'] - assert obj['sourceCode'][source_code_id]['path'].endswith("tests/exe/simple_report.py") - assert obj['sourceCode'][source_code_id]['text'].endswith("\na = b\n") + assert obj["attributes"]["a"] == 1 + assert obj["attributes"]["b"] == "bar" - assert obj['attributes']['a'] == 1 - assert obj['attributes']['b'] == "bar" -def check_multi_file(obj): - if obj['langVersion'].startswith("PyPy"): - assert obj['classifiers'][0] == "ValueError" - assert obj['attributes']['error.message'] == "Error when decoding true at char 1" +def check_multi_file(obj): + if obj["langVersion"].startswith("PyPy"): + assert obj["classifiers"][0] == "ValueError" + assert ( + obj["attributes"]["error.message"] == "Error when decoding true at char 1" + ) else: - assert obj['classifiers'][0] == "JSONDecodeError" - assert obj['attributes']['error.message'] == "Expecting value: line 1 column 1 (char 0)" + assert obj["classifiers"][0] == "JSONDecodeError" + assert ( + obj["attributes"]["error.message"] + == "Expecting value: line 1 column 1 (char 0)" + ) + + fault_stack = obj["threads"][obj["mainThread"]]["stack"] + source_code_id = fault_stack[-1]["sourceCode"] + assert obj["sourceCode"][source_code_id]["path"].endswith("tests/exe/multi_file.py") + lines = obj["sourceCode"][source_code_id]["text"].split("\n") + assert lines[fault_stack[-1]["line"] - 1] == "call_a_file(True)" - fault_stack = obj['threads'][obj['mainThread']]['stack'] - source_code_id = fault_stack[-1]['sourceCode'] - assert obj['sourceCode'][source_code_id]['path'].endswith("tests/exe/multi_file.py") - lines = obj['sourceCode'][source_code_id]['text'].split("\n") - assert lines[fault_stack[-1]['line'] - 1] == 'call_a_file(True)' + assert fault_stack[-6]["funcName"] == "bar" + assert fault_stack[-6]["line"] == 5 - assert fault_stack[-6]['funcName'] == "bar" - assert fault_stack[-6]['line'] == 4 def check_send_report(obj): if sys.version_info.major >= 3: - assert obj['attributes']['error.message'] == "dsa" + assert obj["attributes"]["error.message"] == "dsa" + + assert obj["attributes"]["genre"] == "happy hardcore" + assert obj["annotations"]["color"] == "blue" - assert obj['attributes']['genre'] == 'happy hardcore' - assert obj['annotations']['color'] == 'blue' def check_threads(obj): if sys.version_info.major >= 3: - assert len(obj['threads']) == 4 + assert len(obj["threads"]) == 4 + def run_one_test(check_fn, exe_name): requested_server_address = ("127.0.0.1", 0) @@ -69,7 +81,7 @@ def do_POST(self): self.send_response(200) self.end_headers() payload = self.rfile.read() - json_string = payload.decode('utf-8', 'strict') + json_string = payload.decode("utf-8", "strict") non_local.json_object = json.loads(json_string) def log_message(self, format, *args): @@ -80,8 +92,11 @@ def log_message(self, format, *args): exe_path = os.path.join(exe_dir, exe_name) stdio_action = None if debug_backtrace else subprocess.PIPE - child = subprocess.Popen([sys.executable, exe_path, host, str(port)], - stdout=stdio_action, stderr=stdio_action) + child = subprocess.Popen( + [sys.executable, exe_path, host, str(port)], + stdout=stdio_action, + stderr=stdio_action, + ) httpd.handle_request() check_fn(non_local.json_object) @@ -90,16 +105,19 @@ def log_message(self, format, *args): child.stdout.close() child.stderr.close() httpd.server_close() - + def test_basic_report(): run_one_test(check_basic_report, "simple_report.py") + def test_multi_file(): run_one_test(check_multi_file, "multi_file.py") + def test_send_report(): run_one_test(check_send_report, "send_report.py") + def test_threads(): run_one_test(check_threads, "threads.py") diff --git a/tests/test_report_attributes.py b/tests/test_report_attributes.py index 528dd73..fdae45a 100644 --- a/tests/test_report_attributes.py +++ b/tests/test_report_attributes.py @@ -2,18 +2,25 @@ report = BacktraceReport() + def test_report_session_attribute_is_defined(): - assert report.get_attributes()['application.session'] is not None + assert report.get_attributes()["application.session"] is not None + def test_report_session_attribute_doesnt_change(): compare_report = BacktraceReport() - assert report.get_attributes()['application.session'] == compare_report.get_attributes()['application.session'] + assert ( + report.get_attributes()["application.session"] + == compare_report.get_attributes()["application.session"] + ) + def test_report_backtrace_reporter_attributes(): attributes = report.get_attributes() - assert attributes['backtrace.agent'] == 'backtrace-python' - assert attributes['backtrace.version'] is not None + assert attributes["backtrace.agent"] == "backtrace-python" + assert attributes["backtrace.version"] is not None + def test_report_attribute_override(): override_report = BacktraceReport() @@ -22,4 +29,4 @@ def test_report_attribute_override(): override_attribute_name = "hostname" override_report.set_attribute(override_attribute_name, expected_value) - assert override_report.get_attributes()[override_attribute_name] == expected_value \ No newline at end of file + assert override_report.get_attributes()[override_attribute_name] == expected_value diff --git a/tests/test_submission_url.py b/tests/test_submission_url.py index e70c9f4..8a73350 100644 --- a/tests/test_submission_url.py +++ b/tests/test_submission_url.py @@ -5,12 +5,18 @@ token = "1234567812345678123456781234567812345678123456781234567812345678" submit_url = "https://submit.backtrace.io/{}/{}/json".format(universe, token) + + def test_direct_submission_url(): - expected_direct_submission_url = "{}/post?token={}&format=json".format(hostname, token) + expected_direct_submission_url = "{}/post?token={}&format=json".format( + hostname, token + ) assert construct_submission_url(hostname, token) == expected_direct_submission_url + def test_submit_submission_url(): assert construct_submission_url(submit_url, None) == submit_url + def test_submit_submission_url_when_token_available(): assert construct_submission_url(submit_url, token) == submit_url