Source code for biomaj.process.processfactory

from builtins import range
from builtins import object
import threading
import logging
import os
from biomaj.process.metaprocess import MetaProcess


[docs]class ProcessFactory(object): ''' Manage process execution ''' NB_THREAD = 2
[docs] def __init__(self, bank, redis_client=None, redis_prefix=None): self.bank = bank self.threads_tasks = [] if self.bank.session: self.meta_data = self.bank.session.get('per_process_metadata') else: self.meta_data = {} self.redis_client = redis_client self.redis_prefix = redis_prefix
[docs] def run(self, simulate=False): ''' Run processes :param simulate: does not execute process :type simulate: bool :return: status of execution - bool ''' pass
[docs] def run_threads(self, simulate=False): ''' Start meta threads :param simulate: do not execute processes :type simulate: bool :return: tuple global execution status and status per meta process ''' logging.debug('Start meta threads') os.chdir(self.bank.config.get('process.dir')) threads = [] running_th = [] for thread_tasks in self.threads_tasks: meta_thread = MetaProcess(self.bank, thread_tasks, self.meta_status, self.meta_data, simulate) meta_thread._lock = ProcessFactory._LOCK meta_thread.workflow = self.workflow meta_thread.start() threads.append(meta_thread) running_th.append(meta_thread) # Wait for the end of the threads kill_received = False while len(running_th) > 0: try: # Join all threads using a timeout so it doesn't block # Filter out threads which have been joined or are None # Check for cancel request if self.redis_client and self.redis_client.get(self.redis_prefix + ':' + self.bank.name + ':action:cancel'): logging.warn('Cancel requested, stopping process update') self.redis_client.delete(self.redis_prefix + ':' + self.bank.name + ':session:' + self.session) kill_received = True for t in running_th: t.kill_received = True running_th = [t.join(1000) for t in running_th if t is not None and t.is_alive()] except KeyboardInterrupt: logging.warn("Ctrl-c received! Sending kill to threads...") logging.warn("Running tasks will continue and process will stop.") kill_received = True for t in running_th: t.kill_received = True for meta_thread in threads: meta_thread.join() global_meta_status = {} global_status = True for meta_thread in threads: for meta in meta_thread.meta_status: global_meta_status[meta] = meta_thread.meta_status[meta] if not meta_thread.global_status: global_status = False if kill_received: global_status = False logging.debug('Meta threads are over') return (global_status, global_meta_status)
[docs] def fill_tasks_in_threads(self, metas): ''' Dispatch meta processes in available threads ''' self.threads_tasks = [] for i in range(0, ProcessFactory.NB_THREAD): # Fill array of meta process in future threads self.threads_tasks.append([]) thread_id = 0 for meta in metas: meta_process = meta.strip() if thread_id == ProcessFactory.NB_THREAD: thread_id = 0 self.threads_tasks[thread_id].append(meta_process) thread_id += 1
[docs]class PreProcessFactory(ProcessFactory): ''' Manage preprocesses '''
[docs] def __init__(self, bank, metas=None, redis_client=None, redis_prefix=None): ''' Creates a preprocess factory :param bank: Bank :type bank: :class:`biomaj.bank.Bank` :param metas: initial status of meta processes :type metas: dict ''' ProcessFactory.__init__(self, bank, redis_client, redis_prefix) self.meta_status = None if metas is not None: self.meta_status = metas self.workflow = 'preprocess'
[docs] def run(self, simulate=False): ''' Run processes :param simulate: does not execute process :type simulate: bool :return: status of execution - bool ''' logging.info('PROC:PRE') if self.bank.config.get('db.pre.process') is None: metas = [] else: metas = self.bank.config.get('db.pre.process').split(',') self.fill_tasks_in_threads(metas) (status, self.meta_status) = self.run_threads(simulate) return status
[docs]class RemoveProcessFactory(ProcessFactory): ''' Manage remove processes '''
[docs] def __init__(self, bank, metas=None, redis_client=None, redis_prefix=None): ''' Creates a remove process factory :param bank: Bank :type bank: :class:`biomaj.bank.Bank` :param metas: initial status of meta processes :type metas: dict ''' ProcessFactory.__init__(self, bank, redis_client, redis_prefix) self.meta_status = None if metas is not None: self.meta_status = metas self.workflow = 'removeprocess'
[docs] def run(self, simulate=False): ''' Run processes :param simulate: does not execute process :type simulate: bool :return: status of execution - bool ''' logging.info('PROC:REMOVE') if self.bank.config.get('db.remove.process') is None: metas = [] else: metas = self.bank.config.get('db.remove.process').split(',') self.fill_tasks_in_threads(metas) (status, self.meta_status) = self.run_threads(simulate) return status
[docs]class PostProcessFactory(ProcessFactory): ''' Manage postprocesses self.blocks: dict of meta processes status Each meta process status is a dict of process status '''
[docs] def __init__(self, bank, blocks=None, redis_client=None, redis_prefix=None): ''' Creates a postprocess factory :param bank: Bank :type bank: :class:`biomaj.bank.Bank` :param blocks: initial status of block processes :type blocks: dict ''' ProcessFactory.__init__(self, bank, redis_client, redis_prefix) self.blocks = {} if blocks is not None: self.blocks = blocks self.workflow = 'postprocess'
[docs] def run(self, simulate=False): ''' Run processes :param simulate: does not execute process :type simulate: bool :return: status of execution - bool ''' logging.info('PROC:POST:BLOCK') blocks = self.bank.config.get('BLOCKS') if blocks is None or blocks == '': process_blocks = [] else: process_blocks = blocks.split(',') metas = [] self.meta_status = None global_status = True for process_block in process_blocks: if not global_status: continue logging.info('PROC:POST:BLOCK:' + process_block) if process_block in self.blocks: self.meta_status = self.blocks[process_block] # run each block metas = self.bank.config.get(process_block.strip() + '.db.post.process').split(',') self.fill_tasks_in_threads(metas) (status, self.blocks[process_block]) = self.run_threads(simulate) if not status: global_status = False return global_status
ProcessFactory._LOCK = threading.Lock()