BitMover/BitMover_ui.py

542 lines
22 KiB
Python
Executable File

#!/usr/bin/env python
import os
import sys
from os import path, rename
from PyQt6.QtCore import QThreadPool
from PyQt6.QtGui import QIcon, QPixmap
from PyQt6.QtWidgets import QMainWindow, QApplication, QFileDialog
from _BitMover_MainWindow import Ui_MainWindow
from _import_dialog import DialogImport
from _configure import CONFIG_FILE, Configure
from _file_stuff import is_file, create_folder, path_exists, cmp_files
from _hashing import xx_hash
from _img_preview import ImgPreview
from _lumberjack import timber
from _media import Media
from _thread_my_stuff import Worker
log = timber(__name__)
basedir = os.path.dirname(__file__)
# TODO: verify source dir actually exists
# Subclass QMainWindow to customize your application's main window
class MainWindow(QMainWindow, Ui_MainWindow):
def __init__(self, *args, **kwargs):
super(MainWindow,self).__init__(*args,**kwargs)
self.setupUi(self)
self.setWindowTitle("BitMover")
self.setWindowIcon(QIcon(os.path.join(basedir,'assets', 'forklift.ico')))
self.threadpool = QThreadPool()
c = Configure(CONFIG_FILE)
self.config = c.load_config()
self.src_dir = self.config['folders']['source']['base']
self.dst_dir = self.config['folders']['destination']['base']
self.verify_checksum = self.config['verify_checksum']
self.cleanup_files = self.config['cleanup_sd']
self.store_originals = self.config['store_originals']
self.file_types = self.config['file_types']
# File Stuff
self.total_files = 0
self.file_total = 0
self.files = {}
self.imp_dialog = DialogImport()
self.widgets_config()
def widgets_config(self):
# Button Setup
self.pushButton_src_browse.clicked.connect(self.select_src_directory)
self.pushButton_dst_browse.clicked.connect(self.select_dst_directory)
self.pushButton_3_scan_dir.clicked.connect(self.find_files)
self.pushButton_import.clicked.connect(self.import_files)
# Initialize widgets
self.lineEdit_src_dir.setText(self.src_dir)
self.lineEdit_dst_dir.setText(self.dst_dir)
self.toggle_scan_button(True)
self.toggle_import_button(False)
self.lcd_files_found.display(int(0))
self.set_progress_processing(0)
self.set_progress_importing(0)
self.set_progress_current_file(0)
self.img_preview.setPixmap(QPixmap(os.path.join(basedir,
'assets',
'preview_placeholder.jpg')))
self.img_preview.setScaledContents(True)
self.file_list.currentItemChanged.connect(self.index_changed)
self.checkBox_verify_checksum.setChecked(self.verify_checksum)
self.checkBox_cleanup_files.setChecked(self.cleanup_files)
self.checkBox_store_originals.setChecked(self.store_originals)
self.checkBox_verify_checksum.checkStateChanged.connect(self.verify_checksum_changed)
self.checkBox_cleanup_files.checkStateChanged.connect(self.cleanup_files_changed)
self.checkBox_store_originals.checkStateChanged.connect(self.store_originals_changed)
self.clear_metadata()
# Setup thread pool
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
def verify_checksum_changed(self):
if self.checkBox_verify_checksum.isChecked():
self.config['verify_checksum'] = True
else:
self.config['verify_checksum'] = False
print(f"verify_checksums: {self.config['verify_checksums']}")
def cleanup_files_changed(self):
if self.checkBox_cleanup_files.isChecked():
self.config['cleanup_sd'] = True
else:
self.config['cleanup_sd'] = False
print(f"cleanup_sd: {self.config['cleanup_sd']}")
def store_originals_changed(self):
if self.checkBox_store_originals.isChecked():
self.config['store_originals'] = True
else:
self.config['store_originals'] = False
print(f"store_originals: {self.config['store_originals']}")
def toggle_scan_button(self,enable=True):
self.pushButton_3_scan_dir.setEnabled(enable)
def toggle_import_button(self,enable=True):
self.pushButton_import.setEnabled(enable)
def update_preview(self,i):
preview = ImgPreview(file=i.text(), event=self.get_event(), config=self.config)
self.l_meta_content_date_time_c.setText(preview.dtc)
path_hash = preview.path_hash
self.l_data_file_source_path.setText(
self.files[path_hash]['folders']['source_path'])
self.l_data_file_dest_path.setText(
self.files[path_hash]['folders']['destination'])
self.img_preview.setPixmap(QPixmap(preview.thumbnail))
self.img_preview.setFixedHeight(self.img_preview.width() / preview.thumbnail_ratio)
self.update_metadata(preview)
def update_metadata(self,preview):
self.clear_metadata()
if preview.file_type == 'image':
self.l_meta_01.setText('Size')
self.l_meta_02.setText('dpi')
self.l_meta_03.setText('ISO')
self.l_meta_04.setText('Lens')
self.l_meta_05.setText('Focal Length')
self.l_meta_06.setText('Camera')
self.l_meta_07.setText('Aperture')
self.l_meta_08.setText('Megapixels')
self.l_meta_content_01.setText(str(preview.size))
self.l_meta_content_02.setText(str(preview.dpi))
self.l_meta_content_03.setText(str(preview.iso))
self.l_meta_content_04.setText(str(preview.lens))
self.l_meta_content_05.setText(str(preview.zoom))
self.l_meta_content_06.setText(str(preview.camera))
self.l_meta_content_07.setText(str(preview.aperture))
self.l_meta_content_08.setText(str(preview.mpixels))
elif preview.file_type == 'video':
self.l_meta_01.setText('Size')
self.l_meta_02.setText('Frames / Second')
self.l_meta_03.setText('Bit Depth')
self.l_meta_04.setText('Duration')
self.l_meta_05.setText('Encoder')
self.l_meta_06.setText('Codec')
self.l_meta_07.setText('Profile')
self.l_meta_08.setText('Pix Format')
self.l_meta_content_01.setText(str(preview.size))
self.l_meta_content_02.setText(str(preview.video_framerate))
self.l_meta_content_03.setText(str(preview.video_bit_depth))
self.l_meta_content_04.setText(str(preview.video_duration))
self.l_meta_content_05.setText(str(preview.video_encoding))
self.l_meta_content_06.setText(str(preview.video_codec))
self.l_meta_content_07.setText(str(preview.video_profile))
self.l_meta_content_08.setText(str(preview.video_pix_format))
def clear_metadata(self):
self.l_meta_01.setText('')
self.l_meta_02.setText('')
self.l_meta_03.setText('')
self.l_meta_04.setText('')
self.l_meta_05.setText('')
self.l_meta_06.setText('')
self.l_meta_07.setText('')
self.l_meta_08.setText('')
self.l_meta_content_01.setText('')
self.l_meta_content_02.setText('')
self.l_meta_content_03.setText('')
self.l_meta_content_04.setText('')
self.l_meta_content_05.setText('')
self.l_meta_content_06.setText('')
self.l_meta_content_07.setText('')
self.l_meta_content_08.setText('')
def index_changed(self,i):
self.clear_metadata()
if i is None:
self.img_preview.setPixmap(QPixmap(os.path.join(basedir,
'assets',
'preview_placeholder.jpg')))
else:
self.update_preview(i)
def select_src_directory(self):
directory = QFileDialog.getExistingDirectory(self,
"Select Directory",
self.src_dir)
if directory:
print("Selected Directory:", directory)
# path = Path(directory)
self.src_dir = directory
self.lineEdit_src_dir.setText(self.src_dir)
def select_dst_directory(self):
directory = QFileDialog.getExistingDirectory(self,
"Select Directory",
self.dst_dir)
if directory:
print("Selected Directory:", directory)
# path = Path(directory)
self.dst_dir = directory
self.lineEdit_dst_dir.setText(self.dst_dir)
def set_total_files(self,t):
self.total_files = t
self.lcd_files_found.display(self.total_files)
def set_imported_files(self,t):
self.lcd_files_imported.display(t)
def set_progress_processing(self, n):
# print("%d%% done" % n)
self.progressBar_processing.setValue(int(n))
self.lcd_processing_progress.display(n)
def set_progress_importing(self, n):
# print("%d%% done" % n)
self.progressBar_importing.setValue(int(n))
self.lcd_import_progress.display(n)
def set_progress_current_file(self, n):
# print("%d%% done" % n)
self.progressBar_importing_2.setValue(int(n))
self.lcd_current_file_progress.display(n)
def get_t_files(self,search_types):
for folder, subfolders, filename in os.walk(self.src_dir):
for f_type in search_types:
for ext in self.file_types[f_type]:
for file in filename:
if file.lower().endswith(ext):
current_file = os.path.join(folder, file)
if is_file(current_file):
self.file_total += int(1)
else:
print(f"Skipping {current_file} as it does not look like a real file.")
def t_find_files(self,
progress_callback,
import_progress_callback,
current_file_progress_callback,):
file_count = int(0)
search_types = []
if self.checkBox_search_for_images.isChecked():
search_types.append('image')
if self.checkBox_search_for_video.isChecked():
search_types.append('video')
if self.checkBox_search_for_audio.isChecked():
search_types.append('audio')
self.get_t_files(search_types)
self.set_total_files(self.file_total)
if len(search_types) > 0:
for folder, subfolders, filename in os.walk(self.src_dir):
for f_type in search_types:
for ext in self.file_types[f_type]:
for file in filename:
if file.lower().endswith(ext):
current_file = os.path.join(folder, file)
if is_file(current_file):
file_count += int(1)
self.process_file(current_file)
else:
print(f"Skipping {current_file} as it does not look like a real file.")
progress_callback.emit(round((file_count / self.file_total) * 100, 0))
else:
print("Nothing to search for.")
return "Done."
def t_copy_files(self,
progress_callback,
import_progress_callback,
current_file_progress_callback):
""" Copy Files. """
# imp_dialog = DialogImport()
count = int(0)
chunk_size = 16 * 1024
for file in self.files:
self.imp_dialog.set_importing_file(path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name']))
create_folder(self.files[file]['folders']['destination'])
if self.files[file]['type'] == 'video':
chunk_size = (1024 * 1024) * 5
file_exists = path_exists(path.join(
self.files[file]['folders']['destination'],
self.files[file]['name']))
if file_exists is True:
check_match = cmp_files(
path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name']),
path.join(
self.files[file]['folders']['destination'],
self.files[file]['name']
)
)
if check_match is False:
print(f'\nFound duplicate for {self.files[file]["folders"]["source_path"]}/{self.files[file]["name"]}, renaming destination with hash appended.')
base, extension = path.splitext(self.files[file]['name'])
f_xxhash = xx_hash(path.join(
self.files[file]['folders']['destination'],
self.files[file]['name']))
file_name_hash = base + '_' + f_xxhash + extension
rename(path.join(
self.files[file]['folders']['destination'],
self.files[file]['name']),
path.join(
self.files[file]['folders']['destination'],
file_name_hash))
size = path.getsize(
path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name'])
)
with open(path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name']), 'rb') as fs:
with open(
path.join(
self.files[file]['folders']['destination'],
self.files[file]['name']), 'wb') as fd:
while True:
chunk = fs.read(chunk_size)
if not chunk:
break
fd.write(chunk)
dst_size = path.getsize(
path.join(
self.files[file]['folders']['destination'],
self.files[file]['name']
)
)
current_file_progress_callback.emit(round(( dst_size / size ) * 100, 1))
if self.config['store_originals'] is True:
if self.files[file]['type'] == 'image':
create_folder(self.files[file]['folders']['destination_original'])
file_exists = path_exists(path.join(
self.files[file]['folders']['destination_original'],
self.files[file]['name']))
if file_exists is True:
check_match = cmp_files(
path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name']),
path.join(
self.files[file]['folders']['destination_original'],
self.files[file]['name']
)
)
if check_match is False:
print(f'\nFound duplicate for {self.files[file]["folders"]["source_path"]}/{self.files[file]["name"]}, renaming destination with hash appended.')
base, extension = path.splitext(self.files[file]['name'])
f_xxhash = xx_hash(path.join(
self.files[file]['folders']['destination_original'],
self.files[file]['name']))
file_name_hash = base + '_' + f_xxhash + extension
rename(path.join(
self.files[file]['folders']['destination_original'],
self.files[file]['name']),
path.join(
self.files[file]['folders']['destination_original'],
file_name_hash))
size = path.getsize(
path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name'])
)
with open(path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name']), 'rb') as fs:
with open(
path.join(
self.files[file]['folders']['destination_original'],
self.files[file]['name']), 'wb') as fd:
while True:
chunk = fs.read(chunk_size)
if not chunk:
break
fd.write(chunk)
dst_size = path.getsize(
path.join(
self.files[file]['folders']['destination_original'],
self.files[file]['name']
)
)
current_file_progress_callback.emit(round((dst_size / size) * 100, 1))
count += 1
self.set_imported_files(count)
import_progress_callback.emit(round((count / self.file_total) * 100, 1))
self.imp_dialog.add_to_imported_list(
path.join(
self.files[file]['folders']['source_path'],
self.files[file]['name']))
@staticmethod
def print_output(s):
print(s)
def worker_thread_started(self):
print('scan thread started')
self.toggle_scan_button(False)
self.toggle_import_button(False)
def worker_thread_done(self):
print('scan thread complete.')
self.toggle_scan_button(True)
if 0 < len(self.files):
self.toggle_import_button(True)
else:
self.toggle_import_button(False)
@staticmethod
def thread_complete():
print("THREAD COMPLETE.")
def find_files(self):
""" find files to build a dictionary out of """
# Initialize widgets
self.lcd_files_found.display(int(0))
self.set_progress_processing(0)
self.set_progress_importing(0)
self.img_preview.setPixmap(QPixmap(os.path.join(basedir,
'assets',
'preview_placeholder.jpg')))
self.file_list.clear()
# File Stuff
self.total_files = 0
self.file_total = 0
self.files = {}
worker = Worker(self.t_find_files)
worker.signals.started.connect(self.worker_thread_started)
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.finished.connect(self.worker_thread_done)
worker.signals.progress.connect(self.set_progress_processing)
# Execute.
self.threadpool.start(worker)
def import_files(self):
"""
Import found files
"""
# Open Dialog
# imp_dialog = DialogImport()
# imp_dialog.open_import_dialog()
# Initialize Widgets
self.lcd_files_imported.display(int(0))
self.set_progress_importing(0)
self.set_progress_current_file(0)
self.imp_dialog.set_progress_importing(0)
self.imp_dialog.set_progress_current_file(0)
worker = Worker(self.t_copy_files)
worker.signals.started.connect(self.worker_thread_started)
worker.signals.started.connect(self.imp_dialog.open_import_dialog)
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.finished.connect(self.worker_thread_done)
worker.signals.import_progress.connect(self.set_progress_importing)
worker.signals.import_progress.connect(self.imp_dialog.set_progress_importing)
worker.signals.current_file_progress.connect(self.set_progress_current_file)
worker.signals.current_file_progress.connect(self.imp_dialog.set_progress_current_file)
# Execute
self.threadpool.start(worker)
def get_event(self):
event_name = self.eventName.text()
return event_name
def process_file(self,p):
""" gather information and add to dictionary """
path_name = os.path.dirname(p)
f_name = os.path.basename(p)
event = self.get_event()
c = self.config
# print(f'process_file({path_name}, {f_name}, {event}, {c})')
m = Media(os.path.join(path_name,f_name),event, c)
i = m.source_path_hash
# log.debug(f'Source Path Hash: {i}')
self.files[i] = {
'folders': {},
'date': {},
'event': {}
}
self.files[i]['folders']['source_path'] = m.source_path_dir
self.files[i]['type'] = m.file_type
self.files[i]['name'] = m.file_name
self.files[i]['extension'] = m.file_ext
self.files[i]['date']['capture_date'] = {}
self.files[i]['date']['capture_date']['y'] = m.capture_date[0]
self.files[i]['date']['capture_date']['m'] = m.capture_date[1]
self.files[i]['date']['capture_date']['d'] = m.capture_date[2]
self.files[i]['folders']['destination'] = m.destination_path
self.files[i]['folders']['destination_original'] = m.destination_originals_path
self.files[i]['event']['name'] = m.event_name
self.file_list.addItem(f"{self.files[i]['folders']['source_path']}/{self.files[i]['name']}")
app = QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec()