From 7931e8f220a08af6523cc81fef54c66518d23a3b Mon Sep 17 00:00:00 2001 From: Nolan Parker Date: Sat, 18 Oct 2025 01:49:51 -0500 Subject: [PATCH 1/3] Significantly faster graceful exit from CTRL+C (from 60+ to around 15-30 seconds) --- sherlock_project/sherlock.py | 578 ++++++++++++++++++----------------- 1 file changed, 296 insertions(+), 282 deletions(-) diff --git a/sherlock_project/sherlock.py b/sherlock_project/sherlock.py index 75b3e3d70..2dfb81dac 100644 --- a/sherlock_project/sherlock.py +++ b/sherlock_project/sherlock.py @@ -108,6 +108,29 @@ def response_time(resp, *args, **kwargs): return super(SherlockFuturesSession, self).request( method, url, hooks=hooks, *args, **kwargs ) + + def __enter__(self): + # Called when the 'with' block is entered. + # It just returns the instance itself. + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + # Called when the 'with' block is exited, regardless of success or exception. + + # If an exception occurred and the exception is a Keyboard Interrupt exception, + # do a fast shutdown of the thread. + if exc_type is not None: + if exc_type is KeyboardInterrupt: + print("\nCtrl+C detected. Initiating thread shutdown...") + self.executor.shutdown(wait=False, cancel_futures=True) + # Propogate the KeyboardInterrupt exception. + return False + + # Shut down the thread normally if there was no KeyboardInterrupt exception. + self.executor.shutdown(wait=True, cancel_futures=False) + + # Ensure any other exceptions are still propogated. + return False def get_response(request_future, error_type, social_network): @@ -217,289 +240,287 @@ def sherlock( else: max_workers = len(site_data) - # Create multi-threaded session for all requests. - session = SherlockFuturesSession( - max_workers=max_workers, session=underlying_session - ) - - # Results from analysis of all sites - results_total = {} + # Use 'with' to trigger the explicit shutdown of the thread pool (executor). + # This is faster (approx. 14s) and cleaner than waiting for Python's default cleanup. + with SherlockFuturesSession(max_workers=max_workers, session=underlying_session) as session: + # Results from analysis of all sites + results_total = {} - # First create futures for all requests. This allows for the requests to run in parallel - for social_network, net_info in site_data.items(): - # Results from analysis of this specific site - results_site = {"url_main": net_info.get("urlMain")} + # First create futures for all requests. This allows for the requests to run in parallel + for social_network, net_info in site_data.items(): + # Results from analysis of this specific site + results_site = {"url_main": net_info.get("urlMain")} - # Record URL of main site + # Record URL of main site - # A user agent is needed because some sites don't return the correct - # information since they think that we are bots (Which we actually are...) - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:129.0) Gecko/20100101 Firefox/129.0", - } + # A user agent is needed because some sites don't return the correct + # information since they think that we are bots (Which we actually are...) + headers = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:129.0) Gecko/20100101 Firefox/129.0", + } - if "headers" in net_info: - # Override/append any extra headers required by a given site. - headers.update(net_info["headers"]) + if "headers" in net_info: + # Override/append any extra headers required by a given site. + headers.update(net_info["headers"]) - # URL of user on site (if it exists) - url = interpolate_string(net_info["url"], username.replace(' ', '%20')) - - # Don't make request if username is invalid for the site - regex_check = net_info.get("regexCheck") - if regex_check and re.search(regex_check, username) is None: - # No need to do the check at the site: this username is not allowed. - results_site["status"] = QueryResult( - username, social_network, url, QueryStatus.ILLEGAL - ) - results_site["url_user"] = "" - results_site["http_status"] = "" - results_site["response_text"] = "" - query_notify.update(results_site["status"]) - else: # URL of user on site (if it exists) - results_site["url_user"] = url - url_probe = net_info.get("urlProbe") - request_method = net_info.get("request_method") - request_payload = net_info.get("request_payload") - request = None - - if request_method is not None: - if request_method == "GET": - request = session.get - elif request_method == "HEAD": - request = session.head - elif request_method == "POST": - request = session.post - elif request_method == "PUT": - request = session.put - else: - raise RuntimeError(f"Unsupported request_method for {url}") - - if request_payload is not None: - request_payload = interpolate_string(request_payload, username) - - if url_probe is None: - # Probe URL is normal one seen by people out on the web. - url_probe = url - else: - # There is a special URL for probing existence separate - # from where the user profile normally can be found. - url_probe = interpolate_string(url_probe, username) - - if request is None: - if net_info["errorType"] == "status_code": - # In most cases when we are detecting by status code, - # it is not necessary to get the entire body: we can - # detect fine with just the HEAD response. - request = session.head - else: - # Either this detect method needs the content associated - # with the GET response, or this specific website will - # not respond properly unless we request the whole page. - request = session.get - - if net_info["errorType"] == "response_url": - # Site forwards request to a different URL if username not - # found. Disallow the redirect so we can capture the - # http status from the original URL request. - allow_redirects = False - else: - # Allow whatever redirect that the site wants to do. - # The final result of the request will be what is available. - allow_redirects = True - - # This future starts running the request in a new thread, doesn't block the main thread - if proxy is not None: - proxies = {"http": proxy, "https": proxy} - future = request( - url=url_probe, - headers=headers, - proxies=proxies, - allow_redirects=allow_redirects, - timeout=timeout, - json=request_payload, + url = interpolate_string(net_info["url"], username.replace(' ', '%20')) + + # Don't make request if username is invalid for the site + regex_check = net_info.get("regexCheck") + if regex_check and re.search(regex_check, username) is None: + # No need to do the check at the site: this username is not allowed. + results_site["status"] = QueryResult( + username, social_network, url, QueryStatus.ILLEGAL ) + results_site["url_user"] = "" + results_site["http_status"] = "" + results_site["response_text"] = "" + query_notify.update(results_site["status"]) else: - future = request( - url=url_probe, - headers=headers, - allow_redirects=allow_redirects, - timeout=timeout, - json=request_payload, - ) + # URL of user on site (if it exists) + results_site["url_user"] = url + url_probe = net_info.get("urlProbe") + request_method = net_info.get("request_method") + request_payload = net_info.get("request_payload") + request = None + + if request_method is not None: + if request_method == "GET": + request = session.get + elif request_method == "HEAD": + request = session.head + elif request_method == "POST": + request = session.post + elif request_method == "PUT": + request = session.put + else: + raise RuntimeError(f"Unsupported request_method for {url}") - # Store future in data for access later - net_info["request_future"] = future - - # Add this site's results into final dictionary with all the other results. - results_total[social_network] = results_site - - # Open the file containing account links - for social_network, net_info in site_data.items(): - # Retrieve results again - results_site = results_total.get(social_network) - - # Retrieve other site information again - url = results_site.get("url_user") - status = results_site.get("status") - if status is not None: - # We have already determined the user doesn't exist here - continue - - # Get the expected error type - error_type = net_info["errorType"] - if isinstance(error_type, str): - error_type: list[str] = [error_type] - - # Retrieve future and ensure it has finished - future = net_info["request_future"] - r, error_text, exception_text = get_response( - request_future=future, error_type=error_type, social_network=social_network - ) + if request_payload is not None: + request_payload = interpolate_string(request_payload, username) - # Get response time for response of our request. - try: - response_time = r.elapsed - except AttributeError: - response_time = None + if url_probe is None: + # Probe URL is normal one seen by people out on the web. + url_probe = url + else: + # There is a special URL for probing existence separate + # from where the user profile normally can be found. + url_probe = interpolate_string(url_probe, username) + + if request is None: + if net_info["errorType"] == "status_code": + # In most cases when we are detecting by status code, + # it is not necessary to get the entire body: we can + # detect fine with just the HEAD response. + request = session.head + else: + # Either this detect method needs the content associated + # with the GET response, or this specific website will + # not respond properly unless we request the whole page. + request = session.get + + if net_info["errorType"] == "response_url": + # Site forwards request to a different URL if username not + # found. Disallow the redirect so we can capture the + # http status from the original URL request. + allow_redirects = False + else: + # Allow whatever redirect that the site wants to do. + # The final result of the request will be what is available. + allow_redirects = True + + # This future starts running the request in a new thread, doesn't block the main thread + if proxy is not None: + proxies = {"http": proxy, "https": proxy} + future = request( + url=url_probe, + headers=headers, + proxies=proxies, + allow_redirects=allow_redirects, + timeout=timeout, + json=request_payload, + ) + else: + future = request( + url=url_probe, + headers=headers, + allow_redirects=allow_redirects, + timeout=timeout, + json=request_payload, + ) - # Attempt to get request information - try: - http_status = r.status_code - except Exception: - http_status = "?" - try: - response_text = r.text.encode(r.encoding or "UTF-8") - except Exception: - response_text = "" - - query_status = QueryStatus.UNKNOWN - error_context = None - - # As WAFs advance and evolve, they will occasionally block Sherlock and - # lead to false positives and negatives. Fingerprints should be added - # here to filter results that fail to bypass WAFs. Fingerprints should - # be highly targetted. Comment at the end of each fingerprint to - # indicate target and date fingerprinted. - WAFHitMsgs = [ - r'.loading-spinner{visibility:hidden}body.no-js .challenge-running{display:none}body.dark{background-color:#222;color:#d9d9d9}body.dark a{color:#fff}body.dark a:hover{color:#ee730a;text-decoration:underline}body.dark .lds-ring div{border-color:#999 transparent transparent}body.dark .font-red{color:#b20f03}body.dark', # 2024-05-13 Cloudflare - r'', # 2024-11-11 Cloudflare error page - r'AwsWafIntegration.forceRefreshToken', # 2024-11-11 Cloudfront (AWS) - r'{return l.onPageView}}),Object.defineProperty(r,"perimeterxIdentifiers",{enumerable:' # 2024-04-09 PerimeterX / Human Security - ] - - if error_text is not None: - error_context = error_text - - elif any(hitMsg in r.text for hitMsg in WAFHitMsgs): - query_status = QueryStatus.WAF + # Store future in data for access later + net_info["request_future"] = future + + # Add this site's results into final dictionary with all the other results. + results_total[social_network] = results_site + + # Open the file containing account links + for social_network, net_info in site_data.items(): + # Retrieve results again + results_site = results_total.get(social_network) + + # Retrieve other site information again + url = results_site.get("url_user") + status = results_site.get("status") + if status is not None: + # We have already determined the user doesn't exist here + continue + + # Get the expected error type + error_type = net_info["errorType"] + if isinstance(error_type, str): + error_type: list[str] = [error_type] + + # Retrieve future and ensure it has finished + future = net_info["request_future"] + r, error_text, exception_text = get_response( + request_future=future, error_type=error_type, social_network=social_network + ) - else: - if any(errtype not in ["message", "status_code", "response_url"] for errtype in error_type): - error_context = f"Unknown error type '{error_type}' for {social_network}" - query_status = QueryStatus.UNKNOWN - else: - if "message" in error_type: - # error_flag True denotes no error found in the HTML - # error_flag False denotes error found in the HTML - error_flag = True - errors = net_info.get("errorMsg") - # errors will hold the error message - # it can be string or list - # by isinstance method we can detect that - # and handle the case for strings as normal procedure - # and if its list we can iterate the errors - if isinstance(errors, str): - # Checks if the error message is in the HTML - # if error is present we will set flag to False - if errors in r.text: - error_flag = False - else: - # If it's list, it will iterate all the error message - for error in errors: - if error in r.text: - error_flag = False - break - if error_flag: - query_status = QueryStatus.CLAIMED - else: - query_status = QueryStatus.AVAILABLE - - if "status_code" in error_type and query_status is not QueryStatus.AVAILABLE: - error_codes = net_info.get("errorCode") - query_status = QueryStatus.CLAIMED - - # Type consistency, allowing for both singlets and lists in manifest - if isinstance(error_codes, int): - error_codes = [error_codes] - - if error_codes is not None and r.status_code in error_codes: - query_status = QueryStatus.AVAILABLE - elif r.status_code >= 300 or r.status_code < 200: - query_status = QueryStatus.AVAILABLE - - if "response_url" in error_type and query_status is not QueryStatus.AVAILABLE: - # For this detection method, we have turned off the redirect. - # So, there is no need to check the response URL: it will always - # match the request. Instead, we will ensure that the response - # code indicates that the request was successful (i.e. no 404, or - # forward to some odd redirect). - if 200 <= r.status_code < 300: - query_status = QueryStatus.CLAIMED - else: - query_status = QueryStatus.AVAILABLE - - if dump_response: - print("+++++++++++++++++++++") - print(f"TARGET NAME : {social_network}") - print(f"USERNAME : {username}") - print(f"TARGET URL : {url}") - print(f"TEST METHOD : {error_type}") + # Get response time for response of our request. try: - print(f"STATUS CODES : {net_info['errorCode']}") - except KeyError: - pass - print("Results...") + response_time = r.elapsed + except AttributeError: + response_time = None + + # Attempt to get request information try: - print(f"RESPONSE CODE : {r.status_code}") + http_status = r.status_code except Exception: - pass - try: - print(f"ERROR TEXT : {net_info['errorMsg']}") - except KeyError: - pass - print(">>>>> BEGIN RESPONSE TEXT") + http_status = "?" try: - print(r.text) + response_text = r.text.encode(r.encoding or "UTF-8") except Exception: - pass - print("<<<<< END RESPONSE TEXT") - print("VERDICT : " + str(query_status)) - print("+++++++++++++++++++++") - - # Notify caller about results of query. - result: QueryResult = QueryResult( - username=username, - site_name=social_network, - site_url_user=url, - status=query_status, - query_time=response_time, - context=error_context, - ) - query_notify.update(result) + response_text = "" + + query_status = QueryStatus.UNKNOWN + error_context = None + + # As WAFs advance and evolve, they will occasionally block Sherlock and + # lead to false positives and negatives. Fingerprints should be added + # here to filter results that fail to bypass WAFs. Fingerprints should + # be highly targetted. Comment at the end of each fingerprint to + # indicate target and date fingerprinted. + WAFHitMsgs = [ + r'.loading-spinner{visibility:hidden}body.no-js .challenge-running{display:none}body.dark{background-color:#222;color:#d9d9d9}body.dark a{color:#fff}body.dark a:hover{color:#ee730a;text-decoration:underline}body.dark .lds-ring div{border-color:#999 transparent transparent}body.dark .font-red{color:#b20f03}body.dark', # 2024-05-13 Cloudflare + r'', # 2024-11-11 Cloudflare error page + r'AwsWafIntegration.forceRefreshToken', # 2024-11-11 Cloudfront (AWS) + r'{return l.onPageView}}),Object.defineProperty(r,"perimeterxIdentifiers",{enumerable:' # 2024-04-09 PerimeterX / Human Security + ] + + if error_text is not None: + error_context = error_text + + elif any(hitMsg in r.text for hitMsg in WAFHitMsgs): + query_status = QueryStatus.WAF + + else: + if any(errtype not in ["message", "status_code", "response_url"] for errtype in error_type): + error_context = f"Unknown error type '{error_type}' for {social_network}" + query_status = QueryStatus.UNKNOWN + else: + if "message" in error_type: + # error_flag True denotes no error found in the HTML + # error_flag False denotes error found in the HTML + error_flag = True + errors = net_info.get("errorMsg") + # errors will hold the error message + # it can be string or list + # by isinstance method we can detect that + # and handle the case for strings as normal procedure + # and if its list we can iterate the errors + if isinstance(errors, str): + # Checks if the error message is in the HTML + # if error is present we will set flag to False + if errors in r.text: + error_flag = False + else: + # If it's list, it will iterate all the error message + for error in errors: + if error in r.text: + error_flag = False + break + if error_flag: + query_status = QueryStatus.CLAIMED + else: + query_status = QueryStatus.AVAILABLE + + if "status_code" in error_type and query_status is not QueryStatus.AVAILABLE: + error_codes = net_info.get("errorCode") + query_status = QueryStatus.CLAIMED - # Save status of request - results_site["status"] = result + # Type consistency, allowing for both singlets and lists in manifest + if isinstance(error_codes, int): + error_codes = [error_codes] + + if error_codes is not None and r.status_code in error_codes: + query_status = QueryStatus.AVAILABLE + elif r.status_code >= 300 or r.status_code < 200: + query_status = QueryStatus.AVAILABLE + + if "response_url" in error_type and query_status is not QueryStatus.AVAILABLE: + # For this detection method, we have turned off the redirect. + # So, there is no need to check the response URL: it will always + # match the request. Instead, we will ensure that the response + # code indicates that the request was successful (i.e. no 404, or + # forward to some odd redirect). + if 200 <= r.status_code < 300: + query_status = QueryStatus.CLAIMED + else: + query_status = QueryStatus.AVAILABLE + + if dump_response: + print("+++++++++++++++++++++") + print(f"TARGET NAME : {social_network}") + print(f"USERNAME : {username}") + print(f"TARGET URL : {url}") + print(f"TEST METHOD : {error_type}") + try: + print(f"STATUS CODES : {net_info['errorCode']}") + except KeyError: + pass + print("Results...") + try: + print(f"RESPONSE CODE : {r.status_code}") + except Exception: + pass + try: + print(f"ERROR TEXT : {net_info['errorMsg']}") + except KeyError: + pass + print(">>>>> BEGIN RESPONSE TEXT") + try: + print(r.text) + except Exception: + pass + print("<<<<< END RESPONSE TEXT") + print("VERDICT : " + str(query_status)) + print("+++++++++++++++++++++") + + # Notify caller about results of query. + result: QueryResult = QueryResult( + username=username, + site_name=social_network, + site_url_user=url, + status=query_status, + query_time=response_time, + context=error_context, + ) + query_notify.update(result) - # Save results from request - results_site["http_status"] = http_status - results_site["response_text"] = response_text + # Save status of request + results_site["status"] = result - # Add this site's results into final dictionary with all of the other results. - results_total[social_network] = results_site + # Save results from request + results_site["http_status"] = http_status + results_site["response_text"] = response_text - return results_total + # Add this site's results into final dictionary with all of the other results. + results_total[social_network] = results_site + + return results_total def timeout_check(value): @@ -526,15 +547,6 @@ def timeout_check(value): return float_value - -def handler(signal_received, frame): - """Exit gracefully without throwing errors - - Source: https://www.devdungeon.com/content/python-catch-sigint-ctrl-c - """ - sys.exit(0) - - def main(): parser = ArgumentParser( formatter_class=RawDescriptionHelpFormatter, @@ -703,9 +715,6 @@ def main(): args = parser.parse_args() - # If the user presses CTRL-C, exit gracefully without throwing errors - signal.signal(signal.SIGINT, handler) - # Check for newer version of Sherlock. If it exists, let the user know about it try: latest_release_raw = requests.get(forge_api_latest_release, timeout=10).text @@ -821,14 +830,19 @@ def main(): else: all_usernames.append(username) for username in all_usernames: - results = sherlock( - username, - site_data, - query_notify, - dump_response=args.dump_response, - proxy=args.proxy, - timeout=args.timeout, - ) + # Catch propogated KeyboardInterrupt exception from sherlock function + try: + results = sherlock( + username, + site_data, + query_notify, + dump_response=args.dump_response, + proxy=args.proxy, + timeout=args.timeout, + ) + except KeyboardInterrupt: + print("Exiting program immediately after cleanup") + sys.exit(0) if args.output: result_file = args.output From 5f54f68952795cddae7a2d91c328fae2887a59d4 Mon Sep 17 00:00:00 2001 From: Nolan Parker Date: Sat, 18 Oct 2025 02:03:12 -0500 Subject: [PATCH 2/3] Adjusted new comments for CTRL+C handling in sherlock.py for clarity --- sherlock_project/sherlock.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sherlock_project/sherlock.py b/sherlock_project/sherlock.py index 2dfb81dac..b845e8163 100644 --- a/sherlock_project/sherlock.py +++ b/sherlock_project/sherlock.py @@ -109,16 +109,15 @@ def response_time(resp, *args, **kwargs): method, url, hooks=hooks, *args, **kwargs ) + # Called when the 'with' block in sherlock is entered (necessary for using 'with'). def __enter__(self): - # Called when the 'with' block is entered. - # It just returns the instance itself. + # Just returns the instance itself. return self + # Called when the 'with' block in sherlock is exited, regardless of success or exception. def __exit__(self, exc_type, exc_val, exc_tb): - # Called when the 'with' block is exited, regardless of success or exception. - # If an exception occurred and the exception is a Keyboard Interrupt exception, - # do a fast shutdown of the thread. + # do a fast shutdown of the thread and cancel all futures. if exc_type is not None: if exc_type is KeyboardInterrupt: print("\nCtrl+C detected. Initiating thread shutdown...") @@ -241,7 +240,8 @@ def sherlock( max_workers = len(site_data) # Use 'with' to trigger the explicit shutdown of the thread pool (executor). - # This is faster (approx. 14s) and cleaner than waiting for Python's default cleanup. + # This is faster (approx. 15-30s vs. 60s+) and cleaner than waiting for Python's + # default cleanup. with SherlockFuturesSession(max_workers=max_workers, session=underlying_session) as session: # Results from analysis of all sites results_total = {} From a742c325e82e94767aefa4b6e6caad66f2b41415 Mon Sep 17 00:00:00 2001 From: Nolan Parker Date: Sat, 18 Oct 2025 03:51:02 -0500 Subject: [PATCH 3/3] Removed unnecessary "import signal" line. signal is no longer used. --- sherlock_project/sherlock.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sherlock_project/sherlock.py b/sherlock_project/sherlock.py index b845e8163..bfff59482 100644 --- a/sherlock_project/sherlock.py +++ b/sherlock_project/sherlock.py @@ -17,7 +17,6 @@ sys.exit(1) import csv -import signal import pandas as pd import os import re