224 lines
8.7 KiB
Python
Executable file
224 lines
8.7 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import sys
|
|
import argparse
|
|
import yaml
|
|
import rapidfuzz as rf
|
|
|
|
|
|
def find_similar(tag, tag_list):
|
|
"""
|
|
Return a list of tags similar to tag.
|
|
"""
|
|
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.fuzz.WRatio, score_cutoff=70)
|
|
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.DamerauLevenshtein.distance, score_cutoff=5, processor=rf.utils.default_process)
|
|
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.JaroWinkler.distance, score_cutoff=0.23, processor=rf.utils.default_process)
|
|
#result = rf.process.extract("feminisme", tags, scorer=rf.distance.DamerauLevenshtein.distance, processor=rf.utils.default_process, score_cutoff=None)
|
|
#result = rf.process.cdist(["feminisme"], tags, scorer=rf.distance.DamerauLevenshtein.distance, processor=rf.utils.default_process)
|
|
result = rf.process.extract(tag, tag_list,
|
|
limit=None,
|
|
scorer=rf.distance.JaroWinkler.distance,
|
|
score_cutoff=0.23,
|
|
processor=rf.utils.default_process)
|
|
return [r[0] for r in result]
|
|
|
|
def merge(old_tag, new_tag, old_tags, new_tags):
|
|
"""
|
|
Merge old_tag of the old_tags set into new_tag in the new_tags set.
|
|
"""
|
|
print(f"Merging {old_tag} into {new_tag}")
|
|
if new_tag in old_tags[old_tag]: # old_tag already merged in new_tag
|
|
return
|
|
old_tags[old_tag].append(new_tag)
|
|
try: # new_tag already exists in new_tags
|
|
new_tags[new_tag].append(old_tag)
|
|
except (KeyError, AttributeError): # new_tag does not already exist
|
|
new_tags[new_tag] = [old_tag]
|
|
|
|
def unmerge(old_tag, new_tag, old_tags, new_tags):
|
|
"""
|
|
Unmerge old_tag from new_tag.
|
|
"""
|
|
print(f"Unmerging {old_tag} from {new_tag}")
|
|
try:
|
|
old_tags[old_tag].remove(new_tag)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
new_tags[new_tag].remove(old_tag)
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
def display_options(message, options):
|
|
"""
|
|
Print message and a numbered list of options.
|
|
"""
|
|
if len(options) == 0:
|
|
return
|
|
print(message)
|
|
for i in range(len(options)):
|
|
print(f"\t{i}. {options[i]}")
|
|
|
|
def pick_option(message, options, default=None):
|
|
"""
|
|
Print message and prompt for an option in the options list. Return input.
|
|
Promt again if input is not a valid option.
|
|
If default is set and the input is empty, return default.
|
|
"""
|
|
choice = None
|
|
while choice not in options:
|
|
choice = input(message)
|
|
if choice == '' and default != None:
|
|
return default
|
|
return choice
|
|
|
|
def new_from_old_tags(old_tags):
|
|
new_tags = {}
|
|
for tag, merge_tags in old_tags.items():
|
|
for merge_tag in merge_tags:
|
|
if merge_tag in new_tags:
|
|
new_tags[merge_tag].append(tag)
|
|
else:
|
|
new_tags[merge_tag] = [tag]
|
|
return new_tags
|
|
|
|
def load_tags_yaml(path):
|
|
print(f"Loading tags from yaml file {path}")
|
|
with open(path, "r") as filein:
|
|
old_tags = yaml.safe_load(filein)
|
|
print(f"{len(old_tags)} tags loaded in dirty set")
|
|
|
|
print("Extracting new_tags from yaml")
|
|
new_tags = new_from_old_tags(old_tags)
|
|
print(f"{len(new_tags)} tags loaded in new set")
|
|
return old_tags, new_tags
|
|
|
|
def load_tags_plain(path):
|
|
print(f"Loading tags from plain file {path}")
|
|
with open(path, "r") as filein:
|
|
tags = filein.readlines()
|
|
tags = [t.strip() for t in tags]
|
|
old_tags = {t: [] for t in tags}
|
|
print(f"{len(old_tags)} tags loaded in dirty set")
|
|
print("Initialising empty new set")
|
|
new_tags = {}
|
|
return old_tags, new_tags
|
|
|
|
def export_tags(old_tags):
|
|
while True:
|
|
try:
|
|
path = input("Export path > ")
|
|
export_tags_path(old_tags, path)
|
|
return
|
|
except Exception as e:
|
|
print(f"An error occured during export: {e}")
|
|
|
|
def export_tags_path(old_tags, path):
|
|
with open(path, "w") as fileout:
|
|
yaml.dump(old_tags, fileout)
|
|
print(f"Tags exported to {path}")
|
|
|
|
def display_progress(old_tags):
|
|
"""
|
|
Print the fraction of merged tags from the dirty set.
|
|
"""
|
|
merged = sum(1 for t in old_tags if len(old_tags[t]) > 0)
|
|
ntags = len(old_tags)
|
|
print(f"{merged}/{ntags} ({merged/float(ntags)*100}%) tags merged")
|
|
|
|
def propose_merge_similar(tag, merge_tag, old_tags, new_tags):
|
|
"""
|
|
Propose merging tags from old_tags similar to tag into merge_tag.
|
|
"""
|
|
similar_tags = [t for t in find_similar(tag, old_tags.keys())
|
|
if merge_tag not in old_tags[t]]
|
|
if len(similar_tags) > 0:
|
|
display_options(f"Might also fit in {merge_tag}:", similar_tags)
|
|
while True:
|
|
choice = pick_option("Skip (default) or index > ",
|
|
[str(i) for i in range(len(similar_tags))],
|
|
default='')
|
|
if choice == '':
|
|
break
|
|
merge(similar_tags[int(choice)], merge_tag, old_tags, new_tags)
|
|
|
|
def unmerge_all_from(merge_tag, old_tags, new_tags):
|
|
"""
|
|
Unmerge all tags from the old set from merge_tag.
|
|
"""
|
|
for tag in new_tags[merge_tag]:
|
|
unmerge(tag, merge_tag, old_tags, new_tags)
|
|
|
|
|
|
parser = argparse.ArgumentParser(description = "A simple tool to semi-automate merging a dirty set of tags into a new set of tags.")
|
|
|
|
parser.add_argument("tags", type=str, help="Path to a yaml file containing the dirty set of tags. The yaml directory must be flat, with one key per tag in the dirty set with value null or set to the merge tag in the new set.")
|
|
parser.add_argument("--plain-input", action="store_true", help="If set, the input file is assumed to contain one tag of the dirty set per line. No character is escaped, the newline character is used as separator.")
|
|
parser.add_argument("--export-to", type=str, help="If set, export the tags after each iteration.")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.plain_input == True:
|
|
old_tags, new_tags = load_tags_plain(args.tags)
|
|
else:
|
|
old_tags, new_tags = load_tags_yaml(args.tags)
|
|
|
|
for tag in old_tags.keys():
|
|
if len(old_tags[tag]) > 0:
|
|
continue
|
|
while True:
|
|
print("")
|
|
display_progress(old_tags)
|
|
if args.export_to != None:
|
|
export_tags_path(old_tags, path=args.export_to)
|
|
print(f"Tag: {tag}")
|
|
display_options("Merged in:", old_tags[tag])
|
|
similar_unmerged_tags = [t for t in find_similar(tag, old_tags.keys())
|
|
if len(old_tags[t]) == 0 and t != tag]
|
|
display_options("Similar unmerged tags:", similar_unmerged_tags)
|
|
merge_candidates = [t for t in find_similar(tag, new_tags.keys())
|
|
if t not in old_tags[tag]]
|
|
display_options("Merge candidates:", merge_candidates)
|
|
|
|
if len(merge_candidates) > 0:
|
|
menu_choice = pick_option("Next tag [n], keep as new tag [k], merge with candidate [c], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'c', 'm', 'e'])
|
|
else:
|
|
menu_choice = pick_option("Next tag [n], keep as new tag [k], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'm', 'e'])
|
|
|
|
if menu_choice == 'n':
|
|
break
|
|
elif menu_choice == 'k':
|
|
merge(tag, tag, old_tags, new_tags)
|
|
propose_merge_similar(tag, tag, old_tags, new_tags)
|
|
break
|
|
elif menu_choice == 'c':
|
|
choice = pick_option("Merge candidate index or cancel (default) > ",
|
|
[str(i) for i in range(len(merge_candidates))],
|
|
default = '')
|
|
if choice == '':
|
|
break
|
|
merge_tag = merge_candidates[int(choice)]
|
|
merge(tag, merge_tag, old_tags, new_tags)
|
|
propose_merge_similar(tag, merge_tag, old_tags, new_tags)
|
|
elif menu_choice == 'm':
|
|
merge_tag = input("New merge tag or cancel (default) > ")
|
|
similar_candidates = find_similar(merge_tag, new_tags.keys())
|
|
if len(similar_candidates) > 0:
|
|
display_options("Similar tags already in the new set:",
|
|
similar_candidates)
|
|
choice = pick_option("Continue with chosen tag (default) or index of replacement > ",
|
|
[str(i) for i in range(len(similar_candidates))],
|
|
default = '')
|
|
if choice != '':
|
|
merge_tag = similar_candidates[int(choice)]
|
|
merge(tag, merge_tag, old_tags, new_tags)
|
|
propose_merge_similar(tag, merge_tag, old_tags, new_tags)
|
|
elif menu_choice == 'e':
|
|
export_tags(old_tags)
|
|
sys.exit(0)
|
|
|
|
#unmerge_all_from("__lieux__", old_tags, new_tags)
|
|
|
|
print("\nAll tags merged !")
|
|
export_tags(old_tags)
|