directory stuff

This commit is contained in:
Kameron Kenny 2023-08-10 14:52:27 -04:00
parent c23164d428
commit 912ef08783
1 changed files with 132 additions and 86 deletions

View File

@ -74,27 +74,64 @@ def get_capture_date(p, t):
month = stamp.strftime("%m") month = stamp.strftime("%m")
day = stamp.strftime("%d") day = stamp.strftime("%d")
return year, month, day return year, month, day
def path_exists(path):
""" Does the path exist """
return os.path.exists(path)
def is_dir(path):
""" determine if the argument passed is a directory """
p_exists = path_exists(path)
if p_exists is True:
return os.path.isdir(path)
else:
return p_exists
def path_access_read(path):
""" make sure we can read from the path """
val = os.access(path, os.R_OK)
if val is False:
print(f'Can not read from {path}')
return val
def path_access_write(path):
""" make sure we can write to the path """
val = os.access(path, os.W_OK)
if val is False:
print(f'Can not write to {path}')
return val
def create_folder(f): def create_folder(f):
""" Function to create folder """ """ Function to create folder """
try: if path_exists(f) is False:
os.makedirs(f) os.makedirs(f)
except FileExistsError: elif is_dir(f) is False:
pass pass # this needs to turn into bailing out as there is a collision.
def copy_from_source(p,d,f): def copy_from_source(source_path,dest_path,file_name):
""" Copy file from source to destination """ """ Copy file from source to destination """
if os.path.exists(os.path.join(d, f)):
check_match = cmp_files(p, os.path.join(d, f)) file_exists = path_exists(os.path.join(dest_path,file_name))
if file_exists is True:
check_match = cmp_files(os.path.join(source_path,file_name),
os.path.join(dest_path, file_name))
if check_match is False: if check_match is False:
print(f'Found duplicate for {p}, renaming destination with md5 appended.') print(f'Found duplicate for {p}, renaming destination with md5 appended.')
base, extension = os.path.splitext(f) base, extension = os.path.splitext(file_name)
file_name_hash = base + '_' + md5_hash(os.path.join(d, f)) + extension md5 = md5_hash(os.path.join(dest_path, file_name))
os.rename(os.path.join(d, f), os.path.join(d, file_name_hash)) file_name_hash = base + '_' + md5 + extension
os.rename(os.path.join(dest_path, file_name),
os.path.join(dest_path, file_name_hash))
else: else:
return return
shutil.copy(p, d) create_folder(dest_path)
shutil.copy(os.path.join(source_path,file_name), dest_path)
def process_file(path, f_type, f_name, ext): def process_file(path, f_type, f_name, ext):
""" gather information and add to dictionary """ """ gather information and add to dictionary """
@ -136,21 +173,16 @@ def process_file(path, f_type, f_name, ext):
if files[i]['type'] == 'image': if files[i]['type'] == 'image':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/PHOTO' files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/PHOTO'
if config['store_originals'] is True:
files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + \
'/ORIGINALS'
if files[i]['extension'] in ('jpg', 'jpeg'): if files[i]['extension'] in ('jpg', 'jpeg'):
files[i]['folders']['destination'] = files[i]['folders']['destination'] + \ files[i]['folders']['destination'] = files[i]['folders']['destination'] + \
'/JPG' '/JPG'
if files[i]['folders']['destination_original']: if config['store_originals'] is True:
files[i]['folders']['destination_original'] = \ files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + '/ORIGINALS/JPG'
files[i]['folders']['destination_original'] + '/JPG'
else: else:
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/RAW' files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/RAW'
if files[i]['folders']['destination_original']:
files[i]['folders']['destination_original'] = \ if config['store_originals'] is True:
files[i]['folders']['destination_original'] + '/RAW' files[i]['folders']['destination_original'] = files[i]['folders']['destination'] + '/ORIGINALS/RAW'
elif files[i]['type'] == 'video': elif files[i]['type'] == 'video':
files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/VIDEO' files[i]['folders']['destination'] = files[i]['folders']['destination'] + '/VIDEO'
@ -161,81 +193,95 @@ def process_file(path, f_type, f_name, ext):
else: else:
print('WARN: ', files[i]['type'], ' is not a known type and you never should have landed here.') print('WARN: ', files[i]['type'], ' is not a known type and you never should have landed here.')
try:
files[i]['folders']['destination_original']
except (NameError, KeyError):
files[i]['folders']['destination_original'] = False
# copy_from_source(p, d, o, file)
def find_files(directory): def find_files(directory):
""" find files to build a dictionary out of """ """ find files to build a dictionary out of """
for folder, subfolders, filename in tqdm(os.walk(directory), desc = 'Finding Files', ncols = 100): for folder, subfolders, filename in os.walk(directory):
for f_type in config['file_types']: for f_type in config['file_types']:
for ext in tqdm(config['file_types'][f_type], desc = 'Finding ' + f_type + ' Files', ncols = 100): for ext in config['file_types'][f_type]:
for file in tqdm(filename, desc = 'Finding ' + ext + ' Files', ncols = 100): for file in tqdm(filename, desc = 'Finding ' + ext + ' Files', ncols = 100):
if file.lower().endswith(ext): if file.lower().endswith(ext):
process_file(folder, f_type, file, ext) process_file(folder, f_type, file, ext)
find_files(config['folders']['source']['base']) def validate_config_dir_access():
""" Validate we can op in the defined directories """
#pprint(files) check = path_access_write(config['folders']['destination']['base'])
if check is False:
#for file in files: return False
# pprint(files[file]['folders']) else:
check = path_access_read(config['folders']['source']['base'])
# Copy File if check is False:
for file in tqdm(files, desc = "Copying Files:", ncols = 100): return False
create_folder(files[file]['folders']['destination']) else:
if config['store_backup'] is True:
copy_from_source(os.path.join(files[file]['folders']['source_path'],files[file]['name']), check = path_access_write(config['folders']['backup'])
files[file]['folders']['destination'], if check is False:
files[file]['name']) return False
else:
if files[file]['folders']['destination_original'] is not False: return True
create_folder(files[file]['folders']['destination_original'])
copy_from_source(os.path.join(files[file]['folders']['destination'],files[file]['name']),
files[file]['folders']['destination_original'],
files[file]['name'])
# validate checksum
for file in tqdm(files, desc = "Generating MD5 Hashes:", ncols = 100):
#print(files[file])
files[file]['md5_checksums'] = {}
for folder in files[file]['folders']:
#print(files[file]['folders'][folder])
if files[file]['folders'][folder] is not False:
#print('folder is not false.')
k = os.path.join(files[file]['folders'][folder], files[file]['name'])
#print(k)
files[file]['md5_checksums'][k] = md5_hash(k)
#print(files[file]['md5_checksums'][k])
for file in tqdm(files, desc = "Verifying Checksums:", ncols = 100):
i = 0
c = {}
for checksum in files[file]['md5_checksums']:
c[i] = files[file]['md5_checksums'][checksum]
if i > 0:
P = i - 1
if c[i] == c[P]:
files[file]['source_cleanable'] = True
else: else:
files[file]['source_cleanable'] = False return True
print(f'FATAL: Checksum validation failed for: \
{files[file]["name"]} \n{c[i]}\n is not equal to \n{c[P]}\n')
print('\n File Meta:\n')
pprint(files[file])
i = i + 1
# cleanup sd def copy_files():
if config['cleanup_sd'] is True: """ Copy Files. """
for file in tqdm(files, desc = "Cleaning Up SD:", ncols = 100): for file in tqdm(files, desc = "Copying Files:", ncols = 100):
if files[file]['source_cleanable'] is True: create_folder(files[file]['folders']['destination'])
os.remove(os.path.join(files[file]['folders']['source_path'],files[file]['name']))
copy_from_source(files[file]['folders']['source_path'],
files[file]['folders']['destination'],
files[file]['name'])
if config['store_originals'] is True:
if files[file]['type'] == 'image':
create_folder(files[file]['folders']['destination_original'])
copy_from_source(files[file]['folders']['destination'],
files[file]['folders']['destination_original'],
files[file]['name'])
def gen_hashes():
""" Generate Hashes """
for file in tqdm(files, desc = "Generating MD5 Hashes:", ncols = 100):
#print(files[file])
files[file]['md5_checksums'] = {}
for folder in files[file]['folders']:
k = os.path.join(files[file]['folders'][folder], files[file]['name'])
files[file]['md5_checksums'][k] = md5_hash(k)
def validate_checksums():
""" Validate Checksums """
for file in tqdm(files, desc = "Verifying Checksums:", ncols = 100):
i = 0
c = {}
for checksum in files[file]['md5_checksums']:
c[i] = files[file]['md5_checksums'][checksum]
if i > 0:
P = i - 1
if c[i] == c[P]:
files[file]['source_cleanable'] = True
else:
files[file]['source_cleanable'] = False
print(f'FATAL: Checksum validation failed for: \
{files[file]["name"]} \n{c[i]}\n is not equal to \n{c[P]}\n')
print('\n File Meta:\n')
pprint(files[file])
i = i + 1
def cleanup_sd():
""" If we should cleanup the SD, nuke the copied files. """
if config['cleanup_sd'] is True:
for file in tqdm(files, desc = "Cleaning Up SD:", ncols = 100):
if files[file]['source_cleanable'] is True:
os.remove(os.path.join(files[file]['folders']['source_path'],files[file]['name']))
#pprint(files) #pprint(files)
go = validate_config_dir_access()
if go is True:
find_files(config['folders']['source']['base'])
copy_files()
gen_hashes()
validate_checksums()
cleanup_sd()
else:
print("There was a problem accessing one or more directories defined in the configuration.")
print('done.') print('done.')