python多線程卡主

CSDN問答 2022-01-07 06:57:42 阅读数:695

python
# coding=utf-8import datetimeimport osimport sysimport reimport itertoolsimport getoptimport timefrom collections import OrderedDictimport subprocessimport getpassimport signalimport tracebackfrom Queue import Queuefrom Queue import Emptyfrom threading import Threadimport threadimport loggingdef exe_query(sql_str, dbname, gpport, gphost, gpuser): lst_of_col = [] rc = None out = None err = None CMD = "PGDATABASE=%s PGPORT=%s PGHOST=%s PGUSER=%s PGOPTIONS='-c optimizer=off -c client_encoding=UTF8' " % ( dbname, gpport, gphost, gpuser) CMD = CMD + "psql -R '%s' -tAXF '%s' -v ON_ERROR_STOP=1 <<END_OF_SQL \n" % ('\n', '@|$') CMD = CMD + sql_str + "\n" CMD = CMD + "END_OF_SQL" try: result = subprocess.Popen(CMD, shell = True, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = True) out, err = result.communicate() rc = result.returncode except Exception as e: Logger(1).log(e) finally: if rc != 0: Logger(1).log(sql_str + '==>' + err) return err, rc else: lst_of_line = out.strip().split('\n', -1) for line in lst_of_line: lst_of_col.append(line.split('@|$', -1)) return lst_of_col, rcclass Logger(object): def __init__(self, level=3): self.DEBUG = 5 self.LOG = 4 self.INFO = 3 self.WARN = 2 self.ERROR = 1 self.options = Options() self.options.l = None self.options.qv = self.INFO self.level = level if self.options.l is None: self.options.l = os.path.join(os.environ.get('HOME', '.'), 'gpAdminLogs') if not os.path.isdir(self.options.l): os.mkdir(self.options.l) self.options.l = os.path.join(self.options.l, 'dataload_' + datetime.date.today().strftime('%Y%m%d') + '.log') try: self.logfile = open(self.options.l, 'a') except Exception, e: self.log("could not open logfile %s:%s" % (self.options.l, e)) # def __str__(self): # print "Logger\n" def level_transfer(self, level): if level == self.DEBUG: return "DEBUG" elif level == self.LOG: return "LOG" elif level == self.INFO: return "INFO" elif level == self.ERROR: return "ERROR" elif level == self.WARN: return "WARN" else: self.log("unknown log type %i" % level) def log(self, a): """ Level is either DEBUG, LOG, INFO, ERROR. a is the message """ message = '' try: message = '|'.join( [datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') + ' ', self.level_transfer(self.level), a]) + '\n' message = message.encode('utf-8') except Exception, e: self.logfile.write("\nWarning: Log() threw an exception: {0} \n".format(e)) if self.level <= self.options.qv: sys.stdout.write(message) if self.level <= self.options.qv or self.level <= self.DEBUG: try: self.logfile.write(message) self.logfile.flush() except AttributeError, e: pass if self.level == self.ERROR: passclass Options(object): def __init__(self): passclass WorkerPool(object): """TODO:""" halt_command = 'halt command' def __init__(self, numWorkers=4, items=None, daemonize=False): if numWorkers <= 0: raise Exception("WorkerPool(): numWorkers should be greater than 0.") self.workers = [] self.should_stop = False self.work_queue = Queue() self.completed_queue = Queue() self.num_assigned = 0 self.daemonize = daemonize if items is not None: for item in items: self.work_queue.put(item) self.num_assigned += 1 for i in range(0, numWorkers): w = Workers("worker%d" % i, self) self.workers.append(w) w.start() self.numWorkers = numWorkers ### def getNumWorkers(self): return self.numWorkers def getNextWorkItem(self): return self.work_queue.get(block = True) def addFinishedWorkItem(self, command): self.completed_queue.put(command) self.work_queue.task_done() def markTaskDone(self): self.work_queue.task_done() def addCommand(self, cmd): Logger().log("Adding cmd to work_queue: %s" % cmd.cmd_str) self.work_queue.put(cmd) self.num_assigned += 1 def wait_and_printdots(self, command_count, quiet=True): while self.completed_queue.qsize() < command_count: time.sleep(1) if not quiet: sys.stdout.write(".") sys.stdout.flush() if not quiet: print " " self.join() def print_progress(self, command_count): while True: num_completed = self.completed_queue.qsize() num_completed_percentage = 0 if command_count: num_completed_percentage = float(num_completed) / command_count # self.logger.info('%0.2f%% of jobs completed' % (num_completed_percentage * 100)) if num_completed >= command_count: return self._join_work_queue_with_timeout(10) def _join_work_queue_with_timeout(self, timeout): done_condition = self.work_queue.all_tasks_done done_condition.acquire() try: while self.work_queue.unfinished_tasks: if timeout <= 0: # Timed out. return start_time = time.time() done_condition.wait(timeout) timeout -= (time.time() - start_time) finally: done_condition.release() def join(self): self.work_queue.join() return True def joinWorkers(self): for w in self.workers: w.join() def getCompletedItems(self): completed_list = [] try: while True: item = self.completed_queue.get(False) # will throw Empty if item is not None: completed_list.append(item) except Empty: return completed_list # def check_results(self): # """ goes through all items in the completed_queue and throws an exception at the # first one that didn't execute successfully # # throws ExecutionError # """ # try: # while True: # item = self.completed_queue.get(False) # if not item.get_results().wasSuccessful(): # raise Exception("error") # except Empty: # return def empty_completed_items(self): while not self.completed_queue.empty(): self.completed_queue.get(False) def isDone(self): # TODO: not sure that qsize() is safe return self.num_assigned == self.completed_queue.qsize() def haltWork(self): Logger.log("WorkerPool haltWork()") self.should_stop = True for w in self.workers: w.haltWork() self.work_queue.put(self.halt_command)class Workers(Thread): """TODO:""" pool = None cmd = None name = None logger = None def __init__(self, name, pool): self.name = name self.pool = pool # self.logger = Logger() Thread.__init__(self) self.daemon = pool.daemonize def run(self): while True: try: try: self.cmd = self.pool.getNextWorkItem() except TypeError: # misleading exception raised during interpreter shutdown return # we must have got a command to run here if self.cmd is None: Logger().log("[%s] got a None cmd" % self.name) self.pool.markTaskDone() elif self.cmd is self.pool.halt_command: Logger().log("[%s] got a halt cmd" % self.name) self.pool.markTaskDone() self.cmd = None return elif self.pool.should_stop: Logger().log("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd)) self.pool.markTaskDone() self.cmd = None else: Logger().log("[%s] got cmd: %s" % (self.name, self.cmd.cmd_str)) self.cmd.run() Logger().log("[%s] finished cmd: %s" % (self.name, self.cmd)) # self.pool.addFinishedWorkItem(self.cmd) self.pool.markTaskDone() self.cmd = None except Exception, e: Logger(1).log(e.message) if self.cmd: Logger(1).log("[%s] finished cmd with exception: %s" % (self.name, self.cmd)) # self.pool.addFinishedWorkItem(self.cmd) self.pool.markTaskDone() self.cmd = None def haltWork(self): Logger().log("[%s] haltWork" % self.name) c = self.cmd if c is not None and isinstance(c, Command): c.interrupt() c.cancel()class Command(object): """ TODO: """ def __init__(self, name, cmd_str, remoteHost=None): self.cmd_str = cmd_str self.name = name self.result = None def __str__(self): # print "Command" return "Command" def run(self): pass def interrupt(self): pass def cancel(self): passclass CreateExterlTable(Command): def __init__(self): Command.__init__(self, name = 'Create Exterl Table', cmd_str = None)class GenerateInsertSql(Command): def __init__(self): Command.__init__(self, name = 'Generate Insert Sql', cmd_str = None)class DataManage(Command): name = "Data Manage" def __init__(self, name, dest_host, dest_port, dest_user, dest_db, sour_host, sour_port, sour_user, sour_db, table_pair): self._pool = None self._dest_host = dest_host self._dest_port = dest_port self._dest_user = dest_user self._dest_db = dest_db self._sour_user = sour_user self._sour_port = sour_port self._sour_host = sour_host self._sour_db = sour_db self._table_pair = table_pair self.name = name self.log = Logger() Command.__init__(self, self.name, None) def __str__(self): return self.name def parse_table_pair(self, flag=True): # Logger().log('starting parse table %s\n' % self._table_pair) query_str = 'insert into from ' return query_str def _insert_from_source(self): Logger().log("starting insert from source for %s" % self._table_pair) try: # self._table_pair query_str = self.parse_table_pair() # exe_query(query_str, self._sour_db, self._sour_port, self._sour_host, self._sour_user) except Exception, e: Logger().log(e) finally: thread.exit() def _insert_to_destination(self): Logger().log("starting insert to destination for %s" % self._table_pair) try: # self._table_pair query_str = self.parse_table_pair() # exe_query(query_str, self._sour_db, self._sour_port, self._sour_host, self._sour_user) except Exception, e: Logger().log(e) finally: thread.exit() def _copy_data(self): # self._pool.empty_completed_items() source_thread = Thread(target = self._insert_from_source) source_thread.start() destination_thread = Thread(target = self._insert_to_destination) destination_thread.start() while True: if source_thread.is_alive(): source_thread.join() elif destination_thread.is_alive(): destination_thread.join() else: break Logger().log('copy data success') def run(self): self._copy_data()class DataCopy(object): def __init__(self): # self.table_pair = ['public.test01', 'public.test02', 'public.test03', 'public.test04'] self.table_pair = ['public.test01', 'public.test02'] self._pool = None self._dest_host = '192.168.126.30' self._dest_port = '5432' self._dest_user = 'gpadmin' self._dest_db = 'work' self._sour_user = 'gpadmin6' self._sour_port = '5436' self._sour_host = '192.168.126.30' self._sour_db = 'work' self.name = 'data manager' def run(self): self._pool = WorkerPool(2) for t in self.table_pair: cmd = DataManage(name = t, dest_host = self._dest_host, dest_port = self._dest_port, dest_db = self._dest_db, dest_user = self._dest_user, sour_db = self._sour_db, sour_host = self._sour_host, sour_port = self._sour_port, sour_user = self._sour_user, table_pair = t) self._pool.addCommand(cmd) self._pool.join() # self._pool.joinWorkers() Logger().log('======>Exiting Main Work.<======')if __name__ == '__main__': dc = DataCopy() dc.run()

這個樣例程序運行的時候總是會卡主,線程退不出來。有哪比特大神能幫忙看看的




采納答案:

加了empty判斷,空則break否則繼續while循環,這樣能正常退出來


版权声明:本文为[CSDN問答]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201070657422623.html