#!/usr/bin/env python import os import sys # import time from PyQt6.QtCore import * from PyQt6.QtGui import * from PyQt6.QtWidgets import * import traceback from configure import CONFIG_FILE, Configure from file_stuff import is_file, get_t_files from BitMover_MainWindow import Ui_MainWindow from get_image_tag import get_exif_tag from media import Media from lumberjack import timber from raw_photo import extract_jpg_thumb log = timber(__name__) class WorkerSignals(QObject): """ Defines the signals available from a running worker thread. Supported signals are: finished No data error tuple (exctype, value, traceback.format_exc() ) result object data returned from processing, anything progress int indicating % progress """ finished = pyqtSignal() error = pyqtSignal(tuple) result = pyqtSignal(object) progress = pyqtSignal(int) class Worker(QRunnable): """ Worker thread Inherits from QRunnable to handler worker thread setup, signals and wrap-up. :param callback: The function callback to run on this worker thread. Supplied args and kwargs will be passed through to the runner. :type callback: function :param args: Arguments to pass to the callback function :param kwargs: Keywords to pass to the callback function """ def __init__(self, fn, *args, **kwargs): super(Worker, self).__init__() # Store constructor arguments (re-used for processing) self.fn = fn self.args = args self.kwargs = kwargs self.signals = WorkerSignals() # Add the callback to our kwargs self.kwargs['progress_callback'] = self.signals.progress @pyqtSlot() def run(self): """ Initialise the runner function with passed args, kwargs. """ # Retrieve args/kwargs here; and fire processing using them try: result = self.fn(*self.args, **self.kwargs) except: traceback.print_exc() exctype, value = sys.exc_info()[:2] self.signals.error.emit((exctype, value, traceback.format_exc())) else: self.signals.result.emit(result) # Return the result of the processing finally: self.signals.finished.emit() # Done # Subclass QMainWindow to customize your application's main window class MainWindow(QMainWindow, Ui_MainWindow): def __init__(self, *args, **kwargs): super(MainWindow,self).__init__(*args,**kwargs) self.setupUi(self) # uic.loadUi('BitMover.ui',self) c = Configure(CONFIG_FILE) self.config = c.load_config() self.total_files = 0 # self.t_find_files = threading.Thread(target=self.find_files) self.setWindowTitle("BitMover") self.setWindowIcon(QIcon('assets/forklift.png')) self.src_dir = self.config['folders']['source']['base'] self.dst_dir = self.config['folders']['destination']['base'] self.file_types = self.config['file_types'] self.lineEdit_src_dir.setText(self.src_dir) self.lineEdit_dst_dir.setText(self.dst_dir) self.pushButton_src_browse.clicked.connect(self.select_src_directory) self.pushButton_dst_browse.clicked.connect(self.select_dst_directory) self.pushButton_3_scan_dir.clicked.connect(self.find_files) # self.pushButton_3_scan_dir.clicked.connect(self.t_find_files.start) self.lcd_files_found.display(int(0)) self.set_progress(0,0) self.img_preview.setPixmap(QPixmap('assets/preview_placeholder.jpg')) self.img_preview.setScaledContents(True) # self.img_preview.setFixedWidth(preview_width) # self.img_preview.setFixedHeight(preview_height) self.file_list.currentItemChanged.connect(self.index_changed) self.threadpool = QThreadPool() self.files = {} def index_changed(self,i): f = i.text() event = self.get_event() c = self.config m = Media(f,event,c) dtc = f'{m.capture_date[0]}/{m.capture_date[1]}/{m.capture_date[2]}' self.label_data_date_time_created.setText(dtc) if m.file_type == 'image': # img = Image.open(f) dpi = get_exif_tag(f,"xresolution") width = get_exif_tag(f,"image width") height = get_exif_tag(f,"image height") if width is None: get_exif_tag(f,"exifimagewidth") if height is None: get_exif_tag(f,"exifimagelength") # width = img.width # height = img.height size = f'{width}x{height}' if width is not None and height is not None: mpixels = (width * height) / 1000000 else: mpixels = '' iso = get_exif_tag(f,"iso") aperture = get_exif_tag(f,"fnumber") camera = get_exif_tag(f,"cameramodelname") if camera is None: camera = get_exif_tag(f,"image model") lens = get_exif_tag(f,"lensmodel") zoom = get_exif_tag(f,"focallength") print(f'size: {size}') print(f'dpi: {dpi}') print(f'iso: {iso}') print(f'lens: {lens}') print(f'zoom: {zoom}') print(f'camera: {camera}') print(f'aperture: {aperture}') print(f'mpixels: {mpixels}') self.label_data_width_height.setText(str(size)) self.label_data_dpi.setText(str(dpi)) self.label_data_iso.setText(str(iso)) self.label_data_lens.setText(str(lens)) self.label_data_zoom.setText(str(zoom)) self.label_data_camera.setText(str(camera)) self.label_data_aperture.setText(str(aperture)) self.label_data_megapixels.setText(str(mpixels)) if f.lower().endswith("jpg") or f.lower().endswith("jpeg"): self.img_preview.setPixmap(QPixmap(f)) else: # jpg = img.convert("RGB") jpg = extract_jpg_thumb(f) self.img_preview.setPixmap(QPixmap(jpg)) def select_src_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.src_dir) if directory: print("Selected Directory:", directory) # path = Path(directory) self.src_dir = directory self.lineEdit_src_dir.setText(self.src_dir) def select_dst_directory(self): directory = QFileDialog.getExistingDirectory(self, "Select Directory", self.dst_dir) if directory: print("Selected Directory:", directory) # path = Path(directory) self.dst_dir = directory self.lineEdit_dst_dir.setText(self.dst_dir) def set_progress(self,p,t): """ set progress for bar, p = progress counter t = target total o = progress bar object """ if int(t) == 0: t += 1 # # while QPainter.isActive(Ui_MainWindow.QPainter()): # print('painter active') # time.sleep(0.02) percent_complete = (int(p) / int(t)) * 100 self.progressBar_overall.setValue(int(percent_complete)) def print_output(self, s): print(s) def thread_complete(self): print("THREAD COMPLETE!") def thread_find_files(self): print('in thread_find_files') print(self.src_dir) worker = Worker(self.find_files()) worker.signals.result.connect(self.print_output) worker.signals.finished.connect(self.thread_complete) worker.signals.progress.connect(self.progress_fn) self.threadpool.start(worker) def find_files(self): """ find files to build a dictionary out of """ log.info('In find_files') print(self.src_dir) file_count = int(0) file_total = get_t_files(self.src_dir,self.file_types) for folder, subfolders, filename in os.walk(self.src_dir): for f_type in self.file_types: for ext in self.file_types[f_type]: # for file in tqdm(filename, # desc='Finding ' + ext + ' Files in ' + folder): for file in filename: if file.lower().endswith(ext): current_file = os.path.join(folder, file) if is_file(current_file): file_count += int(1) self.process_file(folder, file) self.set_progress(file_count,file_total) # time.sleep(.02) else: print(f"Skipping {current_file} as it does not look like a real file.") progress_callback.emit((file_count / file_total) * 100) def get_event(self): event_name = self.eventName.text() return event_name def process_file(self,path_name, f_name): """ gather information and add to dictionary """ event = self.get_event() c = self.config log.debug(f'process_file({path_name}, {f_name}, {event}, {c})') m = Media(os.path.join(path_name,f_name),event, c) i = m.source_path_hash log.debug(f'Source Path Hash: {i}') self.files[i] = { 'folders': {}, 'date': {} } self.files[i]['folders']['source_path'] = m.source_path_dir self.files[i]['type'] = m.file_type self.files[i]['name'] = m.file_name self.files[i]['extension'] = m.file_ext self.files[i]['date']['capture_date'] = {} self.files[i]['date']['capture_date']['y'] = m.capture_date[0] self.files[i]['date']['capture_date']['m'] = m.capture_date[1] self.files[i]['date']['capture_date']['d'] = m.capture_date[2] self.files[i]['folders']['destination'] = m.destination_path self.files[i]['folders']['destination_original'] = m.destination_originals_path self.file_list.addItem(f"{self.files[i]['folders']['source_path']}/{self.files[i]['name']}") app = QApplication(sys.argv) window = MainWindow() window.show() app.exec()