Source code for src.models.distance_tuning

import os
import re
import logging
import pandas as pd
from src.utils.utils import check_path_exists
from sklearn.tree import DecisionTreeClassifier
from sklearn.tree import export_text
from src.models.face_recognition import FaceRecognition
from src.data.knowledge_graphs import download_images

LOGGER = logging.getLogger('distance-tuning')


[docs]def tune_distance_threshold(video_path='data/datasets/youtube_faces', sample_per_person=5, model='Dlib') -> float: """ Finds the optimal distance threshold Args: video_path (str): Path to the dataset on which the threshold should be investigated sample_per_person (int): Number of frames to compare in the data set model (str): The face recognition model to tune Returns: distance_threshold (float): The optimal threshold """ csv_path = os.path.join(video_path, 'information.csv') # { file, entities } # 1 clean, match and download thumbnails (Match only names that do not have duplicate names ) thumbnail_path = 'data/thumbnails/tuning/wikidata_Thumbnails_links.csv' thumbnail_dir = 'data/thumbnails/tuning/' if not os.path.exists(thumbnail_path): LOGGER.info('1/7 cleaning and matching thumbnails...') video_df = pd.read_csv(csv_path) video_name_df = pd.DataFrame(video_df.entities.unique(), columns=['entities']) thumbnail_df = pd.concat([pd.read_csv('data/thumbnails/actor_wikidata_Thumbnails_links.csv'), pd.read_csv('data/thumbnails/except_actor_wikidata_Thumbnails_links.csv')]) # clean thumbnails 1 : remove people with the same name check_duplicate = thumbnail_df[['norm_name', 'folder_name']].copy().drop_duplicates() check_duplicate = check_duplicate.groupby(['norm_name']).size() check_duplicate = pd.DataFrame(check_duplicate[check_duplicate == 1].index, columns=['norm_name']) video_name_df = video_name_df.merge(check_duplicate, how='inner', left_on='entities', right_on='norm_name').drop(['norm_name'], axis=1) tmp = video_name_df.merge(thumbnail_df, how='inner', left_on='entities', right_on='norm_name') # clean thumbnails 2 : Remove the norm_name of multiple people who are regularized into one person check_error = tmp[['norm_name', 'name']].copy().drop_duplicates() check_error = check_error.groupby(['norm_name']).size() check_error = pd.DataFrame(check_error[check_error == 1].index, columns=['norm_name']) tmp = tmp.merge(check_error, how='inner') check_path_exists(thumbnail_dir) tmp.to_csv(thumbnail_path) LOGGER.info(f'the distance threshold tuning is running on {len(tmp.name.unique())} celebrities') # TODO : test LOGGER.info('2/7 downloading thumbnails...') download_images(path=thumbnail_dir) # 2 create thumbnails embeddings thumbnails_path = 'data/thumbnails/tuning/thumbnails' fr_dir = 'data/embeddings/tuning' check_path_exists(fr_dir) embeddings_path = os.path.join(fr_dir, 'embeddings.pickle') labels_path = os.path.join(fr_dir, 'labels.pickle') if os.path.exists(embeddings_path): os.remove(embeddings_path) if os.path.exists(labels_path): os.remove(labels_path) LOGGER.info('3/7 creating thumbnails embeddings...') fr = FaceRecognition(thumbnails_path=thumbnails_path, encoder_name=model, labels_path=labels_path, embeddings_path=embeddings_path) # 3 create train data set LOGGER.info('4/7 creating train data set...') dataset = pd.DataFrame(fr.labels, columns=['labels']) # TODO(honglin): only keep ix later dataset['ix'] = dataset.index # train data set: matching part matches = pd.DataFrame(pd.read_csv(thumbnail_path).norm_name.unique(), columns=['entities']) video_df = pd.read_csv(csv_path).merge(matches, how='inner') sample_frames = video_df.groupby(['entities']).sample(n=sample_per_person, random_state=42) # { file, entities } match_dataset = dataset.merge(sample_frames, how='inner', left_on='labels', right_on='entities') # { ix, labels, file, entities } match_dataset['identical'] = 1 # { ix, labels, file, entities, identical } match_dataset = match_dataset[['ix', 'labels', 'file', 'entities', 'identical']] # train data set: unmatched part unmatch_data = [] for i, row in dataset.iterrows(): # for every thumbnail ix = row['ix'] label = row['labels'] sample_per_thumbnail = match_dataset[match_dataset['labels'] != label].sample(n=sample_per_person, random_state=42) for j, r in sample_per_thumbnail.iterrows(): unmatch_data.append([ix, label, r['file'], r['entities'], 0]) unmatch_dataset = pd.DataFrame(unmatch_data, columns=['ix', 'labels', 'file', 'entities', 'identical']) dataset = pd.concat([match_dataset, unmatch_dataset]).reset_index(drop=True) # 4 create embeddings of frames LOGGER.info('5/7 creating embeddings of frames...') file_l = [] frame_embedding_l = [] remove_number = 0 for i, row in dataset[dataset['identical'] == 1].iterrows(): path = row['file'] if not os.path.exists(path): LOGGER.info('{path} does not exist.') continue if path not in file_l: embeddings = fr.represent(path, return_face_number=True) if isinstance(embeddings, int): # the frames that no faces or more than 1 face in remove_number += 1 else: file_l.append(path) frame_embedding_l.append(embeddings[0]) LOGGER.info(f'{remove_number} frames contains more than 1 face') frame_embeddings = pd.DataFrame({'file': file_l, 'embeddings': frame_embedding_l}).set_index('file') # remove those frames from train data set dataset = dataset.merge(pd.DataFrame(frame_embeddings.index, columns=['file']), how='inner') # balance train dataset dataset_match = dataset[dataset['identical'] == 1] dataset_unmatch = dataset[dataset['identical'] == 0] dataset_match_count = dataset_match.shape[0] dataset_unmatch_count = dataset_unmatch.shape[0] if dataset_match_count > dataset_unmatch_count: dataset_match_downsampled = dataset_match.sample(n=dataset_unmatch_count, random_state=42) dataset = pd.concat([dataset_match_downsampled, dataset_unmatch]) else: dataset_unmatch_downsampled = dataset_unmatch.sample(n=dataset_match_count, random_state=42) dataset = pd.concat([dataset_match, dataset_unmatch_downsampled]) # 5s update dataset with distance LOGGER.info('6/7 calculating distances...') cosine_distances = [] euclidean_distances = [] euclidean_l2_distances = [] for i, row in dataset.iterrows(): embedding_thumbnail = fr.embeddings[row['ix']] embedding_frame = frame_embeddings.loc[row['file'], 'embeddings'] cosine_distance = 1 - np.matmul(embedding_thumbnail, embedding_frame) / ( np.linalg.norm(embedding_thumbnail) * np.linalg.norm(embedding_frame)) cosine_distances.append(cosine_distance) euclidean_distance = np.linalg.norm(embedding_thumbnail - embedding_frame) euclidean_distances.append(euclidean_distance) euclidean_l2_distance = np.linalg.norm( embedding_thumbnail / np.linalg.norm(embedding_thumbnail) - embedding_frame / np.linalg.norm( embedding_frame)) euclidean_l2_distances.append(euclidean_l2_distance) dataset[ 'cosine_distances'] = cosine_distances # { ix, labels, file, entities, identical, cosine_distances, euclidean_distances, euclidean_l2_distances } dataset['euclidean_distances'] = euclidean_distances dataset['euclidean_l2_distances'] = euclidean_l2_distances dataset.to_csv('data/datasets/youtube_faces/dataset.csv') # TODO # 6 run decision tree on dataset to get tuned distance threshold, distance as train data, identical as label LOGGER.info('7/7 running decision tree and evaluating...') dataset = pd.read_csv('data/datasets/youtube_faces/dataset.csv') # TODO for distance in ['cosine_distances', 'euclidean_distances', 'euclidean_l2_distances']: decision_tree = DecisionTreeClassifier(max_depth=1) decision_tree.fit(dataset[[distance]], dataset[['identical']]) tree = export_text(decision_tree, feature_names=['distance']) LOGGER.info(tree) number_pattern = r'\d+.?\d*' distance_threshold = float(re.search(number_pattern, tree).group()) # 7 evaluation dataset['prediction'] = 0 idx = dataset[dataset[distance] <= distance_threshold].index dataset.loc[idx, 'prediction'] = 1 tp = (dataset.prediction & dataset.identical).sum() precision = tp / dataset.prediction.sum() recall = tp / dataset.identical.sum() LOGGER.info(f'{distance} threshold: {distance_threshold}. precision: {precision}. recall: {recall}') return distance_threshold