view model.py @ 5:86a9d8d5b291 draft default tip

Uploaded
author jaredgk
date Wed, 17 Oct 2018 17:34:34 -0400
parents 54c84f7dcb2c
children
line wrap: on
line source

import os
import sys
import json
import subprocess
import argparse
import logging
import itertools
import copy

import numpy as np

from collections import defaultdict, OrderedDict

# Insert Jared's directory path, required for calling Jared's functions. Change when directory structure changes.
sys.path.insert(0, os.path.abspath(os.path.join(os.pardir, 'jared')))

from logging_module import initLogger

class ModelFile(dict):
    def __init__(self, *arg, **kw):
        super(ModelFile, self).__init__(*arg, **kw)
        self.inds = []
        self.ind_file = ''
        self.exclude_file = ''

        if arg and self.confirm_model_instance(arg[1]):
            self.update_inds(arg[1])

    def __setitem__(self, *arg, **kw):
        super(ModelFile, self).__setitem__(*arg, **kw)

        if arg and self.confirm_model_instance(arg[1]):
            self.update_inds(model = arg[1])

    def __delitem__(self, key):
        super(ModelFile, self).__delitem__(key)
        self.update_inds()

    def confirm_model_instance (self, unknown):

        if isinstance(unknown, Model):

            return True

        else:

            return False

    def copy_model (self, src_model_name, new_model_name):

        src_model = super(ModelFile, self).__getitem__(src_model_name)

        src_model_copy = copy.deepcopy(src_model)

        src_model_copy.name = new_model_name

        super(ModelFile, self).__setitem__(new_model_name, src_model_copy)

    def rename_model (self, src_model_name, new_model_name):

        src_model = super(ModelFile, self).pop(src_model_name)

        src_model.name = new_model_name

        super(ModelFile, self).__setitem__(new_model_name, src_model)

    def update_inds (self, model = None):

        if self.confirm_model_instance(model):

            # Return error if inds is empty
            if not model.inds:
                raise IOError('No individuals found in %s.' % model.name)

            # Create a list of the unique individuals
            unique_inds = list(set(self.inds + model.inds))

        else:

            # Create an empty list for the unique individuals
            unique_inds = []

            # Loop the models in the file
            for model_in_file in super(ModelFile, self).values():

                # Create a list of the unique individuals
                unique_inds = list(set(unique_inds + model_in_file.inds))


        # Store the individuals
        self.inds = unique_inds

    def create_ind_file (self, file_ext = '', file_path = '', overwrite = False):
        # Assign the filename for the population file
        ind_filename = 'unique_individuals' + file_ext

        # If a path is assigned, create the file at the specified location
        if file_path:
            ind_filename = os.path.join(file_path, ind_filename)

        # Check if previous files should be overwriten
        if not overwrite:
            # Check if the file already exists
            if os.path.isfile(ind_filename):
                raise IOError('Individuals file exists.')

        # Create the population file
        ind_file = open(ind_filename, 'w')
        ind_file.write('%s\n' %'\n'.join(self.inds))
        ind_file.close()

        # Save the individuals filename
        self.ind_file = ind_filename

    def delete_ind_file (self):
        # Check if an individuals file was created
        if self.ind_file:

            # Delete the individuals file
            os.remove(self.ind_file)

            # Remove the filename
            self.ind_file = ''

    def create_exclude_ind_file (self, inds_to_include = [], file_ext = '', file_path = '', overwrite = False):
        # Assign the filename for the population file
        ind_filename = 'exclude_individuals' + file_ext

        # If a path is assigned, create the file at the specified location
        if file_path:
            ind_filename = os.path.join(file_path, ind_filename)

        # Check if previous files should be overwriten
        if not overwrite:
            # Check if the file already exists
            if os.path.isfile(ind_filename):
                raise IOError('Individuals file exists.')

        # Create exclude list by removing included individuals
        exclude_inds = list(set(self.inds) - set(inds_to_include))

        # Create the population file
        ind_file = open(ind_filename, 'w')
        ind_file.write('%s\n' %'\n'.join(exclude_inds))
        ind_file.close()

        # Save the individuals filename
        self.exclude_file = ind_filename

    def delete_exclude_ind_file (self):
        # Check if an individuals file was created
        if self.exclude_file:

            # Delete the individuals file
            os.remove(self.exclude_file)

            # Remove the filename
            self.exclude_file = ''

    def to_json (self):

        model_file_json = []

        for model_name, model_data in super(ModelFile, self).items():
            model_file_json.append(model_data.to_json())

        return model_file_json


