Source code for biomaj.process.metaprocess

from builtins import str
import threading
import logging
import os

from biomaj_process.process import Process, DrmaaProcess, DockerProcess
from biomaj_process.process import RemoteProcess
from biomaj.mongo_connector import MongoConnector
from biomaj_zipkin.zipkin import Zipkin


[docs]class MetaProcess(threading.Thread): ''' Meta process in biomaj process workflow. Meta processes are executed in parallel. Each meta process defined a list of Process to execute sequentially '''
[docs] def __init__(self, bank, metas, meta_status=None, meta_data=None, simulate=False): ''' Creates a meta process thread :param bank: Bank :type bank: :class:`biomak.bank` :param meta: list of meta processes to execute in thread :type meta: list of str :param meta_status: initial status of the meta processes :type meta_status: bool :param simulate: does not execute process :type simulate: bool ''' if meta_data is None: meta_data = {} threading.Thread.__init__(self) self._lock = None self.kill_received = False self.workflow = None self.simulate = simulate self.bank = bank self.metas = metas self.meta_data = meta_data self.meta_status = {} for meta in self.metas: self.meta_status[meta] = {} if meta_status is not None: self.meta_status = meta_status self._stopevent = threading.Event() self.bmaj_env = os.environ.copy() self.bmaj_only_env = {} # The root directory where all databases are stored. # If your data is not stored under one directory hirearchy # you can override this value in the database properties file. for conf in dict(self.bank.config.config_bank.items('GENERAL')): self.bmaj_env[conf] = self.bank.config.config_bank.get('GENERAL', conf) if self.bmaj_env[conf] is None: self.bmaj_env[conf] = '' self.bmaj_only_env[conf] = self.bmaj_env[conf] self.bmaj_env['dbname'] = self.bank.name self.bmaj_only_env['dbname'] = self.bmaj_env['dbname'] self.bmaj_env['datadir'] = self.bank.config.get('data.dir') self.bmaj_only_env['datadir'] = self.bmaj_env['datadir'] self.bmaj_env['data.dir'] = self.bmaj_env['datadir'] self.bmaj_only_env['data.dir'] = self.bmaj_env['data.dir'] if self.bank.config.get('mail.admin'): self.bmaj_env['mailadmin'] = self.bank.config.get('mail.admin') self.bmaj_only_env['mailadmin'] = self.bmaj_env['mailadmin'] if self.bank.config.get('mail.smtp.host'): self.bmaj_env['mailsmtp'] = self.bank.config.get('mail.smtp.host') self.bmaj_only_env['mailsmtp'] = self.bmaj_env['mailsmtp'] self.bmaj_env['processdir'] = self.bank.config.get('process.dir', default='') self.bmaj_only_env['processdir'] = self.bmaj_env['processdir'] if 'PATH' in self.bmaj_env: self.bmaj_env['PATH'] += ':' + self.bmaj_env['processdir'] self.bmaj_only_env['PATH'] = self.bmaj_env['PATH'] else: self.bmaj_env['PATH'] = self.bmaj_env['processdir'] + ':/usr/local/bin:/usr/sbin:/usr/bin' self.bmaj_only_env['PATH'] = self.bmaj_env['PATH'] self.bmaj_env['PP_DEPENDENCE'] = '#' self.bmaj_only_env['PP_DEPENDENCE'] = '#' self.bmaj_env['PP_DEPENDENCE_VOLATILE'] = '#' self.bmaj_only_env['PP_DEPENDENCE_VOLATILE'] = '#' self.bmaj_env['PP_WARNING'] = '#' self.bmaj_only_env['PP_WARNING'] = '#' self.bmaj_env['PATH_PROCESS_BIOMAJ'] = self.bank.config.get('process.dir') self.bmaj_only_env['PATH_PROCESS_BIOMAJ'] = self.bank.config.get('process.dir') # Set some session specific env if self.bank.session is not None: if self.bank.session.get('log_file') is not None: log_file = self.bank.session.get('log_file') log_dir = os.path.dirname(log_file) self.bmaj_env['logdir'] = log_dir self.bmaj_only_env['logdir'] = log_dir self.bmaj_env['logfile'] = log_file self.bmaj_only_env['logfile'] = log_file self.bmaj_env['offlinedir'] = self.bank.session.get_offline_directory() self.bmaj_only_env['offlinedir'] = self.bmaj_env['offlinedir'] self.bmaj_env['dirversion'] = self.bank.config.get('dir.version') self.bmaj_only_env['dirversion'] = self.bmaj_env['dirversion'] self.bmaj_env['noextract'] = self.bank.config.get('no.extract') if self.bmaj_env['noextract'] is None: self.bmaj_env['noextract'] = '' self.bmaj_only_env['noextract'] = self.bmaj_env['noextract'] self.bmaj_env['localrelease'] = self.bank.session.get_release_directory() self.bmaj_only_env['localrelease'] = self.bmaj_env['localrelease'] if self.bank.session.get('release') is not None: self.bmaj_env['remoterelease'] = self.bank.session.get('remoterelease') self.bmaj_only_env['remoterelease'] = self.bmaj_env['remoterelease'] self.bmaj_env['removedrelease'] = self.bank.session.get('release') self.bmaj_only_env['removedrelease'] = self.bmaj_env['removedrelease'] for bdep in self.bank.depends: self.bmaj_env[bdep.name + 'source'] = bdep.session.get_full_release_directory() self.bmaj_only_env[bdep.name + 'source'] = self.bmaj_env[bdep.name + 'source'] # Fix case where a var = None for key in list(self.bmaj_only_env.keys()): if self.bmaj_only_env[key] is None: self.bmaj_env[key] = '' self.bmaj_only_env[key] = ''
[docs] def set_progress(self, name, status=None): ''' Update progress on execution :param name: name of process :type name: str :param status: status of process :type status: bool or None ''' logging.debug('Process:progress:' + name + "=" + str(status)) if self.workflow is not None: MongoConnector.banks.update( {'name': self.bank.name}, {'$set': {'status.' + self.workflow + '.progress.' + name: status}} )
[docs] def run(self): # Run meta processes self.global_status = True for meta in self.metas: if not self._stopevent.isSet(): logging.info("PROC:META:RUN:" + meta) processes = [] if self.bank.config.get(meta) is not None: processes = self.bank.config.get(meta).split(',') processes_status = {} for bprocess in processes: if self.kill_received: raise Exception('Kill request received, exiting') # Process status already ok, do not replay if meta in self.meta_status and bprocess in self.meta_status[meta] and self.meta_status[meta][bprocess]: logging.info("PROC:META:SKIP:PROCESS:" + bprocess) processes_status[bprocess] = True continue logging.info("PROC:META:RUN:PROCESS:" + bprocess) # bprocess.name may not be unique name = bprocess desc = self.bank.config.get(bprocess + '.desc') cluster = self.bank.config.get_bool(bprocess + '.cluster', default=False) docker = self.bank.config.get(bprocess + '.docker') proc_type = self.bank.config.get(bprocess + '.type') exe = self.bank.config.get(bprocess + '.exe') args = self.bank.config.get(bprocess + '.args') expand = self.bank.config.get_bool(bprocess + '.expand', default=True) if cluster: native = self.bank.config.get(bprocess + '.native') bmaj_process = DrmaaProcess(meta + '_' + name, exe, args, desc, proc_type, native, expand, self.bmaj_env, os.path.dirname(self.bank.config.log_file)) else: if self.bank.config.get('micro.biomaj.service.process', default=None) == '1': logging.info("PROC:META:RUN:REMOTEPROCESS: " + bprocess) # (self, name, exe, args, desc=None, proc_type=None, expand=True, # bank_env=None, log_dir=None, # rabbit_mq=None, rabbit_mq_port=5672, rabbit_mq_user=None, rabbit_mq_password=None, rabbit_mq_virtualhost=None, # proxy=None, bank=None): proxy = self.bank.config.get('micro.biomaj.proxy.process') if not proxy: proxy = self.bank.config.get('micro.biomaj.proxy') use_sudo = self.bank.config.get_bool('docker.sudo', default=True) bmaj_process = RemoteProcess( meta + '_' + name, exe, args, desc=desc, proc_type=proc_type, expand=expand, docker=docker, docker_sudo=use_sudo, bank_env=self.bmaj_only_env, log_dir=os.path.dirname(self.bank.config.log_file), rabbit_mq=self.bank.config.get('micro.biomaj.rabbit_mq'), rabbit_mq_port=int(self.bank.config.get('micro.biomaj.rabbit_mq_port', default='5672')), rabbit_mq_user=self.bank.config.get('micro.biomaj.rabbit_mq_user'), rabbit_mq_password=self.bank.config.get('micro.biomaj.rabbit_mq_password'), rabbit_mq_virtualhost=self.bank.config.get('micro.biomaj.rabbit_mq_virtualhost', default='/'), proxy=proxy, bank=self.bank.name ) else: if docker: use_sudo = self.bank.config.get_bool('docker.sudo', default=True) bmaj_process = DockerProcess( meta + '_' + name, exe, args, desc=desc, proc_type=proc_type, docker=docker, expand=expand, bank_env=self.bmaj_only_env, log_dir=os.path.dirname(self.bank.config.log_file), use_sudo=use_sudo) else: bmaj_process = Process( meta + '_' + name, exe, args, desc=desc, proc_type=proc_type, expand=expand, bank_env=self.bmaj_env, log_dir=os.path.dirname(self.bank.config.log_file) ) self.set_progress(bmaj_process.name, None) if self.bank.config.get(bprocess + '.format'): bmaj_process.format = self.bank.config.get(bprocess + '.format') if self.bank.config.get(bprocess + '.types'): bmaj_process.types = self.bank.config.get(bprocess + '.types') if self.bank.config.get(bprocess + '.tags'): bmaj_process.tags = self.bank.config.get(bprocess + '.tags') if self.bank.config.get(bprocess + '.files'): bmaj_process.files = self.bank.config.get(bprocess + '.files') span = None if self.bank.config.get('zipkin_trace_id'): span = Zipkin('biomaj-process', bmaj_process.name, trace_id=self.bank.config.get('zipkin_trace_id'), parent_id=self.bank.config.get('zipkin_span_id')) bmaj_process.set_trace(span.get_trace_id(), span.get_span_id()) res = bmaj_process.run(self.simulate) if span: span.add_binary_annotation('status', str(res)) span.trace() processes_status[bprocess] = res self.set_progress(bmaj_process.name, res) if not res: logging.info("PROC:META:RUN:PROCESS:ERROR:" + bmaj_process.name) self.global_status = False break else: logging.info("PROC:META:RUN:PROCESS:OK:" + bmaj_process.name) if not self.simulate: if self._lock: self._lock.acquire() try: self._get_metata_from_outputfile(bmaj_process) except Exception as e: logging.error(e) finally: self._lock.release() # release lock, no matter what else: self._get_metata_from_outputfile(bmaj_process) self.meta_status[meta] = processes_status
[docs] def _get_metata_from_outputfile(self, proc): ''' Extract metadata given by process on stdout. Store metadata in self.metadata :param proc: process :type proc_name: :class:`biomaj.process.Process` ''' proc_name = proc.name output_file = proc.output_file self.meta_data[proc_name] = {} with open(output_file) as f: for line in f: if line.startswith('##BIOMAJ#'): line = line.replace('##BIOMAJ#', '') line = line.strip('\n\r') metas = line.split('#') meta_format = metas[0] if meta_format == '': meta_format = proc.format meta_type = metas[1] if meta_type == '': meta_type = proc.types meta_tags = metas[2] if meta_tags == '': meta_tags = proc.tags meta_files = metas[3] if meta_format not in self.meta_data[proc_name]: self.meta_data[proc_name][meta_format] = [] tags = meta_tags.split(',') tag_list = {} if meta_tags != '': for tag in tags: t = tag.split(':') tag_list[t[0]] = t[1] self.meta_data[proc_name][meta_format].append({ 'tags': tag_list, 'types': meta_type.split(','), 'files': meta_files.split(',')} ) if proc.files and proc.format: tag_list = {} if proc.tags != '': for tag in proc.tags.split(','): t = tag.split(':') tag_list[t[0]] = t[1] self.meta_data[proc_name][proc.format] = [] self.meta_data[proc_name][proc.format].append({ 'tags': tag_list, 'types': proc.types.split(','), 'files': proc.files.split(',')} )
def stop(self): self._stopevent.set()