BitMover/BitMover_ui.py

338 lines
12 KiB
Python
Executable File

#!/usr/bin/env python
import os
import sys
import traceback
from PyQt6.QtCore import pyqtSignal, pyqtSlot, QRunnable, QObject, QThreadPool
from PyQt6.QtWidgets import QMainWindow, QApplication
from PyQt6.QtGui import QIcon,QPixmap
from PIL import Image
from configure import CONFIG_FILE, Configure
from file_stuff import is_file
from BitMover_MainWindow import Ui_MainWindow
from get_image_tag import get_exif_tag
from media import Media
from lumberjack import timber
from raw_photo import extract_jpg_thumb, get_raw_image_dimensions
log = timber(__name__)
class WorkerSignals(QObject):
"""
Defines the signals avail from a running worker thread.
Supported signals are:
finished
No Data
error
tuple (exctype, value, traceback.format_exc() )
result
object data returned from processing, anything
progress
int indicating % progress
"""
started = pyqtSignal()
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(int)
class Worker(QRunnable):
"""
Worker Thread
Inherits from QRunnable to handler worker thread setup, signals and wrap-up.
:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function
"""
def __init__(self,fn,*args,**kwargs):
super(Worker,self).__init__()
# Store constructor args (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
# Add the callback to our kwargs
self.kwargs['progress_callback'] = self.signals.progress
@pyqtSlot()
def run(self):
"""
Initialise the runner function with passed args, kwargs.
"""
# Retrieve args/kwargs here; and fire processing using them
try:
self.signals.started.emit()
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype,value,traceback.format_exc()))
else:
self.signals.result.emit(result)
finally:
self.signals.finished.emit()
# Subclass QMainWindow to customize your application's main window
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow,self).__init__(*args,**kwargs)
self.setupUi(self)
# uic.loadUi('BitMover.ui',self)
c = Configure(CONFIG_FILE)
self.config = c.load_config()
self.total_files = 0
self.file_total = 0
self.threads = {}
self.lcd_files_found.display(int(0))
self.set_progress(0, 0)
self.setWindowTitle("BitMover")
self.setWindowIcon(QIcon('assets/forklift.png'))
self.threadpool = QThreadPool()
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
self.src_dir = self.config['folders']['source']['base']
self.dst_dir = self.config['folders']['destination']['base']
self.file_types = self.config['file_types']
self.lineEdit_src_dir.setText(self.src_dir)
self.lineEdit_dst_dir.setText(self.dst_dir)
self.pushButton_src_browse.clicked.connect(self.select_src_directory)
self.pushButton_dst_browse.clicked.connect(self.select_dst_directory)
self.pushButton_3_scan_dir.clicked.connect(self.find_files)
self.toggle_scan_button(True)
# self.pushButton_3_scan_dir.clicked.connect(self.t_find_files.start)
self.img_preview.setPixmap(QPixmap('assets/preview_placeholder.jpg'))
self.img_preview.setScaledContents(True)
self.file_list.currentItemChanged.connect(self.index_changed)
self.files = {}
def toggle_scan_button(self,enable=True):
if enable:
self.pushButton_3_scan_dir.setEnabled(True)
else:
self.pushButton_3_scan_dir.setDisabled(True)
def index_changed(self,i):
f = i.text()
event = self.get_event()
c = self.config
m = Media(f,event,c)
dtc = f'{m.capture_date[0]}/{m.capture_date[1]}/{m.capture_date[2]}'
self.label_data_date_time_created.setText(dtc)
if m.file_type == 'image':
dpi = get_exif_tag(f,"xresolution")
if f.lower().endswith("jpg") or f.lower().endswith("jpeg"):
img = Image.open(f)
width = img.width
height = img.height
else:
width = get_raw_image_dimensions(f)[1]
height = get_raw_image_dimensions(f)[0]
size = f'{width}x{height}'
if width is not None and height is not None:
mpixels = round((width * height) / 1000000, 1)
else:
mpixels = ''
iso = get_exif_tag(f,"iso")
aperture = get_exif_tag(f,"fnumber")
camera = get_exif_tag(f,"cameramodelname")
if camera is None:
camera = get_exif_tag(f,"image model")
lens = get_exif_tag(f,"lensmodel")
zoom = get_exif_tag(f,"focallength")
print(f'size: {size}')
print(f'dpi: {dpi}')
print(f'iso: {iso}')
print(f'lens: {lens}')
print(f'zoom: {zoom}')
print(f'camera: {camera}')
print(f'aperture: {aperture}')
print(f'mpixels: {mpixels}')
self.label_data_width_height.setText(str(size))
self.label_data_dpi.setText(str(dpi))
self.label_data_iso.setText(str(iso))
self.label_data_lens.setText(str(lens))
self.label_data_zoom.setText(str(zoom))
self.label_data_camera.setText(str(camera))
self.label_data_aperture.setText(str(aperture))
self.label_data_megapixels.setText(str(mpixels))
if f.lower().endswith("jpg") or f.lower().endswith("jpeg"):
self.img_preview.setPixmap(QPixmap(f))
else:
# jpg = img.convert("RGB")
jpg = extract_jpg_thumb(f)
self.img_preview.setPixmap(QPixmap(jpg))
def select_src_directory(self):
directory = QFileDialog.getExistingDirectory(self,
"Select Directory",
self.src_dir)
if directory:
print("Selected Directory:", directory)
# path = Path(directory)
self.src_dir = directory
self.lineEdit_src_dir.setText(self.src_dir)
def select_dst_directory(self):
directory = QFileDialog.getExistingDirectory(self,
"Select Directory",
self.dst_dir)
if directory:
print("Selected Directory:", directory)
# path = Path(directory)
self.dst_dir = directory
self.lineEdit_dst_dir.setText(self.dst_dir)
def set_total_files(self,t):
self.total_files = t
self.lcd_files_found.display(self.total_files)
def progress_fn(self,n):
# print("%d%% done" % n)
self.progressBar_overall.setValue(int(n))
def set_progress(self,p,t):
"""
set progress for bar,
p = progress counter
t = target total
"""
if int(t) == 0:
t += 1
percent_complete = (int(p) / int(t)) * 100
self.progressBar_overall.setValue(int(percent_complete))
def get_t_files(self):
for folder, subfolders, filename in os.walk(self.src_dir):
for f_type in self.file_types:
for ext in self.file_types[f_type]:
for file in filename:
if file.lower().endswith(ext):
current_file = os.path.join(folder, file)
if is_file(current_file):
self.file_total += int(1)
else:
print(f"Skipping {current_file} as it does not look like a real file.")
def t_find_files(self,progress_callback):
file_count = int(0)
self.get_t_files()
self.set_total_files(self.file_total)
for folder, subfolders, filename in os.walk(self.src_dir):
for f_type in self.file_types:
for ext in self.file_types[f_type]:
for file in filename:
if file.lower().endswith(ext):
current_file = os.path.join(folder, file)
if is_file(current_file):
file_count += int(1)
self.process_file(current_file)
# self.set_progress(file_count,file_total)
# time.sleep(.02)
else:
print(f"Skipping {current_file} as it does not look like a real file.")
progress_callback.emit((file_count / self.file_total) * 100)
return "Done."
def print_output(self,s):
print(s)
def scan_thread_started(self):
print('scan thread started')
self.toggle_scan_button(False)
def scan_thread_done(self):
print('scan thread complete.')
self.toggle_scan_button(True)
def thread_complete(self):
print("THREAD COMPLETE.")
def find_files(self):
""" find files to build a dictionary out of """
worker = Worker(self.t_find_files)
# worker.signals.started.connect(self.toggle_scan_button(enable=False))
worker.signals.started.connect(self.scan_thread_started)
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.finished.connect(self.scan_thread_done)
# worker.signals.finished.connect(self.toggle_scan_button(enable=True))
worker.signals.progress.connect(self.progress_fn)
# Execute.
self.threadpool.start(worker)
def get_event(self):
event_name = self.eventName.text()
return event_name
def process_file(self,p):
""" gather information and add to dictionary """
path_name = os.path.dirname(p)
f_name = os.path.basename(p)
event = self.get_event()
c = self.config
log.debug(f'process_file({path_name}, {f_name}, {event}, {c})')
m = Media(os.path.join(path_name,f_name),event, c)
i = m.source_path_hash
log.debug(f'Source Path Hash: {i}')
self.files[i] = { 'folders': {}, 'date': {} }
self.files[i]['folders']['source_path'] = m.source_path_dir
self.files[i]['type'] = m.file_type
self.files[i]['name'] = m.file_name
self.files[i]['extension'] = m.file_ext
self.files[i]['date']['capture_date'] = {}
self.files[i]['date']['capture_date']['y'] = m.capture_date[0]
self.files[i]['date']['capture_date']['m'] = m.capture_date[1]
self.files[i]['date']['capture_date']['d'] = m.capture_date[2]
self.files[i]['folders']['destination'] = m.destination_path
self.files[i]['folders']['destination_original'] = m.destination_originals_path
self.file_list.addItem(f"{self.files[i]['folders']['source_path']}/{self.files[i]['name']}")
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()