Source code for processfiles.files
import ast
import os
from processfiles.filetools import write_to_file_with_retries, open_file_with_retries
from processfiles.timing import TimeTracker
[docs]class FileProcessTracker:
[docs] def __init__(self, folder=None, restart=False, file_types=('csv',)):
if folder is None:
self.folder = os.getcwd()
else:
self.folder = os.path.abspath(folder)
self.completed_list_path = os.path.join(self.folder, 'completed.txt')
if restart:
self.delete_completed_files()
self.restart = restart
self.load_completed_files()
self.load_process_files(file_types=file_types)
[docs] def file_generator(self):
timer = TimeTracker(self.folder, restart=self.restart)
num_items = len(self.process_list)
for file in self.process_list:
yield os.path.join(self.folder, file)
self.add_file_to_completed(file)
timer.time_estimate(num_items)
# time_estimate is end \r, so this cancels the next output from writing over the final time estimate
print('\n')
[docs] def add_file_to_completed(self, file):
self.completed_list.extend([file])
_update_completed_files(self.completed_list_path, self.completed_list)
[docs] def load_completed_files(self):
self.completed_list = _load_completed_files(self.completed_list_path)
[docs] def load_process_files(self, file_types):
self.process_list = _load_to_process_files(self.folder, self.completed_list, file_types)
[docs] def delete_completed_files(self):
_delete_completed_files(self.completed_list_path)
def _load_to_process_files(folder, completed_list, file_types):
files = _load_initial_file_list(folder, file_types)
return [file for file in files if file not in completed_list]
def _update_completed_files(completed_list_path, completed_list):
write_to_file_with_retries(completed_list_path, completed_list)
def _load_completed_files(completed_list_path):
# Not started yet, none completed
if not os.path.exists(completed_list_path):
return []
list_str = open_file_with_retries(completed_list_path)
completed_list = ast.literal_eval(list_str)
if not isinstance(completed_list, list):
raise ValueError('completed list file contains other than list')
return completed_list
def _load_initial_file_list(folder, file_types):
return [file for file in next(os.walk(folder))[2] if any([file.endswith(ending) for ending in file_types])]
def _delete_completed_files(completed_list_path):
if os.path.exists(completed_list_path):
os.remove(completed_list_path)