Skip to content

Commit

Permalink
feat: support mypy, add more types
Browse files Browse the repository at this point in the history
  • Loading branch information
elkaboussi committed Mar 3, 2024
1 parent 5bc5db4 commit 9f86f1b
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 96 deletions.
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ repos:
hooks:
- id: flake8

# - repo: /~https://github.com/pre-commit/mirrors-mypy
# rev: v0.961
# hooks:
# - id: mypy
- repo: /~https://github.com/pre-commit/mirrors-mypy
rev: v0.961
hooks:
- id: mypy

- repo: /~https://github.com/PyCQA/isort
rev: 5.13.2
hooks:
- id: isort
args: ["--profile", "black"]
args: ["--profile", "black"]
84 changes: 42 additions & 42 deletions analyzer/Categorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from rich import box, print
from rich.table import Table

from analyzer.AnalyzerInterface import AnalyserInterface
from analyzer.AnalyzerInterface import AnalyserInterface, PathLike


class FileInfo(BaseModel):
Expand All @@ -26,69 +26,69 @@ class CategoryInfo(BaseModel):
files: List[FileInfo]


"""
this is an example of CategoryInfo
[
(
'Text', <--- key: category
CategoryInfo( <--- value: CategoryInfo
name='Other',
number_of_files=1,
total_size=0,
files=[FileInfo(
filename=PosixPath('/tmp.txt'),
extension='.txt',
category='Text',
size=0) <--- size in Bytes
]
)
),
(
'Video',
CategoryInfo(
name='Other',
number_of_files=1,
total_size=0,
files=[FileInfo(
filename=PosixPath('/video.mp4'),
extension='.mp4',
category='Video',
size=0)
]
)
),
]
"""
# this is an example of CategoryInfo
# [
# (
# 'Text', <--- key: category
# CategoryInfo( <--- value: CategoryInfo
# name='Other',
# number_of_files=1,
# total_size=0,
# files=[FileInfo(
# filename=PosixPath('/tmp.txt'),
# extension='.txt',
# category='Text',
# size=0) <--- size in Bytes
# ]
# )
# ),
# (
# 'Video',
# CategoryInfo(
# name='Other',
# number_of_files=1,
# total_size=0,
# files=[FileInfo(
# filename=PosixPath('/video.mp4'),
# extension='.mp4',
# category='Video',
# size=0)
# ]
# )
# ),
# ]


file_path = "config/category.json"
category_mapping = Dict[str, List[str]]
category_mapping: Dict[str, List[str]] = {}
try:
with open(file_path, "r") as file:
with open(file_path, "r", encoding="utf-8") as file:
category_mapping = json.load(file)
except Exception as e:
print(f"[red]Error reading category file: due to {e}[/red]", file=sys.stderr)
raise SystemExit(1)
raise SystemExit(1) from e


class Categorization(AnalyserInterface):

