diff --git a/tagmerger b/tagmerger index 9d4bb42..c86b7b9 100755 --- a/tagmerger +++ b/tagmerger @@ -7,6 +7,9 @@ import rapidfuzz as rf def find_similar(tag, tag_list): + """ + Return a list of tags similar to tag. + """ #result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.fuzz.WRatio, score_cutoff=70) #result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.DamerauLevenshtein.distance, score_cutoff=5, processor=rf.utils.default_process) #result = rf.process.extract("feminisme", tags, limit=None, scorer=rf.distance.JaroWinkler.distance, score_cutoff=0.23, processor=rf.utils.default_process) @@ -19,67 +22,50 @@ def find_similar(tag, tag_list): processor=rf.utils.default_process) return [r[0] for r in result] -def merge(tag, old_tags, new_tags): - merge_candidates = find_similar(tag, new_tags.keys()) +def merge(old_tag, new_tag, old_tags, new_tags): + """ + Merge old_tag of the old_tags set into new_tag in the new_tags set. + """ + print(f"Merging {old_tag} into {new_tag}") + if new_tag in old_tags[old_tag]: # old_tag already merged in new_tag + return + old_tags[old_tag].append(new_tag) + try: # new_tag already exists in new_tags + new_tags[new_tag].append(old_tag) + except (KeyError, AttributeError): # new_tag does not already exist + new_tags[new_tag] = [old_tag] - print(f"\nTag : {tag}") - if len(merge_candidates) > 0: - print("Merge candidates:") - for i in range(len(merge_candidates)): - print(f"\t{i}. {merge_candidates[i]}") +def display_options(message, options): + """ + Print message and a numbered list of options. + """ + if len(options) == 0: + return + print(message) + for i in range(len(options)): + print(f"\t{i}. {options[i]}") - choice = input(f"Tag name or [0-{len(merge_candidates) - 1}] or new merge tag > ") - else: - print("No reasonable merge candidate") - choice = input("New merge tag > ") - - try: - # Existing tag chosen by index - choice = int(choice) - merge_tag = merge_candidates[choice] - print(f"Merging in tag {merge_tag}") - except ValueError: - merge_tag = choice - if merge_tag in merge_candidates: # Existing tag chosen by name - print(f"Merging in tag {merge_tag}") - else: # New tag - while len(merge_tag) == 0: - merge_tag = input("New tag cannot be empty. Enter new tag > ") - while merge_tag in new_tags.keys(): - print(f"{merge_tag} already exists in the new tag set !", - "Appending '(dup)'") - merge_tag = merge_tag + "(dup)" - print(f"Merging in new tag {merge_tag}") - new_tags[merge_tag] = [] - old_tags[tag] = merge_tag - new_tags[merge_tag].append(tag) - - # Suggest merging more similar unmerged tags the merge tag - unmerged_tags = [t for t, m in old_tags.items() if m is None] - similar_unmerged_tags = find_similar(tag, unmerged_tags) - if len(similar_unmerged_tags) > 0: - print(f"Might also fit in {merge_tag}:") - for i in range(len(similar_unmerged_tags)): - print(f"\t{i}. {similar_unmerged_tags[i]}") - - choice = input(f"Space-separated indices from [0-{len(similar_unmerged_tags) - 1}] > ") - - # list(set()) removes duplicates - choice = list(set([int(c) for c in choice.split()])) - - for c in choice: - chosen_tag = similar_unmerged_tags[c] - print(f"Merging old tag {chosen_tag} in new tag {merge_tag}") - old_tags[chosen_tag] = merge_tag - new_tags[merge_tag].append(chosen_tag) +def pick_option(message, options, default=None): + """ + Print message and prompt for an option in the options list. Return input. + Promt again if input is not a valid option. + If default is set and the input is empty, return default. + """ + choice = None + while choice not in options: + choice = input(message) + if choice == '' and default != None: + return default + return choice def new_from_old_tags(old_tags): new_tags = {} - for tag, merge_tag in old_tags.items(): - if merge_tag in new_tags: - new_tags[merge_tag].append(tag) - else: - new_tags[merge_tag] = [tag] + for tag, merge_tags in old_tags.items(): + for merge_tag in merge_tags: + if merge_tag in new_tags: + new_tags[merge_tag].append(tag) + else: + new_tags[merge_tag] = [tag] return new_tags def load_tags_yaml(path): @@ -98,16 +84,46 @@ def load_tags_plain(path): with open(path, "r") as filein: tags = filein.readlines() tags = [t.strip() for t in tags] - old_tags = {t: None for t in tags} + old_tags = {t: [] for t in tags} print(f"{len(old_tags)} tags loaded in dirty set") print("Initialising empty new set") new_tags = {} return old_tags, new_tags -def export_tags(old_tags, new_tags, path): - with open(path, "w") as fileout: - yaml.dump(old_tags, fileout) - print(f"Tags exported to {path}") +def export_tags(old_tags): + while True: + try: + path = input("Export path > ") + with open(path, "w") as fileout: + yaml.dump(old_tags, fileout) + print(f"Tags exported to {path}") + return + except Exception as e: + print(f"An error occured during export: {e}") + +def display_progress(old_tags): + """ + Print the fraction of merged tags from the dirty set. + """ + merged = sum(1 for t in old_tags if len(old_tags[t]) > 0) + ntags = len(old_tags) + print(f"{merged}/{ntags} ({merged/float(ntags)*100}%) tags merged") + +def propose_merge_similar(tag, merge_tag, old_tags, new_tags): + """ + Propose merging tags from old_tags similar to tag into merge_tag. + """ + similar_tags = [t for t in find_similar(tag, old_tags.keys()) + if merge_tag not in old_tags[t]] + if len(similar_tags) > 0: + display_options(f"Might also fit in {merge_tag}:", similar_tags) + while True: + choice = pick_option("Skip (default) or index > ", + [str(i) for i in range(len(similar_tags))], + default='') + if choice == '': + break + merge(similar_tags[int(choice)], merge_tag, old_tags, new_tags) parser = argparse.ArgumentParser(description = "A simple tool to semi-automate merging a dirty set of tags into a new set of tags.") @@ -122,19 +138,54 @@ else: old_tags, new_tags = load_tags_yaml(args.tags) for tag in old_tags.keys(): - if old_tags[tag] == None: - choice = input("\nContinue [c, default] or export merged tags [e] > ") - if choice == "e": - export_path = "" - while len(export_path) == 0: - export_path = input("Export path > ") - export_tags(old_tags, new_tags, export_path) - sys.exit(0) + if len(old_tags[tag]) > 0: + continue + while True: + print("") + display_progress(old_tags) + print(f"Tag: {tag}") + display_options("Merged in:", old_tags[tag]) + similar_unmerged_tags = [t for t in find_similar(tag, old_tags.keys()) + if len(old_tags[t]) == 0 and t != tag] + display_options("Similar unmerged tags:", similar_unmerged_tags) + merge_candidates = [t for t in find_similar(tag, new_tags.keys()) + if t not in old_tags[tag]] + display_options("Merge candidates:", merge_candidates) + + if len(merge_candidates) > 0: + menu_choice = pick_option("Next tag [n], keep as new tag [k], merge with candidate [c], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'c', 'm', 'e']) else: - merge(tag, old_tags, new_tags) + menu_choice = pick_option("Next tag [n], keep as new tag [k], merge in new tag [m], export tags and exit [e] > ", ['n', 'k', 'm', 'e']) + + if menu_choice == 'n': + break + elif menu_choice == 'k': + merge(tag, tag, old_tags, new_tags) + propose_merge_similar(tag, tag, old_tags, new_tags) + break + elif menu_choice == 'c': + choice = pick_option("Merge candidate index or cancel (default) > ", + [str(i) for i in range(len(merge_candidates))], + default = '') + if choice == '': + break + merge_tag = merge_candidates[int(choice)] + merge(tag, merge_tag, old_tags, new_tags) + propose_merge_similar(tag, merge_tag, old_tags, new_tags) + elif menu_choice == 'm': + merge_tag = input("New merge tag or cancel (default) > ") + similar_candidates = find_similar(merge_tag, new_tags.keys()) + if len(similar_candidates) > 0: + display_options("Similar tags already in the new set:", + similar_candidates) + choice = pick_option("Continue with chosen tag (default) or index of replacement > ", similar_candidates, default = '') + if choice != '': + merge_tag = similar_candidates[int(choice)] + merge(tag, merge_tag, old_tags, new_tags) + propose_merge_similar(tag, merge_tag, old_tags, new_tags) + elif menu_choice == 'e': + export_tags(old_tags) + sys.exit(0) print("\nAll tags merged !") -export_path = "" -while len(export_path) == 0: - export_path = input("Export path > ") -export_tags(old_tags, new_tags, export_path) +export_tags(old_tags)