Refactor
This commit is contained in:
parent
ba580b64ec
commit
451d8e79f0
1 changed files with 125 additions and 74 deletions
199
tagmerger
199
tagmerger
|
@ -7,6 +7,9 @@ import rapidfuzz as rf
|
|||
|
||||
|
||||
def find_similar(tag, tag_list):
|
||||
"""
|
||||
Return a list of tags similar to tag.
|
||||
"""
|
||||
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.fuzz.WRatio, score_cutoff=70)
|
||||
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.DamerauLevenshtein.distance, score_cutoff=5, processor=rf.utils.default_process)
|
||||
#result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.JaroWinkler.distance, score_cutoff=0.23, processor=rf.utils.default_process)
|
||||
|
@ -19,67 +22,50 @@ def find_similar(tag, tag_list):
|
|||
processor=rf.utils.default_process)
|
||||
return [r[0] for r in result]
|
||||
|
||||
def merge(tag, old_tags, new_tags):
|
||||
merge_candidates = find_similar(tag, new_tags.keys())
|
||||
def merge(old_tag, new_tag, old_tags, new_tags):
|
||||
"""
|
||||
Merge old_tag of the old_tags set into new_tag in the new_tags set.
|
||||
"""
|
||||
print(f"Merging {old_tag} into {new_tag}")
|
||||
if new_tag in old_tags[old_tag]: # old_tag already merged in new_tag
|
||||
return
|
||||
old_tags[old_tag].append(new_tag)
|
||||
try: # new_tag already exists in new_tags
|
||||
new_tags[new_tag].append(old_tag)
|
||||
except (KeyError, AttributeError): # new_tag does not already exist
|
||||
new_tags[new_tag] = [old_tag]
|
||||
|
||||
print(f"\nTag : {tag}")
|
||||
if len(merge_candidates) > 0:
|
||||
print("Merge candidates:")
|
||||
for i in range(len(merge_candidates)):
|
||||
print(f"\t{i}. {merge_candidates[i]}")
|
||||
def display_options(message, options):
|
||||
"""
|
||||
Print message and a numbered list of options.
|
||||
"""
|
||||
if len(options) == 0:
|
||||
return
|
||||
print(message)
|
||||
for i in range(len(options)):
|
||||
print(f"\t{i}. {options[i]}")
|
||||
|
||||
choice = input(f"Tag name or [0-{len(merge_candidates) - 1}] or new merge tag > ")
|
||||
else:
|
||||
print("No reasonable merge candidate")
|
||||
choice = input("New merge tag > ")
|
||||
|
||||
try:
|
||||
# Existing tag chosen by index
|
||||
choice = int(choice)
|
||||
merge_tag = merge_candidates[choice]
|
||||
print(f"Merging in tag {merge_tag}")
|
||||
except ValueError:
|
||||
merge_tag = choice
|
||||
if merge_tag in merge_candidates: # Existing tag chosen by name
|
||||
print(f"Merging in tag {merge_tag}")
|
||||
else: # New tag
|
||||
while len(merge_tag) == 0:
|
||||
merge_tag = input("New tag cannot be empty. Enter new tag > ")
|
||||
while merge_tag in new_tags.keys():
|
||||
print(f"{merge_tag} already exists in the new tag set !",
|
||||
"Appending '(dup)'")
|
||||
merge_tag = merge_tag + "(dup)"
|
||||
print(f"Merging in new tag {merge_tag}")
|
||||
new_tags[merge_tag] = []
|
||||
old_tags[tag] = merge_tag
|
||||
new_tags[merge_tag].append(tag)
|
||||
|
||||
# Suggest merging more similar unmerged tags the merge tag
|
||||
unmerged_tags = [t for t, m in old_tags.items() if m is None]
|
||||
similar_unmerged_tags = find_similar(tag, unmerged_tags)
|
||||
if len(similar_unmerged_tags) > 0:
|
||||
print(f"Might also fit in {merge_tag}:")
|
||||
for i in range(len(similar_unmerged_tags)):
|
||||
print(f"\t{i}. {similar_unmerged_tags[i]}")
|
||||
|
||||
choice = input(f"Space-separated indices from [0-{len(similar_unmerged_tags) - 1}] > ")
|
||||
|
||||
# list(set()) removes duplicates
|
||||
choice = list(set([int(c) for c in choice.split()]))
|
||||
|
||||
for c in choice:
|
||||
chosen_tag = similar_unmerged_tags[c]
|
||||
print(f"Merging old tag {chosen_tag} in new tag {merge_tag}")
|
||||
old_tags[chosen_tag] = merge_tag
|
||||
new_tags[merge_tag].append(chosen_tag)
|
||||
def pick_option(message, options, default=None):
|
||||
"""
|
||||
Print message and prompt for an option in the options list. Return input.
|
||||
Promt again if input is not a valid option.
|
||||
If default is set and the input is empty, return default.
|
||||
"""
|
||||
choice = None
|
||||
while choice not in options:
|
||||
choice = input(message)
|
||||
if choice == '' and default != None:
|
||||
return default
|
||||
return choice
|
||||
|
||||
def new_from_old_tags(old_tags):
|
||||
new_tags = {}
|
||||
for tag, merge_tag in old_tags.items():
|
||||
if merge_tag in new_tags:
|
||||
new_tags[merge_tag].append(tag)
|
||||
else:
|
||||
new_tags[merge_tag] = [tag]
|
||||
for tag, merge_tags in old_tags.items():
|
||||
for merge_tag in merge_tags:
|
||||
if merge_tag in new_tags:
|
||||
new_tags[merge_tag].append(tag)
|
||||
else:
|
||||
new_tags[merge_tag] = [tag]
|
||||
return new_tags
|
||||
|
||||
def load_tags_yaml(path):
|
||||
|
@ -98,16 +84,46 @@ def load_tags_plain(path):
|
|||
with open(path, "r") as filein:
|
||||
tags = filein.readlines()
|
||||
tags = [t.strip() for t in tags]
|
||||
old_tags = {t: None for t in tags}
|
||||
old_tags = {t: [] for t in tags}
|
||||
print(f"{len(old_tags)} tags loaded in dirty set")
|
||||
print("Initialising empty new set")
|
||||
new_tags = {}
|
||||
return old_tags, new_tags
|
||||
|
||||
def export_tags(old_tags, new_tags, path):
|
||||
with open(path, "w") as fileout:
|
||||
yaml.dump(old_tags, fileout)
|
||||
print(f"Tags exported to {path}")
|
||||
def export_tags(old_tags):
|
||||
while True:
|
||||
try:
|
||||
path = input("Export path > ")
|
||||
with open(path, "w") as fileout:
|
||||
yaml.dump(old_tags, fileout)
|
||||
print(f"Tags exported to {path}")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"An error occured during export: {e}")
|
||||
|
||||
def display_progress(old_tags):
|
||||
"""
|
||||
Print the fraction of merged tags from the dirty set.
|
||||
"""
|
||||
merged = sum(1 for t in old_tags if len(old_tags[t]) > 0)
|
||||
ntags = len(old_tags)
|
||||
print(f"{merged}/{ntags} ({merged/float(ntags)*100}%) tags merged")
|
||||
|
||||
def propose_merge_similar(tag, merge_tag, old_tags, new_tags):
|
||||
"""
|
||||
Propose merging tags from old_tags similar to tag into merge_tag.
|
||||
"""
|
||||
similar_tags = [t for t in find_similar(tag, old_tags.keys())
|
||||
if merge_tag not in old_tags[t]]
|
||||
if len(similar_tags) > 0:
|
||||
display_options(f"Might also fit in {merge_tag}:", similar_tags)
|
||||
while True:
|
||||
choice = pick_option("Skip (default) or index > ",
|
||||
[str(i) for i in range(len(similar_tags))],
|
||||
default='')
|
||||
if choice == '':
|
||||
break
|
||||
merge(similar_tags[int(choice)], merge_tag, old_tags, new_tags)
|
||||
|
||||
parser = argparse.ArgumentParser(description = "A simple tool to semi-automate merging a dirty set of tags into a new set of tags.")
|
||||
|
||||
|
@ -122,19 +138,54 @@ else:
|
|||
old_tags, new_tags = load_tags_yaml(args.tags)
|
||||
|
||||
for tag in old_tags.keys():
|
||||
if old_tags[tag] == None:
|
||||
choice = input("\nContinue [c, default] or export merged tags [e] > ")
|
||||
if choice == "e":
|
||||
export_path = ""
|
||||
while len(export_path) == 0:
|
||||
export_path = input("Export path > ")
|
||||
export_tags(old_tags, new_tags, export_path)
|
||||
sys.exit(0)
|
||||
if len(old_tags[tag]) > 0:
|
||||
continue
|
||||
while True:
|
||||
print("")
|
||||
display_progress(old_tags)
|
||||
print(f"Tag: {tag}")
|
||||
display_options("Merged in:", old_tags[tag])
|
||||
similar_unmerged_tags = [t for t in find_similar(tag, old_tags.keys())
|
||||
if len(old_tags[t]) == 0 and t != tag]
|
||||
display_options("Similar unmerged tags:", similar_unmerged_tags)
|
||||
merge_candidates = [t for t in find_similar(tag, new_tags.keys())
|
||||
if t not in old_tags[tag]]
|
||||
display_options("Merge candidates:", merge_candidates)
|
||||
|
||||
if len(merge_candidates) > 0:
|
||||
menu_choice = pick_option("Next tag [n], keep as new tag [k], merge with candidate [c], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'c', 'm', 'e'])
|
||||
else:
|
||||
merge(tag, old_tags, new_tags)
|
||||
menu_choice = pick_option("Next tag [n], keep as new tag [k], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'm', 'e'])
|
||||
|
||||
if menu_choice == 'n':
|
||||
break
|
||||
elif menu_choice == 'k':
|
||||
merge(tag, tag, old_tags, new_tags)
|
||||
propose_merge_similar(tag, tag, old_tags, new_tags)
|
||||
break
|
||||
elif menu_choice == 'c':
|
||||
choice = pick_option("Merge candidate index or cancel (default) > ",
|
||||
[str(i) for i in range(len(merge_candidates))],
|
||||
default = '')
|
||||
if choice == '':
|
||||
break
|
||||
merge_tag = merge_candidates[int(choice)]
|
||||
merge(tag, merge_tag, old_tags, new_tags)
|
||||
propose_merge_similar(tag, merge_tag, old_tags, new_tags)
|
||||
elif menu_choice == 'm':
|
||||
merge_tag = input("New merge tag or cancel (default) > ")
|
||||
similar_candidates = find_similar(merge_tag, new_tags.keys())
|
||||
if len(similar_candidates) > 0:
|
||||
display_options("Similar tags already in the new set:",
|
||||
similar_candidates)
|
||||
choice = pick_option("Continue with chosen tag (default) or index of replacement > ", similar_candidates, default = '')
|
||||
if choice != '':
|
||||
merge_tag = similar_candidates[int(choice)]
|
||||
merge(tag, merge_tag, old_tags, new_tags)
|
||||
propose_merge_similar(tag, merge_tag, old_tags, new_tags)
|
||||
elif menu_choice == 'e':
|
||||
export_tags(old_tags)
|
||||
sys.exit(0)
|
||||
|
||||
print("\nAll tags merged !")
|
||||
export_path = ""
|
||||
while len(export_path) == 0:
|
||||
export_path = input("Export path > ")
|
||||
export_tags(old_tags, new_tags, export_path)
|
||||
export_tags(old_tags)
|
||||
|
|
Loading…
Reference in a new issue