1
+ #!/usr/bin/python
2
+ # Complete example for uploading, updating and querying a Labcas dataset.
3
+
4
+ # Workflow Manager XML/RPC supported methods are defined in org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManager.java:
5
+ # o public boolean handleEvent(String eventName, Hashtable metadata)
6
+ # o public String executeDynamicWorkflow(Vector<String> taskIds, Hashtable metadata)
7
+ # o public Hashtable getWorkflowInstanceById(String wInstId)
8
+
9
+ # File Manager XML/RPC supported method are defined in org.apache.oodt.cas.filemgr.system.XmlRpcFileManager.java:
10
+ # o public Hashtable<String, Object> getProductByName(String productName)
11
+ # o public Vector<Hashtable<String, Object>> getProductTypes()
12
+
13
+ # Solr queries are based on optional module solrpy
14
+
15
+ import xmlrpclib
16
+ import time
17
+ import solr
18
+
19
+ # global objects
20
+ verbose = False
21
+ workflowManagerServerProxy = xmlrpclib .ServerProxy ('http://localhost:9001/' , verbose = verbose )
22
+ fileManaferServerProxy = xmlrpclib .ServerProxy ('http://localhost:9000/' , verbose = verbose )
23
+ solrServerProxy = solr .SolrConnection ("http://localhost:8080/solr" )
24
+
25
+
26
+ def uploadDataset (dataset ):
27
+
28
+ # NOTE: currently, if you start a named workflow, the XMLRPC interface only returns True/False, not a workflow instance identifier...
29
+ #tf = serverProxy.workflowmgr.handleEvent('labcas-upload', { 'Dataset':'mydata' } )
30
+
31
+ # ... consequently, you must submit an equivalent dynamic workflow, which does return the workflow instance id
32
+ wInstId = workflowManagerServerProxy .workflowmgr .executeDynamicWorkflow ( ['urn:edrn:LabcasUploadInitTask' ,'urn:edrn:LabcasUploadExecuteTask' ], { 'Dataset' :'mydata' } )
33
+
34
+ # monitor workflow instance
35
+ waitForCompletion (wInstId )
36
+
37
+ def updateDataset (dataset ):
38
+
39
+ # submit "labcas-update" workflow
40
+ wInstId = workflowManagerServerProxy .workflowmgr .executeDynamicWorkflow ( ['urn:edrn:LabcasUpdateTask' ], { 'Dataset' :'mydata' } )
41
+
42
+ # monitor workflow instance
43
+ waitForCompletion (wInstId )
44
+
45
+ def waitForCompletion (wInstId ):
46
+ ''' Monitors a workflow instance until it completes.'''
47
+
48
+ # wait for the server to instantiate this workflow before querying it
49
+ time .sleep (1 )
50
+
51
+ # now use the workflow instance id to check for status, wait until completed
52
+ running_status = ['CREATED' , 'QUEUED' , 'STARTED' , 'PAUSED' ]
53
+ pge_task_status = ['STAGING INPUT' , 'BUILDING CONFIG FILE' , 'PGE EXEC' , 'CRAWLING' ]
54
+ finished_status = ['FINISHED' , 'ERROR' ]
55
+ while (True ):
56
+ response = workflowManagerServerProxy .workflowmgr .getWorkflowInstanceById (wInstId )
57
+ status = response ['status' ]
58
+ if status in running_status or status in pge_task_status :
59
+ print 'Workflow istance=%s running with status=%s' % (wInstId , status )
60
+ time .sleep (1 )
61
+ elif status in finished_status :
62
+ print 'Workflow istance=%s ended with status=%s' % (wInstId , status )
63
+ break
64
+ else :
65
+ print 'UNRECOGNIZED WORKFLOW STATUS: %s' % status
66
+ break
67
+ print response
68
+
69
+ def getProductType (dataset ):
70
+
71
+ # retrieve a specific product type by name
72
+ productTypeDict = fileManaferServerProxy .filemgr .getProductTypeByName (dataset )
73
+
74
+ printProductType (productTypeDict )
75
+
76
+ def listProductTypes ():
77
+
78
+ # list all supported product types
79
+ productTypes = fileManaferServerProxy .filemgr .getProductTypes ()
80
+ for productTypeDict in productTypes :
81
+ printProductType (productTypeDict )
82
+
83
+ def printProductType (productTypeDict ):
84
+ print 'PRODUCT TYPE: %s' % productTypeDict ['name' ]
85
+ for key , value in productTypeDict .items ():
86
+ print '\t %s = %s' % (key , value )
87
+
88
+ def listProducts (dataset ):
89
+
90
+ # query for all datasets with this name, all versions
91
+ response = solrServerProxy .query ('*:*' , fq = ['Dataset:%s' % dataset ], start = 0 )
92
+ print "\n Number of files found: %s" % response .numFound
93
+ for result in response .results :
94
+ printProduct (result )
95
+
96
+ # query for all possible versions of this dataset
97
+ response = solrServerProxy .query ('*:*' , fq = ['Dataset:%s' % dataset ], start = 0 , rows = 0 , facet = 'true' , facet_field = 'Version' )
98
+ versions = response .facet_counts ['facet_fields' ]['Version' ]
99
+ last_version = 0
100
+ for key , value in versions .items ():
101
+ print "\n Version number %s has %s files" % (key , value )
102
+ if int (key ) > last_version :
103
+ last_version = int (key )
104
+
105
+ # query for all files for a specific version
106
+ response = solrServerProxy .query ('*:*' , fq = ['Dataset:%s' % dataset ,'Version:%s' % last_version ], start = 0 )
107
+ print "\n Latest version: %s number of files: %s" % (last_version , response .numFound )
108
+ for result in response .results :
109
+ printProduct (result )
110
+
111
+ def printProduct (result ):
112
+ '''Utility function to print out a few fields of a result.'''
113
+
114
+ print "\n File id=%s" % result ['id' ] # single-valued field
115
+ print "File name=%s" % result ['Filename' ][0 ] # multi-valued field
116
+ print "File size=%s" % result ['FileSize' ][0 ] # multi-valued field
117
+ print "File location=%s" % result ['CAS.ReferenceDatastore' ][0 ] # multi-valued field
118
+ print "File version=%s" % result ['Version' ][0 ] # multi-valued field
119
+
120
+
121
+ if __name__ == '__main__' :
122
+
123
+ dataset = 'mydata'
124
+
125
+ # upload dataset staged in directory 'mydata'
126
+ uploadDataset (dataset )
127
+
128
+ # upload the dataset again:
129
+ # o a new version will be generated
130
+ # o the product type metadata will be completey overridden
131
+ #uploadDataset(dataset)
132
+
133
+ # update dataset metadata - no new version is generated
134
+ updateDataset (dataset )
135
+
136
+ # list all product types in File manager
137
+ #listProductTypes()
138
+
139
+ # query the product types from the XML/RPC File Manager interface
140
+ getProductType (dataset )
141
+
142
+ # list all products for given dataset == product type
143
+ listProducts (dataset )
0 commit comments