Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions guarddog/analyzer/metadata/go/typosquatting.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json
import logging
import os
from typing import Optional

from guarddog.analyzer.metadata.typosquatting import TyposquatDetector
from guarddog.utils.config import TOP_PACKAGES_CACHE_LOCATION

log = logging.getLogger("guarddog")


class GoTyposquatDetector(TyposquatDetector):
"""Detector for typosquatting attacks for go modules. Checks for distance one Levenshtein,
Expand All @@ -25,12 +28,7 @@ def _get_top_packages(self) -> set:
)

top_packages_path = os.path.join(resources_dir, top_packages_filename)

top_packages_information = None

if top_packages_filename in os.listdir(resources_dir):
with open(top_packages_path, "r") as top_packages_file:
top_packages_information = json.load(top_packages_file)
top_packages_information = self._get_top_packages_local(top_packages_path)

if top_packages_information is None:
raise Exception(
Expand All @@ -39,6 +37,15 @@ def _get_top_packages(self) -> set:

return set(top_packages_information)

def _get_top_packages_local(self, path: str) -> list[dict] | None:
try:
with open(path, "r") as f:
result = json.load(f)
return result
except FileNotFoundError:
log.error(f"File not found: {path}")
return None

def detect(
self,
package_info,
Expand Down
55 changes: 43 additions & 12 deletions guarddog/analyzer/metadata/npm/typosquatting.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
Expand All @@ -7,6 +8,8 @@
from guarddog.utils.config import TOP_PACKAGES_CACHE_LOCATION
import requests

log = logging.getLogger("guarddog")


class NPMTyposquatDetector(TyposquatDetector):
"""Detector for typosquatting attacks. Detects if a package name is a typosquat of one of the top 5000 packages.
Expand All @@ -32,24 +35,52 @@ def _get_top_packages(self) -> set:
)

top_packages_path = os.path.join(resources_dir, top_packages_filename)
top_packages_information = self._get_top_packages_local(top_packages_path)

top_packages_information = None

if top_packages_filename in os.listdir(resources_dir):
update_time = datetime.fromtimestamp(os.path.getmtime(top_packages_path))
if self._file_is_expired(top_packages_path, days=30):
new_information = self._get_top_packages_network(popular_packages_url)
if new_information is not None:
top_packages_information = new_information

if datetime.now() - update_time <= timedelta(days=30):
with open(top_packages_path, "r") as top_packages_file:
top_packages_information = json.load(top_packages_file)
with open(top_packages_path, "w+") as f:
json.dump(new_information, f, ensure_ascii=False, indent=4)

if top_packages_information is None:
response = requests.get(popular_packages_url).json()
top_packages_information = list([i["name"] for i in response[0:8000]])
with open(top_packages_path, "w+") as f:
json.dump(top_packages_information, f, ensure_ascii=False, indent=4)

return set()
return set(top_packages_information)

def _file_is_expired(self, path: str, days: int) -> bool:
try:
update_time = datetime.fromtimestamp(os.path.getmtime(path))
return datetime.now() - update_time > timedelta(days=days)
except FileNotFoundError:
return True

def _get_top_packages_local(self, path: str) -> list[dict] | None:
try:
with open(path, "r") as f:
result = json.load(f)
return result
except FileNotFoundError:
log.error(f"File not found: {path}")
return None

def _get_top_packages_network(self, url: str) -> list[dict] | None:
try:
response = requests.get(url)
response.raise_for_status()

response_data = response.json()
result = list([i["name"] for i in response_data[0:8000]])

return result
except json.JSONDecodeError:
log.error(f'Couldn`t convert to json: "{response.text}"')
return None
except requests.exceptions.RequestException as e:
log.error(f"Network error: {e}")
return None

def detect(
self,
package_info,
Expand Down
68 changes: 51 additions & 17 deletions guarddog/analyzer/metadata/pypi/typosquatting.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,61 @@ def _get_top_packages(self) -> set:
)

top_packages_path = os.path.join(resources_dir, top_packages_filename)
top_packages_information = self._get_top_packages_local(top_packages_path)

top_packages_information = None
if self._file_is_expired(top_packages_path, days=30):
new_information = self._get_top_packages_network(popular_packages_url)
if new_information is not None:
top_packages_information = new_information

if top_packages_filename in os.listdir(resources_dir):
update_time = datetime.fromtimestamp(os.path.getmtime(top_packages_path))

if datetime.now() - update_time <= timedelta(days=30):
with open(top_packages_path, "r") as top_packages_file:
top_packages_information = json.load(top_packages_file)["rows"]
with open(top_packages_path, "w+") as f:
json.dump(new_information, f, ensure_ascii=False, indent=4)

if top_packages_information is None:
response = requests.get(popular_packages_url).json()
with open(top_packages_path, "w+") as f:
json.dump(response, f, ensure_ascii=False, indent=4)

top_packages_information = response["rows"]

def get_safe_name(package):
return packaging.utils.canonicalize_name(package["project"])

return set(map(get_safe_name, top_packages_information))
return set()
return set(map(self.get_safe_name, top_packages_information))

@staticmethod
def get_safe_name(package):
return packaging.utils.canonicalize_name(package["project"])

def _file_is_expired(self, path: str, days: int) -> bool:
try:
update_time = datetime.fromtimestamp(os.path.getmtime(path))
return datetime.now() - update_time > timedelta(days=days)
except FileNotFoundError:
return True

def _get_top_packages_local(self, path: str) -> list[dict] | None:
try:
with open(path, "r") as f:
result = json.load(f)
return self.extract_information(result)
except FileNotFoundError:
log.error(f"File not found: {path}")
return None

def _get_top_packages_network(self, url: str) -> list[dict] | None:
try:
response = requests.get(url)
response.raise_for_status()

response_data = response.json()
result = response_data

return self.extract_information(result)
except json.JSONDecodeError:
log.error(f'Couldn`t convert to json: "{response.text}"')
return None
except requests.exceptions.RequestException as e:
log.error(f"Network error: {e}")
return None

@staticmethod
def extract_information(data: dict | None) -> list[dict] | None:
if data is not None:
return data.get("rows")
return None

def detect(
self,
Expand Down