From d9c93c74b52b583aef213cd56b5f19e54ec44afb Mon Sep 17 00:00:00 2001 From: fanqienano <923641962@qq.com> Date: Mon, 13 Mar 2017 23:42:10 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=96=87=E4=BB=B6=E7=9B=AE?= =?UTF-8?q?=E5=BD=95=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ProcessManager.py | 48 ++++++++++++++++ TaskManager.py | 65 +-------------------- ThreadManager.py | 142 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 64 deletions(-) create mode 100644 ProcessManager.py create mode 100644 ThreadManager.py diff --git a/ProcessManager.py b/ProcessManager.py new file mode 100644 index 0000000..c960539 --- /dev/null +++ b/ProcessManager.py @@ -0,0 +1,48 @@ +#!/usr/bin/python +#coding=UTF-8 + +from multiprocessing import Process +from multiprocessing import JoinableQueue +import time +import random +from collections import OrderedDict +import inspect +import ctypes +import signal + +from TaskManager import TaskManager + +class TaskProcess(Process): + def __init__(self, func, *args, **kwargs): + super(TaskProcess, self).__init__() + self.func = func + self.args = args + self.callback = kwargs.get('callback', self.callback) + self.isFork = kwargs.get('fork', False) + self.timeout = kwargs.get('timeout', 0) + + def start(self): + if self.isFork: + super(TaskProcess, self).start() + + def run(self): + signal.signal(signal.SIGALRM, self.exceptionHandler) + signal.alarm(self.timeout) + self.func(*self.args) + self.callback(self) + + def exceptionHandler(self, signum, frame): + raise AssertionError + + def callback(self, t): + pass + +class ProcessManager(TaskManager): + def __init__(self, *args, **kwargs): + self.waitQueue = OrderedDict() + self.cancel = False + self.num = 5 + if 'num' in kwargs: + self.num = kwargs['num'] + self.threadQueue = {} + self.isRun = False diff --git a/TaskManager.py b/TaskManager.py index 91ba81f..5906012 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -1,77 +1,14 @@ #!/usr/bin/python #coding=UTF-8 -from multiprocessing import Process -from multiprocessing import JoinableQueue -from threading import Thread -from threading import Event -from threading import Timer import time import random from collections import OrderedDict -import inspect -import ctypes -import signal words = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' -class TimeoutException(Exception): - pass - -class TaskThread(Thread): - def __init__(self, func, *args, **kwargs): - super(TaskThread, self).__init__() - self.func = func - self.args = args - self.callback = kwargs.get('callback', self.callback) - self.excTimeout = kwargs.get('exc_timeout', None) - self.timeout = kwargs.get('timeout', None) - - def run(self): - if self.excTimeout is not None: - timeoutThread = Timer(self.excTimeout, self.exceptionHandler, (self.ident,)) - timeoutThread.start() - self.func(*self.args) - self.callback(self) - - def exceptionHandler(self, tid): - self.callback(self) - ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(TimeoutException)) - - def callback(self, t): - pass - -class TaskProcess(Process): - def __init__(self, func, *args, **kwargs): - super(TaskProcess, self).__init__() - self.func = func - self.args = args - self.callback = kwargs.get('callback', self.callback) - self.isFork = kwargs.get('fork', False) - self.timeout = kwargs.get('timeout', 0) - - def start(self): - if self.isFork: - super(TaskProcess, self).start() - - def run(self): - signal.signal(signal.SIGALRM, self.exceptionHandler) - signal.alarm(self.timeout) - self.func(*self.args) - self.callback(self) - - def exceptionHandler(self, signum, frame): - raise AssertionError - - def callback(self, t): - pass - class TaskManager(object): - def __init__(self, mode, *args, **kwargs): - if mode.lower() == 'process': - self.taskClass = TaskProcess - else: - self.taskClass = TaskThread + def __init__(self, *args, **kwargs): self.waitQueue = OrderedDict() self.cancel = False self.num = 5 diff --git a/ThreadManager.py b/ThreadManager.py new file mode 100644 index 0000000..db2e5bc --- /dev/null +++ b/ThreadManager.py @@ -0,0 +1,142 @@ +#!/usr/bin/python +#coding=UTF-8 + +from multiprocessing import Process +from multiprocessing import JoinableQueue +from threading import Thread +from threading import Event +from threading import Timer +import time +import random +from collections import OrderedDict +import inspect +import ctypes +import signal + +from TaskManager import TaskManager + +words = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' + +class TimeoutException(Exception): + pass + +class TaskThread(Thread): + def __init__(self, func, *args, **kwargs): + super(TaskThread, self).__init__() + self.func = func + self.args = args + self.callback = kwargs.get('callback', self.callback) + self.excTimeout = kwargs.get('exc_timeout', None) + self.timeout = kwargs.get('timeout', None) + + def run(self): + if self.excTimeout is not None: + timeoutThread = Timer(self.excTimeout, self.exceptionHandler, (self.ident,)) + timeoutThread.start() + self.func(*self.args) + self.callback(self) + + def exceptionHandler(self, tid): + self.callback(self) + ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(TimeoutException)) + + def callback(self, t): + pass + +class ThreadManager(TaskManager): + def __init__(self, *args, **kwargs): + self.waitQueue = OrderedDict() + self.cancel = False + self.num = 5 + if 'num' in kwargs: + self.num = kwargs['num'] + self.threadQueue = {} + self.isRun = False + + def callback(self, t): + try: + del self.threadQueue[t.name] + except KeyError: + pass + if self.isRun: + self.startTask() + + def start(self): + ''' + Start all tasks. + ''' + self.isRun = True + self.startTask() + + def hold(self): + ''' + No new tasks are pending. + ''' + self.isRun = False + + def wait(self, **kwargs): + ''' + Waiting for all tasks. + name: Waiting for the name of the task. + ''' + name = kwargs.get('name', None) + if name is None: + while len(self.threadQueue) > 0: + t = self.threadQueue.values()[0] + t.join(t.timeout) + self.callback(t) + else: + while True: + try: + t = self.threadQueue[name] + t.join(t.timeout) + self.callback(t) + break + except KeyError: + if name in self.waitQueue.keys() + self.threadQueue.keys(): + continue + else: + break + + def dismiss(self, **kwargs): + ''' + Dismiss all the not starting tasks. + name: Dismiss for the name of the task. + return Dismiss result. + ''' + name = kwargs.get('name', None) + if name is None: + self.waitQueue.clear() + return True + else: + try: + del self.waitQueue[name] + return True + except KeyError: + return False + return False + + def addTask(self, func, args = (), **kwargs): + ''' + add one task to queue. + kwargs: callback; timeout; exc_timeout; daemonic; + return task name. + ''' + name = kwargs.get('name', None) + kwargs['callback'] = kwargs.get('callback', self.callback) + if name is None: + name = ''.join(random.sample(words, 5)) + '_' + str(time.time()) + self.waitQueue[name] = (func, args, kwargs) + if self.isRun: + self.startTask() + return name + + def startTask(self): + while len(self.threadQueue) <= self.num and len(self.waitQueue) > 0: + item = self.waitQueue.popitem(0) + daemonic = item[1][2].get('daemonic', False) + t = TaskThread(item[1][0], *item[1][1], **item[1][2]) + self.threadQueue[t.name] = t + t.name = item[0] + t.setDaemon(daemonic) + t.start() \ No newline at end of file