class Model:
    def __init__ (self, name):
        self.name = name
        self.tree = ''
        self.pop_list = []
        self.ind_dict = defaultdict(list)
        self.nind = defaultdict(int)
        self.pop_files = []
        self.ind_file = ''

    @property
    def npop (self):
        return len(self.pop_list)

    @property
    def inds (self):
        return list(itertools.chain.from_iterable(self.ind_dict.values()))

    def assign_tree (self, tree):
        self.tree = str(tree)

    def assign_pop (self, pop, inds = []):
        self.pop_list.append(str(pop))
        if inds:
            self.ind_dict[pop] = [str(ind) for ind in inds]
        self.nind[pop] = len(self.ind_dict[pop])

    def sample_pop (self, pop, sample_size, with_replacements = False):

        # Confirm the pop is in the model
        if str(pop) not in self.pop_list:

            # Raise error if pop not found
            raise Exception('%s not found' % pop)

        # Confirm the sample size is an int
        try:

            sample_size = int(sample_size)

        except:

            # Raise error if sample_size not an int
            raise Exception('%s not int' % sample_size)

        # Check if the sample size is larger than the pop
        if int(sample_size) > self.nind[pop]:

            # Raise error if sample_size is larger
            raise Exception('%s is larger than %s' % (sample_size, pop))

        # Use numpy choice to randomly sample the pop
        sampled_inds = np.random.choice(self.ind_dict[pop], sample_size, replace = with_replacements)

        # Save the sampled inds as a list
        self.ind_dict[pop] = list(sampled_inds)

    def sample_pops (self, sample_size, with_replacements = False):

        # Confirm the sample size is an int
        try:

            sample_size = int(sample_size)

        except:

            # Raise error if sample_size not an int
            raise Exception('%s not int' % sample_size)

        # Loop each pop in the pop list
        for pop in self.pop_list:

            # Check if the sample size is larger than the pop
            if int(sample_size) > self.nind[pop]:

                # Raise error if sample_size is larger
                raise Exception('%s is larger than %s' % (sample_size, pop))

        # Loop each pop in the pop list, if no error raised
        for pop in self.pop_list:

            # Use numpy choice to randomly sample the pop
            sampled_inds = np.random.choice(self.ind_dict[pop], sample_size, replace = with_replacements)

            # Save the sampled inds as a list
            self.ind_dict[pop] = list(sampled_inds)

    def create_pop_files (self, file_ext = '', file_path = '', overwrite = False):
        for pop in self.pop_list:
            # Assign the filename for the population file
            pop_filename = pop + file_ext

            # If a path is assigned, create the file at the specified location
            if file_path:
                pop_filename = os.path.join(file_path, pop_filename)

            # Check if previous files should be overwriten
            if not overwrite:
                # Check if the file already exists
                if os.path.isfile(pop_filename):
                    raise IOError('Population file exists.')

            # Create the population file
            pop_file = open(pop_filename, 'w')
            pop_file.write('%s\n' %'\n'.join(self.ind_dict[pop]))
            pop_file.close()

            # Save the population filename
            self.pop_files.append(pop_filename)

    def delete_pop_files (self):
        # Check if pop files were created
        if len(self.pop_files) != 0:

            # Loop the created pop files
            for pop_file in self.pop_files:
                # Delete the pop file
                os.remove(pop_file)

            # Remove the filenames
            self.pop_files = []

    def create_ind_file (self, file_ext = '', file_path = '', overwrite = False):
        # Assign the filename for the population file
        ind_filename = 'individual.keep' + file_ext

        # If a path is assigned, create the file at the specified location
        if file_path:
            ind_filename = os.path.join(file_path, ind_filename)

        # Check if previous files should be overwriten
        if not overwrite:
            # Check if the file already exists
            if os.path.isfile(ind_filename):
                raise IOError('Individuals file exists.')

        # Create the population file
        ind_file = open(ind_filename, 'w')
        ind_file.write('%s\n' %'\n'.join(self.inds))
        ind_file.close()

        # Save the individuals filename
        self.ind_file = ind_filename

    def delete_ind_file (self):
        # Check if an individuals file was created
        if self.ind_file:

            # Delete the individuals file
            os.remove(self.ind_file)

            # Remove the filename
            self.ind_file = ''

    def to_json (self):

        model_json = OrderedDict()

        model_json['name'] = self.name

        pop_json = OrderedDict()

        for pop in self.pop_list:

            pop_json[pop] = OrderedDict()

            pop_json[pop]['indv'] = self.ind_dict[pop]

        model_json['pops'] = pop_json

        return model_json

def read_model_file (filename):

    # Check that the file exists
    if not os.path.isfile(filename):
        raise IOError

    # Create ModelFile object
    models_to_return = ModelFile()

    # Check if using python 2 or 3
    if sys.version_info[0] == 2:
        # Open the model file in python 2
        model_file = open(filename, 'rU')
    else:
        # Open the model file in python 3
        model_file = open(filename, 'r', newline=None)

    # Parse the model file using the json reader
    models_dict = json.load(model_file)

    # List to store all unique individuals (i.e. individuals in all models)
    individual_list = []

    # Loop the parsed models
    for model_dict in models_dict:

        # Create the model
        model = Model(str(model_dict['name']))

        # Loop the populations in the model
        for pop, pop_dict in model_dict['pops'].items():

            # Convert all individuals names to str
            ind_list = [str(pop_ind) for pop_ind in pop_dict['inds']]

            # Assign the population ans it's individuals to the model
            model.assign_pop(str(pop), ind_list)

            # Assign the individuals to the unique individual list
            individual_list.extend(ind_list)

        # Remove duplicates from the unique individual list
        individual_list = list(set(individual_list))

        # Save the model
        models_to_return[str(model.name)] = model

    logging.info('Finished reading model file (%s)' % filename)

    # Return the models
    return models_to_return

def write_model_file (model_file, filename, overwrite = False):

    # Check if the file is to be overwritten
    if not overwrite:

        # Check if the file exists
        if os.path.exists(filename):
            raise Exception('%s already exists' % filename)

    # Open the output file
    output_file = open(filename, 'w')

    # Write the json-formmated data to the output file
    output_file.write(json.dumps(model_file.to_json(), indent = 4))

    # Close the output file
    output_file.close()

    logging.info('Finished writing model file (%s)' % filename)