#!/usr/bin/env python ''' Import photos from SD card into folder with todays date + nickname Use: importphotos (--jpg|--raw|--both) Add script to path TODO: 8. Optinally allow specification of a backup location on another disk or NAS to ship a 3rd copy to 10. Every config option has an arg override 11. Optionally rename file if event name was passed in -- STRETCH -- 12. Make a graphical interface ''' import os from pprint import pprint import argparse import shutil import hashlib from datetime import datetime from tqdm import tqdm import yaml import exifread import ffmpeg CONFIG_FILE = 'config.yaml' files = {} # Read configuration from file try: with open(CONFIG_FILE, 'r') as cf: config = yaml.load(cf, Loader=yaml.FullLoader) except FileNotFoundError: print("Configuration file not found: ", CONFIG_FILE) print("Copy config.yaml.EXAMPLE to ", CONFIG_FILE, " and update accordingly.") parser = argparse.ArgumentParser() parser.add_argument("-e", "--event", help = "Event Name") parser.add_argument("-s", "--source", help = "Source Directory to search for files") parser.add_argument("-d", "--destination", help = "Destination Directory to put files") parser.add_argument("-o", "--create-originals", help = "For images only, create an originals \ folder for safe keeping") parser.add_argument("-b", "--backup-destination", help = "Create a backup of everything at the \ specified location") parser.add_argument("-D", "--delete-source-files", help = "Delete files from SD after validating \ checksum of copied files") parser.add_argument("-c", "--config", help = "Load the specified config file instead \ of the default " + CONFIG_FILE) parser.add_argument("-g", "--generate-config", help = "Generate config file based on options \ passed from command arguments") args = parser.parse_args() if args.event: event = args.event if args.source: config['folders']['source']['base'] = args.source if args.destination: config['folders']['destination']['base'] = args.source #if args.create-oringinals: # pass #if args.backup-destination: # pass #if args.delete-source-files: # pass #if args.config: # pass #if args.generate-config: # pass def dump_yaml(dictionary, file): """ dump a dictionary to a yaml file """ with open(file, 'w') as f: yaml.dump(dictionary, f) def md5_hash(file): """ calculates and returns md5 hash """ #print("calculating md5 for ", f) md5 = hashlib.md5(open(file, 'rb').read()).hexdigest() #with open(file, 'r') as f: # md5 = hashlib.md5(f).hexdigest() return md5 def cmp_files(file_1,file_2): """ Use file hashes to compare files """ return md5_hash(file_1) == md5_hash(file_2) def get_capture_date(path, f_type): """ get capture date from meta """ if f_type == 'image': with open(path, 'rb') as file: tags = exifread.process_file(file) if 'EXIF DateTimeOriginal' in tags: stamp = datetime.strptime(str(tags['EXIF DateTimeOriginal']), '%Y:%m:%d %H:%M:%S') elif 'Image DateTime' in tags: stamp = datetime.strptime(str(tags['Image DateTime']), '%Y:%m:%d %H:%M:%S') else: stamp = datetime.strptime(str('1900:01:01 00:00:00'), '%Y:%m:%d %H:%M:%S') elif f_type == 'video': stamp = datetime.strptime(ffmpeg.probe(path)['format']['tags']['creation_time'], '%Y-%m-%dT%H:%M:%S.%f%z') elif f_type == 'audio': stamp = datetime.strptime(ffmpeg.probe(path)['format']['tags']['date'], '%Y-%m-%d') else: stamp = datetime.fromtimestamp(os.path.getctime(path)) year = stamp.strftime("%Y") month = stamp.strftime("%m") day = stamp.strftime("%d") return year, month, day def path_exists(path): """ Does the path exist """ return os.path.exists(path) def is_dir(path): """ determine if the argument passed is a directory """ p_exists = path_exists(path) if p_exists is True: it_is_dir = os.path.isdir(path) else: it_is_dir = p_exists return it_is_dir def path_access_read(path): """ make sure we can read from the path """ val = os.access(path, os.R_OK) if val is False: print(f'Can not read from {path}') return val def path_access_write(path): """ make sure we can write to the path """ val = os.access(path, os.W_OK) if val is False: print(f'Can not write to {path}') return val def create_folder(file): """ Function to create folder """ if path_exists(file) is False: os.makedirs(file) elif is_dir(file) is False: pass # this needs to turn into bailing out as there is a collision. def copy_from_source(source_path,dest_path,file_name): """ Copy file from source to destination """ file_exists = path_exists(os.path.join(dest_path,file_name)) if file_exists is True: check_match = cmp_files(os.path.join(source_path,file_name), os.path.join(dest_path, file_name)) if check_match is False: print(f'Found duplicate for {source_path}, renaming destination with md5 appended.') base, extension = os.path.splitext(file_name) md5 = md5_hash(os.path.join(dest_path, file_name)) file_name_hash = base + '_' + md5 + extension os.rename(os.path.join(dest_path, file_name), os.path.join(dest_path, file_name_hash)) else: return create_folder(dest_path) shutil.copy(os.path.join(source_path,file_name), dest_path) def process_file(path, f_type, f_name, ext): """ gather information and add to dictionary """ i = os.path.join(path,f_name) files[i] = { 'folders': {}, 'date': {} } files[i]['folders']['source_path'] = path files[i]['type'] = f_type files[i]['name'] = f_name files[i]['extension'] = ext files[i]['date']['capture_date'] = get_capture_date( os.path.join(files[i]['folders']['source_path'], files[i]['name']),files[i]['type']) files[i]['date']['y'] = files[i]['date']['capture_date'][0] files[i]['date']['m'] = files[i]['date']['capture_date'][1] files[i]['date']['d'] = files[i]['date']['capture_date'][2] if event: files[i]['folders']['destination'] = config['folders']['destination']['base'] + \ '/' + files[i]['date']['y'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '-' + \ files[i]['date']['d'] + '-' + \ event else: files[i]['folders']['destination'] = config['folders']['destination']['base'] + \ '/' + files[i]['date']['y'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '-' + \ files[i]['date']['d'] if files[i]['type'] == 'image': files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/PHOTO' if files[i]['extension'] in ('jpg', 'jpeg'): if config['store_originals'] is True: files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \ '/ORIGINALS/JPG' files[i]['folders']['destination'] = files[i]['folders']['destination'] + \ '/JPG' else: if config['store_originals'] is True: files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \ '/ORIGINALS/RAW' files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/RAW' elif files[i]['type'] == 'video': files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/VIDEO' elif files[i]['type'] == 'audio': files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/AUDIO' else: print('WARN: ', files[i]['type'], \ ' is not a known type and you never should have landed here.') def find_files(directory): """ find files to build a dictionary out of """ for folder, subfolders, filename in os.walk(directory): for f_type in config['file_types']: for ext in config['file_types'][f_type]: for file in tqdm(filename, desc = 'Finding ' + ext + ' Files', ncols = 100): if file.lower().endswith(ext): print(file) process_file(folder, f_type, file, ext) def validate_config_dir_access(): """ Validate we can op in the defined directories """ check = path_access_write(config['folders']['destination']['base']) if check is False: writable = False else: check = path_access_read(config['folders']['source']['base']) if check is False: writable = False else: if config['store_backup'] is True: check = path_access_write(config['folders']['backup']) if check is False: writable = False else: writable = True else: writable = True return writable def copy_files(): """ Copy Files. """ for file in tqdm(files, desc = "Copying Files:", ncols = 100): create_folder(files[file]['folders']['destination']) copy_from_source(files[file]['folders']['source_path'], files[file]['folders']['destination'], files[file]['name']) if config['store_originals'] is True: if files[file]['type'] == 'image': create_folder(files[file]['folders']['destination_original']) copy_from_source(files[file]['folders']['destination'], files[file]['folders']['destination_original'], files[file]['name']) def gen_hashes(): """ Generate Hashes """ for file in tqdm(files, desc = "Generating MD5 Hashes:", ncols = 100): #print(files[file]) files[file]['md5_checksums'] = {} for folder in files[file]['folders']: k = os.path.join(files[file]['folders'][folder], files[file]['name']) files[file]['md5_checksums'][k] = md5_hash(k) def validate_checksums(): """ Validate Checksums """ for file in tqdm(files, desc = "Verifying Checksums:", ncols = 100): i = 0 c = {} for checksum in files[file]['md5_checksums']: c[i] = files[file]['md5_checksums'][checksum] if i > 0: P = i - 1 if c[i] == c[P]: files[file]['source_cleanable'] = True else: files[file]['source_cleanable'] = False print(f'FATAL: Checksum validation failed for: \ {files[file]["name"]} \n{c[i]}\n is not equal to \n{c[P]}\n') print('\n File Meta:\n') pprint(files[file]) i = i + 1 def cleanup_sd(): """ If we should cleanup the SD, nuke the copied files. """ if config['cleanup_sd'] is True: for file in tqdm(files, desc = "Cleaning Up SD:", ncols = 100): if files[file]['source_cleanable'] is True: os.remove(os.path.join(files[file]['folders']['source_path'],files[file]['name'])) GO = validate_config_dir_access() if GO is True: find_files(config['folders']['source']['base']) copy_files() gen_hashes() validate_checksums() cleanup_sd() else: print("There was a problem accessing one or more directories defined in the configuration.") dump_yaml(files, 'files_dict.yaml') print('done.')