#!/usr/bin/env python import os from os import path from os import rename import sys from PyQt6.QtCore import QThreadPool from PyQt6.QtGui import QIcon, QPixmap from PyQt6.QtWidgets import QMainWindow, QApplication, QFileDialog from _BitMover_MainWindow import Ui_MainWindow from _configure import CONFIG_FILE, Configure from _find_files import FindFiles from _dialog_find_files import FindProgress from _dialog_import import DialogImport from _dialog_checksum_progress import DialogChecksumProgress # from _media_import import MediaImporter from _preview import MediaPreview from _thread_my_stuff import Worker from _media_file import MediaFile from _file_stuff import path_exists,is_file,create_folder,cmp_hashes from _verify_file_checksum import FileHash from _dialog_compare_imported_checksums import DialogCompareImportedChecksums basedir = os.path.dirname(__file__) # TODO: verify source dir actually exists class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, *args, **kwargs): super(MainWindow,self).__init__(*args,**kwargs) self.setupUi(self) self.setWindowTitle("BitMover") self.setWindowIcon(QIcon(os.path.join(basedir,'assets', 'forklift.ico'))) self.threadpool = QThreadPool() self.config = None self.src_dir = None self.dst_dir = None self.verify_checksum = None self.cleanup_files = None self.store_originals = None self.file_types = None self.source_path_hash = None self.search_types = None self.chunk_size = (16 * 1024) * 1 self.load_config() self.path_file_source = None self.path_file_destination = None self.path_file_destination_original = None self.checksum_fph = None # File Stuff self.total_files = 0 self.file_total = 0 self.files = {} self.event_name = None self.imp_dialog = DialogImport() self.find_files_dialog = FindProgress() self.checksum_progress_dialog = DialogChecksumProgress() self.compare_imported_checksums_dialog = DialogCompareImportedChecksums() self.widgets_config() def load_config(self): try: c = Configure(CONFIG_FILE) self.config = c.load_config() self.src_dir = self.config['folders']['source']['base'] self.dst_dir = self.config['folders']['destination']['base'] self.verify_checksum = self.config.get('verify_checksum', False) self.cleanup_files = self.config.get('cleanup_sd', False) self.store_originals = self.config.get('store_originals', False) self.file_types = self.config.get('file_types', {}) except KeyError as e: log.error(f"Missing configuration key: {e}") sys.exit(1) # or provide a fallback def widgets_config(self): # Button Setup self.pushButton_src_browse.clicked.connect(self.select_src_directory) self.pushButton_dst_browse.clicked.connect(self.select_dst_directory) self.pushButton_3_scan_dir.clicked.connect(self.find_files) self.pushButton_import.clicked.connect(self.import_files) # Initialize widgets self.lineEdit_src_dir.setText(self.src_dir) self.lineEdit_dst_dir.setText(self.dst_dir) self.toggle_scan_button(True) self.toggle_import_button(False) self.lcd_files_found.display(int(0)) self.set_default_thumbnail() self.file_list.currentItemChanged.connect(self.index_changed) self.checkBox_verify_checksum.setChecked(self.verify_checksum) self.checkBox_cleanup_files.setChecked(self.cleanup_files) self.checkBox_store_originals.setChecked(self.store_originals) self.checkBox_verify_checksum.checkStateChanged.connect(self.verify_checksum_changed) self.checkBox_cleanup_files.checkStateChanged.connect(self.cleanup_files_changed) self.checkBox_store_originals.checkStateChanged.connect(self.store_originals_changed) self.clear_metadata() # Setup thread pool print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount()) def toggle_scan_button(self,enable=True): self.pushButton_3_scan_dir.setEnabled(enable) def toggle_import_button(self,enable=True): self.pushButton_import.setEnabled(enable) def select_src_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.src_dir) if directory: # path = Path(directory) self.src_dir = directory self.lineEdit_src_dir.setText(self.src_dir) def select_dst_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.dst_dir) if directory: # path = Path(directory) self.dst_dir = directory self.lineEdit_dst_dir.setText(self.dst_dir) def get_source_path_hash(self,f): self.source_path_hash = FileHash(f).path_hash return self.source_path_hash def verify_checksum_changed(self): if self.checkBox_verify_checksum.isChecked(): self.config['verify_checksum'] = True else: self.config['verify_checksum'] = False def cleanup_files_changed(self): if self.checkBox_cleanup_files.isChecked(): self.config['cleanup_sd'] = True else: self.config['cleanup_sd'] = False def store_originals_changed(self): if self.checkBox_store_originals.isChecked(): self.config['store_originals'] = True else: self.config['store_originals'] = False def set_thumbnail(self,thumb_file,scaled=True,ratio=None): self.img_preview.setPixmap(QPixmap(thumb_file)) self.img_preview.setScaledContents(scaled) if ratio is not None: self.img_preview.setFixedHeight(self.img_preview.width() / ratio) def set_default_thumbnail(self): self.set_thumbnail(os.path.join(basedir, 'assets', 'preview_placeholder.jpg')) def get_preview(self,path_file_name): preview = MediaPreview(path_file_name=path_file_name, media_files=self.files) return preview def update_preview(self,preview): if preview.file_type.lower() == 'video': thumb_ratio = 16 / 9 # 1.77 elif preview.file_type.lower() == 'image': thumb_ratio = preview.thumbnail_ratio else: thumb_ratio = 1 self.set_thumbnail(preview.thumbnail, ratio=thumb_ratio) def update_metadata(self,preview): self.clear_metadata() path_hash = preview.source_path_hash f = self.files[path_hash] f_date = f['date']['capture_date'] self.l_data_file_source_path.setText( self.files[path_hash]['folders']['source_path']) self.l_data_file_dest_path.setText( self.files[path_hash]['folders']['destination']) self.l_meta_content_date_time_c.setText( f"{f_date['y']}/{f_date['m']}/{f_date['d']}" ) if f['file_type'] == 'image': f_photo = f['image_meta']['photo'] self.l_meta_01.setText('Size') self.l_meta_02.setText('dpi') self.l_meta_03.setText('ISO') self.l_meta_04.setText('Lens') self.l_meta_05.setText('Focal Length') self.l_meta_06.setText('Camera') self.l_meta_07.setText('Aperture') self.l_meta_08.setText('Megapixels') self.l_meta_content_01.setText(str(f_photo['size']['width_height'])) self.l_meta_content_02.setText(str(f_photo['dpi'])) self.l_meta_content_03.setText(str(f_photo['iso'])) self.l_meta_content_04.setText(str(f_photo['lens_model'])) self.l_meta_content_05.setText(str(f_photo['lens_focal_length'])) self.l_meta_content_06.setText(str(f"{f_photo['camera_brand']} {f_photo['camera_model']}")) self.l_meta_content_07.setText(str(f_photo['aperture'])) self.l_meta_content_08.setText(str(f_photo['size']['megapixels'])) elif f['file_type'] == 'video': f_video = f['video_meta']['video'] self.l_meta_01.setText('Size') self.l_meta_02.setText('Frames / Second') self.l_meta_03.setText('Bit Depth') self.l_meta_04.setText('Duration') self.l_meta_05.setText('Encoder') self.l_meta_06.setText('Codec') self.l_meta_07.setText('Profile') self.l_meta_08.setText('Pix Format') self.l_meta_content_01.setText(str(f_video['size']['width_height'])) self.l_meta_content_02.setText(str(f_video['r_frame_rate'])) self.l_meta_content_03.setText(str(f_video['bits_per_raw_sample'])) self.l_meta_content_04.setText(str(f_video['duration'])) self.l_meta_content_05.setText(str(f_video['encoding_brand'])) self.l_meta_content_06.setText(str(f_video['codec_long_name'])) self.l_meta_content_07.setText(str(f_video['profile'])) self.l_meta_content_08.setText(str(f_video['pix_fmt'])) def clear_metadata(self): self.l_meta_01.setText('') self.l_meta_02.setText('') self.l_meta_03.setText('') self.l_meta_04.setText('') self.l_meta_05.setText('') self.l_meta_06.setText('') self.l_meta_07.setText('') self.l_meta_08.setText('') self.l_meta_content_01.setText('') self.l_meta_content_02.setText('') self.l_meta_content_03.setText('') self.l_meta_content_04.setText('') self.l_meta_content_05.setText('') self.l_meta_content_06.setText('') self.l_meta_content_07.setText('') self.l_meta_content_08.setText('') def index_changed(self,i): self.clear_metadata() if i is None: self.set_default_thumbnail() else: self.process_file(i.text()) preview = self.get_preview(i.text()) self.update_preview(preview) self.update_metadata(preview) def get_event(self): self.event_name = self.eventName.text() return self.event_name def get_search_types(self): self.search_types = [] if self.checkBox_search_for_images.isChecked(): self.search_types.append('image') if self.checkBox_search_for_video.isChecked(): self.search_types.append('video') if self.checkBox_search_for_audio.isChecked(): self.search_types.append('audio') return self.search_types def get_t_files(self): file_total = 0 self.file_list.clear() self.img_preview.setPixmap(QPixmap(os.path.join(basedir, 'assets', 'preview_placeholder.jpg'))) for folder, subfolders, filename in os.walk(self.src_dir): for f_type in self.search_types: for ext in self.file_types[f_type]: for file in filename: if file.lower().endswith(ext): current_file = os.path.join(folder, file) if is_file(current_file): file_total += int(1) else: print(f"Skipping {current_file} as it does not look like a real file.") return file_total def set_total_files(self,t): total_file_count = t self.lcd_files_found.display(total_file_count) def set_imported_files(self,t): self.lcd_files_imported.display(t) @staticmethod def print_output(s): print(f'output: {s}') def worker_thread_started(self): self.toggle_scan_button(False) self.toggle_import_button(False) def worker_thread_done(self): self.toggle_scan_button(True) if 0 < len(self.file_list): self.toggle_import_button(True) else: self.toggle_import_button(False) @staticmethod def thread_complete(): print("THREAD COMPLETE.") def add_found_file_to_list(self,f): sph = self.get_source_path_hash(f) self.files[sph] = {} self.file_list.addItem(f) def gen_file_dict(self,d): self.files = d def process_file(self,p): """ gather information and add to dictionary """ media_file = MediaFile(path_file_name=p, config=self.config, event_name=self.event_name) i = self.get_source_path_hash(p) print(f'processing: {p}\nhash: {i}') self.files[i] = media_file.media def find_files(self): """ find files to build a dictionary out of """ self.files = {} self.set_total_files(0) self.get_event() self.get_search_types() self.file_total = self.get_t_files() self.set_total_files(self.file_total) # time.sleep(3) file_finder = FindFiles(search_types=self.search_types, src_dir=self.src_dir, file_types=self.file_types, file_total=self.file_total) worker = Worker(file_finder.t_find_files) worker.signals.started.connect( self.worker_thread_started) worker.signals.started.connect( self.find_files_dialog.open_find_files_dialog) worker.signals.progress.connect( self.find_files_dialog.set_progress_finding_files) worker.signals.found_file.connect( self.add_found_file_to_list) # worker.signals.found_file.connect( # self.process_file) worker.signals.total_file_count.connect( self.set_total_files) # worker.signals.result.connect( # self.gen_file_dict) worker.signals.finished.connect( self.thread_complete) worker.signals.finished.connect( self.worker_thread_done) worker.signals.finished.connect( self.find_files_dialog.close_find_files_dialog) # Execute. self.threadpool.start(worker) # From _media_import.py def is_video(self,sph): if self.files[sph]['file_type'] == 'video': r = True else: r = False return r def is_image(self,sph): if self.files[sph]['file_type'] == 'image': r = True else: r = False return r def t_copy_files(self, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback): """ Copy Files. """ count = int(0) for line in range(self.file_list.count()): item = self.file_list.item(line) file = item.text() self.process_file(file) sph = self.get_source_path_hash(file) self.src_dir = self.files[sph]['folders']['source_path'] self.dst_dir = self.files[sph]['folders']['destination'] self.path_file_source = path.join(self.src_dir, self.files[sph]['name']) self.path_file_destination = path.join(self.dst_dir, self.files[sph]['name']) self.imp_dialog.set_importing_file(self.path_file_source) self.copy_a_file(sph, self.path_file_destination, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback) if self.check_store_original(sph) is True: self.path_file_destination_original = path.join(self.dst_dir, self.files[sph]['folders']['destination_original'], self.files[sph]['name']) self.copy_a_file(sph, self.path_file_destination_original, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback) count += 1 imported_file_count_callback.emit(count) progress_callback.emit(round((count / self.file_total) * 100, 1)) self.imp_dialog.add_to_imported_list(self.path_file_source) def copy_a_file(self, fph, target, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback): size = path.getsize(self.path_file_source) target_dir = os.path.dirname(target) create_folder(target_dir) dup_check = self.check_duplicate(fph,target,target_dir, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback) if dup_check is False: if self.is_video(fph): self.chunk_size = (1024 * 1024) * 5 else: self.chunk_size = (16 * 1024) * 1 with open(self.path_file_source, 'rb') as fs: with open(target, 'wb') as fd: while True: chunk = fs.read(self.chunk_size) if not chunk: break fd.write(chunk) dst_size = path.getsize(target) current_file_progress_callback.emit(round((dst_size / size) * 100, 1)) def check_store_original(self,sph): if self.config['store_originals'] is True: if self.is_image(sph): r = True else: r = False else: r = False return r def get_checksum(self,_pf, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback): sph = FileHash(_pf).path_hash self.path_file_source = os.path.join(self.files[sph]['folders']['source_path'], self.files[sph]['name']) self.path_file_destination = os.path.join(self.files[sph]['folders']['destination'], self.files[sph]['name']) if self.store_originals is True: self.path_file_destination_original = os.path.join( self.files[sph]['folders']['destination_original'], self.files[sph]['name']) def set_checksum(_pf): if path_exists(_pf) and is_file(_pf): self.files[sph]['xx_checksum'][_pf] = FileHash(_pf).get_hash(_pf, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback) else: print(f"set_checksum: {_pf} doesn't exist yet or isn't a real file.") if self.files[sph].get('xx_checksum') is None: self.files[sph]['xx_checksum'] = {} set_checksum(self.path_file_source) set_checksum(self.path_file_destination) if self.store_originals is True: set_checksum(self.path_file_destination_original) else: if self.files[sph]['xx_checksum'].get(self.path_file_source) is None: set_checksum(self.path_file_source) if self.files[sph]['xx_checksum'].get(self.path_file_destination) is None: set_checksum(self.path_file_destination) if self.store_originals is True: if self.files[sph]['xx_checksum'].get(self.path_file_destination_original) is None: set_checksum(self.path_file_destination_original) def check_duplicate(self, sph, target, target_dir, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback): if path_exists(target): print(f'Path Exists: path_file_source: {self.path_file_source}\ntarget: {target}') self.get_checksum(self.path_file_source, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback) hash1 = self.files[sph]['xx_checksum'][self.path_file_source] hash2 = self.files[sph]['xx_checksum'][target] check_match = cmp_hashes(hash1,hash2) if check_match is False: print(f'\nFound duplicate for {self.path_file_source}, renaming destination with hash appended.') base, extension = path.splitext(self.files[sph]['name']) file_name_hash = base + '_' + hash2 + extension rename(target, path.join(target_dir, file_name_hash)) r = False # This is false because we move the original conflict out of the way. else: print(f"\n########################\nFound duplicate for: {self.path_file_source}\n" f"at: {target}... \nHashes MATCH\n" f"########################") r = True else: # No duplicate r = False return r # END from _media_import.py def import_files(self): """ Import found files """ # Initialize Widgets self.lcd_files_imported.display(int(0)) self.imp_dialog.set_progress_importing(0) self.imp_dialog.set_progress_current_file(0) worker = Worker(self.t_copy_files) worker.signals.started.connect( self.worker_thread_started) worker.signals.started.connect( self.imp_dialog.open_import_dialog) worker.signals.progress.connect( self.imp_dialog.set_progress_importing) worker.signals.current_file_progress.connect( self.imp_dialog.set_progress_current_file) worker.signals.imported_file_count.connect( self.set_imported_files) worker.signals.result.connect( self.print_output) worker.signals.finished.connect( self.thread_complete) worker.signals.finished.connect( self.worker_thread_done) worker.signals.finished.connect( self.compare_imported_checksums) worker.signals.checksum_dialog_open.connect( self.checksum_progress_dialog.open_dialog) worker.signals.checksum_file.connect( self.checksum_progress_dialog.set_file) worker.signals.checksum_progress.connect( self.checksum_progress_dialog.set_progress) worker.signals.finished.connect( self.checksum_progress_dialog.close_dialog) # Execute self.threadpool.start(worker) def compare_imported_checksums(self): # if self.config['verify_checksum'] is True: # self.checksum_fph = fph # Initialize Widgets self.compare_imported_checksums_dialog.setup_compare_dialog() worker = Worker(self.t_compare_imported_checksums) worker.signals.started.connect( self.worker_thread_started) worker.signals.started.connect( self.compare_imported_checksums_dialog.open_dialog) worker.signals.compare_checksums_source_file.connect( self.compare_imported_checksums_dialog.set_label_path_file_source) worker.signals.compare_checksums_dest_file.connect( self.compare_imported_checksums_dialog.set_label_path_file_dest) worker.signals.compare_checksums_source_hash.connect( self.compare_imported_checksums_dialog.set_label_hash_source) worker.signals.compare_checksums_dest_hash.connect( self.compare_imported_checksums_dialog.set_label_hash_dest) worker.signals.compare_checksums_add_row.connect( self.compare_imported_checksums_dialog.add_table_row) worker.signals.finished.connect( self.thread_complete) worker.signals.finished.connect( self.worker_thread_done) worker.signals.finished.connect( self.checksum_progress_dialog.close_dialog) worker.signals.checksum_dialog_open.connect( self.checksum_progress_dialog.open_dialog) worker.signals.checksum_file.connect( self.checksum_progress_dialog.set_file) worker.signals.checksum_progress.connect( self.checksum_progress_dialog.set_progress) worker.signals.finished.connect( self.checksum_progress_dialog.close_dialog) # Execute self.threadpool.start(worker) def t_compare_imported_checksums(self, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback): for file in self.files: _pfs = os.path.join(self.files[file]['folders']['source_path'], self.files[file]['name']) _pfd = os.path.join(self.files[file]['folders']['destination'], self.files[file]['name']) compare_checksums_source_file_callback.emit(_pfs) compare_checksums_dest_file_callback.emit(_pfd) compare_checksums_source_hash_callback.emit('Calculating...') compare_checksums_dest_hash_callback.emit('Calculating...') self.get_checksum(_pfs, progress_callback, current_file_progress_callback, imported_file_count_callback, found_file_callback, total_file_count_callback, checksum_file_callback, checksum_progress_callback, checksum_dialog_open_callback, compare_checksums_source_file_callback, compare_checksums_dest_file_callback, compare_checksums_source_hash_callback, compare_checksums_dest_hash_callback, compare_checksums_add_row_callback) compare_checksums_source_hash_callback.emit( self.files[file]['xx_checksum'][_pfs]) compare_checksums_dest_hash_callback.emit( self.files[file]['xx_checksum'][_pfd]) i = 0 c = {} for checksum in self.files[file]['xx_checksum']: c[i] = self.files[file]['xx_checksum'][checksum] if i > 0: p = i - 1 if c[i] == c[p]: self.files[file]['checksum_match'] = True print(f'Checksums match for: {self.files[file]["name"]}') else: self.files[file]['checksum_match'] = False print(f'Checksum failed for: {self.files[file]["name"]}') i += 1 table_row = { 'source_path_file': _pfs, 'dest_path_file': _pfd, 'source_path_hash': self.files[file]['xx_checksum'][_pfs], 'dest_path_hash': self.files[file]['xx_checksum'][_pfd], 'checksum_match': self.files[file]['checksum_match'] } print(f"checksum_match:{self.files[file]['checksum_match']}") print(f"--------Table Row-------\n{table_row}\n--------End-------\n") compare_checksums_add_row_callback.emit(table_row) # Main App app = QApplication(sys.argv) # Main Window window = MainWindow() # Show Window window.show() app.exec()