Previous changeset 4:0d1557b3d540 (2015-03-19) Next changeset 6:4393f982d18f (2015-03-19) |
Commit message:
small update |
modified:
__init__.py |
added:
static_resources/README.txt |
removed:
Rscripts/filter-RIDB.R Rscripts/ridb-regression.R test/__init__.py test/integration_tests.py test/test_combine_output.py test/test_export_to_metexp_tabular.py test/test_library_lookup.py test/test_match_library.py test/test_query_mass_repos.py test/test_query_metexp.py test/test_query_metexp_LARGE.py |
b |
diff -r 0d1557b3d540 -r 41f122255d14 Rscripts/filter-RIDB.R --- a/Rscripts/filter-RIDB.R Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,56 +0,0 @@ -## -# -# Removes duplicates from a RI-database -# -# Usage: -# Rscript filter-RIDB.R /path/to/retention_db.txt output_RIDB_file.txt -# -## - -# Commandline arguments -args <- commandArgs(TRUE) -ridb <- args[1] -out_file <- args[2] - -# Function to check duplicates -duplicates <- function(dat) { - s <- do.call("order", as.data.frame(dat)) - non.dup <- !duplicated(dat[s, ]) - orig.ind <- s[non.dup] - first.occ <- orig.ind[cumsum(non.dup)] - first.occ[non.dup] <- NA - first.occ[order(s)] -} - -# Load CSV file -ridb <- read.csv(ridb,header=TRUE, sep="\t") -## Filters on: CAS FORMULA Column type Column phase type Column name -filter_cols <- c(1, 3, 5, 6, 7) -cat("RIDB dimensions: ") -print(dim(ridb)) -deleted <- NULL -cat("Checking for duplicates...") -dups <- duplicates(ridb[,filter_cols]) -cat("\t[DONE]\nRemoving duplicates...") -newridb <- ridb -newridb["min"] <- NA -newridb["max"] <- NA -newridb["orig.columns"] <- NA -for (i in unique(dups)) { - if (!is.na(i)) { - rows <- which(dups == i) - duprows <- ridb[c(i, rows),] - # Replace duplicate rows with one row containing the median value - new_RI <- median(duprows$RI) - newridb$RI[i] <- median(duprows$RI) - newridb$min[i] <- min(duprows$RI) - newridb$max[i] <- max(duprows$RI) - newridb$orig.columns[i] <- paste(rows, collapse=",") - deleted <- c(deleted, rows) - } -} -cat("\t\t[DONE]\nCreating new dataset...") -out_ridb <- newridb[-deleted,] -cat("\t\t[DONE]\nWriting new dataset...") -write.table(out_ridb, na='', file=out_file, quote=T, sep="\t", row.names=F) -cat("\t\t[DONE]\n") |
b |
diff -r 0d1557b3d540 -r 41f122255d14 Rscripts/ridb-regression.R --- a/Rscripts/ridb-regression.R Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,208 +0,0 @@ -## -# -# Performs regression analysis using either 3rd degree polynomial- or linear-method -# -## - -# Commandline arguments -args <- commandArgs(TRUE) -if (length(args) < 7) - stop(cat("Missing arguments, usage:\n\tRscript ridb-regression.R RI-database ", - "ouput_file logfile min_residuals range_mod pvalue rsquared method ", - "plot(yes/no) plot_archive")) - -ridb <- args[1] -out_file <- args[2] -logfile <- args[3] -min_residuals <- as.integer(args[4]) -range_mod <- as.integer(args[5]) -pvalue <- as.double(args[6]) -rsquared <- as.double(args[7]) -method <- args[8] -plot <- tolower(args[9]) -if (plot == 'true') - plot_archive = args[10] - -# Do not show warnings etc. -sink(file='/dev/null') - -progress <- c() -logger <- function(logdata) { - ## Logs progress, adds a timestamp for each event - #cat(paste(Sys.time(), "\t", logdata, "\n", sep="")) ## DEBUG - progress <<- c(progress, paste(Sys.time(), "\t", logdata, sep="")) -} - -logger("Reading Retention Index Database..") - -# Read Retention Index Database -ridb <- read.csv(ridb, header=TRUE, sep="\t") -logger(paste("\t", nrow(ridb), "records read..")) -# Get a unique list -gc_columns <- unique(as.vector(as.matrix(ridb['Column.name'])[,1])) -cas_numbers <- unique(as.vector(as.matrix(ridb['CAS'])[,1])) - -add_poly_fit <- function(fit, gc1_index, gc2_index, range) { - pval = anova.lm(fit)$Pr - r.squared = summary(fit)$r.squared - - data = rep(NA, 11) - # Append results to matrix - data[1] = gc_columns[gc1_index] # Column 1 - data[2] = gc_columns[gc2_index] # Column 2 - data[3] = coefficients(fit)[1] # The 4 coefficients - data[4] = coefficients(fit)[2] - data[5] = coefficients(fit)[3] - data[6] = coefficients(fit)[4] - data[7] = range[1] # Left limit - data[8] = range[2] # Right limit - data[9] = length(fit$residuals) # Number of datapoints analysed - data[10] = pval[1] # p-value for resulting fitting - data[11] = r.squared # R-squared - return(data) -} - - -add_linear_fit <- function(fit, gc1_index, gc2_index, range) { - pval = anova.lm(fit)$Pr - r.squared = summary(fit)$r.squared - - data = rep(NA, 7) - # Append results to matrix - data[1] = gc_columns[gc1_index] # Column 1 - data[2] = gc_columns[gc2_index] # Column 2 - data[3] = coefficients(fit)[1] # The 4 coefficients - data[4] = coefficients(fit)[2] - data[7] = length(fit$residuals) # Number of datapoints analysed - data[8] = pval[1] # p-value for resulting fitting - data[9] = r.squared # R-squared - return(data) -} - - -add_fit <- function(fit, gc1_index, gc2_index, range, method) { - if (method == 'poly') - return(add_poly_fit(fit, gc1_index, gc2_index, range)) - else - return(add_linear_fit(fit, gc1_index, gc2_index, range)) -} - - -plot_fit <- function(ri1, ri2, gc1_index, gc2_index, coeff, range, method) { - if (method == 'poly') - pol <- function(x) coeff[4]*x^3 + coeff[3]*x^2 + coeff[2]*x + coeff[1] - else - pol <- function(x) coeff[2]*x + coeff[1] - pdf(paste('regression_model_', - make.names(gc_columns[gc1_index]), '_vs_', - make.names(gc_columns[gc2_index]), '.pdf', sep='')) - curve(pol, 250:3750, col="red", lwd=2.5, main='Regression Model', xlab=gc_columns[gc1_index], - ylab=gc_columns[gc2_index], xlim=c(250, 3750), ylim=c(250, 3750)) - points(ri1, ri2, lwd=0.4) - # Add vertical lines showing left- and right limits when using poly method - if (method == 'poly') - abline(v=range, col="grey", lwd=1.5) - dev.off() -} - -# Initialize output dataframe -if (method == 'poly') { - m <- data.frame(matrix(ncol = 11, nrow = 10)) -} else { - m <- data.frame(matrix(ncol = 9, nrow = 10)) -} - - -get_fit <- function(gc1, gc2, method) { - if (method == 'poly') - return(lm(gc1 ~ poly(gc2, 3, raw=TRUE))) - else - return(lm(gc1 ~ gc2)) -} - -# Permutate -k <- 1 -logger(paste("Permutating (with ", length(gc_columns), " GC-columns)..", sep="")) - -for (i in 1:(length(gc_columns)-1)) { - logger(paste("\tCalculating model for ", gc_columns[i], "..", sep="")) - breaks <- 0 - for (j in (i+1):length(gc_columns)) { - col1 = ridb[which(ridb['Column.name'][,1] == gc_columns[i]),] - col2 = ridb[which(ridb['Column.name'][,1] == gc_columns[j]),] - - # Find CAS numbers for which both columns have data (intersect) - cas_intersect = intersect(col1[['CAS']], col2[['CAS']]) - - # Skip if number of shared CAS entries is < cutoff - if (length(cas_intersect) < min_residuals) { - breaks = breaks + 1 - next - } - # Gather Retention Indices - col1_data = col1[['RI']][match(cas_intersect, col1[['CAS']])] - col2_data = col2[['RI']][match(cas_intersect, col2[['CAS']])] - - # Calculate the range within which regression is possible (and move if 'range_mod' != 0) - range = c(min(c(min(col1_data), min(col2_data))), max(c(max(col1_data), max(col2_data)))) - if (range_mod != 0) { - # Calculate percentage and add/subtract from range - perc = diff(range) / 100 - perc_cutoff = range_mod * perc - range = as.integer(range + c(perc_cutoff, -perc_cutoff)) - } - - # Calculate model for column1 vs column2 and plot if requested - fit = get_fit(col1_data, col2_data, method) - m[k,] = add_fit(fit, i, j, range, method) - - if (plot == 'true') - plot_fit(col1_data, col2_data, i, j, coefficients(fit), range, method) - - # Calculate model for column2 vs column1 and plot if requested - fit = get_fit(col2_data, col1_data, method) - m[k + 1,] = add_fit(fit, j, i, range, method) - - if (plot == 'true') - plot_fit(col2_data, col1_data, j, i, coefficients(fit), range, method) - - k = k + 2 - } - logger(paste("\t\t", breaks, " comparisons have been skipped due to nr. of datapoints < cutoff", sep="")) -} - -# Filter on pvalue and R-squared -logger("Filtering on pvalue and R-squared..") -if (method == 'poly') { - pval_index <- which(m[,10] < pvalue) - rsquared_index <- which(m[,11] > rsquared) -} else { - pval_index <- which(m[,8] < pvalue) - rsquared_index <- which(m[,9] > rsquared) -} -logger(paste(nrow(m) - length(pval_index), " models discarded due to pvalue > ", pvalue, sep="")) - -logger(paste(nrow(m) - length(rsquared_index), " models discarded due to R-squared < ", rsquared, sep="")) - -# Remaining rows -index = unique(c(pval_index, rsquared_index)) - -# Reduce dataset -m = m[index,] -sink() - -# Place plots in the history as a ZIP file -if (plot == 'true') { - logger("Creating archive with model graphics..") - system(paste("zip -9 -r models.zip *.pdf > /dev/null", sep="")) - system(paste("cp models.zip ", plot_archive, sep="")) -} - -# Save dataframe as tab separated file -logger("All done, saving data..") -header = c("Column1", "Column2", "Coefficient1", "Coefficient2", "Coefficient3", "Coefficient4", - "LeftLimit", "RightLimit", "Residuals", "pvalue", "Rsquared") -if (method != 'poly') - header = header[c(1:4, 7:11)] -write(progress, logfile) -write.table(m, file=out_file, sep="\t", quote=FALSE, col.names=header, row.names=FALSE) |
b |
diff -r 0d1557b3d540 -r 41f122255d14 __init__.py --- a/__init__.py Thu Mar 19 12:10:19 2015 +0100 +++ b/__init__.py Thu Mar 19 12:13:13 2015 +0100 |
b |
@@ -1,6 +0,0 @@ -''' -Module containing Galaxy tools for the LC or GC/MS pipeline -Created on Mar , 2014 - -@author: pieter lukasse -''' \ No newline at end of file |
b |
diff -r 0d1557b3d540 -r 41f122255d14 static_resources/README.txt --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/static_resources/README.txt Thu Mar 19 12:13:13 2015 +0100 |
b |
@@ -0,0 +1,3 @@ +This folder and respective files should be deployed together with the following tools: + + - ../export_to_metexp_tabular.py \ No newline at end of file |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/__init__.py --- a/test/__init__.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
b |
@@ -1,1 +0,0 @@ -''' unit tests ''' |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/integration_tests.py --- a/test/integration_tests.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
b'@@ -1,268 +0,0 @@\n-\'\'\'Integration tests for the GCMS project\'\'\'\n-\n-from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611\n-from GCMS import library_lookup, combine_output\n-from GCMS.rankfilter_GCMS import rankfilter\n-import os.path\n-import sys\n-import unittest\n-import re\n-\n-\n-class IntegrationTest(unittest.TestCase):\n- def test_library_lookup(self):\n- \'\'\'\n- Run main for data/NIST_tabular and compare produced files with references determined earlier.\n- \'\'\'\n- # Create out folder\n- outdir = "output/" #tempfile.mkdtemp(prefix=\'test_library_lookup\')\n- if not os.path.exists(outdir):\n- os.makedirs(outdir)\n- outfile_base = os.path.join(outdir, \'produced_library_lookup\')\n- outfile_txt = outfile_base + \'.txt\'\n-\n- #Build up arguments and run\n- input_txt = resource_filename(__name__, "data/NIST_tabular.txt")\n- library = resource_filename(__name__, "data/RIDB_subset.txt")\n- regress_model = resource_filename(__name__, "data/ridb_poly_regression.txt")\n- sys.argv = [\'test\',\n- library,\n- input_txt,\n- \'Capillary\',\n- \'Semi-standard non-polar\',\n- outfile_txt,\n- \'HP-5\',\n- regress_model]\n- # Execute main function with arguments provided through sys.argv\n- library_lookup.main()\n- #Compare with reference files\n- reference_txt = resource_filename(__name__, \'reference/produced_library_lookup.txt\')\n- \n- #read both the reference file and actual output files\n- expected = _read_file(reference_txt)\n- actual = _read_file(outfile_txt)\n- \n- #convert the read in files to lists we can compare\n- expected = expected.split()\n- actual = actual.split()\n-\n- for exp, act in zip(expected, actual):\n- if re.match(\'\\\\d+\\\\.\\\\d+\', exp):\n- exp = float(exp)\n- act = float(act)\n- self.assertAlmostEqual(exp, act, places=5)\n- else:\n- # compare values\n- self.failUnlessEqual(expected, actual)\n-\n-\n- def test_combine_output_simple(self):\n- \'\'\'\n- Run main for data/NIST_tabular and compare produced files with references determined earlier.\n- \'\'\'\n- # Create out folder\n- outdir = "output/" #tempfile.mkdtemp(prefix=\'test_library_lookup\')\n- if not os.path.exists(outdir):\n- os.makedirs(outdir)\n- outfile_base = os.path.join(outdir, \'produced_combine_output\')\n- outfile_single_txt = outfile_base + \'_single.txt\'\n- outfile_multi_txt = outfile_base + \'_multi.txt\'\n-\n- #Build up arguments and run\n- input_rankfilter = resource_filename(__name__, "data/Rankfilter.txt")\n- input_caslookup = resource_filename(__name__, "data/Caslookup.txt")\n- sys.argv = [\'test\',\n- input_rankfilter,\n- input_caslookup,\n- outfile_single_txt,\n- outfile_multi_txt]\n- # Execute main function with arguments provided through sys.argv\n- combine_output.main()\n- #Compare with reference files\n- # reference_single_txt = resource_filename(__name__, \'reference/produced_combine_output_single.txt\')\n- # reference_multi_txt = resource_filename(__name__, \'reference/produced_combine_output_multi.txt\')\n- # self.failUnlessEqual(_read_file(reference_single_txt), _read_file(outfile_single_txt))\n- # self.failUnlessEqual(_read_file(reference_multi_txt), _read_file(outfile_multi_txt))\n-\n- #Clean up\n- #shutil.rmtree(tempdir)\n-\n-\n- \n- def def_test_rank_filter_advanced(self):\n- \'\'\'\n- Run main of RankFilter\n- \'\'\'\n- # Create out folder\n- outdir = "output/integration/"\n- if not os.path.exists(outdir):\n- '..b' combine_result_single_items = combine_output._process_data(outfile_single_txt)\n- combine_result_multi_items = combine_output._process_data(outfile_multi_txt)\n- self.assertGreater(len(combine_result_single_items[\'Centrotype\']), \n- len(combine_result_multi_items[\'Centrotype\']))\n- \n- \n- # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct:\n- caslookup_items = combine_output._process_data(input_caslookup)\n- rankfilter_items = combine_output._process_data(input_rankfilter)\n- \n- # check that the caslookup RI column is correctly maintained in its original order in\n- # the combined file:\n- ri_caslookup = caslookup_items[\'RI\']\n- ri_combine_single = combine_result_single_items[\'RI\']\n- self.assertListEqual(ri_caslookup, ri_combine_single) \n- \n- # check the centrotype column\'s integrity:\n- centrotype_caslookup = caslookup_items[\'Centrotype\']\n- centrotype_combine_single = combine_result_single_items[\'Centrotype\']\n- centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items[\'ID\'])\n- self.assertListEqual(centrotype_caslookup, centrotype_combine_single)\n- self.assertListEqual(centrotype_caslookup, centrotype_rankfilter)\n- \n- # integration and integrity checks:\n- file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt")\n- file_NIST_items = combine_output._process_data(file_NIST)\n- # check that rank filter output has exactly the same ID items as the original NIST input file:\n- self.assertListEqual(file_NIST_items[\'ID\'], rankfilter_items[\'ID\']) \n- # check the same for the CAS column:\n- self.assertListEqual(_get_strippedcas(file_NIST_items[\'CAS\']), rankfilter_items[\'CAS\'])\n- # now check the NIST CAS column against the cas lookup results: \n- cas_NIST = _get_processedcas(file_NIST_items[\'CAS\'])\n- self.assertListEqual(cas_NIST, caslookup_items[\'CAS\'])\n- # now check the CAS of the combined result. If all checks are OK, it means the CAS column\'s order\n- # and values remained stable throughout all steps: \n- self.assertListEqual(rankfilter_items[\'CAS\'], combine_result_single_items[\'CAS\']) \n- \n- # check that the rankfilter RIsvr column is correctly maintained in its original order in\n- # the combined file:\n- risvr_rankfilter = rankfilter_items[\'RIsvr\']\n- risvr_combine_single = combine_result_single_items[\'RIsvr\']\n- self.assertListEqual(risvr_rankfilter, risvr_combine_single) \n-\n- \n- \n-\n-def _get_centrotype_rankfilter(id_list):\n- \'\'\'\n- returns the list of centrotype ids given a list of ID in the\n- form e.g. 74-1.0-564-1905200-7, where the numbers before the \n- first "-" are the centrotype id\n- \'\'\'\n- result = []\n- for compound_id_idx in xrange(len(id_list)):\n- compound_id = id_list[compound_id_idx]\n- centrotype = compound_id.split(\'-\')[0]\n- result.append(centrotype) \n-\n- return result\n-\n-\n-def _get_processedcas(cas_list):\n- \'\'\'\n- returns the list cas numbers in the form C64175 instead of 64-17-5\n- \'\'\'\n- result = []\n- for cas_id_idx in xrange(len(cas_list)):\n- cas = cas_list[cas_id_idx]\n- processed_cas = \'C\' + str(cas.replace(\'-\', \'\').strip())\n- result.append(processed_cas) \n-\n- return result\n-\n-def _get_strippedcas(cas_list):\n- \'\'\'\n- removes the leading white space from e.g. " 64-17-5"\n- \'\'\'\n- result = []\n- for cas_id_idx in xrange(len(cas_list)):\n- cas = cas_list[cas_id_idx]\n- processed_cas = cas.strip()\n- result.append(processed_cas) \n-\n- return result\n-\n-\n-def _read_file(filename):\n- \'\'\'\n- Helper method to quickly read a file\n- @param filename:\n- \'\'\'\n- with open(filename) as handle:\n- return handle.read()\n' |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/test_combine_output.py --- a/test/test_combine_output.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,106 +0,0 @@ -''' -Created on Mar 27, 2012 - -@author: marcelk -''' -from GCMS import combine_output -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -import os -import shutil -import tempfile -import unittest - - -class Test(unittest.TestCase): - ''' - Tests for the 'combine_output' Galaxy tool - ''' - - def setUp(self): - self.rf_output = resource_filename(__name__, "data/RankFilter.txt") - self.cl_output = resource_filename(__name__, "data/CasLookup.txt") - - def test_process_data(self): - ''' - Tests the processing of the RankFilter and CasLookup files into dictionaries - ''' - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - self.assertEqual(set([' 18457-04-0', ' 55133-95-4', ' 58-08-2', ' 112-34-5']), set(rfdata['CAS'])) - self.assertEqual(set(['C58082', 'C18457040', 'C55133954', 'C112345']), set(cldata['CAS'])) - - def test_add_hit(self): - ''' - Tests the combination of two records from both the RankFilter- and CasLookup-tools - ''' - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - index = 0 - rf_record = dict(zip(rfdata.keys(), [rfdata[key][index] for key in rfdata.keys()])) - cl_record = dict(zip(cldata.keys(), [cldata[key][index] for key in cldata.keys()])) - - hit = combine_output._add_hit(rf_record, cl_record) - self.assertEqual(len(hit), 27) - - # Pass empty record, should fail combination - self.assertRaises(KeyError, combine_output._add_hit, rf_record, {}) - - def test_merge_data(self): - ''' - Tests the merging of the RankFilter and CasLookup data - ''' - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - merged, _ = combine_output._merge_data(rfdata, cldata) - centrotypes = _get_centrotypes(merged) - self.failUnless(all(centrotype in centrotypes for centrotype in ('2716','12723', '3403', '12710'))) - -def _get_centrotypes(merged): - ''' - returns centrotype codes found in merged set - ''' - result = [] - for item_idx in xrange(len(merged)): - item = merged[item_idx] - centrotype = item[0][0] - result.append(centrotype) - - return result - - def test_remove_formula(self): - ''' - Tests the removal of the Formula from the 'Name' field (RankFilter output) - ''' - name = "Caffeine C8H10N4O2" - compound_name, compound_formula = combine_output._remove_formula(name) - self.assertEqual(compound_name, 'Caffeine') - self.assertEqual(compound_formula, 'C8H10N4O2') - name = "Ethanol C2H6O" - compound_name, compound_formula = combine_output._remove_formula(name) - self.assertEqual(compound_name, 'Ethanol') - self.assertEqual(compound_formula, 'C2H6O') - # No formula to remove - name = "Butanoic acid, 4-[(trimethylsilyl)oxy]-, trimethylsilyl ester" - compound_name, compound_formula = combine_output._remove_formula(name) - self.assertEqual(compound_name, name) - self.assertEqual(compound_formula, False) - - def test_save_data(self): - ''' - Tests the creation of the output tabular files (no content testing) - ''' - temp_folder = tempfile.mkdtemp(prefix='gcms_combine_output_') - saved_single_data = '{0}/{1}'.format(temp_folder, 'output_single.tsv') - saved_multi_data = '{0}/{1}'.format(temp_folder, 'output_multi.tsv') - rfdata = combine_output._process_data(self.rf_output) - cldata = combine_output._process_data(self.cl_output) - merged, nhits = combine_output._merge_data(rfdata, cldata) - combine_output._save_data(merged, nhits, saved_single_data, saved_multi_data) - self.failUnless(os.path.exists(saved_single_data)) - self.failUnless(os.path.exists(saved_multi_data)) - shutil.rmtree(temp_folder) - - -if __name__ == "__main__": - #import sys;sys.argv = ['', 'Test.testName'] - unittest.main() |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/test_export_to_metexp_tabular.py --- a/test/test_export_to_metexp_tabular.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,112 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from GCMS import export_to_metexp_tabular -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - - def test_MM_calculations(self): - ''' - test the implemented method for MM calculations for - given chemical formulas - ''' - export_to_metexp_tabular.init_elements_and_masses_map() - - formula = "C8H18O3" - # should be = 12.01*8 + 1.01*18 + 16*3 = 162.26 - result = export_to_metexp_tabular.get_molecular_mass(formula) - self.assertEqual(162.26, result) - - formula = "CH2O3Fe2Ni" - # should be = 12.01*1 + 1.01*2 + 16*3 + 55.85*2 + 58.71 = 232.44 - result = export_to_metexp_tabular.get_molecular_mass(formula) - self.assertAlmostEqual(232.44, result, 2) - - - - - - def test_combine_output_simple(self): - ''' - comment me - ''' - # Create out folder - outdir = "output/metexp/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - rankfilter_and_caslookup_combined_file = resource_filename(__name__, "data/dummy1_produced_combine_output_single.txt") - msclust_quantification_and_spectra_file = resource_filename(__name__, "data/dummy1_sim.txt") - output_csv = resource_filename(__name__, outdir + "metexp_tabular.txt") - - sys.argv = ['test', - rankfilter_and_caslookup_combined_file, - msclust_quantification_and_spectra_file, - output_csv, - 'tomato', - 'leafs', - 'test experiment', - 'pieter', - 'DB5 column'] - - # Execute main function with arguments provided through sys.argv - export_to_metexp_tabular.main() - - ''' - # Asserts are based on reading in with process_data and comparing values of - # certain columns - - # Check 3: library_lookup RI column, centrotype column, ri_svr column are correct: - caslookup_items = combine_output._process_data(input_caslookup) - rankfilter_items = combine_output._process_data(input_rankfilter) - - # check that the caslookup RI column is correctly maintained in its original order in - # the combined file: - ri_caslookup = caslookup_items['RI'] - ri_combine_single = combine_result_single_items['RI'] - self.assertListEqual(ri_caslookup, ri_combine_single) - - # check the centrotype column's integrity: - centrotype_caslookup = caslookup_items['Centrotype'] - centrotype_combine_single = combine_result_single_items['Centrotype'] - centrotype_rankfilter = _get_centrotype_rankfilter(rankfilter_items['ID']) - self.assertListEqual(centrotype_caslookup, centrotype_combine_single) - self.assertListEqual(centrotype_caslookup, centrotype_rankfilter) - - # integration and integrity checks: - file_NIST = resource_filename(__name__, "data/integration/NIST_identification_results_tabular.txt") - file_NIST_items = combine_output._process_data(file_NIST) - # check that rank filter output has exactly the same ID items as the original NIST input file: - self.assertListEqual(file_NIST_items['ID'], rankfilter_items['ID']) - # check the same for the CAS column: - self.assertListEqual(_get_strippedcas(file_NIST_items['CAS']), rankfilter_items['CAS']) - # now check the NIST CAS column against the cas lookup results: - cas_NIST = _get_processedcas(file_NIST_items['CAS']) - self.assertListEqual(cas_NIST, caslookup_items['CAS']) - # now check the CAS of the combined result. If all checks are OK, it means the CAS column's order - # and values remained stable throughout all steps: - self.assertListEqual(rankfilter_items['CAS'], combine_result_single_items['CAS']) - - # check that the rankfilter RIsvr column is correctly maintained in its original order in - # the combined file: - risvr_rankfilter = rankfilter_items['RIsvr'] - risvr_combine_single = combine_result_single_items['RIsvr'] - self.assertListEqual(risvr_rankfilter, risvr_combine_single) - ''' - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read() |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/test_library_lookup.py --- a/test/test_library_lookup.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
b'@@ -1,180 +0,0 @@\n-\'\'\'\n-Created on Mar 6, 2012\n-\n-@author: marcelk\n-\'\'\'\n-from GCMS import library_lookup, match_library\n-from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611\n-import os\n-import shutil\n-import tempfile\n-import unittest\n-\n-\n-class Test(unittest.TestCase):\n- \'\'\'\n- Tests the \'library_lookup\' Galaxy tool\n- \'\'\'\n-\n- def setUp(self):\n- self.ri_database = resource_filename(__name__, "data/RIDB_subset.txt")\n- self.nist_output = resource_filename(__name__, "data/NIST_tabular.txt")\n- self.ridb_poly_regress = resource_filename(__name__, "data/ridb_poly_regression.txt")\n- self.ridb_linear_regress = resource_filename(__name__, "data/ridb_linear_regression.txt")\n-\n- def test_create_lookup_table(self):\n- \'\'\'\n- Tests the \'create_lookup_table\' function\n- \'\'\'\n- column_type = \'Capillary\'\n- polarity = \'Semi-standard non-polar\'\n- lookup_dict = library_lookup.create_lookup_table(self.ri_database, column_type, polarity)\n- self.assertFalse(False in [res[4] == \'Capillary\' for res in lookup_dict[\'4177166\']])\n- self.assertEqual([\'C51276336\', \'2,6-Dimethyl-octa-1,7-dien-3,6-diol\', \'C10H18O2\',\n- \'1277\', \'Capillary\', \'Semi-standard non-polar\', \'DB-5MS\', \'1\',\n- \'C51276336_DB-5MS\', \'\', \'\', \'\'], lookup_dict[\'51276336\'][1])\n-\n- def test_read_model(self):\n- \'\'\'\n- Tests reading the regression model data containing the parameters required for converting\n- retention indices between GC-columns\n- \'\'\'\n- model, _ = library_lookup._read_model(self.ridb_poly_regress)\n- # Order of values: coefficient 1 through 4, left limit, right limit\n- # Polynomial model\n- self.assertEqual([20.6155874639486, 0.945187096379008, 3.96480787567566e-05, -9.04377237159287e-09,\n- 628.0, 2944.0, 405.0, 0, 0.998685262365514], model[\'HP-5\'][\'SE-54\'])\n- self.assertEqual([-92.3963391356951, 1.26116176393346, -0.000191991657547972, 4.15387371263164e-08,\n- 494.0, 2198.0, 407.0, 0, 0.996665023122993], model[\'Apiezon L\'][\'Squalane\'])\n- # Linear model\n- model, _ = library_lookup._read_model(self.ridb_linear_regress)\n- self.assertEqual([2.81208738561543, 0.99482475526584, 628.0, 2944.0, 405.0, 0, 0.998643883946458],\n- model[\'HP-5\'][\'SE-54\'])\n- self.assertEqual([19.979922768462, 0.993741869298272, 494.0, 2198.0, 407.0, 0, 0.99636062891041],\n- model[\'Apiezon L\'][\'Squalane\'])\n-\n- def test_apply_regression(self):\n- \'\'\'\n- Tests the regression model on some arbitrary retention indices\n- \'\'\'\n- poly_model, _ = library_lookup._read_model(self.ridb_poly_regress)\n- linear_model, _ = library_lookup._read_model(self.ridb_linear_regress)\n- retention_indices = [1000, 1010, 1020, 1030, 1040, 1050]\n- converted_poly = []\n- converted_linear = []\n- for ri in retention_indices:\n- converted_poly.append(library_lookup._apply_poly_regression(\'HP-5\', \'DB-5\', ri, poly_model))\n- converted_linear.append(library_lookup._apply_linear_regression(\'HP-5\', \'DB-5\', ri, linear_model))\n-\n- self.assertEqual([1003.0566541860778, 1013.0979459524663, 1023.1358645806529, 1033.170466241159,\n- 1043.2018071045052, 1053.2299433412131], converted_poly)\n- self.assertEqual([1001.8127584915925, 1011.830140783027, 1021.8475230744615, 1031.864905365896,\n- 1041.8822876573306, 1051.899669948765], converted_linear)\n- \n- # Test polynomial limit detection, the following RI falls outside of the possible limits\n- ri = 3400\n- converted_poly = library_lookup._apply_poly_regression(\'HP-5\', \'DB-5\', ri, poly_model)\n- self.assertEqual(False, converted_poly)\n-\n- def test_preferred_hit(self):\n- '..b' \'Semi-standard non-polar\', \'SE-52\', \'\', \'C150867_SE-52\', \'\', \'\', \'\'], \'SE-52\')\n- self.assertEqual(expected, match)\n-\n- def test_format_result(self):\n- \'\'\'\n- Tests the \'format_result\' function\n- \'\'\'\n- column_type = \'Capillary\'\n- polarity = \'Semi-standard non-polar\'\n-\n- # Look for DB-5\n- pref_column = [\'DB-5\']\n- model, method = library_lookup._read_model(self.ridb_poly_regress)\n- lookup_dict = library_lookup.create_lookup_table(self.ri_database, column_type, polarity)\n- data = library_lookup.format_result(lookup_dict, self.nist_output, pref_column, column_type,\n- polarity, model, method)#False, None)\n-\n- # remove non-hits from set:\n- data = _get_hits_only(data)\n- self.assertEqual([\'C544354\', \'Ethyl linoleate\', \'C20H36O2\', \'2155\', \'Capillary\', \'Semi-standard non-polar\',\n- \'DB-5\', \'1\', \'C544354_DB-5\', \'1810\', \'None\', \'\', \'\', \'0\'], data[20])\n- self.assertEqual(111, len(data))\n-\n- # Look for both DB-5 and HP-5\n- pref_column = [\'DB-5\', \'HP-5\']\n- data = library_lookup.format_result(lookup_dict, self.nist_output, pref_column, column_type,\n- polarity, False, None)\n- # remove non-hits from set:\n- data = _get_hits_only(data)\n- self.assertEqual([\'C502614\', \'.beta.-(E)-Farnesene\', \'C15H24\', \'1508\', \'Capillary\', \'Semi-standard non-polar\',\n- \'DB-5\', \'1\', \'C502614_DB-5\', \'942\', \'None\', \'1482\', \'1522\', \'22\'], data[50])\n- self.assertEqual(106, len(data))\n-\n-\n- def test_save_data(self):\n- \'\'\'\n- Tests the creation of the output tabular file\n- \'\'\'\n- temp_folder = tempfile.mkdtemp(prefix=\'gcms_combine_output_\')\n- saved_data = \'{0}/{1}\'.format(temp_folder, \'output.tsv\')\n- column_type = \'Capillary\'\n- polarity = \'Semi-standard non-polar\'\n- pref_column = [\'DB-5\']\n- lookup_dict = library_lookup.create_lookup_table(self.ri_database, column_type, polarity)\n- data = library_lookup.format_result(lookup_dict, self.nist_output, pref_column, column_type, polarity, False, None)\n- library_lookup._save_data(data, saved_data)\n- self.failUnless(os.path.exists(saved_data))\n- shutil.rmtree(temp_folder)\n- \n- \n- def test_match_library_get_lib_files(self):\n- \'\'\'\n- Tests the match_library.py functionality\n- \'\'\'\n- riqc_libs_dir = resource_filename(__name__, "../repositories/PRIMS-metabolomics/RI_DB_libraries")\n- get_library_files_output = match_library.get_directory_files(riqc_libs_dir)\n- self.assertEqual(2, len(get_library_files_output))\n- self.assertEqual("Library_RI_DB_capillary_columns-noDuplicates", get_library_files_output[0][0])\n- #TODO change assert below to assert that the result is a file, so the test can run on other dirs as well:\n- #self.assertEqual("E:\\\\workspace\\\\PRIMS-metabolomics\\\\python-tools\\\\tools\\\\GCMS\\\\test\\\\data\\\\riqc_libs\\\\RI DB library (capillary columns) Dec.2012.txt", get_library_files_output[0][1])\n- #self.assertEqual("RI DB library (capillary columns) Jan.2013", get_library_files_output[1][0]) \n- try:\n- get_library_files_output = match_library.get_directory_files("/blah")\n- # should not come here\n- self.assertTrue(False)\n- except:\n- # should come here\n- self.assertTrue(True)\n-\n-def _get_hits_only(data):\n- \'\'\'\n- removes items that have RI == 0.0 and Name == \'\' (these are dummy lines just for the output\n- \'\'\'\n- result = []\n- for item_idx in xrange(len(data)):\n- item = data[item_idx]\n- if item[1] != \'\' and item[3] > 0.0 :\n- result.append(item) \n-\n- return result \n-\n-\n-if __name__ == "__main__":\n- #import sys;sys.argv = [\'\', \'Test.testName\']\n- unittest.main()\n' |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/test_match_library.py --- a/test/test_match_library.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,51 +0,0 @@ -''' -Created on Mar 6, 2012 - -@author: marcelk -''' -from GCMS import match_library -import unittest -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 - - -class Test(unittest.TestCase): - ''' - Tests the 'match_library' Galaxy tool - ''' - nist_db = resource_filename(__name__, "data/RIDB_subset.txt") - - def test_get_column_type(self): - ''' - Tests the 'get_column_type' function that returns tuples of unique elements - for column types in the RI database - ''' - galaxy_output = match_library.get_column_type(self.nist_db) - self.assertEqual([('Capillary(9999)', 'Capillary', False)], galaxy_output) - - def test_filter_column(self): - ''' - Tests the 'filter_column' function showing the column phase for all 'Capillary' columns - ''' - galaxy_output = match_library.filter_column(self.nist_db, 'Capillary') - self.assertEqual([('Semi-standard non-polar(9999)', 'Semi-standard non-polar', False)], galaxy_output) - - def test_filter_column2(self): - ''' - Tests the 'filter_column' function showing all possibilities for columns having both the - 'Capillary' and 'Semi-standard non-polar' properties - ''' - galaxy_output = match_library.filter_column2(self.nist_db, 'Capillary', 'Semi-standard non-polar') - self.failUnless(('Apiezon M(6)', 'Apiezon M', False) in galaxy_output) - - def test_count_occurrence(self): - ''' - Tests the 'count_occurrence' function - ''' - test_list = [2, 0, 0, 2, 1, 3, 4, 5, 3, 2, 3, 4, 5, 5, 4, 2, 5, 3, 4, 3, 5, 4, 2, 0, 4] - counts = match_library.count_occurrence(test_list) - self.assertEqual({0: 3, 1: 1, 2: 5, 3: 5, 4: 6, 5: 5}, counts) - - -if __name__ == "__main__": - #import sys;sys.argv = ['', 'Test.testName'] - unittest.main() |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/test_query_mass_repos.py --- a/test/test_query_mass_repos.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,62 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from MS import query_mass_repos -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - - - - def test_simple(self): - ''' - Simple initial test - ''' - # Create out folder - outdir = "output/query_mass_repos/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - # input_file = sys.argv[1] - # molecular_mass_col = sys.argv[2] - # repository_file = sys.argv[3] - # mass_tolerance = float(sys.argv[4]) - # output_result = sys.argv[5] - - input_file = resource_filename(__name__, "data/service_query_tabular.txt") - - molecular_mass_col = "mass (Da)" - dblink_file = resource_filename(__name__, "data/MFSearcher ExactMassDB service.txt") - output_result = resource_filename(__name__, outdir + "metexp_query_results_added.txt") - - - - - sys.argv = ['test', - input_file, - molecular_mass_col, - dblink_file, - '0.001', - 'ms', - output_result] - - # Execute main function with arguments provided through sys.argv - query_mass_repos.main() - - - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read() |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/test_query_metexp.py --- a/test/test_query_metexp.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,79 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from GCMS import query_metexp -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - -# def test_MM_calculations(self): -# ''' -# test the implemented method for MM calculations for -# given chemical formulas -# ''' -# export_to_metexp_tabular.init_elements_and_masses_map() -# -# formula = "C8H18O3" -# # should be = 12.01*8 + 1.01*18 + 16*3 = 162.26 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertEqual(162.26, result) -# -# formula = "CH2O3Fe2Ni" -# # should be = 12.01*1 + 1.01*2 + 16*3 + 55.85*2 + 58.71 = 232.44 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertAlmostEqual(232.44, result, 2) -# -# -# - - - def test_simple(self): - ''' - Simple initial test - ''' - # Create out folder - outdir = "output/metexp_query/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - # input_file = sys.argv[1] - # molecular_mass_col = sys.argv[2] - # formula_col = sys.argv[3] - # metexp_dblink_file = sys.argv[4] - # output_result = sys.argv[5] - - input_file = resource_filename(__name__, "data/metexp_query_tabular.txt") - casid_col = "CAS" - formula_col = "FORMULA" - molecular_mass_col = "MM" - metexp_dblink_file = resource_filename(__name__, "data/METEXP Test DB.txt") - output_result = resource_filename(__name__, outdir + "metexp_query_results_added.txt") - - sys.argv = ['test', - input_file, - casid_col, - formula_col, - molecular_mass_col, - metexp_dblink_file, - 'GC', - output_result] - - # Execute main function with arguments provided through sys.argv - query_metexp.main() - - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read() |
b |
diff -r 0d1557b3d540 -r 41f122255d14 test/test_query_metexp_LARGE.py --- a/test/test_query_metexp_LARGE.py Thu Mar 19 12:10:19 2015 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 |
[ |
@@ -1,79 +0,0 @@ -'''Integration tests for the GCMS project''' - -from pkg_resources import resource_filename # @UnresolvedImport # pylint: disable=E0611 -from GCMS import query_metexp -import os.path -import sys -import unittest - - -class IntegrationTest(unittest.TestCase): - - -# def test_MM_calculations(self): -# ''' -# test the implemented method for MM calculations for -# given chemical formulas -# ''' -# export_to_metexp_tabular.init_elements_and_masses_map() -# -# formula = "C8H18O3" -# # should be = 12.01*8 + 1.01*18 + 16*3 = 162.26 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertEqual(162.26, result) -# -# formula = "CH2O3Fe2Ni" -# # should be = 12.01*1 + 1.01*2 + 16*3 + 55.85*2 + 58.71 = 232.44 -# result = export_to_metexp_tabular.get_molecular_mass(formula) -# self.assertAlmostEqual(232.44, result, 2) -# -# -# - - - def test_large(self): - ''' - Simple test, but on larger set, last test executed in 28s - ''' - # Create out folder - outdir = "output/metexp_query/" - if not os.path.exists(outdir): - os.makedirs(outdir) - - #Build up arguments and run - - # input_file = sys.argv[1] - # molecular_mass_col = sys.argv[2] - # formula_col = sys.argv[3] - # metexp_dblink_file = sys.argv[4] - # output_result = sys.argv[5] - - input_file = resource_filename(__name__, "data/metexp_query_tabular_large.txt") - casid_col = "CAS" - formula_col = "FORMULA" - molecular_mass_col = "MM" - metexp_dblink_file = resource_filename(__name__, "data/METEXP Test DB.txt") - output_result = resource_filename(__name__, outdir + "metexp_query_results_added_LARGE.txt") - - sys.argv = ['test', - input_file, - casid_col, - formula_col, - molecular_mass_col, - metexp_dblink_file, - 'GC', - output_result] - - # Execute main function with arguments provided through sys.argv - query_metexp.main() - - - - -def _read_file(filename): - ''' - Helper method to quickly read a file - @param filename: - ''' - with open(filename) as handle: - return handle.read() |