from builtins import range
from builtins import object
import threading
import logging
import os
from biomaj.process.metaprocess import MetaProcess
[docs]class ProcessFactory(object):
'''
Manage process execution
'''
NB_THREAD = 2
[docs] def __init__(self, bank, redis_client=None, redis_prefix=None):
self.bank = bank
self.threads_tasks = []
if self.bank.session:
self.meta_data = self.bank.session.get('per_process_metadata')
else:
self.meta_data = {}
self.redis_client = redis_client
self.redis_prefix = redis_prefix
[docs] def run(self, simulate=False):
'''
Run processes
:param simulate: does not execute process
:type simulate: bool
:return: status of execution - bool
'''
pass
[docs] def run_threads(self, simulate=False):
'''
Start meta threads
:param simulate: do not execute processes
:type simulate: bool
:return: tuple global execution status and status per meta process
'''
logging.debug('Start meta threads')
os.chdir(self.bank.config.get('process.dir'))
threads = []
running_th = []
for thread_tasks in self.threads_tasks:
meta_thread = MetaProcess(self.bank, thread_tasks, self.meta_status, self.meta_data, simulate)
meta_thread._lock = ProcessFactory._LOCK
meta_thread.workflow = self.workflow
meta_thread.start()
threads.append(meta_thread)
running_th.append(meta_thread)
# Wait for the end of the threads
kill_received = False
while len(running_th) > 0:
try:
# Join all threads using a timeout so it doesn't block
# Filter out threads which have been joined or are None
# Check for cancel request
if self.redis_client and self.redis_client.get(self.redis_prefix + ':' + self.bank.name + ':action:cancel'):
logging.warn('Cancel requested, stopping process update')
self.redis_client.delete(self.redis_prefix + ':' + self.bank.name + ':session:' + self.session)
kill_received = True
for t in running_th:
t.kill_received = True
running_th = [t.join(1000) for t in running_th if t is not None and t.is_alive()]
except KeyboardInterrupt:
logging.warn("Ctrl-c received! Sending kill to threads...")
logging.warn("Running tasks will continue and process will stop.")
kill_received = True
for t in running_th:
t.kill_received = True
for meta_thread in threads:
meta_thread.join()
global_meta_status = {}
global_status = True
for meta_thread in threads:
for meta in meta_thread.meta_status:
global_meta_status[meta] = meta_thread.meta_status[meta]
if not meta_thread.global_status:
global_status = False
if kill_received:
global_status = False
logging.debug('Meta threads are over')
return (global_status, global_meta_status)
[docs] def fill_tasks_in_threads(self, metas):
'''
Dispatch meta processes in available threads
'''
self.threads_tasks = []
for i in range(0, ProcessFactory.NB_THREAD):
# Fill array of meta process in future threads
self.threads_tasks.append([])
thread_id = 0
for meta in metas:
meta_process = meta.strip()
if thread_id == ProcessFactory.NB_THREAD:
thread_id = 0
self.threads_tasks[thread_id].append(meta_process)
thread_id += 1
[docs]class PreProcessFactory(ProcessFactory):
'''
Manage preprocesses
'''
[docs] def __init__(self, bank, metas=None, redis_client=None, redis_prefix=None):
'''
Creates a preprocess factory
:param bank: Bank
:type bank: :class:`biomaj.bank.Bank`
:param metas: initial status of meta processes
:type metas: dict
'''
ProcessFactory.__init__(self, bank, redis_client, redis_prefix)
self.meta_status = None
if metas is not None:
self.meta_status = metas
self.workflow = 'preprocess'
[docs] def run(self, simulate=False):
'''
Run processes
:param simulate: does not execute process
:type simulate: bool
:return: status of execution - bool
'''
logging.info('PROC:PRE')
if self.bank.config.get('db.pre.process') is None:
metas = []
else:
metas = self.bank.config.get('db.pre.process').split(',')
self.fill_tasks_in_threads(metas)
(status, self.meta_status) = self.run_threads(simulate)
return status
[docs]class RemoveProcessFactory(ProcessFactory):
'''
Manage remove processes
'''
[docs] def __init__(self, bank, metas=None, redis_client=None, redis_prefix=None):
'''
Creates a remove process factory
:param bank: Bank
:type bank: :class:`biomaj.bank.Bank`
:param metas: initial status of meta processes
:type metas: dict
'''
ProcessFactory.__init__(self, bank, redis_client, redis_prefix)
self.meta_status = None
if metas is not None:
self.meta_status = metas
self.workflow = 'removeprocess'
[docs] def run(self, simulate=False):
'''
Run processes
:param simulate: does not execute process
:type simulate: bool
:return: status of execution - bool
'''
logging.info('PROC:REMOVE')
if self.bank.config.get('db.remove.process') is None:
metas = []
else:
metas = self.bank.config.get('db.remove.process').split(',')
self.fill_tasks_in_threads(metas)
(status, self.meta_status) = self.run_threads(simulate)
return status
[docs]class PostProcessFactory(ProcessFactory):
'''
Manage postprocesses
self.blocks: dict of meta processes status
Each meta process status is a dict of process status
'''
[docs] def __init__(self, bank, blocks=None, redis_client=None, redis_prefix=None):
'''
Creates a postprocess factory
:param bank: Bank
:type bank: :class:`biomaj.bank.Bank`
:param blocks: initial status of block processes
:type blocks: dict
'''
ProcessFactory.__init__(self, bank, redis_client, redis_prefix)
self.blocks = {}
if blocks is not None:
self.blocks = blocks
self.workflow = 'postprocess'
[docs] def run(self, simulate=False):
'''
Run processes
:param simulate: does not execute process
:type simulate: bool
:return: status of execution - bool
'''
logging.info('PROC:POST:BLOCK')
blocks = self.bank.config.get('BLOCKS')
if blocks is None or blocks == '':
process_blocks = []
else:
process_blocks = blocks.split(',')
metas = []
self.meta_status = None
global_status = True
for process_block in process_blocks:
if not global_status:
continue
logging.info('PROC:POST:BLOCK:' + process_block)
if process_block in self.blocks:
self.meta_status = self.blocks[process_block]
# run each block
metas = self.bank.config.get(process_block.strip() + '.db.post.process').split(',')
self.fill_tasks_in_threads(metas)
(status, self.blocks[process_block]) = self.run_threads(simulate)
if not status:
global_status = False
return global_status
ProcessFactory._LOCK = threading.Lock()