[Linux]#5 multiprocessing

Clay Ryu's sound lab·2023년 12월 5일
0

Framework

목록 보기
37/49

Two method of multiprocessing

1. using multiprocessing library

I don't know the exact reason sometimes or manytimes the multiprocessing doesn't work as I expected. There are several reasons for this matter.

  • I/O Bound vs CPU Bound: If your program is I/O bound, meaning it spends most of its time waiting for I/O operations like disk or network operations to complete, multiprocessing won't speed it up much. In fact, it might even slow it down due to the overhead of creating processes and communicating between them.
  • Shared Resources: If your processes need to access a shared resource, they might have to wait for other processes to release the resource. This can cause them to execute sequentially.
  • Improper Use of Multiprocessing: If you're not using the multiprocessing module correctly, it can lead to sequential execution. For instance, if you're joining on a process immediately after starting it, this will cause your program to wait for that process to finish before moving on to the next one, leading to sequential execution.
from multiprocessing import Pool, cpu_count

  def _mp_midi2corpus(self, file_path):
    try:
      # analyze: chords, global_bpm
      midi_obj = self._analyze(file_path)
      # midi2corpus
      corpus = self._midi2corpus(midi_obj)
      # Save the corpus directly in the worker process.
      filename = file_path.stem + ".pkl"  # Get the stem (filename without extension) of the original file path
      save_path = Path("dataset/corpus") / f"corpus_{self.__class__.__name__}" / filename  # Create a new Path object for saving
      with save_path.open('wb') as f:
        pickle.dump(corpus, f)
      return "success"
    except (OSError, EOFError, ValueError, KeyError) as e:
      print(file_path)
      traceback.print_exc(limit=0)
      print()
      return "error"
    except AssertionError as e:
      if str(e) == "No time_signature_changes":
          return "error"
      elif str(e) == "Measure duration error":
          # print("Measure duration error", file_path)
          return "error"
      else:
          print("Other Assertion Error", str(e), file_path)
          return "error"
    except Exception as e:
      print(file_path)
      traceback.print_exc(limit=0)
      print()
      return "error"

  def _prepare_corpus(self, make_corpus):
    if make_corpus:
      print("preprocessing midi data to corpus data")
      # check the corpus folder is already exist and make it if not
      Path("dataset/corpus").mkdir(parents=True, exist_ok=True)
      Path(f"dataset/corpus/corpus_{self.__class__.__name__}").mkdir(parents=True, exist_ok=True)
      start_time = time.time()
      # multi-processing
      broken_counter = 0
      success_counter = 0
      with Pool(cpu_count()) as p:
        for message in tqdm(p.imap(self._mp_midi2corpus, self.midi_list), total=len(self.midi_list)):
          if message == "error":
            broken_counter += 1
          elif message == "success":
            success_counter += 1
      print(f"MIDI data preprocessing takes: {time.time() - start_time}s, {success_counter} samples collected, {broken_counter} broken.")
      # load corpus
      print("preprocessed corpus data is being loaded")
      corpus_list = sorted(list(Path(f"dataset/corpus/corpus_{self.__class__.__name__}").rglob("*.pkl")))
      self.corpus = {filepath_name:corpus for filepath_name, corpus in tqdm(map(self._mp_corpus_loader, corpus_list), total=len(corpus_list))}

2. using screen

use single core process, but aplly it on several different screen

  def _make_events(self, make_events):
    '''
    self.encoding_function will be changed according to the encoding_scheme 
    '''
    if make_events:
      print("preprocessing corpus data to events data")
      # check the events folder is already exist and make it if not
      Path(f"dataset/{self.encoding_scheme}_events").mkdir(parents=True, exist_ok=True)
      Path(f"dataset/{self.encoding_scheme}_events/events_{self.__class__.__name__}").mkdir(parents=True, exist_ok=True)
      start_time = time.time()
      self.data = {}
      # single-processing
      iter_list = list(self.corpus.items())
      for filepath_name, event in tqdm(map(self._mp_event_maker, iter_list), total=len(self.corpus)):
        self.data[filepath_name] = event
        with open(Path(f"dataset/{self.encoding_scheme}_events/events_{self.__class__.__name__}/{filepath_name}"), 'wb') as f:
          pickle.dump(event, f)
      print(f"taken time for making events is {time.time()-start_time}")
profile
chords & code // harmony with structure

0개의 댓글