def __init__(self) -> None:
self.category_data = defaultdict(
self.category_data: Dict[str, CategoryInfo] = defaultdict(
lambda: CategoryInfo(
name="Other", number_of_files=0, total_size=0, files=[]
)
)

def add(self, filename: Path) -> None:
def add(self, filepath: PathLike) -> None:
"""
Add a file to the categorized files.
Parameters:
- filename (Path): Path to the file.
"""

extension = filename.suffix
if isinstance(filepath, str):
filepath = Path(filepath)
extension = filepath.suffix
category = next(
(
category
Expand All @@ -98,15 +98,15 @@ def add(self, filename: Path) -> None:
"Other",
)
try:
size = filename.stat().st_size
size = filepath.stat().st_size
except FileNotFoundError:
return

self.category_data[category].number_of_files += 1
self.category_data[category].total_size += size
self.category_data[category].files.append(
FileInfo(
filename=filename, extension=extension, category=category, size=size
filename=filepath, extension=extension, category=category, size=size
)
)

Expand Down
11 changes: 5 additions & 6 deletions analyzer/LargeFiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@

from analyzer.AnalyzerInterface import AnalyserInterface

PathLike = Union[Path, str]


@dataclass
class FileEntry:
file_path: Path
file_path: PathLike
size: bitmath.Byte


PathLike = Union[Path, str]


class LargeFileIdentifier(AnalyserInterface):
DEFAULT_THRESHOLD = bitmath.MiB(1)

Expand Down Expand Up @@ -99,7 +98,7 @@ def delete_reported_files(self) -> None:
"""
for entry in self.large_files:
try:
entry.file_path.unlink()
Path(entry.file_path).unlink()
print(f"[green]Deleted:[/green] [cyan]{entry.file_path}[/cyan]")
except Exception as e:
print(f"[red]Error deleting file {entry.file_path}:[/red] {e}")
Expand All @@ -118,7 +117,7 @@ def delete_one_file_at_a_time(self) -> None:
)
if response.lower() != "y":
continue
entry.file_path.unlink()
Path(entry.file_path).unlink()
print(f"[green]Deleted:[/green] [cyan]{entry.file_path}[/cyan]")
except Exception as e:
print(f"[red]Error deleting file {entry.file_path}:[/red] {e}")
16 changes: 8 additions & 8 deletions analyzer/Summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Union

import bitmath
from rich import print
import rich


class FileStatisticsCollector:
Expand Down Expand Up @@ -40,10 +40,10 @@ def report_statistics(self) -> None:
self.average_size = self.total_size / self.total_files
bitmath.format_string = "{value:.2f} {unit}"

print("\n[bold][underline]Statistics[/underline][/bold]")
print(f"{'Total Files:':<{self.report_key_len}} {self.total_files} file")
print(self._format_size_line("Total Size:", self.total_size))
print(self._format_size_line("Average File Size:", self.average_size))
print(self._format_size_line("Smallest File Size:", self.smallest_file_size))
print(self._format_size_line("Largest File Size:", self.largest_file_size))
print(f"{'Time Elapsed:':<{self.report_key_len}} {elapsed_time:.2f} seconds")
rich.print("\n[bold][underline]Statistics[/underline][/bold]")
rich.print(f"{'Total Files:':<{self.report_key_len}} {self.total_files} file")
rich.print(self._format_size_line("Total Size:", self.total_size))
rich.print(self._format_size_line("Average File Size:", self.average_size))
rich.print(self._format_size_line("Smallest File Size:", self.smallest_file_size))
rich.print(self._format_size_line("Largest File Size:", self.largest_file_size))
rich.print(f"{'Time Elapsed:':<{self.report_key_len}} {elapsed_time:.2f} seconds")
30 changes: 22 additions & 8 deletions analyzer/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def configure_log_file(log_file):
Configure log file redirection if provided.
"""
if log_file:
sys.stdout = open(log_file, "a")
sys.stderr = open(log_file, "a")
sys.stdout = open(file=log_file, mode="a", encoding="utf-8")
sys.stderr = open(file=log_file, mode="a", encoding="utf-8")


def log_intro(loginfo: LogInfo):
Expand All @@ -43,15 +43,29 @@ def log_intro(loginfo: LogInfo):

logger = logging.getLogger()
logger.info("File System Analysis - Starting Analysis")
logger.info(f"Date and Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Target Directory: {loginfo.target_dir}")
logger.info(f"Size Threshold for Large Files: {loginfo.size_threshold} bytes")
logger.info(f"Delete Files Flag: {loginfo.delete_files}")
logger.info(f"Log File: {loginfo.log_file}")
logger.info("Date and Time: %s", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
logger.info("Target Directory: %s", loginfo.target_dir)
logger.info("Size Threshold for Large Files: %s bytes", loginfo.size_threshold)
logger.info("Delete Files Flag: %s", loginfo.delete_files)
logger.info("Log File: %s", loginfo.log_file)
logger.info("=" * 50)


def setup_logging(log_file, target_dir, size_threshold, delete_files):
def setup_logging(
log_file: Optional[str], target_dir: Path, size_threshold: str, delete_files: bool
) -> None:
"""
Set up logging configuration.
Args:
log_file (Optional[str]): Path to the log file (optional).
target_dir (Path): Path to the target directory.
size_threshold (str): Size threshold for identifying large files.
delete_files (bool): Flag indicating whether file deletion prompt is enabled.
Returns:
None
"""
logging.basicConfig(
filename=log_file,
level=logging.INFO,
Expand Down
11 changes: 5 additions & 6 deletions analyzer/utils/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
from typing import Optional

import bitmath
from pydantic import BaseModel, Field
from rich import print
from pydantic import BaseModel

RED = "\033[91m"
RESET = "\033[0m"
Expand Down Expand Up @@ -38,10 +37,10 @@ def valid_path(path: str) -> Path:
Returns:
Path: The validated path as a Path object.
"""
path = Path(path)
if not path.exists():
vpath = Path(path)
if not vpath.exists():
raise argparse.ArgumentTypeError(f"{RED}Invalid path: {path}{RESET}")
return path
return vpath


def parse_args() -> Optional[ParsedArgs]:
Expand Down Expand Up @@ -78,7 +77,7 @@ def parse_args() -> Optional[ParsedArgs]:
)

parser.add_argument(
"-l", "--log", type=valid_path, default=None, help="Path to the log file"
"-l", "--log", type=str, default=None, help="Path to the log file"
)

parser.add_argument(
Expand Down
14 changes: 8 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
from bitmath import Byte, KiB, MiB
from pyfakefs.fake_filesystem import FakeFile, FakeFilesystem

PathLike = Union[Path, str]

# list of fake files to be created in the fake file system
fake_filesystem_files: List[Dict[str, Union[str, Byte, oct]]] = [
fake_filesystem_files: List[Dict[str, Union[str, Byte, int]]] = [
{
"name": "/root_dir/file_100_byte_0644.txt",
"size": Byte(100).to_Byte(),
Expand Down Expand Up @@ -161,11 +163,11 @@


def create_fakefs_file(
fs: FakeFilesystem, filepath: str, mode: oct = 0o644, size: Byte = Byte(100)
fs: FakeFilesystem, filepath: PathLike, mode: int = 0o644, size: Byte = Byte(100)
) -> Path:
"""Create a fake file in the fake file system."""
try:
file: FakeFile = fs.create_file(
fs.create_file(
file_path=filepath,
st_mode=mode,
contents="",
Expand All @@ -178,7 +180,7 @@ def create_fakefs_file(
)
except FileExistsError:
pass
return Path(file.path)
return Path(filepath)


@pytest.fixture
Expand All @@ -187,8 +189,8 @@ def app_file_system(fs: FakeFilesystem):
for file_info in fake_filesystem_files:
create_fakefs_file(
fs=fs,
filepath=file_info["name"],
mode=file_info["perm"],
filepath=str(file_info["name"]),
mode=int(file_info["perm"]),
size=file_info["size"],
)
yield fs
Expand Down
2 changes: 1 addition & 1 deletion tests/test_file_categorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
def categorization(fs: FakeFilesystem, app_file_system): # noqa F811
categorization_instance = Categorization()
for file in fake_filesystem_files:
categorization_instance.add(Path(file["name"]))
categorization_instance.add(str(file["name"]))
yield categorization_instance


Expand Down
8 changes: 5 additions & 3 deletions tests/test_large_files_identification.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import os
from collections import Counter
from pathlib import Path
from typing import Dict, List, Union

import bitmath
import pytest
from bitmath import Byte
from pyfakefs.fake_filesystem import FakeFilesystem

from analyzer.LargeFiles import LargeFileIdentifier
Expand All @@ -13,15 +15,15 @@

def get_large_files_in_fake_files(
size: bitmath.Byte = LargeFileIdentifier.DEFAULT_THRESHOLD.to_Byte(),
):
) -> List[Dict[str, Union[str, Byte, int]]]:
return [file for file in fake_filesystem_files if file["size"] >= size]


@pytest.fixture(scope="function")
def LargeFileIdentifier_(fs: FakeFilesystem, app_file_system): # noqa F811
instance = LargeFileIdentifier(size_threshold="1024 KiB")
for file in fake_filesystem_files:
instance.add(Path(file["name"]))
instance.add(str(file["name"]))
yield instance


Expand Down Expand Up @@ -195,7 +197,7 @@ def test_report(LargeFileIdentifier_, capsys: pytest.CaptureFixture):
# content
for file in list_of_large_files:
assert file["name"] in output
assert str(file["size"].best_prefix(bitmath.SI)) in output
assert str(Byte(int(file["size"])).best_prefix(bitmath.SI)) in output


def test_delete_files(LargeFileIdentifier_):
Expand Down
Loading

0 comments on commit 9f86f1b

Please sign in to comment.