#!/usr/bin/env python import os import sys from PyQt6.QtCore import QThreadPool from PyQt6.QtGui import QIcon, QPixmap from PyQt6.QtWidgets import QMainWindow, QApplication, QFileDialog from _BitMover_MainWindow import Ui_MainWindow from _configure import CONFIG_FILE, Configure from _find_files import FindFiles from _find_files_dialog import FindProgress from _import_dialog import DialogImport from _media_import import MediaImporter from _preview import MediaPreview from _thread_my_stuff import Worker basedir = os.path.dirname(__file__) # TODO: verify source dir actually exists # Subclass QMainWindow to customize your application's main window class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, *args, **kwargs): super(MainWindow,self).__init__(*args,**kwargs) self.setupUi(self) self.setWindowTitle("BitMover") self.setWindowIcon(QIcon(os.path.join(basedir,'assets', 'forklift.ico'))) self.threadpool = QThreadPool() c = Configure(CONFIG_FILE) self.config = c.load_config() self.src_dir = self.config['folders']['source']['base'] self.dst_dir = self.config['folders']['destination']['base'] self.verify_checksum = self.config['verify_checksum'] self.cleanup_files = self.config['cleanup_sd'] self.store_originals = self.config['store_originals'] self.file_types = self.config['file_types'] # File Stuff self.total_files = 0 self.files = {} self.imp_dialog = DialogImport() self.find_files_dialog = FindProgress() self.widgets_config() def widgets_config(self): # Button Setup self.pushButton_src_browse.clicked.connect(self.select_src_directory) self.pushButton_dst_browse.clicked.connect(self.select_dst_directory) self.pushButton_3_scan_dir.clicked.connect(self.find_files) self.pushButton_import.clicked.connect(self.import_files) # Initialize widgets self.lineEdit_src_dir.setText(self.src_dir) self.lineEdit_dst_dir.setText(self.dst_dir) self.toggle_scan_button(True) self.toggle_import_button(False) self.lcd_files_found.display(int(0)) self.set_default_thumbnail() self.file_list.currentItemChanged.connect(self.index_changed) self.checkBox_verify_checksum.setChecked(self.verify_checksum) self.checkBox_cleanup_files.setChecked(self.cleanup_files) self.checkBox_store_originals.setChecked(self.store_originals) self.checkBox_verify_checksum.checkStateChanged.connect(self.verify_checksum_changed) self.checkBox_cleanup_files.checkStateChanged.connect(self.cleanup_files_changed) self.checkBox_store_originals.checkStateChanged.connect(self.store_originals_changed) self.clear_metadata() # Setup thread pool print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount()) def toggle_scan_button(self,enable=True): print(f'toggle_scan_button.enabled: {enable}') self.pushButton_3_scan_dir.setEnabled(enable) def toggle_import_button(self,enable=True): self.pushButton_import.setEnabled(enable) def select_src_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.src_dir) if directory: print("Selected Directory:", directory) # path = Path(directory) self.src_dir = directory self.lineEdit_src_dir.setText(self.src_dir) def select_dst_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.dst_dir) if directory: print("Selected Directory:", directory) # path = Path(directory) self.dst_dir = directory self.lineEdit_dst_dir.setText(self.dst_dir) def verify_checksum_changed(self): if self.checkBox_verify_checksum.isChecked(): self.config['verify_checksum'] = True else: self.config['verify_checksum'] = False print(f"verify_checksums: {self.config['verify_checksums']}") def cleanup_files_changed(self): if self.checkBox_cleanup_files.isChecked(): self.config['cleanup_sd'] = True else: self.config['cleanup_sd'] = False print(f"cleanup_sd: {self.config['cleanup_sd']}") def store_originals_changed(self): if self.checkBox_store_originals.isChecked(): self.config['store_originals'] = True else: self.config['store_originals'] = False print(f"store_originals: {self.config['store_originals']}") def set_thumbnail(self,thumb_file,scaled=True,ratio=None): self.img_preview.setPixmap(QPixmap(thumb_file)) self.img_preview.setScaledContents(scaled) if ratio is not None: self.img_preview.setFixedHeight(self.img_preview.width() / ratio) def set_default_thumbnail(self): self.set_thumbnail(os.path.join(basedir, 'assets', 'preview_placeholder.jpg')) def get_preview(self,i): preview = MediaPreview(path_file_name = i.text(), media_files = self.files) return preview def update_preview(self,preview): self.set_thumbnail(preview.thumbnail,ratio=preview.thumbnail_ratio) def update_metadata(self,preview): self.clear_metadata() path_hash = preview.source_path_hash f = self.files[path_hash] f_date = f['date']['capture_date'] f_video = f['video'] self.l_data_file_source_path.setText( self.files[path_hash]['folders']['source_path']) self.l_data_file_dest_path.setText( self.files[path_hash]['folders']['destination']) self.l_meta_content_date_time_c.setText( f"{f_date['y']}/{f_date['m']}/{f_date['d']}" ) if preview.file_type == 'image': self.l_meta_01.setText('Size') self.l_meta_02.setText('dpi') self.l_meta_03.setText('ISO') self.l_meta_04.setText('Lens') self.l_meta_05.setText('Focal Length') self.l_meta_06.setText('Camera') self.l_meta_07.setText('Aperture') self.l_meta_08.setText('Megapixels') self.l_meta_content_01.setText(str(f['photo']['size']['width_height'])) self.l_meta_content_02.setText(str(f['photo']['dpi'])) self.l_meta_content_03.setText(str(f['photo']['iso'])) self.l_meta_content_04.setText(str(f['photo']['lens_model'])) self.l_meta_content_05.setText(str(f['photo']['focal_length'])) self.l_meta_content_06.setText(str(f"{f['photo']['camera_brand']} {f['photo']['camera_model']}")) self.l_meta_content_07.setText(str(f['photo']['aperture'])) self.l_meta_content_08.setText(str(f['photo']['megapixels'])) elif preview.file_type == 'video': self.l_meta_01.setText('Size') self.l_meta_02.setText('Frames / Second') self.l_meta_03.setText('Bit Depth') self.l_meta_04.setText('Duration') self.l_meta_05.setText('Encoder') self.l_meta_06.setText('Codec') self.l_meta_07.setText('Profile') self.l_meta_08.setText('Pix Format') self.l_meta_content_01.setText(str(f_video['size']['width_height'])) self.l_meta_content_02.setText(str(f_video['r_framerate'])) self.l_meta_content_03.setText(str(f_video['bit_depth'])) self.l_meta_content_04.setText(str(f_video['duration'])) self.l_meta_content_05.setText(str(f_video['encoding'])) self.l_meta_content_06.setText(str(f_video['codec'])) self.l_meta_content_07.setText(str(f_video['profile'])) self.l_meta_content_08.setText(str(f_video['pix_format'])) def clear_metadata(self): self.l_meta_01.setText('') self.l_meta_02.setText('') self.l_meta_03.setText('') self.l_meta_04.setText('') self.l_meta_05.setText('') self.l_meta_06.setText('') self.l_meta_07.setText('') self.l_meta_08.setText('') self.l_meta_content_01.setText('') self.l_meta_content_02.setText('') self.l_meta_content_03.setText('') self.l_meta_content_04.setText('') self.l_meta_content_05.setText('') self.l_meta_content_06.setText('') self.l_meta_content_07.setText('') self.l_meta_content_08.setText('') def index_changed(self,i): self.clear_metadata() if i is None: self.set_default_thumbnail() else: preview = self.get_preview(i) self.update_preview(preview) self.update_metadata(preview) def get_event(self): event_name = self.eventName.text() return event_name def set_total_files(self,t): total_file_count = t self.lcd_files_found.display(total_file_count) def set_imported_files(self,t): self.lcd_files_imported.display(t) @staticmethod def print_output(s): print(s) def worker_thread_started(self): print('scan thread started') self.toggle_scan_button(False) self.toggle_import_button(False) def worker_thread_done(self): print('scan thread complete.') self.toggle_scan_button(True) if 0 < len(self.files): self.toggle_import_button(True) else: self.toggle_import_button(False) @staticmethod def thread_complete(): print("THREAD COMPLETE.") def add_found_file_to_list(self,f): self.file_list.addItem(f) def gen_file_dict(self,d): self.files = d def find_files(self): """ find files to build a dictionary out of """ self.files = {} self.set_total_files(0) file_finder = FindFiles() worker = Worker(file_finder.t_find_files) worker.signals.started.connect( self.worker_thread_started) worker.signals.started.connect( self.find_files_dialog.open_find_files_dialog) worker.signals.progress.connect( self.find_files_dialog.set_progress_finding_files) worker.signals.found_file.connect( self.add_found_file_to_list) worker.signals.total_file_count.connect( self.set_total_files) worker.signals.result.connect( self.gen_file_dict) worker.signals.finished.connect( self.thread_complete) worker.signals.finished.connect( self.worker_thread_done) worker.signals.finished.connect( self.find_files_dialog.close_find_files_dialog) # Execute. self.threadpool.start(worker) def import_files(self): """ Import found files """ # Initialize Widgets self.lcd_files_imported.display(int(0)) self.imp_dialog.set_progress_importing(0) self.imp_dialog.set_progress_current_file(0) importer = MediaImporter() worker = Worker(importer.t_copy_files) worker.signals.started.connect( self.worker_thread_started) worker.signals.started.connect( self.imp_dialog.open_import_dialog) worker.signals.import_progress.connect( self.imp_dialog.set_progress_importing) worker.signals.current_file_progress.connect( self.imp_dialog.set_progress_current_file) worker.signals.imported_file_count.connect( self.set_imported_files) worker.signals.result.connect( self.print_output) worker.signals.finished.connect( self.thread_complete) worker.signals.finished.connect( self.worker_thread_done) # Execute self.threadpool.start(worker) def verify_checksum(self): # fh_match = FileHash() print(self.config) app = QApplication(sys.argv) window = MainWindow() window.show() app.exec()