Source code for fast_conformation.msa_generation.msa_utils
# Copyright 2021 DeepMind Technologies Limited## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""Common utilities for data pipeline tools."""importcontextlibimportshutilimporttempfileimporttimefromtypingimportOptionalimportsubprocessfromgetpassimportgetpassimportosfromabslimportlogging
[docs]@contextlib.contextmanagerdeftmpdir_manager(base_dir:Optional[str]=None):"""Context manager that deletes a temporary directory on exit."""tmpdir=tempfile.mkdtemp(dir=base_dir)try:yieldtmpdirfinally:shutil.rmtree(tmpdir,ignore_errors=True)
[docs]@contextlib.contextmanagerdeftiming(msg:str):logging.info('Started %s',msg)tic=time.time()yieldtoc=time.time()logging.info('Finished %s in %.3f seconds',msg,toc-tic)
[docs]defcreate_ram_disk():password=getpass("Enter your sudo password to create Ram Disk for running HMMER faster: ")# Create the ramdisk directorycommand_mkdir="sudo mkdir -m 777 --parents /tmp/ramdisk"subprocess.run(['sudo','-S']+command_mkdir.split(),input=password.encode())# Mount the ramdiskcommand_mount="sudo mount -t tmpfs -o size=9G ramdisk /tmp/ramdisk"subprocess.run(['sudo','-S']+command_mount.split(),input=password.encode())
[docs]defread_fasta(file_path):ifnotos.path.exists(file_path):raiseFileNotFoundError(f"FASTA file {file_path} not found.")sequences={}withopen(file_path,'r')asfile:sequence_name=''sequence_data=''valid_fasta=Falseforlineinfile:line=line.strip()ifline.startswith('>'):ifsequence_name:ifnotsequence_data:raiseValueError(f"Sequence data for {sequence_name} is missing.")sequences[sequence_name]=sequence_datasequence_data=''sequence_name=line[1:]# Remove the '>' charactervalid_fasta=Trueelifline:ifnotsequence_name:raiseValueError("FASTA file is missing a sequence header before sequence data.")sequence_data+=lineifsequence_name:# Add the last sequence to the dictionaryifnotsequence_data:raiseValueError(f"Sequence data for {sequence_name} is missing.")sequences[sequence_name]=sequence_dataifnotvalid_fasta:raiseValueError("No valid FASTA format detected in the file.")returnsequences
[docs]defsave_dict_to_fasta(seq_dict,output_path,jobname):forseq_name,seqinseq_dict.items():withopen(f'{output_path}/{jobname}/target_seq/{jobname}.fasta','w')asfile:file.write(f">{jobname}\n")file.write(f"{seq}\n")return## TODO this is a hack to get just the first sequence, replace with something more elegant
[docs]defcreate_directory(path):ifos.path.exists(path):shutil.rmtree(path)os.makedirs(path)print(f"Directory '{path}' created successfully.")