742 lines
30 KiB
Python
Executable File
742 lines
30 KiB
Python
Executable File
#!/usr/bin/env python
|
|
import os
|
|
from os import path
|
|
from os import rename
|
|
import sys
|
|
|
|
from PyQt6.QtCore import QThreadPool
|
|
from PyQt6.QtGui import QIcon, QPixmap
|
|
from PyQt6.QtWidgets import QMainWindow, QApplication, QFileDialog
|
|
|
|
from _BitMover_MainWindow import Ui_MainWindow
|
|
from _configure import CONFIG_FILE, Configure
|
|
from _find_files import FindFiles
|
|
from _dialog_find_files import FindProgress
|
|
from _dialog_import import DialogImport
|
|
from _dialog_checksum_progress import DialogChecksumProgress
|
|
# from _media_import import MediaImporter
|
|
from _preview import MediaPreview
|
|
from _thread_my_stuff import Worker
|
|
from _media_file import MediaFile
|
|
from _file_stuff import path_exists,is_file,create_folder,cmp_hashes
|
|
from _verify_file_checksum import FileHash
|
|
|
|
|
|
basedir = os.path.dirname(__file__)
|
|
|
|
# TODO: verify source dir actually exists
|
|
|
|
class MainWindow(QMainWindow, Ui_MainWindow):
|
|
def __init__(self, *args, **kwargs):
|
|
super(MainWindow,self).__init__(*args,**kwargs)
|
|
self.setupUi(self)
|
|
self.setWindowTitle("BitMover")
|
|
self.setWindowIcon(QIcon(os.path.join(basedir,'assets', 'forklift.ico')))
|
|
self.threadpool = QThreadPool()
|
|
self.config = None
|
|
self.src_dir = None
|
|
self.dst_dir = None
|
|
self.verify_checksum = None
|
|
self.cleanup_files = None
|
|
self.store_originals = None
|
|
self.file_types = None
|
|
self.source_path_hash = None
|
|
self.search_types = None
|
|
self.chunk_size = (16 * 1024) * 1
|
|
self.load_config()
|
|
|
|
self.path_file_source = None
|
|
self.path_file_destination = None
|
|
self.path_file_destination_original = None
|
|
self.checksum_fph = None
|
|
|
|
# File Stuff
|
|
self.total_files = 0
|
|
self.file_total = 0
|
|
self.files = {}
|
|
|
|
self.event_name = None
|
|
|
|
self.imp_dialog = DialogImport()
|
|
self.find_files_dialog = FindProgress()
|
|
self.checksum_progress_dialog = DialogChecksumProgress()
|
|
|
|
self.widgets_config()
|
|
|
|
def load_config(self):
|
|
try:
|
|
c = Configure(CONFIG_FILE)
|
|
self.config = c.load_config()
|
|
self.src_dir = self.config['folders']['source']['base']
|
|
self.dst_dir = self.config['folders']['destination']['base']
|
|
self.verify_checksum = self.config.get('verify_checksum', False)
|
|
self.cleanup_files = self.config.get('cleanup_sd', False)
|
|
self.store_originals = self.config.get('store_originals', False)
|
|
self.file_types = self.config.get('file_types', {})
|
|
except KeyError as e:
|
|
log.error(f"Missing configuration key: {e}")
|
|
sys.exit(1) # or provide a fallback
|
|
|
|
def widgets_config(self):
|
|
# Button Setup
|
|
self.pushButton_src_browse.clicked.connect(self.select_src_directory)
|
|
self.pushButton_dst_browse.clicked.connect(self.select_dst_directory)
|
|
self.pushButton_3_scan_dir.clicked.connect(self.find_files)
|
|
self.pushButton_import.clicked.connect(self.import_files)
|
|
|
|
# Initialize widgets
|
|
self.lineEdit_src_dir.setText(self.src_dir)
|
|
self.lineEdit_dst_dir.setText(self.dst_dir)
|
|
self.toggle_scan_button(True)
|
|
self.toggle_import_button(False)
|
|
self.lcd_files_found.display(int(0))
|
|
self.set_default_thumbnail()
|
|
self.file_list.currentItemChanged.connect(self.index_changed)
|
|
self.checkBox_verify_checksum.setChecked(self.verify_checksum)
|
|
self.checkBox_cleanup_files.setChecked(self.cleanup_files)
|
|
self.checkBox_store_originals.setChecked(self.store_originals)
|
|
self.checkBox_verify_checksum.checkStateChanged.connect(self.verify_checksum_changed)
|
|
self.checkBox_cleanup_files.checkStateChanged.connect(self.cleanup_files_changed)
|
|
self.checkBox_store_originals.checkStateChanged.connect(self.store_originals_changed)
|
|
self.clear_metadata()
|
|
|
|
# Setup thread pool
|
|
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
|
|
|
|
def toggle_scan_button(self,enable=True):
|
|
print(f'toggle_scan_button.enabled: {enable}')
|
|
self.pushButton_3_scan_dir.setEnabled(enable)
|
|
|
|
def toggle_import_button(self,enable=True):
|
|
self.pushButton_import.setEnabled(enable)
|
|
|
|
def select_src_directory(self):
|
|
directory = QFileDialog.getExistingDirectory(self,
|
|
"Select Directory",
|
|
self.src_dir)
|
|
|
|
if directory:
|
|
print("Selected Directory:", directory)
|
|
# path = Path(directory)
|
|
self.src_dir = directory
|
|
self.lineEdit_src_dir.setText(self.src_dir)
|
|
|
|
def select_dst_directory(self):
|
|
directory = QFileDialog.getExistingDirectory(self,
|
|
"Select Directory",
|
|
self.dst_dir)
|
|
|
|
if directory:
|
|
print("Selected Directory:", directory)
|
|
# path = Path(directory)
|
|
self.dst_dir = directory
|
|
self.lineEdit_dst_dir.setText(self.dst_dir)
|
|
|
|
def get_source_path_hash(self,f):
|
|
self.source_path_hash = FileHash(f).path_hash
|
|
return self.source_path_hash
|
|
|
|
def verify_checksum_changed(self):
|
|
if self.checkBox_verify_checksum.isChecked():
|
|
self.config['verify_checksum'] = True
|
|
else:
|
|
self.config['verify_checksum'] = False
|
|
print(f"verify_checksums: {self.config['verify_checksums']}")
|
|
|
|
def cleanup_files_changed(self):
|
|
if self.checkBox_cleanup_files.isChecked():
|
|
self.config['cleanup_sd'] = True
|
|
else:
|
|
self.config['cleanup_sd'] = False
|
|
print(f"cleanup_sd: {self.config['cleanup_sd']}")
|
|
|
|
def store_originals_changed(self):
|
|
if self.checkBox_store_originals.isChecked():
|
|
self.config['store_originals'] = True
|
|
else:
|
|
self.config['store_originals'] = False
|
|
print(f"store_originals: {self.config['store_originals']}")
|
|
|
|
def set_thumbnail(self,thumb_file,scaled=True,ratio=None):
|
|
self.img_preview.setPixmap(QPixmap(thumb_file))
|
|
self.img_preview.setScaledContents(scaled)
|
|
if ratio is not None:
|
|
self.img_preview.setFixedHeight(self.img_preview.width() / ratio)
|
|
|
|
def set_default_thumbnail(self):
|
|
self.set_thumbnail(os.path.join(basedir,
|
|
'assets',
|
|
'preview_placeholder.jpg'))
|
|
|
|
def get_preview(self,path_file_name):
|
|
preview = MediaPreview(path_file_name=path_file_name,
|
|
media_files=self.files)
|
|
return preview
|
|
|
|
def update_preview(self,preview):
|
|
if preview.file_type.lower() == 'video':
|
|
thumb_ratio = 16 / 9 # 1.77
|
|
elif preview.file_type.lower() == 'image':
|
|
thumb_ratio = preview.thumbnail_ratio
|
|
else:
|
|
thumb_ratio = 1
|
|
|
|
self.set_thumbnail(preview.thumbnail,
|
|
ratio=thumb_ratio)
|
|
|
|
def update_metadata(self,preview):
|
|
self.clear_metadata()
|
|
path_hash = preview.source_path_hash
|
|
f = self.files[path_hash]
|
|
f_date = f['date']['capture_date']
|
|
self.l_data_file_source_path.setText(
|
|
self.files[path_hash]['folders']['source_path'])
|
|
self.l_data_file_dest_path.setText(
|
|
self.files[path_hash]['folders']['destination'])
|
|
self.l_meta_content_date_time_c.setText(
|
|
f"{f_date['y']}/{f_date['m']}/{f_date['d']}"
|
|
)
|
|
if f['file_type'] == 'image':
|
|
f_photo = f['image_meta']['photo']
|
|
|
|
self.l_meta_01.setText('Size')
|
|
self.l_meta_02.setText('dpi')
|
|
self.l_meta_03.setText('ISO')
|
|
self.l_meta_04.setText('Lens')
|
|
self.l_meta_05.setText('Focal Length')
|
|
self.l_meta_06.setText('Camera')
|
|
self.l_meta_07.setText('Aperture')
|
|
self.l_meta_08.setText('Megapixels')
|
|
self.l_meta_content_01.setText(str(f_photo['size']['width_height']))
|
|
self.l_meta_content_02.setText(str(f_photo['dpi']))
|
|
self.l_meta_content_03.setText(str(f_photo['iso']))
|
|
self.l_meta_content_04.setText(str(f_photo['lens_model']))
|
|
self.l_meta_content_05.setText(str(f_photo['lens_focal_length']))
|
|
self.l_meta_content_06.setText(str(f"{f_photo['camera_brand']} {f_photo['camera_model']}"))
|
|
self.l_meta_content_07.setText(str(f_photo['aperture']))
|
|
self.l_meta_content_08.setText(str(f_photo['size']['megapixels']))
|
|
|
|
elif f['file_type'] == 'video':
|
|
f_video = f['video_meta']['video']
|
|
|
|
self.l_meta_01.setText('Size')
|
|
self.l_meta_02.setText('Frames / Second')
|
|
self.l_meta_03.setText('Bit Depth')
|
|
self.l_meta_04.setText('Duration')
|
|
self.l_meta_05.setText('Encoder')
|
|
self.l_meta_06.setText('Codec')
|
|
self.l_meta_07.setText('Profile')
|
|
self.l_meta_08.setText('Pix Format')
|
|
|
|
self.l_meta_content_01.setText(str(f_video['size']['width_height']))
|
|
self.l_meta_content_02.setText(str(f_video['r_frame_rate']))
|
|
self.l_meta_content_03.setText(str(f_video['bits_per_raw_sample']))
|
|
self.l_meta_content_04.setText(str(f_video['duration']))
|
|
self.l_meta_content_05.setText(str(f_video['encoding_brand']))
|
|
self.l_meta_content_06.setText(str(f_video['codec_long_name']))
|
|
self.l_meta_content_07.setText(str(f_video['profile']))
|
|
self.l_meta_content_08.setText(str(f_video['pix_fmt']))
|
|
|
|
def clear_metadata(self):
|
|
self.l_meta_01.setText('')
|
|
self.l_meta_02.setText('')
|
|
self.l_meta_03.setText('')
|
|
self.l_meta_04.setText('')
|
|
self.l_meta_05.setText('')
|
|
self.l_meta_06.setText('')
|
|
self.l_meta_07.setText('')
|
|
self.l_meta_08.setText('')
|
|
self.l_meta_content_01.setText('')
|
|
self.l_meta_content_02.setText('')
|
|
self.l_meta_content_03.setText('')
|
|
self.l_meta_content_04.setText('')
|
|
self.l_meta_content_05.setText('')
|
|
self.l_meta_content_06.setText('')
|
|
self.l_meta_content_07.setText('')
|
|
self.l_meta_content_08.setText('')
|
|
|
|
def index_changed(self,i):
|
|
self.clear_metadata()
|
|
if i is None:
|
|
self.set_default_thumbnail()
|
|
else:
|
|
self.process_file(i.text())
|
|
print(f'index changed to: {i.text()}')
|
|
preview = self.get_preview(i.text())
|
|
self.update_preview(preview)
|
|
self.update_metadata(preview)
|
|
|
|
def get_event(self):
|
|
self.event_name = self.eventName.text()
|
|
return self.event_name
|
|
|
|
def get_search_types(self):
|
|
self.search_types = []
|
|
if self.checkBox_search_for_images.isChecked():
|
|
self.search_types.append('image')
|
|
if self.checkBox_search_for_video.isChecked():
|
|
self.search_types.append('video')
|
|
if self.checkBox_search_for_audio.isChecked():
|
|
self.search_types.append('audio')
|
|
return self.search_types
|
|
|
|
def get_t_files(self):
|
|
file_total = 0
|
|
self.file_list.clear()
|
|
self.img_preview.setPixmap(QPixmap(os.path.join(basedir,
|
|
'assets',
|
|
'preview_placeholder.jpg')))
|
|
|
|
for folder, subfolders, filename in os.walk(self.src_dir):
|
|
for f_type in self.search_types:
|
|
for ext in self.file_types[f_type]:
|
|
for file in filename:
|
|
if file.lower().endswith(ext):
|
|
current_file = os.path.join(folder, file)
|
|
if is_file(current_file):
|
|
file_total += int(1)
|
|
else:
|
|
print(f"Skipping {current_file} as it does not look like a real file.")
|
|
return file_total
|
|
|
|
def set_total_files(self,t):
|
|
total_file_count = t
|
|
self.lcd_files_found.display(total_file_count)
|
|
|
|
def set_imported_files(self,t):
|
|
self.lcd_files_imported.display(t)
|
|
|
|
@staticmethod
|
|
def print_output(s):
|
|
print(f'output: {s}')
|
|
|
|
def worker_thread_started(self):
|
|
print('scan thread started')
|
|
self.toggle_scan_button(False)
|
|
self.toggle_import_button(False)
|
|
|
|
def worker_thread_done(self):
|
|
print('scan thread complete.')
|
|
self.toggle_scan_button(True)
|
|
if 0 < len(self.file_list):
|
|
self.toggle_import_button(True)
|
|
else:
|
|
self.toggle_import_button(False)
|
|
|
|
@staticmethod
|
|
def thread_complete():
|
|
print("THREAD COMPLETE.")
|
|
|
|
def add_found_file_to_list(self,f):
|
|
sph = self.get_source_path_hash(f)
|
|
self.files[sph] = {}
|
|
self.file_list.addItem(f)
|
|
|
|
def gen_file_dict(self,d):
|
|
self.files = d
|
|
|
|
def process_file(self,p):
|
|
""" gather information and add to dictionary """
|
|
media_file = MediaFile(path_file_name=p,
|
|
config=self.config,
|
|
event_name=self.event_name)
|
|
i = self.get_source_path_hash(p)
|
|
print(f'processing: {p}\nhash: {i}')
|
|
self.files[i] = media_file.media
|
|
|
|
def find_files(self):
|
|
""" find files to build a dictionary out of """
|
|
self.files = {}
|
|
self.set_total_files(0)
|
|
self.get_event()
|
|
self.get_search_types()
|
|
self.file_total = self.get_t_files()
|
|
self.set_total_files(self.file_total)
|
|
|
|
# time.sleep(3)
|
|
|
|
file_finder = FindFiles(search_types=self.search_types,
|
|
src_dir=self.src_dir,
|
|
file_types=self.file_types,
|
|
file_total=self.file_total)
|
|
worker = Worker(file_finder.t_find_files)
|
|
|
|
worker.signals.started.connect(
|
|
self.worker_thread_started)
|
|
worker.signals.started.connect(
|
|
self.find_files_dialog.open_find_files_dialog)
|
|
worker.signals.progress.connect(
|
|
self.find_files_dialog.set_progress_finding_files)
|
|
worker.signals.found_file.connect(
|
|
self.add_found_file_to_list)
|
|
# worker.signals.found_file.connect(
|
|
# self.process_file)
|
|
worker.signals.total_file_count.connect(
|
|
self.set_total_files)
|
|
# worker.signals.result.connect(
|
|
# self.gen_file_dict)
|
|
worker.signals.finished.connect(
|
|
self.thread_complete)
|
|
worker.signals.finished.connect(
|
|
self.worker_thread_done)
|
|
worker.signals.finished.connect(
|
|
self.find_files_dialog.close_find_files_dialog)
|
|
|
|
# Execute.
|
|
self.threadpool.start(worker)
|
|
|
|
# From _media_import.py
|
|
def is_video(self,sph):
|
|
print(f"Should be sph: {sph}")
|
|
print(f"Should be filetype: {self.files[sph]['file_type']}")
|
|
if self.files[sph]['file_type'] == 'video':
|
|
r = True
|
|
else:
|
|
r = False
|
|
|
|
return r
|
|
|
|
def is_image(self,sph):
|
|
if self.files[sph]['file_type'] == 'image':
|
|
r = True
|
|
else:
|
|
r = False
|
|
|
|
return r
|
|
|
|
def t_copy_files(self,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback):
|
|
""" Copy Files. """
|
|
count = int(0)
|
|
for line in range(self.file_list.count()):
|
|
item = self.file_list.item(line)
|
|
file = item.text()
|
|
self.process_file(file)
|
|
sph = self.get_source_path_hash(file)
|
|
print(sph)
|
|
self.src_dir = self.files[sph]['folders']['source_path']
|
|
self.dst_dir = self.files[sph]['folders']['destination']
|
|
self.path_file_source = path.join(self.src_dir,
|
|
self.files[sph]['name'])
|
|
self.path_file_destination = path.join(self.dst_dir,
|
|
self.files[sph]['name'])
|
|
self.imp_dialog.set_importing_file(self.path_file_source)
|
|
|
|
self.copy_a_file(
|
|
sph,
|
|
self.path_file_destination,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback)
|
|
|
|
print(f'check_store_original({sph}): {self.check_store_original(sph)}')
|
|
|
|
if self.check_store_original(sph) is True:
|
|
self.path_file_destination_original = path.join(self.dst_dir,
|
|
self.files[sph]['folders']['destination_original'],
|
|
self.files[sph]['name'])
|
|
|
|
self.copy_a_file(sph,
|
|
self.path_file_destination_original,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback)
|
|
|
|
count += 1
|
|
|
|
imported_file_count_callback.emit(count)
|
|
progress_callback.emit(round((count / self.file_total) * 100, 1))
|
|
self.imp_dialog.add_to_imported_list(self.path_file_source)
|
|
|
|
def copy_a_file(self,
|
|
fph,
|
|
target,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback):
|
|
|
|
size = path.getsize(self.path_file_source)
|
|
target_dir = os.path.dirname(target)
|
|
create_folder(target_dir)
|
|
|
|
dup_check = self.check_duplicate(fph,target,target_dir,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback)
|
|
if dup_check is False:
|
|
if self.is_video(fph):
|
|
self.chunk_size = (1024 * 1024) * 5
|
|
else:
|
|
self.chunk_size = (16 * 1024) * 1
|
|
|
|
with open(self.path_file_source, 'rb') as fs:
|
|
with open(target, 'wb') as fd:
|
|
while True:
|
|
chunk = fs.read(self.chunk_size)
|
|
if not chunk:
|
|
break
|
|
fd.write(chunk)
|
|
dst_size = path.getsize(target)
|
|
current_file_progress_callback.emit(round((dst_size / size) * 100, 1))
|
|
|
|
def check_store_original(self,sph):
|
|
if self.config['store_originals'] is True:
|
|
if self.is_image(sph):
|
|
r = True
|
|
else:
|
|
r = False
|
|
else:
|
|
r = False
|
|
|
|
return r
|
|
|
|
def get_checksum(self,_pf,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback):
|
|
|
|
sph = FileHash(_pf).path_hash
|
|
self.path_file_source = os.path.join(self.files[sph]['folders']['source_path'],
|
|
self.files[sph]['name'])
|
|
self.path_file_destination = os.path.join(self.files[sph]['folders']['destination'],
|
|
self.files[sph]['name'])
|
|
if self.store_originals is True:
|
|
self.path_file_destination_original = os.path.join(
|
|
self.files[sph]['folders']['destination_original'],
|
|
self.files[sph]['name'])
|
|
|
|
|
|
def set_checksum(_pf):
|
|
print(f'set_checksum, _pf: {_pf}')
|
|
if path_exists(_pf) and is_file(_pf):
|
|
|
|
|
|
self.files[sph]['xx_checksum'][_pf] = FileHash(_pf).get_hash(_pf,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback)
|
|
else:
|
|
print(f"set_checksum: {_pf} doesn't exist yet or isn't a real file.")
|
|
|
|
if self.files[sph].get('xx_checksum') is None:
|
|
self.files[sph]['xx_checksum'] = {}
|
|
set_checksum(self.path_file_source)
|
|
set_checksum(self.path_file_destination)
|
|
if self.store_originals is True:
|
|
set_checksum(self.path_file_destination_original)
|
|
else:
|
|
if self.files[sph]['xx_checksum'].get(self.path_file_source) is None:
|
|
set_checksum(self.path_file_source)
|
|
if self.files[sph]['xx_checksum'].get(self.path_file_destination) is None:
|
|
set_checksum(self.path_file_destination)
|
|
if self.store_originals is True:
|
|
if self.files[sph]['xx_checksum'].get(self.path_file_destination_original) is None:
|
|
set_checksum(self.path_file_destination_original)
|
|
|
|
# self.checksum_progress_dialog.close_dialog()
|
|
|
|
def check_duplicate(self,
|
|
sph,
|
|
target,
|
|
target_dir,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback):
|
|
if path_exists(target):
|
|
print(f'Path Exists: path_file_source: {self.path_file_source}\ntarget: {target}')
|
|
self.get_checksum(self.path_file_source,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback)
|
|
|
|
hash1 = self.files[sph]['xx_checksum'][self.path_file_source]
|
|
hash2 = self.files[sph]['xx_checksum'][target]
|
|
check_match = cmp_hashes(hash1,hash2)
|
|
if check_match is False:
|
|
print(f'\nFound duplicate for {self.path_file_source}, renaming destination with hash appended.')
|
|
base, extension = path.splitext(self.files[sph]['name'])
|
|
file_name_hash = base + '_' + hash2 + extension
|
|
rename(target, path.join(target_dir, file_name_hash))
|
|
r = False # This is false because we move the original conflict out of the way.
|
|
else:
|
|
print(f"\n########################\nFound duplicate for: {self.path_file_source}\n"
|
|
f"at: {target}... \nHashes MATCH\n"
|
|
f"########################")
|
|
r = True
|
|
else:
|
|
# No duplicate
|
|
r = False
|
|
return r
|
|
|
|
# END from _media_import.py
|
|
|
|
def import_files(self):
|
|
"""
|
|
Import found files
|
|
"""
|
|
|
|
# Initialize Widgets
|
|
self.lcd_files_imported.display(int(0))
|
|
|
|
self.imp_dialog.set_progress_importing(0)
|
|
self.imp_dialog.set_progress_current_file(0)
|
|
worker = Worker(self.t_copy_files)
|
|
worker.signals.started.connect(
|
|
self.worker_thread_started)
|
|
worker.signals.started.connect(
|
|
self.imp_dialog.open_import_dialog)
|
|
worker.signals.progress.connect(
|
|
self.imp_dialog.set_progress_importing)
|
|
worker.signals.current_file_progress.connect(
|
|
self.imp_dialog.set_progress_current_file)
|
|
worker.signals.imported_file_count.connect(
|
|
self.set_imported_files)
|
|
worker.signals.result.connect(
|
|
self.print_output)
|
|
worker.signals.finished.connect(
|
|
self.thread_complete)
|
|
worker.signals.finished.connect(
|
|
self.worker_thread_done)
|
|
worker.signals.finished.connect(
|
|
self.compare_imported_checksums)
|
|
|
|
worker.signals.checksum_dialog_open.connect(
|
|
self.checksum_progress_dialog.open_dialog)
|
|
worker.signals.checksum_file.connect(
|
|
self.checksum_progress_dialog.set_file)
|
|
worker.signals.checksum_progress.connect(
|
|
self.checksum_progress_dialog.set_progress)
|
|
worker.signals.finished.connect(
|
|
self.checksum_progress_dialog.close_dialog)
|
|
|
|
# Execute
|
|
self.threadpool.start(worker)
|
|
|
|
def compare_imported_checksums(self):
|
|
# if self.config['verify_checksum'] is True:
|
|
# self.checksum_fph = fph
|
|
# Initialize Widgets
|
|
self.checksum_progress_dialog.set_progress(0)
|
|
|
|
worker = Worker(self.t_compare_imported_checksums)
|
|
worker.signals.started.connect(
|
|
self.worker_thread_started)
|
|
worker.signals.started.connect(
|
|
self.checksum_progress_dialog.open_dialog)
|
|
worker.signals.found_file.connect(
|
|
self.checksum_progress_dialog.set_file)
|
|
worker.signals.progress.connect(
|
|
self.checksum_progress_dialog.set_progress)
|
|
worker.signals.finished.connect(
|
|
self.thread_complete)
|
|
worker.signals.finished.connect(
|
|
self.worker_thread_done)
|
|
worker.signals.finished.connect(
|
|
self.checksum_progress_dialog.close_dialog)
|
|
|
|
|
|
worker.signals.checksum_dialog_open.connect(
|
|
self.checksum_progress_dialog.open_dialog)
|
|
worker.signals.checksum_file.connect(
|
|
self.checksum_progress_dialog.set_file)
|
|
worker.signals.checksum_progress.connect(
|
|
self.checksum_progress_dialog.set_progress)
|
|
worker.signals.finished.connect(
|
|
self.checksum_progress_dialog.close_dialog)
|
|
|
|
# Execute
|
|
self.threadpool.start(worker)
|
|
|
|
def t_compare_imported_checksums(self,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback):
|
|
|
|
for file in self.files:
|
|
_pf = os.path.join(self.files[file]['folders']['source_path'],
|
|
self.files[file]['name'])
|
|
print(f"source_path: {self.files[file]['folders']['source_path']}")
|
|
print(f"name: {self.files[file]['name']}")
|
|
print(f'_pf: {_pf}')
|
|
self.get_checksum(_pf,
|
|
progress_callback,
|
|
current_file_progress_callback,
|
|
imported_file_count_callback,
|
|
found_file_callback,
|
|
total_file_count_callback,
|
|
checksum_file_callback,
|
|
checksum_progress_callback,
|
|
checksum_dialog_open_callback)
|
|
i = 0
|
|
c = {}
|
|
for checksum in self.files[file]['xx_checksum']:
|
|
c[i] = self.files[file]['xx_checksum'][checksum]
|
|
if i > 0:
|
|
p = i - 1
|
|
if c[i] == c[p]:
|
|
self.files[file]['checksum_match'] = True
|
|
print(f'Checksums match for: {self.files[file]["name"]}')
|
|
else:
|
|
self.files[file]['checksum_match'] = False
|
|
print(f'Checksum failed for: {self.files[file]["name"]}')
|
|
i += 1
|
|
|
|
# Main App
|
|
app = QApplication(sys.argv)
|
|
# Main Window
|
|
window = MainWindow()
|
|
# Show Window
|
|
window.show()
|
|
app.exec() |