BitMover/import_media.py

242 lines
9.1 KiB
Python

#!/usr/bin/env python3
'''
Import photos from SD card into folder with todays date + nickname
Use: importphotos (--jpg|--raw|--both) <nickname of folder (optional)>
Add script to path
TODO:
8. Optinally allow specification of a backup location on another disk
or NAS to ship a 3rd copy to
10. Every config option has an arg override
11. Optionally rename file if event name was passed in
-- STRETCH --
12. Make a graphical interface
'''
import os
import sys
from pprint import pprint
import argparse
import shutil
import hashlib
from datetime import datetime
from tqdm import tqdm
import yaml
import exifread
import ffmpeg
CONFIG_FILE = 'config.yaml'
files = {}
# Read configuration from file
try:
with open(CONFIG_FILE, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
except FileNotFoundError:
print("Configuration file not found: ", CONFIG_FILE)
print("Copy config.yaml.EXAMPLE to ", CONFIG_FILE, " and update accordingly.")
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--event", help = "Event Name")
args = parser.parse_args()
if args.event:
event = args.event
def md5_hash(f):
""" calculates and returns md5 hash """
#print("calculating md5 for ", f)
md5 = hashlib.md5(open(f, 'rb').read()).hexdigest()
return md5
def cmp_files(f1,f2):
""" Use file hashes to compare files """
return md5_hash(f1) == md5_hash(f2)
def get_capture_date(p, t):
""" get capture date from meta """
if t == 'image':
with open(p, 'rb') as f:
tags = exifread.process_file(f)
stamp = datetime.strptime(str(tags['EXIF DateTimeOriginal']), '%Y:%m:%d %H:%M:%S')
elif t == 'video':
stamp = datetime.strptime(ffmpeg.probe(p)['format']['tags']['creation_time'],
'%Y-%m-%dT%H:%M:%S.%f%z')
elif t == 'audio':
stamp = datetime.strptime(ffmpeg.probe(p)['format']['tags']['date'], '%Y-%m-%d')
else:
stamp = datetime.fromtimestamp(os.path.getctime(p))
year = stamp.strftime("%Y")
month = stamp.strftime("%m")
day = stamp.strftime("%d")
return year, month, day
def create_folder(f):
""" Function to create folder """
try:
os.makedirs(f)
except FileExistsError:
pass
def copy_from_source(p,d,f):
""" Copy file from source to destination """
if os.path.exists(os.path.join(d, f)):
check_match = cmp_files(p, os.path.join(d, f))
if check_match is False:
print(f'Found duplicate for {p}, renaming destination with md5 appended.')
base, extension = os.path.splitext(f)
file_name_hash = base + '_' + md5_hash(os.path.join(d, f)) + extension
os.rename(os.path.join(d, f), os.path.join(d, file_name_hash))
else:
return
shutil.copy(p, d)
def process_file(path, f_type, f_name, ext):
""" gather information and add to dictionary """
i = os.path.join(path,f_name)
files[i] = { 'folders': {}, 'date': {} }
files[i]['folders']['source_path'] = path
files[i]['type'] = f_type
files[i]['name'] = f_name
files[i]['extension'] = ext
files[i]['date']['capture_date'] = get_capture_date(
os.path.join(files[i]['folders']['source_path'],
files[i]['name']),files[i]['type'])
files[i]['date']['y'] = files[i]['date']['capture_date'][0]
files[i]['date']['m'] = files[i]['date']['capture_date'][1]
files[i]['date']['d'] = files[i]['date']['capture_date'][2]
if event:
files[i]['folders']['destination'] = config['folders']['destination']['base'] + \
'/' + files[i]['date']['y'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '-' + \
files[i]['date']['d'] + '-' + \
event
else:
files[i]['folders']['destination'] = config['folders']['destination']['base'] + \
'/' + files[i]['date']['y'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '/' + \
files[i]['date']['y'] + '-' + \
files[i]['date']['m'] + '-' + \
files[i]['date']['d']
if files[i]['type'] == 'image':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/PHOTO'
if config['store_originals'] is True:
files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \
'/ORIGINALS'
if files[i]['extension'] in ('jpg', 'jpeg'):
files[i]['folders']['destination'] = files[i]['folders']['destination'] + \
'/JPG'
if files[i]['folders']['destination_original']:
files[i]['folders']['destination_original'] = \
files[i]['folders']['destination_original'] + '/JPG'
else:
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/RAW'
if files[i]['folders']['destination_original']:
files[i]['folders']['destination_original'] = \
files[i]['folders']['destination_original'] + '/RAW'
elif files[i]['type'] == 'video':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/VIDEO'
elif files[i]['type'] == 'audio':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/AUDIO'
else:
print('WARN: ', files[i]['type'], ' is not a known type and you never should have landed here.')
try:
files[i]['folders']['destination_original']
except (NameError, KeyError):
files[i]['folders']['destination_original'] = False
# copy_from_source(p, d, o, file)
def find_files(directory):
""" find files to build a dictionary out of """
for folder, subfolders, filename in tqdm(os.walk(directory), desc = 'Finding Files', ncols = 100):
for f_type in config['file_types']:
for ext in tqdm(config['file_types'][f_type], desc = 'Finding ' + f_type + ' Files', ncols = 100):
for file in tqdm(filename, desc = 'Finding ' + ext + ' Files', ncols = 100):
if file.lower().endswith(ext):
process_file(folder, f_type, file, ext)
find_files(config['folders']['source']['base'])
#pprint(files)
#for file in files:
# pprint(files[file]['folders'])
# Copy File
for file in tqdm(files, desc = "Copying Files:", ncols = 100):
create_folder(files[file]['folders']['destination'])
copy_from_source(os.path.join(files[file]['folders']['source_path'],files[file]['name']),
files[file]['folders']['destination'],
files[file]['name'])
if files[file]['folders']['destination_original'] is not False:
create_folder(files[file]['folders']['destination_original'])
copy_from_source(os.path.join(files[file]['folders']['destination'],files[file]['name']),
files[file]['folders']['destination_original'],
files[file]['name'])
# validate checksum
for file in tqdm(files, desc = "Generating MD5 Hashes:", ncols = 100):
#print(files[file])
files[file]['md5_checksums'] = {}
for folder in files[file]['folders']:
#print(files[file]['folders'][folder])
if files[file]['folders'][folder] is not False:
#print('folder is not false.')
k = os.path.join(files[file]['folders'][folder], files[file]['name'])
#print(k)
files[file]['md5_checksums'][k] = md5_hash(k)
#print(files[file]['md5_checksums'][k])
for file in tqdm(files, desc = "Verifying Checksums:", ncols = 100):
i = 0
c = {}
for checksum in files[file]['md5_checksums']:
c[i] = files[file]['md5_checksums'][checksum]
if i > 0:
P = i - 1
if c[i] == c[P]:
files[file]['source_cleanable'] = True
else:
files[file]['source_cleanable'] = False
print(f'FATAL: Checksum validation failed for: \
{files[file]["name"]} \n{c[i]}\n is not equal to \n{c[P]}\n')
print('\n File Meta:\n')
pprint(files[file])
i = i + 1
# cleanup sd
if config['cleanup_sd'] is True:
for file in tqdm(files, desc = "Cleaning Up SD:", ncols = 100):
if files[file]['source_cleanable'] is True:
os.remove(os.path.join(files[file]['folders']['source_path'],files[file]['name']))
#pprint(files)
print('done.')