view multi_obgrep.py @ 14:e376ba0fcde0 draft

"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/chemicaltoolbox/openbabel commit 327c29cc43f56d7067ab9fa51323ea31951db98b"
author bgruening
date Tue, 10 Nov 2020 20:34:01 +0000
parents b578d0c101cd
children 22a866858b81
line wrap: on
line source

#!/usr/bin/env python
"""
    Input: Molecules in SDF, SMILES ...
    Output: Molecule file filtered with obgrep.
    Copyright 2013, Bjoern Gruening and Xavier Lucas
"""
import argparse
import multiprocessing
import os
import shlex
import shutil
import subprocess
import tempfile


def parse_command_line():
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', '--infile', required=True, help='Molecule file.')
    parser.add_argument('-q', '--query', required=True, help='Query file, containing different SMARTS in each line.')
    parser.add_argument('-o', '--outfile', required=True, help='Path to the output file.')
    parser.add_argument("--iformat", help="Input format, like smi, sdf, inchi")
    parser.add_argument("--n-times", dest="n_times", type=int,
                        default=0, help="Print a molecule only if the pattern occurs # times inside the molecule.")
    parser.add_argument('-p', '--processors', type=int, default=multiprocessing.cpu_count())
    parser.add_argument("--invert-matches", dest="invert_matches", action="store_true",
                        default=False, help="Invert the matching, print non-matching molecules.")
    parser.add_argument("--only-name", dest="only_name", action="store_true",
                        default=False, help="Only print the name of the molecules.")
    parser.add_argument("--full-match", dest="full_match", action="store_true",
                        default=False, help="Full match, print matching-molecules only when the number of heavy atoms is also equal to the number of atoms in the SMARTS pattern.")
    parser.add_argument("--number-of-matches", dest="number_of_matches", action="store_true",
                        default=False, help="Print the number of matches.")
    return parser.parse_args()


results = list()


def mp_callback(res):
    results.append(res)


def mp_helper(query, args):
    """
        Helper function for multiprocessing.
        That function is a wrapper around obgrep.
    """

    cmd_list = []
    if args.invert_matches:
        cmd_list.append('-v')
    if args.only_name:
        cmd_list.append('-n')
    if args.full_match:
        cmd_list.append('-f')
    if args.number_of_matches:
        cmd_list.append('-c')
    if args.n_times:
        cmd_list.append('-t %s' % str(args.n_times))

    tmp = tempfile.NamedTemporaryFile(delete=False)
    cmd = 'obgrep %s "%s" %s' % (' '.join(cmd_list), query, args.infile)
    child = subprocess.Popen(shlex.split(cmd), stdout=open(tmp.name, 'w+'), stderr=subprocess.PIPE)

    stdout, stderr = child.communicate()
    return (tmp.name, query)


def obgrep(args):
    temp_file = tempfile.NamedTemporaryFile()
    temp_link = "%s.%s" % (temp_file.name, args.iformat)
    temp_file.close()
    os.symlink(args.infile, temp_link)
    args.infile = temp_link

    pool = multiprocessing.Pool(args.processors)
    for query in open(args.query):
        pool.apply_async(mp_helper, args=(query.strip(), args), callback=mp_callback)
        # mp_callback(mp_helper(query.strip(), args))
    pool.close()
    pool.join()

    out_handle = open(args.outfile, 'wb')
    for result_file, query in results:
        res_handle = open(result_file, 'rb')
        shutil.copyfileobj(res_handle, out_handle)
        res_handle.close()
        os.remove(result_file)
    out_handle.close()

    os.remove(temp_link)


def __main__():
    """
        Multiprocessing obgrep search.
    """
    args = parse_command_line()
    obgrep(args)


if __name__ == "__main__":
    __main__()