I don't know the exact reason sometimes or manytimes the multiprocessing doesn't work as I expected. There are several reasons for this matter.
from multiprocessing import Pool, cpu_count
def _mp_midi2corpus(self, file_path):
# analyze: chords, global_bpm
midi_obj = self._analyze(file_path)
# midi2corpus
corpus = self._midi2corpus(midi_obj)
# Save the corpus directly in the worker process.
filename = file_path.stem + ".pkl" # Get the stem (filename without extension) of the original file path
save_path = Path("dataset/corpus") / f"corpus_{self.__class__.__name__}" / filename # Create a new Path object for saving
with save_path.open('wb') as f:
pickle.dump(corpus, f)
return "success"
except (OSError, EOFError, ValueError, KeyError) as e:
return "error"
except AssertionError as e:
if str(e) == "No time_signature_changes":
return "error"
elif str(e) == "Measure duration error":
# print("Measure duration error", file_path)
return "error"
print("Other Assertion Error", str(e), file_path)
return "error"
except Exception as e:
return "error"
def _prepare_corpus(self, make_corpus):
if make_corpus:
print("preprocessing midi data to corpus data")
# check the corpus folder is already exist and make it if not
Path("dataset/corpus").mkdir(parents=True, exist_ok=True)
Path(f"dataset/corpus/corpus_{self.__class__.__name__}").mkdir(parents=True, exist_ok=True)
start_time = time.time()
# multi-processing
broken_counter = 0
success_counter = 0
with Pool(cpu_count()) as p:
for message in tqdm(p.imap(self._mp_midi2corpus, self.midi_list), total=len(self.midi_list)):
if message == "error":
broken_counter += 1
elif message == "success":
success_counter += 1
print(f"MIDI data preprocessing takes: {time.time() - start_time}s, {success_counter} samples collected, {broken_counter} broken.")
# load corpus
print("preprocessed corpus data is being loaded")
corpus_list = sorted(list(Path(f"dataset/corpus/corpus_{self.__class__.__name__}").rglob("*.pkl")))
self.corpus = {filepath_name:corpus for filepath_name, corpus in tqdm(map(self._mp_corpus_loader, corpus_list), total=len(corpus_list))}
use single core process, but aplly it on several different screen
def _make_events(self, make_events):
self.encoding_function will be changed according to the encoding_scheme
if make_events:
print("preprocessing corpus data to events data")
# check the events folder is already exist and make it if not
Path(f"dataset/{self.encoding_scheme}_events").mkdir(parents=True, exist_ok=True)
Path(f"dataset/{self.encoding_scheme}_events/events_{self.__class__.__name__}").mkdir(parents=True, exist_ok=True)
start_time = time.time()
self.data = {}
# single-processing
iter_list = list(self.corpus.items())
for filepath_name, event in tqdm(map(self._mp_event_maker, iter_list), total=len(self.corpus)):
self.data[filepath_name] = event
with open(Path(f"dataset/{self.encoding_scheme}_events/events_{self.__class__.__name__}/{filepath_name}"), 'wb') as f:
pickle.dump(event, f)
print(f"taken time for making events is {time.time()-start_time}")