Source code for fgivenx.parallel

import warnings
try:
    from tqdm import tqdm, tqdm_notebook
except ImportError:
    def tqdm(x, **kwargs): return x

    def tqdm_notebook(x, **kwargs): return x


try:
    PARALLEL = True
    from joblib import Parallel, delayed, cpu_count
except ImportError:
    PARALLEL = False

    class Parallel(object):
        def __init__(self, n_jobs=None): pass

        def __call__(self, x): return list(x)

    def delayed(x): return x

    def cpu_count(): return 1


[docs]def parallel_apply(f, array, **kwargs): """ Apply a function to an array with openmp parallelisation. Equivalent to `[f(x) for x in array]`, but parallelised if required. Parameters ---------- f: function Univariate function to apply to each element of array array: array-like Array to apply f to parallel: int or bool, optional int > 0: number of processes to parallelise over int < 0 or bool=True: use OMP_NUM_THREADS to choose parallelisation bool=False or int=0: do not parallelise tqdm_kwargs: dict, optional additional kwargs for tqdm progress bars. precurry: tuple, optional immutable arguments to pass to f before x, i.e. `[f(precurry,x) for x in array]` postcurry: tuple, optional immutable arguments to pass to f after x i.e. `[f(x,postcurry) for x in array]` Returns ------- list: `[f(precurry,x,postcurry) for x in array]` parallelised according to parallel """ precurry = tuple(kwargs.pop('precurry', ())) postcurry = tuple(kwargs.pop('postcurry', ())) parallel = kwargs.pop('parallel', False) tqdm_kwargs = kwargs.pop('tqdm_kwargs', {}) if kwargs: raise TypeError('Unexpected **kwargs: %r' % kwargs) try: # If running in a jupyter notebook then use tqdm_notebook. progress = tqdm_notebook if get_ipython().has_trait('kernel') else tqdm except (NameError, AssertionError): # Otherwise use regular tqdm progress bar progress = tqdm if not parallel: return [f(*(precurry + (x,) + postcurry)) for x in progress(array, **tqdm_kwargs)] elif parallel is True: parallel = cpu_count() elif isinstance(parallel, int): if parallel < 0: parallel = cpu_count() else: parallel = parallel else: raise ValueError("parallel keyword must be an integer or bool") if parallel and not PARALLEL: warnings.warn("You need to install the package joblib" "if you want to use parallelisation") return Parallel(n_jobs=parallel)(delayed(f)(*(precurry + (x,) + postcurry)) for x in progress(array, **tqdm_kwargs))