#!/usr/bin/env python import os import sys import traceback from PyQt6.QtCore import pyqtSignal, pyqtSlot, QRunnable, QObject, QThreadPool from PyQt6.QtWidgets import QMainWindow, QApplication from PyQt6.QtGui import QIcon,QPixmap from PIL import Image from configure import CONFIG_FILE, Configure from file_stuff import is_file from BitMover_MainWindow import Ui_MainWindow from get_image_tag import get_exif_tag from media import Media from lumberjack import timber from raw_photo import extract_jpg_thumb, get_raw_image_dimensions log = timber(__name__) class WorkerSignals(QObject): """ Defines the signals avail from a running worker thread. Supported signals are: finished No Data error tuple (exctype, value, traceback.format_exc() ) result object data returned from processing, anything progress int indicating % progress """ started = pyqtSignal() finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(int) class Worker(QRunnable): """ Worker Thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function """ def __init__(self,fn,*args,**kwargs): super(Worker,self).__init__() # Store constructor args (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): """ Initialise the runner function with passed args, kwargs. """ # Retrieve args/kwargs here; and fire processing using them try: self.signals.started.emit() result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype,value,traceback.format_exc())) else: self.signals.result.emit(result) finally: self.signals.finished.emit() # Subclass QMainWindow to customize your application's main window class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, *args, **kwargs): super(MainWindow,self).__init__(*args,**kwargs) self.setupUi(self) # uic.loadUi('BitMover.ui',self) c = Configure(CONFIG_FILE) self.config = c.load_config() self.total_files = 0 self.file_total = 0 self.threads = {} self.lcd_files_found.display(int(0)) self.set_progress(0, 0) self.setWindowTitle("BitMover") self.setWindowIcon(QIcon('assets/forklift.png')) self.threadpool = QThreadPool() print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount()) self.src_dir = self.config['folders']['source']['base'] self.dst_dir = self.config['folders']['destination']['base'] self.file_types = self.config['file_types'] self.lineEdit_src_dir.setText(self.src_dir) self.lineEdit_dst_dir.setText(self.dst_dir) self.pushButton_src_browse.clicked.connect(self.select_src_directory) self.pushButton_dst_browse.clicked.connect(self.select_dst_directory) self.pushButton_3_scan_dir.clicked.connect(self.find_files) self.toggle_scan_button(True) # self.pushButton_3_scan_dir.clicked.connect(self.t_find_files.start) self.img_preview.setPixmap(QPixmap('assets/preview_placeholder.jpg')) self.img_preview.setScaledContents(True) self.file_list.currentItemChanged.connect(self.index_changed) self.files = {} def toggle_scan_button(self,enable=True): if enable: self.pushButton_3_scan_dir.setEnabled(True) else: self.pushButton_3_scan_dir.setDisabled(True) def index_changed(self,i): f = i.text() event = self.get_event() c = self.config m = Media(f,event,c) dtc = f'{m.capture_date[0]}/{m.capture_date[1]}/{m.capture_date[2]}' self.label_data_date_time_created.setText(dtc) if m.file_type == 'image': dpi = get_exif_tag(f,"xresolution") if f.lower().endswith("jpg") or f.lower().endswith("jpeg"): img = Image.open(f) width = img.width height = img.height else: width = get_raw_image_dimensions(f)[1] height = get_raw_image_dimensions(f)[0] size = f'{width}x{height}' if width is not None and height is not None: mpixels = round((width * height) / 1000000, 1) else: mpixels = '' iso = get_exif_tag(f,"iso") aperture = get_exif_tag(f,"fnumber") camera = get_exif_tag(f,"cameramodelname") if camera is None: camera = get_exif_tag(f,"image model") lens = get_exif_tag(f,"lensmodel") zoom = get_exif_tag(f,"focallength") print(f'size: {size}') print(f'dpi: {dpi}') print(f'iso: {iso}') print(f'lens: {lens}') print(f'zoom: {zoom}') print(f'camera: {camera}') print(f'aperture: {aperture}') print(f'mpixels: {mpixels}') self.label_data_width_height.setText(str(size)) self.label_data_dpi.setText(str(dpi)) self.label_data_iso.setText(str(iso)) self.label_data_lens.setText(str(lens)) self.label_data_zoom.setText(str(zoom)) self.label_data_camera.setText(str(camera)) self.label_data_aperture.setText(str(aperture)) self.label_data_megapixels.setText(str(mpixels)) if f.lower().endswith("jpg") or f.lower().endswith("jpeg"): self.img_preview.setPixmap(QPixmap(f)) else: # jpg = img.convert("RGB") jpg = extract_jpg_thumb(f) self.img_preview.setPixmap(QPixmap(jpg)) def select_src_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.src_dir) if directory: print("Selected Directory:", directory) # path = Path(directory) self.src_dir = directory self.lineEdit_src_dir.setText(self.src_dir) def select_dst_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.dst_dir) if directory: print("Selected Directory:", directory) # path = Path(directory) self.dst_dir = directory self.lineEdit_dst_dir.setText(self.dst_dir) def set_total_files(self,t): self.total_files = t self.lcd_files_found.display(self.total_files) def progress_fn(self,n): # print("%d%% done" % n) self.progressBar_overall.setValue(int(n)) def set_progress(self,p,t): """ set progress for bar, p = progress counter t = target total """ if int(t) == 0: t += 1 percent_complete = (int(p) / int(t)) * 100 self.progressBar_overall.setValue(int(percent_complete)) def get_t_files(self): for folder, subfolders, filename in os.walk(self.src_dir): for f_type in self.file_types: for ext in self.file_types[f_type]: for file in filename: if file.lower().endswith(ext): current_file = os.path.join(folder, file) if is_file(current_file): self.file_total += int(1) else: print(f"Skipping {current_file} as it does not look like a real file.") def t_find_files(self,progress_callback): file_count = int(0) self.get_t_files() self.set_total_files(self.file_total) for folder, subfolders, filename in os.walk(self.src_dir): for f_type in self.file_types: for ext in self.file_types[f_type]: for file in filename: if file.lower().endswith(ext): current_file = os.path.join(folder, file) if is_file(current_file): file_count += int(1) self.process_file(current_file) # self.set_progress(file_count,file_total) # time.sleep(.02) else: print(f"Skipping {current_file} as it does not look like a real file.") progress_callback.emit((file_count / self.file_total) * 100) return "Done." def print_output(self,s): print(s) def scan_thread_started(self): print('scan thread started') self.toggle_scan_button(False) def scan_thread_done(self): print('scan thread complete.') self.toggle_scan_button(True) def thread_complete(self): print("THREAD COMPLETE.") def find_files(self): """ find files to build a dictionary out of """ worker = Worker(self.t_find_files) # worker.signals.started.connect(self.toggle_scan_button(enable=False)) worker.signals.started.connect(self.scan_thread_started) worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.finished.connect(self.scan_thread_done) # worker.signals.finished.connect(self.toggle_scan_button(enable=True)) worker.signals.progress.connect(self.progress_fn) # Execute. self.threadpool.start(worker) def get_event(self): event_name = self.eventName.text() return event_name def process_file(self,p): """ gather information and add to dictionary """ path_name = os.path.dirname(p) f_name = os.path.basename(p) event = self.get_event() c = self.config log.debug(f'process_file({path_name}, {f_name}, {event}, {c})') m = Media(os.path.join(path_name,f_name),event, c) i = m.source_path_hash log.debug(f'Source Path Hash: {i}') self.files[i] = { 'folders': {}, 'date': {} } self.files[i]['folders']['source_path'] = m.source_path_dir self.files[i]['type'] = m.file_type self.files[i]['name'] = m.file_name self.files[i]['extension'] = m.file_ext self.files[i]['date']['capture_date'] = {} self.files[i]['date']['capture_date']['y'] = m.capture_date[0] self.files[i]['date']['capture_date']['m'] = m.capture_date[1] self.files[i]['date']['capture_date']['d'] = m.capture_date[2] self.files[i]['folders']['destination'] = m.destination_path self.files[i]['folders']['destination_original'] = m.destination_originals_path self.file_list.addItem(f"{self.files[i]['folders']['source_path']}/{self.files[i]['name']}") app = QApplication(sys.argv) window = MainWindow() window.show() app.exec()