Module jumpscale.tools.poolexecutor.poolexecutor
Expand source code
import gevent
class Job:
def __init__(self, fun, *args, **kwargs):
self.fun = fun
self.args = args
self.kwargs = kwargs
class PoolExecutor:
def __init__(self):
self.jobs = []
def __enter__(self):
return self
def __exit__(self, type, value, tb):
pass
def task_add(self, fun, *args, **kwargs):
self.jobs.append(Job(fun, *args, **kwargs))
def run(self, die=True):
try:
greenlets = [gevent.spawn(job.fun, *job.args, **job.kwargs) for job in self.jobs]
# print("greenlets: ", greenlets)
gevent.joinall(greenlets, raise_error=die)
except Exception as e:
self.jobs = []
raise e
else:
self.jobs = []
return greenlets
def results(self, greenlets):
return [greenlet.value for greenlet in greenlets]
# def test_simple(self):
# with j.tools.poolexecutor.PoolExecutor() as p:
# for i in range(5):
# p.task_add(sleepf, i, name="fun{}".format(i))
# gs = p.run()
# p.results(gs)
# def sleepf(howlong, name="fun"):
# print("{} is sleeping for {}".format(name, howlong))
# for i in range(howlong):
# print("{} is sleeping slept for {}".format(name, howlong - i))
# gevent.sleep(i)
# for i in range(5):
# self.task_add(sleepf, i, name="fun{}".format(i))
# self.run()
# def test_with_errors(self):
# def sleepf(howlong, name="fun"):
# print("{} is sleeping for {}".format(name, howlong))
# for i in range(howlong):
# print("{} is sleeping slept for {}".format(name, howlong - i))
# gevent.sleep(i)
# def sleepf_with_error(howlong, name="fun"):
# print("{} is sleeping for {}".format(name, howlong))
# for i in range(howlong):
# print("{} is sleeping slept for {}".format(name, howlong - i))
# gevent.sleep(i)
# raise RuntimeError("error here in sleepf_with_error")
# for i in range(5):
# self.task_add(sleepf, i, name="fun{}".format(i))
# self.task_add(sleepf_with_error, i, name="error_fun")
# try:
# self.run()
# except:
# print("run has a function that raises and we caught it.")
# def test_with_results(self):
# def sleepf(howlong, name="fun"):
# print("{} is sleeping for {}".format(name, howlong))
# for i in range(howlong):
# print("{} is sleeping slept for {}".format(name, howlong - i))
# gevent.sleep(i)
# return 7
# for i in range(5):
# self.task_add(sleepf, i, name="fun{}".format(i))
# greenlets = self.run()
# results = [greenlet.value for greenlet in greenlets]
# assert all(map(lambda x: x == 7, results)) == True
# print(results)
# def test(self):
# for f in [self.test_simple, self.test_with_results, self.test_with_errors]:
# f()
Classes
class Job (fun, *args, **kwargs)
-
Expand source code
class Job: def __init__(self, fun, *args, **kwargs): self.fun = fun self.args = args self.kwargs = kwargs
class PoolExecutor
-
Expand source code
class PoolExecutor: def __init__(self): self.jobs = [] def __enter__(self): return self def __exit__(self, type, value, tb): pass def task_add(self, fun, *args, **kwargs): self.jobs.append(Job(fun, *args, **kwargs)) def run(self, die=True): try: greenlets = [gevent.spawn(job.fun, *job.args, **job.kwargs) for job in self.jobs] # print("greenlets: ", greenlets) gevent.joinall(greenlets, raise_error=die) except Exception as e: self.jobs = [] raise e else: self.jobs = [] return greenlets def results(self, greenlets): return [greenlet.value for greenlet in greenlets] # def test_simple(self): # with j.tools.poolexecutor.PoolExecutor() as p: # for i in range(5): # p.task_add(sleepf, i, name="fun{}".format(i)) # gs = p.run() # p.results(gs) # def sleepf(howlong, name="fun"): # print("{} is sleeping for {}".format(name, howlong)) # for i in range(howlong): # print("{} is sleeping slept for {}".format(name, howlong - i)) # gevent.sleep(i) # for i in range(5): # self.task_add(sleepf, i, name="fun{}".format(i)) # self.run() # def test_with_errors(self): # def sleepf(howlong, name="fun"): # print("{} is sleeping for {}".format(name, howlong)) # for i in range(howlong): # print("{} is sleeping slept for {}".format(name, howlong - i)) # gevent.sleep(i) # def sleepf_with_error(howlong, name="fun"): # print("{} is sleeping for {}".format(name, howlong)) # for i in range(howlong): # print("{} is sleeping slept for {}".format(name, howlong - i)) # gevent.sleep(i) # raise RuntimeError("error here in sleepf_with_error") # for i in range(5): # self.task_add(sleepf, i, name="fun{}".format(i)) # self.task_add(sleepf_with_error, i, name="error_fun") # try: # self.run() # except: # print("run has a function that raises and we caught it.") # def test_with_results(self): # def sleepf(howlong, name="fun"): # print("{} is sleeping for {}".format(name, howlong)) # for i in range(howlong): # print("{} is sleeping slept for {}".format(name, howlong - i)) # gevent.sleep(i) # return 7 # for i in range(5): # self.task_add(sleepf, i, name="fun{}".format(i)) # greenlets = self.run() # results = [greenlet.value for greenlet in greenlets] # assert all(map(lambda x: x == 7, results)) == True # print(results) # def test(self): # for f in [self.test_simple, self.test_with_results, self.test_with_errors]: # f()
Methods
def results(self, greenlets)
-
Expand source code
def results(self, greenlets): return [greenlet.value for greenlet in greenlets]
def run(self, die=True)
-
Expand source code
def run(self, die=True): try: greenlets = [gevent.spawn(job.fun, *job.args, **job.kwargs) for job in self.jobs] # print("greenlets: ", greenlets) gevent.joinall(greenlets, raise_error=die) except Exception as e: self.jobs = [] raise e else: self.jobs = [] return greenlets
def task_add(self, fun, *args, **kwargs)
-
Expand source code
def task_add(self, fun, *args, **kwargs): self.jobs.append(Job(fun, *args, **kwargs))