Skip to content

Commit

Permalink
Merge pull request #144 from pomfort/dev/fixes-and-renaming
Browse files Browse the repository at this point in the history
Enable detection of renamed files, check ascmhl folder, new tests and fixes, update Python version
  • Loading branch information
ptrpfn authored Mar 8, 2024
2 parents 47773a8 + 8b828f7 commit 5970ecb
Show file tree
Hide file tree
Showing 60 changed files with 776 additions and 142 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build+test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
python-version: [3.7, 3.8, 3.9]
python-version: ["3.11", "3.12"]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,14 @@ The `create` command hashes all files given with the different options and creat
with records for all hashed files. The command compares the hashes against the hashes stored in previous generations
if available.

In general, the following options can be added to the command:
- v, --verbose: this option should be set in order to obtain a verbose output. Otherwise there will be no output with
information about the MHL file creation process
- h, --hash_format: Specifies the algorithm with which the hash is to be generated. The following values are allowed:
md5, sha1, xxh128, xxh3, xxh64, c4
- n, --no_directory_hashes: Skip creation of directory hashes, only reference directories without hash
- dr, --detect_renaming: enables the detection of renamed files based on their hash value

#### `create` default behavior (for file hierarchy, with completeness check)

The `create` command traverses through a folder hierarchy (such as a folder with media files, a camera card, or an
Expand Down
1 change: 1 addition & 0 deletions ascmhl/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
__maintainer__ = "Patrick Renner, Alexander Sahm"
__email__ = "opensource@pomfort.com"
"""

try:
from importlib.metadata import version
except ImportError:
Expand Down
1 change: 1 addition & 0 deletions ascmhl/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
__maintainer__ = "Patrick Renner, Alexander Sahm"
__email__ = "opensource@pomfort.com"
"""

from typing import List

from . import logger
Expand Down
2 changes: 1 addition & 1 deletion ascmhl/cli/ascmhl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def mhltool_cli():
pass


@mhltool_cli.resultcallback()
@mhltool_cli.result_callback()
def update(*args, **kwargs):
updater.join(timeout=1)
if updater.needs_update:
Expand Down
2 changes: 1 addition & 1 deletion ascmhl/cli/ascmhl_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def mhldebugtool_cli():
pass


@mhldebugtool_cli.resultcallback()
@mhldebugtool_cli.result_callback()
def update(*args, **kwargs):
updater.join(timeout=1)
if updater.needs_update:
Expand Down
157 changes: 131 additions & 26 deletions ascmhl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from . import errors
from . import ignore
from . import utils
from . import hasher
from .ignore import MHLIgnoreSpec
from .__version__ import (
ascmhl_supported_hashformats,
Expand Down Expand Up @@ -61,6 +62,13 @@
is_flag=True,
help="Skip creation of directory hashes, only reference directories without hash",
)
@click.option(
"--detect_renaming",
"-dr",
default=False,
is_flag=True,
help="Detect automatically renamed files",
)
# creatorinfo values
@click.option(
"--author_name",
Expand Down Expand Up @@ -119,6 +127,7 @@ def create(
verbose,
hash_format,
no_directory_hashes,
detect_renaming,
single_file,
ignore_list,
ignore_spec_file,
Expand All @@ -142,6 +151,7 @@ def create(
create_for_single_files_subcommand(
root_path,
verbose,
detect_renaming,
hash_format,
single_file,
author_name,
Expand All @@ -157,6 +167,7 @@ def create(
create_for_folder_subcommand(
root_path,
verbose,
detect_renaming,
hash_format,
no_directory_hashes,
author_name,
Expand All @@ -174,6 +185,7 @@ def create(
def create_for_folder_subcommand(
root_path,
verbose,
detect_renaming,
hash_formats,
no_directory_hashes,
author_name,
Expand Down Expand Up @@ -208,6 +220,10 @@ def create_for_folder_subcommand(
# we collect all paths we expect to find first and remove every path that we actually found while
# traversing the file system, so this set will at the end contain the file paths not found in the file system
not_found_paths = existing_history.set_of_file_paths()
renamed_files = existing_history.renamed_path_with_previous_path()
not_found_paths = {p if renamed_files.get(p, None) is None else renamed_files[p] for p in not_found_paths}
new_paths = set()
missing_asc_mhl_folder = set()

# create the ignore specification
ignore_spec = ignore.MHLIgnoreSpec(existing_history.latest_ignore_patterns(), ignore_list, ignore_spec_file)
Expand All @@ -234,6 +250,12 @@ def create_for_folder_subcommand(
for item_name, is_dir in children:
file_path = os.path.join(folder_path, item_name)
not_found_paths.discard(file_path)
for hash_list in existing_history.hash_lists:
for media_hash in hash_list.media_hashes:
if media_hash.path == existing_history.get_relative_file_path(file_path):
break
else:
new_paths.add(file_path)
if is_dir:
if not no_directory_hashes:
path_content_hash_lookup = dir_content_hash_mapping_lookup.pop(file_path)
Expand Down Expand Up @@ -289,6 +311,73 @@ def create_for_folder_subcommand(
folder_path, modification_date, dir_content_hash_lookup, dir_structure_hash_lookup
)

if len(existing_history.hash_lists) > 0:
for ref in existing_history.hash_lists[-1].hash_list_references:
referenced_asc_folder = os.path.join(
os.path.dirname(existing_history.asc_mhl_path), os.path.dirname(ref.path)
)
if not os.path.exists(referenced_asc_folder):
missing_asc_mhl_folder.add(os.path.dirname(referenced_asc_folder))

if detect_renaming:
found_file_paths = set()
for new_path in new_paths:
for not_found_path in not_found_paths:
# find hashes to not_found_path and new_path
not_found_path_history, relative_not_found_path = existing_history.find_history_for_path(
existing_history.get_relative_file_path(not_found_path)
)
not_found_path_hash = not_found_path_history.find_first_hash_entry_for_path(relative_not_found_path)

new_path_history, new_path_media_hash = None, None
for history, hash_list in session.new_hash_lists.items():
new_path_media_hash = hash_list.find_media_hash_for_path(history.get_relative_file_path(new_path))
if new_path_media_hash is not None:
new_path_history = history
break
new_path_hash = new_path_media_hash.find_hash_entry_for_format(not_found_path_hash.hash_format)
# compare found hashes
if new_path_hash:
if new_path_hash.hash_string == not_found_path_hash.hash_string:
if os.path.basename(new_path) != os.path.basename(not_found_path):
logger.info(
"a renamed {} was detected: from {} to {}".format(
"folder" if os.path.isdir(new_path) else "file",
relative_not_found_path,
existing_history.get_relative_file_path(new_path),
)
)
if new_path_media_hash.path == ".":
root_hash = session.new_hash_lists[
new_path_history.parent_history
].find_media_hash_for_path(new_path_history.parent_history.get_relative_file_path(new_path))
if root_hash:
root_hash.previous_path = relative_not_found_path
else:
new_path_media_hash.previous_path = relative_not_found_path
if not_found_path in missing_asc_mhl_folder:
if os.path.exists(os.path.join(new_path, ascmhl_folder_name)):
missing_asc_mhl_folder.discard(not_found_path)
else:
missing_asc_mhl_folder.discard(not_found_path)
missing_asc_mhl_folder.add(new_path)
found_file_paths.add(not_found_path)
else:
old_hash_format_for_new_path = hasher.hash_file(
os.path.join(root_path, new_path), not_found_path_hash.hash_format
)
if old_hash_format_for_new_path == not_found_path_hash.hash_string:
if os.path.basename(new_path) != os.path.basename(not_found_path):
logger.info(
"a renamed {} was detected: from {} to {}".format(
"folder" if os.path.isdir(new_path) else "file",
relative_not_found_path,
existing_history.get_relative_file_path(new_path),
)
)
new_path_media_hash.previous_path = relative_not_found_path
found_file_paths.add(not_found_path)
not_found_paths = not_found_paths - found_file_paths
commit_session(session, author_name, author_email, author_phone, author_role, location, comment)

exception = test_for_missing_files(not_found_paths, root_path, ignore_spec)
Expand All @@ -298,10 +387,14 @@ def create_for_folder_subcommand(
if exception:
raise exception

if len(missing_asc_mhl_folder) > 0:
raise errors.NoMHLHistoryException(", ".join(missing_asc_mhl_folder))


def create_for_single_files_subcommand(
root_path,
verbose,
detect_renaming,
hash_formats,
single_file,
author_name,
Expand Down Expand Up @@ -516,6 +609,8 @@ def verify_entire_folder(
# we collect all paths we expect to find first and remove every path that we actually found while
# traversing the file system, so this set will at the end contain the file paths not found in the file system
not_found_paths = existing_history.set_of_file_paths()
renamed_files = existing_history.renamed_path_with_previous_path()
not_found_paths = {p if renamed_files.get(p, None) is None else renamed_files[p] for p in not_found_paths}

num_failed_verifications = 0
num_new_files = 0
Expand All @@ -534,6 +629,13 @@ def verify_entire_folder(
# TODO: find new directories here
continue

for hash_list in existing_history.hash_lists:
for media_hash in hash_list.media_hashes:
if media_hash.path != history_relative_path:
continue
history_relative_path = media_hash.previous_path or history_relative_path
break

if single_file is None or os.path.realpath(single_file) == os.path.realpath(file_path):
# check if there is an existing hash in the other generations and verify
original_hash_entry = history.find_original_hash_entry_for_path(history_relative_path)
Expand All @@ -560,8 +662,8 @@ def verify_entire_folder(

exception = test_for_missing_files(not_found_paths, root_path, ignore_spec)

if found_single_file == False:
exception = errors.SingelFileNotFoundException()
if not found_single_file:
exception = errors.SingleFileNotFoundException()

if num_new_files > 0:
exception = errors.NewFilesFoundException()
Expand Down Expand Up @@ -923,6 +1025,9 @@ def diff_entire_folder_against_full_history_subcommand(root_path, verbose, ignor
# we collect all paths we expect to find first and remove every path that we actually found while
# traversing the file system, so this set will at the end contain the file paths not found in the file system
not_found_paths = existing_history.set_of_file_paths()
renamed_files = existing_history.renamed_path_with_previous_path()
not_found_paths = {p if renamed_files.get(p, None) is None else renamed_files[p] for p in not_found_paths}

num_failed_verifications = 0
num_new_files = 0

Expand All @@ -938,6 +1043,13 @@ def diff_entire_folder_against_full_history_subcommand(root_path, verbose, ignor
# TODO: find new directories here
continue

for hash_list in existing_history.hash_lists:
for media_hash in hash_list.media_hashes:
if media_hash.path != history_relative_path:
continue
history_relative_path = media_hash.previous_path or history_relative_path
break

# check if there is an existing hash in the other generations and verify
original_hash_entry = history.find_original_hash_entry_for_path(history_relative_path)

Expand All @@ -948,10 +1060,10 @@ def diff_entire_folder_against_full_history_subcommand(root_path, verbose, ignor
continue

exception = test_for_missing_files(not_found_paths, root_path, ignore_spec)
if num_new_files > 0:
exception = errors.NewFilesFoundException()
if num_failed_verifications > 0:
exception = errors.VerificationFailedException()
if not exception and num_new_files > 0:
exception = errors.NewFilesFoundException()

if exception:
raise exception
Expand Down Expand Up @@ -1157,8 +1269,8 @@ def info(verbose, single_file, root_path):
root_path = current_dir
break
current_dir = os.path.dirname(current_dir)
if root_path == None:
raise errors.NoMHLHistoryExceptionForPath(single_file[0])
if root_path is None:
raise errors.NoMHLHistoryException(single_file[0])
else:
info_for_single_file(root_path, verbose, single_file)
return
Expand All @@ -1184,7 +1296,11 @@ def info_for_entire_history(root_path, verbose):
if len(existing_history.hash_lists) == 0:
raise errors.NoMHLHistoryException(root_path)

for hash_list in existing_history.hash_lists:
log_child_histories(existing_history)


def log_child_histories(history):
for hash_list in history.hash_lists:
if logger.verbose_logging == True:
creatorInfo = hash_list.creator_info.summary()
processInfo = hash_list.process_info.summary()
Expand All @@ -1196,25 +1312,9 @@ def info_for_entire_history(root_path, verbose):
else:
logger.info(f" Generation {hash_list.generation_number} ({hash_list.creator_info.creation_date})")

if len(existing_history.child_histories) > 0:
if logger.verbose_logging == True:
for child_history in existing_history.child_histories:
logger.info(f"\nChild History at {child_history.get_root_path()}:")
for hash_list in child_history.hash_lists:
if logger.verbose_logging == True:
creatorInfo = hash_list.creator_info.summary()
processInfo = hash_list.process_info.summary()
logger.info(
f" Generation {hash_list.generation_number} ({hash_list.creator_info.creation_date})\n"
f" CreatorInfo: {creatorInfo}\n"
f" ProcessInfo: {processInfo}"
)
else:
logger.info(
f" Generation {hash_list.generation_number} ({hash_list.creator_info.creation_date})"
)
else:
logger.info(f" Child Histories: {len(existing_history.child_histories)}")
for child_history in history.child_histories:
logger.info(f"\nChild History at {child_history.get_root_path()}:")
log_child_histories(child_history)


def info_for_single_file(root_path, verbose, single_file):
Expand Down Expand Up @@ -1253,6 +1353,11 @@ def info_for_single_file(root_path, verbose, single_file):
f" CreatorInfo: {creatorInfo}\n"
f" ProcessInfo: {processInfo}"
)
if media_hash.previous_path and relative_path == media_hash.path:
logger.info(
" In previous generations the file was named: {}\n\n".format(media_hash.previous_path)
)
info_for_single_file(root_path, verbose, [os.path.join(root_path, media_hash.previous_path)])
else:
logger.info(
f" Generation {hash_list.generation_number} ({hash_list.creator_info.creation_date})"
Expand Down
Loading

0 comments on commit 5970ecb

Please sign in to comment.