tagmerger/tagmerger
2024-11-08 15:45:14 +01:00

224 lines
8.7 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
import argparse
import yaml
import rapidfuzz as rf
def find_similar(tag, tag_list):
"""
Return a list of tags similar to tag.
"""
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.fuzz.WRatio, score_cutoff=70)
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.DamerauLevenshtein.distance, score_cutoff=5, processor=rf.utils.default_process)
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.JaroWinkler.distance, score_cutoff=0.23, processor=rf.utils.default_process)
#result = rf.process.extract("feminisme", tags, scorer=rf.distance.DamerauLevenshtein.distance, processor=rf.utils.default_process, score_cutoff=None)
#result = rf.process.cdist(["feminisme"], tags, scorer=rf.distance.DamerauLevenshtein.distance, processor=rf.utils.default_process)
result = rf.process.extract(tag, tag_list,
limit=None,
scorer=rf.distance.JaroWinkler.distance,
score_cutoff=0.23,
processor=rf.utils.default_process)
return [r[0] for r in result]
def merge(old_tag, new_tag, old_tags, new_tags):
"""
Merge old_tag of the old_tags set into new_tag in the new_tags set.
"""
print(f"Merging {old_tag} into {new_tag}")
if new_tag in old_tags[old_tag]: # old_tag already merged in new_tag
return
old_tags[old_tag].append(new_tag)
try: # new_tag already exists in new_tags
new_tags[new_tag].append(old_tag)
except (KeyError, AttributeError): # new_tag does not already exist
new_tags[new_tag] = [old_tag]
def unmerge(old_tag, new_tag, old_tags, new_tags):
"""
Unmerge old_tag from new_tag.
"""
print(f"Unmerging {old_tag} from {new_tag}")
try:
old_tags[old_tag].remove(new_tag)
except ValueError:
pass
try:
new_tags[new_tag].remove(old_tag)
except ValueError:
pass
def display_options(message, options):
"""
Print message and a numbered list of options.
"""
if len(options) == 0:
return
print(message)
for i in range(len(options)):
print(f"\t{i}. {options[i]}")
def pick_option(message, options, default=None):
"""
Print message and prompt for an option in the options list. Return input.
Promt again if input is not a valid option.
If default is set and the input is empty, return default.
"""
choice = None
while choice not in options:
choice = input(message)
if choice == '' and default != None:
return default
return choice
def new_from_old_tags(old_tags):
new_tags = {}
for tag, merge_tags in old_tags.items():
for merge_tag in merge_tags:
if merge_tag in new_tags:
new_tags[merge_tag].append(tag)
else:
new_tags[merge_tag] = [tag]
return new_tags
def load_tags_yaml(path):
print(f"Loading tags from yaml file {path}")
with open(path, "r") as filein:
old_tags = yaml.safe_load(filein)
print(f"{len(old_tags)} tags loaded in dirty set")
print("Extracting new_tags from yaml")
new_tags = new_from_old_tags(old_tags)
print(f"{len(new_tags)} tags loaded in new set")
return old_tags, new_tags
def load_tags_plain(path):
print(f"Loading tags from plain file {path}")
with open(path, "r") as filein:
tags = filein.readlines()
tags = [t.strip() for t in tags]
old_tags = {t: [] for t in tags}
print(f"{len(old_tags)} tags loaded in dirty set")
print("Initialising empty new set")
new_tags = {}
return old_tags, new_tags
def export_tags(old_tags):
while True:
try:
path = input("Export path > ")
export_tags_path(old_tags, path)
return
except Exception as e:
print(f"An error occured during export: {e}")
def export_tags_path(old_tags, path):
with open(path, "w") as fileout:
yaml.dump(old_tags, fileout)
print(f"Tags exported to {path}")
def display_progress(old_tags):
"""
Print the fraction of merged tags from the dirty set.
"""
merged = sum(1 for t in old_tags if len(old_tags[t]) > 0)
ntags = len(old_tags)
print(f"{merged}/{ntags} ({merged/float(ntags)*100}%) tags merged")
def propose_merge_similar(tag, merge_tag, old_tags, new_tags):
"""
Propose merging tags from old_tags similar to tag into merge_tag.
"""
similar_tags = [t for t in find_similar(tag, old_tags.keys())
if merge_tag not in old_tags[t]]
if len(similar_tags) > 0:
display_options(f"Might also fit in {merge_tag}:", similar_tags)
while True:
choice = pick_option("Skip (default) or index > ",
[str(i) for i in range(len(similar_tags))],
default='')
if choice == '':
break
merge(similar_tags[int(choice)], merge_tag, old_tags, new_tags)
def unmerge_all_from(merge_tag, old_tags, new_tags):
"""
Unmerge all tags from the old set from merge_tag.
"""
for tag in new_tags[merge_tag]:
unmerge(tag, merge_tag, old_tags, new_tags)
parser = argparse.ArgumentParser(description = "A simple tool to semi-automate merging a dirty set of tags into a new set of tags.")
parser.add_argument("tags", type=str, help="Path to a yaml file containing the dirty set of tags. The yaml directory must be flat, with one key per tag in the dirty set with value null or set to the merge tag in the new set.")
parser.add_argument("--plain-input", action="store_true", help="If set, the input file is assumed to contain one tag of the dirty set per line. No character is escaped, the newline character is used as separator.")
parser.add_argument("--export-to", type=str, help="If set, export the tags after each iteration.")
args = parser.parse_args()
if args.plain_input == True:
old_tags, new_tags = load_tags_plain(args.tags)
else:
old_tags, new_tags = load_tags_yaml(args.tags)
for tag in old_tags.keys():
if len(old_tags[tag]) > 0:
continue
while True:
print("")
display_progress(old_tags)
if args.export_to != None:
export_tags_path(old_tags, path=args.export_to)
print(f"Tag: {tag}")
display_options("Merged in:", old_tags[tag])
similar_unmerged_tags = [t for t in find_similar(tag, old_tags.keys())
if len(old_tags[t]) == 0 and t != tag]
display_options("Similar unmerged tags:", similar_unmerged_tags)
merge_candidates = [t for t in find_similar(tag, new_tags.keys())
if t not in old_tags[tag]]
display_options("Merge candidates:", merge_candidates)
if len(merge_candidates) > 0:
menu_choice = pick_option("Next tag [n], keep as new tag [k], merge with candidate [c], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'c', 'm', 'e'])
else:
menu_choice = pick_option("Next tag [n], keep as new tag [k], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'm', 'e'])
if menu_choice == 'n':
break
elif menu_choice == 'k':
merge(tag, tag, old_tags, new_tags)
propose_merge_similar(tag, tag, old_tags, new_tags)
break
elif menu_choice == 'c':
choice = pick_option("Merge candidate index or cancel (default) > ",
[str(i) for i in range(len(merge_candidates))],
default = '')
if choice == '':
break
merge_tag = merge_candidates[int(choice)]
merge(tag, merge_tag, old_tags, new_tags)
propose_merge_similar(tag, merge_tag, old_tags, new_tags)
elif menu_choice == 'm':
merge_tag = input("New merge tag or cancel (default) > ")
similar_candidates = find_similar(merge_tag, new_tags.keys())
if len(similar_candidates) > 0:
display_options("Similar tags already in the new set:",
similar_candidates)
choice = pick_option("Continue with chosen tag (default) or index of replacement > ",
[str(i) for i in range(len(similar_candidates))],
default = '')
if choice != '':
merge_tag = similar_candidates[int(choice)]
merge(tag, merge_tag, old_tags, new_tags)
propose_merge_similar(tag, merge_tag, old_tags, new_tags)
elif menu_choice == 'e':
export_tags(old_tags)
sys.exit(0)
#unmerge_all_from("__lieux__", old_tags, new_tags)
print("\nAll tags merged !")
export_tags(old_tags)