Source code for pyacs.gts.Sgts_methods.gts_mp_ipyparallel

# def gts_mp(self, method, *args, **kwargs):
#     """
#     Sgts.gts_mp is the parallelized version of Sgts.gts, using ipyparallel

#     :param method: the Gts method to be applied to all time series (gts) of the current Sgts
#     :param *args, **kwargs: arguments of method to be used when calling method
#     :param ncpu: ncpu is an additional kwarg to define the number of CPU used. default is 4

#     :return: the new Sgts including the gts processed by method
#     """
#     from pyacs.gts.Sgts import Sgts
#     from tqdm import tqdm

#     import logging
#     import pyacs.message.message as MESSAGE
#     import pyacs.message.verbose_message as VERBOSE
#     import pyacs.message.error as ERROR
#     import pyacs.message.warning as WARNING
#     import pyacs.message.debug_message as DEBUG
#     import pyacs.debug

#     import ipyparallel as ipp
#     from icecream import ic

#     import inspect
#     VERBOSE("Running Sgts.%s" % inspect.currentframe().f_code.co_name)

#     sgts = self
#     n = self.n()

#     # get func

#     import importlib
#     import inspect
#     import os

#     try:
#         func = getattr(self.__dict__[self.lcode()[0]], method)
#     except:
#         ERROR(("There is no Gts method called %s. Exit" % method), exit=True)

#     # get func path
#     ldir = []
#     OK = False
#     for cdir in inspect.getfile(func).split('/'):
#         if OK:
#             ldir.append(cdir)
#         if cdir == 'gts':
#             OK = True
#     ldir.pop()

#     str_4_import = 'pyacs.gts.' + '.'.join(ldir) + '.' + method
#     mod = importlib.import_module(str_4_import)
#     func = getattr(mod, method)

#     arg = [args, kwargs]

#     verbose = kwargs.get('verbose', False)
#     ncpu = kwargs.get('ncpu', 4)

#     if 'ncpu' in kwargs.keys():
#         kwargs.pop('ncpu')

#     MESSAGE("Will run %s method on %d Gts using %d CPU" % (method, sgts.n(), ncpu))

#     def worker_wrapper(gts, arg):
#         [args, kwargs] = arg
#         #return func(gts, *args, **kwargs)

#         try:
#             return func(gts, *args, **kwargs)
#         except:
#             gts.data = None
#             return gts

#     # request a cluster
#     with ipp.Cluster(n=ncpu, monitor_engines=True, log_level=40) as rc:
#     #with ipp.Cluster(n=ncpu) as rc:
#         # get a view on the cluster
#         view = rc.load_balanced_view()
#         # submit the tasks
#         asyncresult = view.map_async(worker_wrapper, sgts.lGts(), [arg] * n)
#         # wait interactively for results
#         asyncresult.wait_interactive()
#         # retrieve actual results
#         result = asyncresult.get()
#     # at this point, the cluster processes have been shutdown

#     # get results
#     new_ts = Sgts(read=False)
#     for gts in result:
#         if gts is not None:
#             new_ts.append(gts)

#     # print missing
#     lgood_code = new_ts.lcode(force_4_digit=True)
#     for code in sorted(sgts.lcode()):
#         if code not in lgood_code:
#             ERROR(("Problem while applying %s to %s. Skipped from output Sgts." % (method, code)))

#     return (new_ts)
[docs] def gts_mp_ipyparallel(self, method, *args, **kwargs): """Parallel version of Sgts.gts using ipyparallel with fault-tolerant recovery. Deprecated. Parameters ---------- method : str Gts method to apply. *args : tuple Positional arguments for the method. **kwargs : dict Keyword arguments; ncpu (int) sets number of CPUs (default 4). Returns ------- Sgts New Sgts with processed Gts. """ from pyacs.gts.Sgts import Sgts import pyacs.message.message as MESSAGE import pyacs.message.error as ERROR import pyacs.message.warning as WARNING import pyacs.message.verbose_message as VERBOSE from ipyparallel import Cluster from ipyparallel.error import EngineError import traceback import importlib import inspect VERBOSE("Running Sgts.gts_mp") sgts = self n = self.n() ncpu = kwargs.pop('ncpu', 4) verbose = kwargs.get('verbose', False) # Get the method try: func = getattr(self.__dict__[self.lcode()[0]], method) except: ERROR(f"No such method: {method}", exit=True) # Resolve correct import path for func ldir = [] OK = False for cdir in inspect.getfile(func).split('/'): if OK: ldir.append(cdir) if cdir == 'gts': OK = True ldir.pop() str_4_import = 'pyacs.gts.' + '.'.join(ldir) + '.' + method mod = importlib.import_module(str_4_import) func = getattr(mod, method) arg = [args, kwargs] MESSAGE(f"Will run {method} on {n} Gts using {ncpu} CPU(s)") def worker_wrapper(gts, arg): [args, kwargs] = arg #return func(gts, *args, **kwargs) try: return func(gts, *args, **kwargs) except: gts.data = None return gts new_ts = Sgts(read=False) with Cluster(n=ncpu, monitor_engines=True) as rc: view = rc.load_balanced_view() asyncresult = view.map_async(worker_wrapper, sgts.lGts(), [arg] * n) print("⏳ Waiting for tasks...") asyncresult.wait(timeout=600) partial_results = [] try: partial_results = asyncresult.get() except EngineError as ee: ERROR(f"💥 Engine crash: {ee}") # Attempt to retrieve what we can for i in range(len(sgts.lGts())): try: partial_results.append(asyncresult[i]) except Exception as e: ERROR(f"⚠️ Task {i} retrieval failed: {type(e).__name__}: {e}") partial_results.append(None) except Exception as e: ERROR(f"❌ Unexpected exception: {e}") # partial_results = [None] * len(sgts.lGts()) # Attempt to retrieve what we can for i in range(len(sgts.lGts())): try: partial_results.append(asyncresult[i]) except Exception as e: ERROR(f"⚠️ Task {i} retrieval failed: {type(e).__name__}: {e}") partial_results.append(None) for res in partial_results: if isinstance(res, tuple) and len(res) == 3: code, gts_result, error = res if gts_result is None: ERROR(f"⚠️ {code} failed:\n{error}") else: new_ts.append(gts_result) elif res is not None: new_ts.append(res) else: ERROR(f"⚠️ Missing result (engine crash or task failure)") # Collect successful results for gts_result in partial_results: if gts_result is not None: new_ts.append(gts_result) else: ERROR(f"⚠️ Failed to process a Gts") # Check missing outputs lpb = [] lgood_code = new_ts.lcode(force_4_digit=True) for code in sorted(sgts.lcode()): if code not in lgood_code: ERROR(f"❌ Missing output for {code} — skipped in final Sgts.") lpb.append(code) # Check for inconsistencies between input and output Sgts if new_ts.n() != self.n(): ERROR(f"Number of Gts in output Sgts ({new_ts.n()}) does not match input Sgts ({self.n()}).") lcode_output = sorted([x[:4] for x in new_ts.lcode()]) for code in self.lcode(): if code not in lcode_output: WARNING(f"{code} is missing in output Sgts. Check why.") lpb.append(code) else: # Check for None values in l1trend_sts_aicc for code in self.lcode(): if new_ts.__dict__ is None: WARNING(f"{code} was not properly processed and is None.") lpb.append(code) else: if new_ts.__dict__[code].data is None: WARNING(f"{code} has None data in l1trend_sts_aicc") lpb.append(code) if len(lpb) > 0: WARNING(f"Number of missing output time series: {lpb}") else: MESSAGE("All time series were properly processed.") return new_ts.delnone()