Mercurial > repos > uga-galaxy-group > webservice_toolsuite_v1_1
view WebServiceExtensionsV1.1/WebServiceToolWorkflow_REST_SOAP/clientGenerator/creatorEngineComplex.py @ 0:049760c677de default tip
Galaxy WSExtensions added successfully
author | uga-galaxy-group |
---|---|
date | Tue, 05 Jul 2011 19:34:18 -0400 |
parents | |
children |
line wrap: on
line source
''' @author Rui Wang @see LICENSE (MIT style license file). ''' from types import * import copy import ZSI.TC from introspect import * from msHandler import * from paramConverter import * __author__="Rui Wang" class ClientCreator(Introspector): """all method to introspect module to return operations&inputs and to invoke operation""" def path2Ops(self, modulePath ): '''given module(the service python file generated by wsdl2py) path, return dictionary of operation name: object (functions of class-**SOAP)''' allclass=self.path2Class(modulePath) #locator='' opclass='' for c in allclass.keys(): if c[len(c)-4:len(c)]=='SOAP': opclass=c #class object of which holds all ops opclassOb=allclass[opclass] #list of names of all ops allattr=dir(opclassOb) #print allattr allops={} for opt in allattr: #every operation object optemp=getattr(opclassOb, opt) #whether it's def and not private such as __init__ if callable(optemp) and optemp.__name__.startswith('__')==False: allops[optemp.__name__]=optemp return allops def path2messageClassOb(self, modulePath): '''given module path, return all message class object''' allclassOb=self.path2Class(modulePath) #find all message classes: has 'typecode', type(obtemp)==ClassType allmsOb=[] for c in allclassOb.values(): if hasattr(c,'typecode'): allmsOb.append(c) return allmsOb def opname2inputClassOb(self, opName, modulePath): '''given operation name, module path, return input class object''' allopNames=self.path2Ops(modulePath) if opName not in allopNames: raise NameError, 'Warning: No operation has the given name!!' else: #find all message classes: has 'typecode', type(obtemp)==ClassType allmsOb=self.path2messageClassOb(modulePath) #find input message class of given operation: typecode has pname=operation name for ms in allmsOb: pnameOb=getattr(getattr(ms, 'typecode'), 'pname') if opName==str(pnameOb): inputClass=ms break return inputClass def opname2outputClassOb(self, opName, modulePath): '''given operation name, module path, return output class object''' allopNames=self.path2Ops(modulePath) if opName not in allopNames: raise NameError, 'Warning: No operation has the given name!!' else: #find all message classes: has 'typecode', type(obtemp)==ClassType allmsOb=self.path2messageClassOb(modulePath) #find output message class of given operation: typecode has pname=operation name+'Response' for ms in allmsOb: pnameOb=getattr(getattr(ms, 'typecode'), 'pname') if opName+'Response'==str(pnameOb): outputClass=ms break return outputClass def opname2outputs(self, opName, modulePath): '''given operation name, module path, return a list of output nemes of the operation in the module''' allopNames=self.path2Ops(modulePath) if opName not in allopNames: raise NameError, 'Warning: No operation has the given name!!' else: #get input class object outputClass=self.opname2outputClassOb(opName, modulePath) #find input parameters from message class : instrospect an instace msinstance=outputClass() parameters={} mshandler=MessageHandler() parameters=mshandler.msParser(msinstance) return parameters def opname2inputs(self, opName, modulePath): '''given operation name, module path, return a list of all inputs name of the operation in the given module''' allopNames=self.path2Ops(modulePath) if opName not in allopNames: raise NameError, 'Warning: No operation has the given name!!' else: #get input class object inputClass=self.opname2inputClassOb(opName, modulePath) parameters={} mshandler=MessageHandler() parameters=mshandler.msParser(inputClass()) #mshandler.flatten(inputClass()) print 'The parametrrs after flattening : ',parameters ofwhat=getattr(getattr(inputClass, 'typecode'), 'ofwhat') paramtypes={} i=0 for k in parameters.keys(): paramtypes[k]=str(ofwhat[i]) i=i+1 for l in paramtypes.keys(): print "paramters",l origPar=paramtypes[l] copyPar=paramtypes[l] #print copyPar#[-23:-1] newPar=origPar.replace(copyPar[-23:-1],'').strip('<>') paramtypes[l]=newPar # print ofwhat ## for p in ofwhat: ## if isinstance(p, ZSI.TC.ComplexType): ## print 'complextype', p ## elif isinstance(p, ZSI.TC.Array) : ## print 'arraytype', p ## elif isinstance(p,ZSI.TC.String): ## print 'String',p ## elif isinstance(p,ZSI.TC.Integer): ## print 'Integer',p ## else: ## parameters[getattr(p, 'aname')]=None ## #print parameters return parameters #return paramtypes #find input parameters from message class : instrospect an instace # msinstance=inputClass()u can ## print dir(msinstance) # parameters=[] # for i in dir(msinstance): ## print i, type(getattr(msinstance, i)) # if (type(getattr(msinstance, i)) is NoneType) and i!='__doc__': # parameters.append(i) ## print parameters # return parameters def path2service(self, modulePath): '''given module path, return return service instance instead of: dbfetchSrv = WSDBFetchServerLegacyServiceLocator().getWSDBFetchServerLegacy()''' allclass=self.path2Class(modulePath) locator='' for c in allclass.keys(): if c[len(c)-7:len(c)]=='Locator': locator=c locatorClassOb=allclass[locator] for obname in dir(locatorClassOb): obtemp=getattr(locatorClassOb, obname) if (type(obtemp) is MethodType)and not obtemp.__name__.endswith('Address'): serverMethod=obtemp break # Create a service interface service=serverMethod(locatorClassOb()) return service def invokeOp(self, opName, modulePath, inputs): allopNames=self.path2Ops(modulePath) if opName not in allopNames: raise NameError, 'Warning: No operation has the given name!!' else: # Create a service interface serviceInstance=self.path2service(modulePath) #get input class object of given opName inputClass=self.opname2inputClassOb(opName, modulePath) mshandler=MessageHandler() if len(inputs) != 0: inputClassInstance=mshandler.msAssign(inputClass(), inputs) else: inputClassInstance= inputClass() #get input name list of given opName # inputNames=self.opname2inputs(opName, modulePath) #set value for inputs #request._query = 'UNIPROT:ADH1A_HUMAN' #request._format = 'fasta' # #request._style = 'raw' # if inputNames!= None: # for inName in inputs.keys(): # if inName not in inputNames: # raise TypeError, 'the input name is wrong!!!!' # #return None # else: # setattr(inputClassInstance, inName, inputs[inName]) #get dictionary of operation name:object(def) opDict=self.path2Ops(modulePath) #get operation object(def) opOb=opDict[opName] #invoke operation: response = dbfetchSrv.fetchData(request) responseInstance=opOb(serviceInstance, inputClassInstance) #get output from outputmessage: result = response._fetchDataReturn resultDict={} resultDict=mshandler.flatten(responseInstance) #mshandler.msParser(responseInstance) flat = nested2flatDict(resultDict) # outputNames=self.opname2outputs(opName, modulePath) # for out in outputNames: # resultDict[out]=getattr(responseInstance, out) #return flat#resultDict return responseInstance #testing this module if __name__=="__main__": test=ClientCreator() #picr web service # print 'all operations of picr: \n', test.path2Ops('picr.AccessionMapperService_client').keys() # print 'inputs of picr \n', test.opname2inputs('getUPIForSequence', 'picr.AccessionMapperService_client') # print 'outputs of picr \n', test.opname2outputs('getUPIForSequence', 'picr.AccessionMapperService_client') seq = """>Q8E5Q5_STRA3 MKLSKRYRFWQKVIKALGVLALIATLVLVVYLYKLGILNDSNELKDLVHKYEFWGPMIFI VAQIVQIVFPVIPGGVTTVAGFLIFGPTLGFIYNYIGIIIGSVILFWLVKFYGRKFVLLF MDQKTFDKYESKLETSGYEKFFIFCMASPISPADIMVMITGLSNMSIKRFVTIIMITKPI SIIGYSYLWIYGGDILKNFLN""" # inp = {'_content': [{'_type': 'sequence', '_content': 'MKLSKRYRFWQKVIKALGVLALIATLVLVVYLYKLGILNDSNELKDLVHKYEFWGPMIFI'}, {'_type': 'sequence', '_content': 'MDQKTFDKYESKLETSGYEKFFIFCMASPISPADIMVMITGLSNMSIKRFVTIIMITKPI'}], '_params': {'_database': 'swissprot', '_email': 'chaitanya@gmail.com', '_program': 'blastp'}} inp = {'_parameters': {'_stype': 'protein', '_sequence': 'MKLSKRYRFWQKVIKALGVLALIATLVLVVYLYKLGILNDSNELKDLVHKYEFWGPMIFI','_database':{ '_string' : ['uniprotkb','uniprotkb_swissprot']}, '_program': 'blastp'}, '_email': 'chaitanya@gmail.com'} inputDict={'_params':{ '_program' : 'blastp', '_database' :'swissprot', '_email' :'riververy@yahoo.com', '_async': 1}, '_content':[{'_type':'sequence', '_content':'MKLSKRYRFWQKVIKALGVLALIATLVLVVYLYKLGILNDSNELKDLVHKYEFWGPMIFI'}]} # inputDict = {} inputs ={'_jobId':'wublast-S20110517-051830-0552-49531676-oy','_type':'out'} inp = {} #print test.invokeOp('run', 'wublast.wublast_services', inp) print test.invokeOp('getParameters', 'wublast.wublast_services', inp) #print test.opname2inputs('getResult','wublast.wublast_services') # print 'all operations: \n', test.path2Ops('blast.WSWUBlast_client').keys() # print 'inputs of runWUBlast \n', test.opname2inputs('runWUBlast', 'blast.WSWUBlast_client') # print 'outputs of runWUBlast \n', test.opname2outputs('runWUBlast', 'blast.WSWUBlast_client') # modul=test.path2Module('ebiDbfetch.WSDBFetchServerLegacyService_services') # print dir(modul) # print 'all operations: \n', test.path2Ops('ebiDbfetch.WSDBFetchServerLegacyService_services').keys() # print 'inputs of fetchData:\n', test.opname2inputs('fetchData', 'ebiDbfetch.WSDBFetchServerLegacyService_services') # print 'all classes:\n', test.path2Class('ebiDbfetch.WSDBFetchServerLegacyService_services').keys() # print 'service instance ob:\n', test.path2service('ebiDbfetch.WSDBFetchServerLegacyService_services') # print 'outputs of fetchData:\n', test.opname2outputs('fetchData', 'ebiDbfetch.WSDBFetchServerLegacyService_services') # inputDict={'_format':'fasta', '_query':'UNIPROT:ADH1A_HUMAN', '_style':'raw'} # print test.invokeOp('fetchData', 'ebiDbfetch.WSDBFetchServerLegacyService_services', inputDict).values()[0] # print 'all operations: \n', test.path2Ops('dbfetch.WSDBFetchServerLegacyService_services').keys() # print 'inputs of fetchData:\n', test.opname2inputs('fetchData', 'dbfetch.WSDBFetchServerLegacyService_services') # inputDict={'_format':'fasta', '_query':'UNIPROT:ADH1A_HUMAN', '_style':'raw'} # print test.invokeOp('fetchData', 'dbfetch.WSDBFetchServerLegacyService_services', inputDict).values()[0] #dbfetch.WSDBFetchServerLegacyService_client # print 'all operations: \n', test.path2Ops('dbfetch.WSDBFetchServerLegacyService_client').keys() # print 'inputs of fetchData:\n', test.opname2inputs('fetchData', 'dbfetch.WSDBFetchServerLegacyService_client') # inputDict={'_format':'fasta', '_query':'UNIPROT:ADH1A_HUMAN', '_style':'raw'} # print test.invokeOp('fetchData', 'dbfetch.WSDBFetchServerLegacyService_client', inputDict).values()[0]