#!/usr/bin/env python """ Import photos from SD card into folder with today's date + nickname Use: import_media.py (--jpg|--raw|--both) Add script to path TODO: 8. Optionally allow specification of a backup location on another disk or NAS to ship a 3rd copy to 10. Every config option has an arg override 11. Optionally rename file if EVENT name was passed in -- STRETCH -- 12. Make a graphical interface """ import os import sys from pprint import pprint import argparse import shutil import hashlib import xxhash from datetime import datetime from tqdm import tqdm import yaml import exifread import ffmpeg CONFIG_FILE = 'config.yaml' files = {} # Read configuration from file try: with open(CONFIG_FILE, 'r') as cf: config = yaml.load(cf, Loader=yaml.FullLoader) except FileNotFoundError: print("Configuration file not found: ", CONFIG_FILE) print("Copy config.yaml.EXAMPLE to ", CONFIG_FILE, " and update accordingly.") sys.exit() parser = argparse.ArgumentParser() parser.add_argument("-e", "--event", help = "Event Name") parser.add_argument("-s", "--source", help = "Source Directory to search for files") parser.add_argument("-d", "--destination", help = "Destination Directory to put files") parser.add_argument("-o", "--create-originals", help = "For images only, create an originals \ folder for safe keeping") parser.add_argument("-b", "--backup-destination", help = "Create a backup of everything at the \ specified location") parser.add_argument("-D", "--delete-source-files", help = "Delete files from SD after validating \ checksum of copied files") parser.add_argument("-v", "--verify", help = "[True|False] Verify the checksum of \ the copied file") parser.add_argument("-c", "--config", help = "Load the specified config file instead \ of the default " + CONFIG_FILE) parser.add_argument("-g", "--generate-config", help = "Generate config file based on options \ passed from command arguments") args = parser.parse_args() if args.event: EVENT = args.event else: EVENT = False if args.source: config['folders']['source']['base'] = args.source if args.destination: config['folders']['destination']['base'] = args.source #if args.create-originals: # pass #if args.backup-destination: # pass #if args.delete-source-files: # pass #if args.config: # pass #if args.generate-config: # pass def dump_yaml(dictionary, file): """ dump a dictionary to a yaml file """ one_million = 1000**2 with open(file, 'w') as f: yaml.dump( dictionary, f, default_flow_style=False, width=one_million) def is_file(file): """ Determine if the object is a file. """ return bool(os.path.isfile(file)) ''' def md5_hash(file): """ calculates and returns md5 hash """ if config['verify_checksum']: #print("calculating md5 for ", f) md5 = hashlib.md5(open(file, 'rb').read()).hexdigest() #with open(file, 'r') as f: # md5 = hashlib.md5(f).hexdigest() else: md5 = 'no_verify' return md5 ''' def xx_hash(file): """ calculates and returns file hash based on xxHash """ if config['verify_checksum']: size = os.path.getsize(file) hasher = xxhash.xxh64() with open(file, 'rb') as f: with tqdm(total=size, unit='B', unit_scale=True, desc=f'Getting hash for {os.path.basename(file)}') as pbar: for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk) pbar.update(len(chunk)) file_hash = hasher.hexdigest() else: file_hash = 'no_verify' return file_hash def cmp_files(file_1,file_2): """ Use file hashes to compare files """ hash1 = xx_hash(file_1) hash2 = xx_hash(file_2) print(f'\n{hash1}') print(f'\n{hash2}') return hash1 == hash2 def get_capture_date(path, f_type): """ get capture date from meta """ if f_type == 'image': with open(path, "rb") as file: tags = exifread.process_file(file) if 'EXIF DateTimeOriginal' in tags: try: stamp = datetime.strptime( str(tags['EXIF DateTimeOriginal']), '%Y:%m:%d %H:%M:%S') except ValueError as ve_dte: print(f"\nError: {ve_dte}") print("\nTrying digitized") try: stamp = datetime.strptime( str(tags['EXIF DateTimeDigitized']), '%Y:%m:%d %H:%M:%S') except ValueError as ve_dtd: print(f"\nError: {ve_dtd}") print("\nTrying Image DateTime") try: stamp = datetime.strptime( str(tags['Image DateTime']), '%Y:%m:%d %H:%M:%S') except ValueError as ve_idt: print(f"\nError: {ve_idt}") print(f"\nGiving up... Please inspect {path} and try again\n") sys.exit() elif 'Image DateTime' in tags: stamp = datetime.strptime( str(tags['Image DateTime']), '%Y:%m:%d %H:%M:%S') else: stamp = datetime.strptime( str('1900:01:01 00:00:00'), '%Y:%m:%d %H:%M:%S') elif f_type == 'video': try: stamp = datetime.strptime( ffmpeg.probe(path)['format']['tags']['creation_time'], '%Y-%m-%dT%H:%M:%S.%f%z') except: print(f"\n{path} had an error. Please inspect the file and try again.") sys.exit() elif f_type == 'audio': try: stamp = datetime.strptime(ffmpeg.probe( path)['format']['tags']['date'], '%Y-%m-%d') except KeyError as ke: print(f'\nError: {ke} for {path}. Trying getctime...') try: stamp = datetime.fromtimestamp(os.path.getctime(path)) except: print(f'\nCould not get timestamp for {path}. Giving up.') sys.exit() else: try: stamp = datetime.fromtimestamp(os.path.getctime(path)) except: print(f'\nCould not get timestamp for {path}. Giving up.') sys.exit() year = stamp.strftime("%Y") month = stamp.strftime("%m") day = stamp.strftime("%d") return year, month, day def path_exists(path): """ Does the path exist """ return os.path.exists(path) def is_dir(path): """ determine if the argument passed is a directory """ p_exists = path_exists(path) if p_exists is True: it_is_dir = os.path.isdir(path) else: it_is_dir = p_exists return it_is_dir def path_access_read(path): """ make sure we can read from the path """ val = os.access(path, os.R_OK) if val is False: print(f'Can not read from {path}') return val def path_access_write(path): """ make sure we can write to the path """ val = os.access(path, os.W_OK) if val is False: print(f'Can not write to {path}') return val def create_folder(file): """ Function to create folder """ if path_exists(file) is False: os.makedirs(file) elif is_dir(file) is False: pass # this needs to turn into bailing out as there is a collision. def copy_with_progress(s,d,f): """ Copy a file with the progress bar """ size = os.path.getsize(s) with open(s, 'rb') as fs: with open(d, 'wb') as fd: with tqdm(total=size, unit='B', unit_scale=True, desc=f'Copying {f}') as pbar: while True: chunk = fs.read(4096) if not chunk: break fd.write(chunk) pbar.update(len(chunk)) def copy_from_source(source_path,dest_path,file_name): """ Copy file from source to destination """ file_exists = path_exists(os.path.join(dest_path,file_name)) if file_exists is True: print(f'\nFound {file_name} at destination, checking if they match.') check_match = cmp_files(os.path.join(source_path,file_name), os.path.join(dest_path, file_name)) if check_match is False: print(f'\nFound duplicate for {source_path}/{file_name}, \ renaming destination with hash appended.') base, extension = os.path.splitext(file_name) #md5 = md5_hash(os.path.join(dest_path, file_name)) f_xxhash = xx_hash(os.path.join(dest_path, file_name)) #file_name_hash = base + '_' + md5 + extension file_name_hash = base + '_' + f_xxhash + extension os.rename(os.path.join(dest_path, file_name), os.path.join(dest_path, file_name_hash)) else: print(f'\n{file_name} hashes match') return create_folder(dest_path) #shutil.copy(os.path.join(source_path,file_name), dest_path) copy_with_progress(os.path.join(source_path,file_name), os.path.join(dest_path,file_name), file_name) os.system('clear') def process_file(path, f_type, f_name, ext): """ gather information and add to dictionary """ i = os.path.join(path,f_name) files[i] = { 'folders': {}, 'date': {} } files[i]['folders']['source_path'] = path files[i]['type'] = f_type files[i]['name'] = f_name files[i]['extension'] = ext files[i]['date']['capture_date'] = get_capture_date( os.path.join(files[i]['folders']['source_path'], files[i]['name']),files[i]['type']) files[i]['date']['y'] = files[i]['date']['capture_date'][0] files[i]['date']['m'] = files[i]['date']['capture_date'][1] files[i]['date']['d'] = files[i]['date']['capture_date'][2] if EVENT is not False: files[i]['folders']['destination'] = config['folders']['destination']['base'] + \ '/' + files[i]['date']['y'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '-' + \ files[i]['date']['d'] + '-' + \ EVENT else: files[i]['folders']['destination'] = config['folders']['destination']['base'] + \ '/' + files[i]['date']['y'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '/' + \ files[i]['date']['y'] + '-' + \ files[i]['date']['m'] + '-' + \ files[i]['date']['d'] if files[i]['type'] == 'image': files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/PHOTO' if files[i]['extension'] in ('jpg', 'jpeg'): if config['store_originals'] is True: files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \ '/ORIGINALS/JPG' files[i]['folders']['destination'] = files[i]['folders']['destination'] + \ '/JPG' else: if config['store_originals'] is True: files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \ '/ORIGINALS/RAW' files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/RAW' elif files[i]['type'] == 'video': files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/VIDEO' elif files[i]['type'] == 'audio': files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/AUDIO' else: print('WARN: ', files[i]['type'], ' is not a known type and you never should have landed here.') def find_files(directory): """ find files to build a dictionary out of """ os.system('clear') for folder, subfolders, filename in os.walk(directory): for f_type in config['file_types']: for ext in config['file_types'][f_type]: for file in tqdm(filename, desc = 'Finding ' + ext + ' Files in ' + folder): if file.lower().endswith(ext): current_file = os.path.join(folder,file) if is_file(current_file): process_file(folder, f_type, file, ext) else: print(f"Skipping {current_file} as it does not look like a real file.") def validate_config_dir_access(): """ Validate we can operate in the defined directories """ check = path_access_write(config['folders']['destination']['base']) if check is False: writable = False else: check = path_access_read(config['folders']['source']['base']) if check is False: writable = False else: if config['store_backup'] is True: check = path_access_write(config['folders']['backup']) if check is False: writable = False else: writable = True else: writable = True return writable def copy_files(): """ Copy Files. """ os.system('clear') for file in tqdm(files, desc = "Copying Files:"): create_folder(files[file]['folders']['destination']) copy_from_source(files[file]['folders']['source_path'], files[file]['folders']['destination'], files[file]['name']) if config['store_originals'] is True: if files[file]['type'] == 'image': create_folder(files[file]['folders']['destination_original']) copy_from_source(files[file]['folders']['destination'], files[file]['folders']['destination_original'], files[file]['name']) ''' def gen_hashes(): """ Generate Hashes """ for file in tqdm(files, desc = "Generating MD5 Hashes:", ncols = 100): #print(files[file]) files[file]['md5_checksums'] = {} for folder in files[file]['folders']: k = os.path.join(files[file]['folders'][folder], files[file]['name']) files[file]['md5_checksums'][k] = md5_hash(k) ''' def gen_xxhashes(): """ Generate xxHashes """ os.system('clear') for file in tqdm(files, desc = "Generating xx Hashes:"): #print(files[file]) files[file]['xx_checksums'] = {} for folder in files[file]['folders']: k = os.path.join(files[file]['folders'][folder], files[file]['name']) files[file]['xx_checksums'][k] = xx_hash(k) print(f"{k}: {files[file]['xx_checksums'][k]}") ''' def validate_checksums(): """ Validate Checksums """ for file in tqdm(files, desc = "Verifying Checksums:", ncols = 100): i = 0 c = {} for checksum in files[file]['md5_checksums']: c[i] = files[file]['md5_checksums'][checksum] if i > 0: p = i - 1 if c[i] == c[p]: files[file]['source_cleanable'] = True else: files[file]['source_cleanable'] = False print(f'FATAL: Checksum validation failed for: \ {files[file]["name"]} \n{c[i]}\n is not equal to \n{c[p]}\n') print('\n File Meta:\n') pprint(files[file]) i = i + 1 ''' def validate_xx_checksums(): """ Validate Checksums """ os.system('clear') for file in tqdm(files, desc = "Verifying Checksums:"): i = 0 c = {} for checksum in files[file]['xx_checksums']: c[i] = files[file]['xx_checksums'][checksum] if i > 0: p = i - 1 if c[i] == c[p]: files[file]['source_cleanable'] = True else: files[file]['source_cleanable'] = False print(f'FATAL: Checksum validation failed for: \ {files[file]["name"]} \n{c[i]}\n is not equal to \n{c[p]}\n') print('\n File Meta:\n') pprint(files[file]) i = i + 1 def cleanup_sd(): """ If we should clean up the SD, nuke the copied files. """ if config['cleanup_sd'] is True: os.system('clear') for file in tqdm(files, desc = "Cleaning Up SD:"): if files[file]['source_cleanable'] is True: os.remove(os.path.join(files[file]['folders']['source_path'],files[file]['name'])) GO = validate_config_dir_access() if GO is True: find_files(config['folders']['source']['base']) copy_files() gen_xxhashes() validate_xx_checksums() cleanup_sd() else: print("There was a problem accessing one or more directories defined in the configuration.") dump_yaml(files, 'files_dict.yaml') print('done.')