1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
import logging import os import sqlite3
import imagehash import magic import matplotlib.pyplot as plt from PIL import Image from pybktree import BKTree
logging.basicConfig(level=logging.INFO)
DB_FILENAME = os.path.expanduser("~/Documents/pixiv.image.hash.db") DB_NAME = "hash" CREATE_SQL = """CREATE TABLE hash ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, filename TEXT NOT NULL, hash TEXT NOT NULL ) """ IMG_DIR = "/mnt/hdd43/imgs/pixiv" PREVIEW_DIR = os.path.expanduser("~/Pictures/dups")
file_magic = magic.Magic(mime=True) sqlite3_conn = sqlite3.connect(DB_FILENAME)
def ensure_table() -> None: cur = sqlite3_conn.cursor() if ( cur.execute( "SELECT name FROM sqlite_master WHERE name = ?", (DB_NAME,) ).fetchone() is not None ): logging.info(f"table {DB_NAME} exists.") return cur.execute(CREATE_SQL) logging.info(f"create table {DB_NAME}.")
def is_image(filename: str) -> bool: return file_magic.from_file(filename=filename).startswith("image")
def in_db(filename: str) -> bool: cur = sqlite3_conn.cursor() if ( cur.execute("SELECT id FROM hash WHERE filename = ?", (filename,)).fetchone() is None ): return False return True
def cal_hash_recursively(directory: str = ".") -> None: for dirpath, _, filenames in os.walk(directory): for filename in filenames: f = os.path.normpath(os.path.join(dirpath, filename)) logging.info(f"processing {f}...") if not is_image(f): logging.info(f"{f}: not an image, skipping ...") continue if in_db(f): logging.info(f"{f}: already in db, skipping ...") continue cur = sqlite3_conn.cursor() cur.execute( "INSERT INTO hash VALUES (?, ?, ?)", ( None, f, str(imagehash.phash(Image.open(f), hash_size=16)), ), ) logging.info(f"{f}: hash saved") sqlite3_conn.commit()
def img_hamming(a: tuple, b: tuple): return a[1] - b[1]
def construct_bktree() -> BKTree: cur = sqlite3_conn.cursor() cur.execute("SELECT filename, hash FROM hash") row = cur.fetchone() hashes = [] while row is not None: hashes.append((row[0], imagehash.hex_to_hash(row[1]))) row = cur.fetchone() return BKTree(img_hamming, hashes)
def find_dups(tree: BKTree) -> list[list]: cur = sqlite3_conn.cursor() cur.execute("SELECT filename, hash FROM hash") row = cur.fetchone() dups = [] while row is not None: dup = [i[1][0] for i in tree.find((row[0], imagehash.hex_to_hash(row[1])), 25)] if len(dup) > 1: dups.append(dup) row = cur.fetchone() dups_sorted = [list(t) for t in set(tuple(sorted(l)) for l in dups)] return dups_sorted
def generate_dup_previews(dups: list[list]) -> None: for i, dup in enumerate(dups): images = dup titles = [os.path.basename(img) for img in images] logging.info(f"processing dup #{i}: {images}") fig, axes = plt.subplots( nrows=1, ncols=len(titles), figsize=(3 * len(titles), 3) ) axes = axes.flatten() for j, ax in enumerate(axes): with Image.open(images[j]) as img: img.thumbnail((1024, 1024)) ax.imshow(img) ax.set_title(titles[j], fontsize=8) ax.axis("off") fig.tight_layout() fig.savefig(os.path.join(PREVIEW_DIR, f"dup.{i}.png"), dpi=300) plt.close(fig)
ensure_table() cal_hash_recursively(IMG_DIR) tree = construct_bktree() dups = find_dups(tree) generate_dup_previews(dups)
|