BitMover/import_media.py

334 lines
13 KiB
Python

#!/usr/bin/env python
'''
Import photos from SD card into folder with todays date + nickname
Use: importphotos (--jpg|--raw|--both) <nickname of folder (optional)>
Add script to path
TODO:
8. Optinally allow specification of a backup location on another disk
or NAS to ship a 3rd copy to
10. Every config option has an arg override
11. Optionally rename file if EVENT name was passed in
-- STRETCH --
12. Make a graphical interface
'''
import os
from pprint import pprint
import argparse
import shutil
import hashlib
from datetime import datetime
from tqdm import tqdm
import yaml
import exifread
import ffmpeg
CONFIG_FILE = 'config.yaml'
files = {}
# Read configuration from file
try:
with open(CONFIG_FILE, 'r') as cf:
config = yaml.load(cf, Loader=yaml.FullLoader)
except FileNotFoundError:
print("Configuration file not found: ", CONFIG_FILE)
print("Copy config.yaml.EXAMPLE to ", CONFIG_FILE, " and update accordingly.")
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--event", help = "Event Name")
parser.add_argument("-s", "--source", help = "Source Directory to search for files")
parser.add_argument("-d", "--destination", help = "Destination Directory to put files")
parser.add_argument("-o", "--create-originals", help = "For images only, create an originals \
folder for safe keeping")
parser.add_argument("-b", "--backup-destination", help = "Create a backup of everything at the \
specified location")
parser.add_argument("-D", "--delete-source-files", help = "Delete files from SD after validating \
checksum of copied files")
parser.add_argument("-c", "--config", help = "Load the specified config file instead \
of the default " + CONFIG_FILE)
parser.add_argument("-g", "--generate-config", help = "Generate config file based on options \
passed from command arguments")
args = parser.parse_args()
if args.event:
EVENT = args.event
else:
EVENT = False
if args.source:
config['folders']['source']['base'] = args.source
if args.destination:
config['folders']['destination']['base'] = args.source
#if args.create-oringinals:
# pass
#if args.backup-destination:
# pass
#if args.delete-source-files:
# pass
#if args.config:
# pass
#if args.generate-config:
# pass
def dump_yaml(dictionary, file):
""" dump a dictionary to a yaml file """
with open(file, 'w') as f:
yaml.dump(dictionary, f)
def md5_hash(file):
""" calculates and returns md5 hash """
#print("calculating md5 for ", f)
md5 = hashlib.md5(open(file, 'rb').read()).hexdigest()
#with open(file, 'r') as f:
# md5 = hashlib.md5(f).hexdigest()
return md5
def cmp_files(file_1,file_2):
""" Use file hashes to compare files """
return md5_hash(file_1) == md5_hash(file_2)
def get_capture_date(path, f_type):
""" get capture date from meta """
if f_type == 'image':
with open(path, 'rb') as file:
tags = exifread.process_file(file)
if 'EXIF DateTimeOriginal' in tags:
stamp = datetime.strptime(str(tags['EXIF DateTimeOriginal']), '%Y:%m:%d %H:%M:%S')
elif 'Image DateTime' in tags:
stamp = datetime.strptime(str(tags['Image DateTime']), '%Y:%m:%d %H:%M:%S')
else:
stamp = datetime.strptime(str('1900:01:01 00:00:00'), '%Y:%m:%d %H:%M:%S')
elif f_type == 'video':
stamp = datetime.strptime(ffmpeg.probe(path)['format']['tags']['creation_time'],
'%Y-%m-%dT%H:%M:%S.%f%z')
elif f_type == 'audio':
stamp = datetime.strptime(ffmpeg.probe(path)['format']['tags']['date'], '%Y-%m-%d')
else:
stamp = datetime.fromtimestamp(os.path.getctime(path))
year = stamp.strftime("%Y")
month = stamp.strftime("%m")
day = stamp.strftime("%d")
return year, month, day
def path_exists(path):
""" Does the path exist """
return os.path.exists(path)
def is_dir(path):
""" determine if the argument passed is a directory """
p_exists = path_exists(path)
if p_exists is True:
it_is_dir = os.path.isdir(path)
else:
it_is_dir = p_exists
return it_is_dir
def path_access_read(path):
""" make sure we can read from the path """
val = os.access(path, os.R_OK)
if val is False:
print(f'Can not read from {path}')
return val
def path_access_write(path):
""" make sure we can write to the path """
val = os.access(path, os.W_OK)
if val is False:
print(f'Can not write to {path}')
return val
def create_folder(file):
""" Function to create folder """
if path_exists(file) is False:
os.makedirs(file)
elif is_dir(file) is False:
pass # this needs to turn into bailing out as there is a collision.
def copy_from_source(source_path,dest_path,file_name):
""" Copy file from source to destination """
file_exists = path_exists(os.path.join(dest_path,file_name))
if file_exists is True:
check_match = cmp_files(os.path.join(source_path,file_name),
os.path.join(dest_path, file_name))
if check_match is False:
print(f'Found duplicate for {source_path}, renaming destination with md5 appended.')
base, extension = os.path.splitext(file_name)
md5 = md5_hash(os.path.join(dest_path, file_name))
file_name_hash = base + '_' + md5 + extension
os.rename(os.path.join(dest_path, file_name),
os.path.join(dest_path, file_name_hash))
else:
return
create_folder(dest_path)
shutil.copy(os.path.join(source_path,file_name), dest_path)
def process_file(path, f_type, f_name, ext):
""" gather information and add to dictionary """
i = os.path.join(path,f_name)
files[i] = { 'folders': {}, 'date': {} }
files[i]['folders']['source_path'] = path
files[i]['type'] = f_type
files[i]['name'] = f_name
files[i]['extension'] = ext
files[i]['date']['capture_date'] = get_capture_date(
os.path.join(files[i]['folders']['source_path'],
files[i]['name']),files[i]['type'])
files[i]['date']['y'] = files[i]['date']['capture_date'][0]
files[i]['date']['m'] = files[i]['date']['capture_date'][1]
files[i]['date']['d'] = files[i]['date']['capture_date'][2]
if EVENT is not False:
files[i]['folders']['destination'] = config['folders']['destination']['base'] + \
'/' + files[i]['date']['y'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '-' + \
files[i]['date']['d'] + '-' + \
EVENT
else:
files[i]['folders']['destination'] = config['folders']['destination']['base'] + \
'/' + files[i]['date']['y'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '-' + \
files[i]['date']['d']
if files[i]['type'] == 'image':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/PHOTO'
if files[i]['extension'] in ('jpg', 'jpeg'):
if config['store_originals'] is True:
files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \
'/ORIGINALS/JPG'
files[i]['folders']['destination'] = files[i]['folders']['destination'] + \
'/JPG'
else:
if config['store_originals'] is True:
files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \
'/ORIGINALS/RAW'
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/RAW'
elif files[i]['type'] == 'video':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/VIDEO'
elif files[i]['type'] == 'audio':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/AUDIO'
else:
print('WARN: ', files[i]['type'], \
' is not a known type and you never should have landed here.')
def find_files(directory):
""" find files to build a dictionary out of """
for folder, subfolders, filename in os.walk(directory):
for f_type in config['file_types']:
for ext in config['file_types'][f_type]:
for file in tqdm(filename, desc = 'Finding ' + ext + ' Files', ncols = 100):
if file.lower().endswith(ext):
print(file)
process_file(folder, f_type, file, ext)
def validate_config_dir_access():
""" Validate we can op in the defined directories """
check = path_access_write(config['folders']['destination']['base'])
if check is False:
writable = False
else:
check = path_access_read(config['folders']['source']['base'])
if check is False:
writable = False
else:
if config['store_backup'] is True:
check = path_access_write(config['folders']['backup'])
if check is False:
writable = False
else:
writable = True
else:
writable = True
return writable
def copy_files():
""" Copy Files. """
for file in tqdm(files, desc = "Copying Files:", ncols = 100):
create_folder(files[file]['folders']['destination'])
copy_from_source(files[file]['folders']['source_path'],
files[file]['folders']['destination'],
files[file]['name'])
if config['store_originals'] is True:
if files[file]['type'] == 'image':
create_folder(files[file]['folders']['destination_original'])
copy_from_source(files[file]['folders']['destination'],
files[file]['folders']['destination_original'],
files[file]['name'])
def gen_hashes():
""" Generate Hashes """
for file in tqdm(files, desc = "Generating MD5 Hashes:", ncols = 100):
#print(files[file])
files[file]['md5_checksums'] = {}
for folder in files[file]['folders']:
k = os.path.join(files[file]['folders'][folder], files[file]['name'])
files[file]['md5_checksums'][k] = md5_hash(k)
def validate_checksums():
""" Validate Checksums """
for file in tqdm(files, desc = "Verifying Checksums:", ncols = 100):
i = 0
c = {}
for checksum in files[file]['md5_checksums']:
c[i] = files[file]['md5_checksums'][checksum]
if i > 0:
P = i - 1
if c[i] == c[P]:
files[file]['source_cleanable'] = True
else:
files[file]['source_cleanable'] = False
print(f'FATAL: Checksum validation failed for: \
{files[file]["name"]} \n{c[i]}\n is not equal to \n{c[P]}\n')
print('\n File Meta:\n')
pprint(files[file])
i = i + 1
def cleanup_sd():
""" If we should cleanup the SD, nuke the copied files. """
if config['cleanup_sd'] is True:
for file in tqdm(files, desc = "Cleaning Up SD:", ncols = 100):
if files[file]['source_cleanable'] is True:
os.remove(os.path.join(files[file]['folders']['source_path'],files[file]['name']))
GO = validate_config_dir_access()
if GO is True:
find_files(config['folders']['source']['base'])
copy_files()
gen_hashes()
validate_checksums()
cleanup_sd()
else:
print("There was a problem accessing one or more directories defined in the configuration.")
dump_yaml(files, 'files_dict.yaml')
print('done.')