Unit 25 - Spatio-temporal parallelizationΒΆ

This unit is focused on improving script created in Unit 24 - Spatio-temporal scripting by processing Sentinel scenes in parallel. The paralelization is done by ParallelModuleQueue, see line 217 similarly to Unit 17 - DTM script parallelization.

New feature of PyGRASS library is introduced on line 146. By MultiModule you can define list of modules which will work as isolated units not influenced by other processes running parallel. By setting set_temp_region the computation region settings will be not influenced by other processes running in parallel.

New script option ncproc on line 43 enables controlling number of processes running in parallel.

  1#!/usr/bin/env python
  2#
  3##############################################################################
  4#
  5# MODULE:       ndvi-tgrass-v2
  6#
  7# AUTHOR(S):    martin
  8#
  9# PURPOSE:      NDVI TGRASS version 2
 10#
 11# DATE:         Sat Feb  3 15:45:35 2018
 12#
 13##############################################################################
 14
 15#%module
 16#% description: NDVI TGRASS script version 2
 17#%end                
 18#%option G_OPT_STRDS_INPUT
 19#% key: b4
 20#% description: Name of the input 4th band space time raster dataset
 21#%end
 22#%option G_OPT_STRDS_INPUT
 23#% key: b8
 24#% description: Name of the input 4th band space time raster dataset
 25#%end
 26#%option G_OPT_STRDS_INPUT
 27#% key: mask
 28#% description: Name of the input mask space time raster dataset
 29#%end
 30#%option G_OPT_F_OUTPUT
 31#%end
 32#%option
 33#% key: basename
 34#% description: Basename for output raster maps
 35#% required: yes
 36#%end
 37#%option
 38#% key: threshold
 39#% description: Threshold for removing small areas
 40#% answer: 1600
 41#%end
 42#%option
 43#% key: nprocs
 44#% description: Number of processes
 45#% answer: 1
 46#% type: integer
 47#%end
 48
 49import sys
 50import os
 51import atexit
 52
 53from grass.pygrass.modules import Module, MultiModule, ParallelModuleQueue
 54from grass.script import parser
 55from grass.script.vector import vector_db_select
 56    
 57def cleanup(idx):
 58    Module('g.remove', flags='f', name='mask' + idx, type='raster')
 59    Module('g.remove', flags='f', name='ndvi' + idx, type='raster')
 60    Module('g.remove', flags='f', name='ndvi_class' + idx, type='raster')
 61    Module('g.remove', flags='f', name='ndvi_class' + idx, type='vector')
 62
 63def compute(b4, b8, msk, output, idx):
 64
 65    modules = []
 66    modules.append(
 67        Module("g.region",
 68               overwrite = True,
 69               vector = msk,
 70               align = b4,
 71               run_ = False)
 72    )
 73    modules.append(
 74        Module("v.to.rast",
 75               overwrite = True,
 76               input = msk,
 77               output = 'mask' + idx,
 78               type = 'area',
 79               use = 'val',
 80               value='1',
 81               run_ = False)
 82    )
 83    modules.append(
 84        Module("r.mapcalc",
 85               overwrite = True,
 86               expression = "ndvi{idx} = if(isnull({clouds}), null(), float({b8} - {b4}) / ({b8} + {b4}))".format(
 87                   idx=idx, clouds=msk, b8=b8, b4=b4),
 88               run_ = False)
 89    )
 90                
 91    recode_str="""-1:0.1:1
 920.1:0.5:2
 930.5:1:3"""
 94
 95    modules.append(
 96        Module("r.recode",
 97               overwrite = True,
 98               input = "ndvi" + idx,
 99               output = "ndvi_class" + idx,
100               rules = "-",
101               stdin_ = recode_str,
102               run_ = False)
103    )
104    
105    colors_str="""1 grey
1062 255 255 0
1073 green"""
108    modules.append(
109        Module("r.colors",
110               map = "ndvi_class" + idx,
111               rules = "-",
112               stdin_ = colors_str,
113               run_ = False)
114    )
115
116    modules.append(
117        Module("r.to.vect",
118               flags = 'sv',
119               overwrite = True,
120               input = "ndvi_class" + idx,
121               output = "ndvi_class" + idx,
122               type = "area",
123               run_ = False)
124    )
125
126    modules.append(
127        Module("v.clean",
128               overwrite = True,
129               input = "ndvi_class" + idx,
130               output = output,
131               tool = "rmarea",
132               threshold = options['threshold'],
133               run_ = False)
134    )
135
136    modules.append(
137        Module('v.rast.stats',
138               flags='c',
139               map=output,
140               raster='ndvi'+idx,
141               column_prefix='ndvi',
142               method=['minimum','maximum','average'],
143               run_ = False)
144    )
145
146    queue.put(MultiModule(modules, sync=False, set_temp_region=True))
147
148def stats(output, date, fd):
149    fd.write('-' * 80)
150    fd.write(os.linesep)
151    fd.write('NDVI class statistics ({0}: {1})'.format(output, date))
152    fd.write(os.linesep)
153    fd.write('-' * 80)
154    fd.write(os.linesep)
155    from subprocess import PIPE
156    ret = Module('v.report', map=output, option='area',
157                 stdout_=PIPE)
158    for line in ret.outputs.stdout.splitlines()[1:]: # skip first line (cat|label|area)
159        # parse line (eg. 1||2712850)
160        data = line.split('|')
161        cat = data[0]
162        area = float(data[-1])
163        fd.write('NDVI class {0}: {1:.1f} ha'.format(cat, area/1e4))
164        fd.write(os.linesep)
165
166    data = vector_db_select(output)
167    for vals in data['values'].itervalues():
168        # unfortunately we need to cast values by float
169        fd.write('NDVI class {0}: {1:.4f} (min) {2:.4f} (max) {3:.4f} (mean)'.format(
170            vals[0], float(vals[2]), float(vals[3]), float(vals[4])))
171        fd.write(os.linesep)
172        
173def main():
174    import grass.temporal as tgis
175
176    tgis.init()
177
178    sp4 = tgis.open_old_stds(options['b4'], 'raster')
179    sp8 = tgis.open_old_stds(options['b8'], 'raster')
180    msk = tgis.open_old_stds(options['mask'], 'raster')
181
182    idx = 1
183    data = []
184    for item in sp4.get_registered_maps(columns='name,start_time'):
185        b4 = item[0]
186        date=item[1]
187        b8 = sp8.get_registered_maps(columns='name',
188                                     where="start_time = '{}'".format(date))[0][0]
189        ms = msk.get_registered_maps(columns='name',
190                                     where="start_time = '{}'".format(date))[0][0]
191        output = '{}_{}'.format(options['basename'], idx)
192        compute(b4, b8, ms, output, str(idx))
193
194        data.append(
195            (output, date)
196        )
197            
198        idx += 1
199
200    queue.wait()
201
202    idx = 1
203    fd = open(options['output'], 'w')
204    for output, date in data:
205        stats(output, date, fd)
206        cleanup(str(idx))
207        idx += 1
208
209    fd.close()
210    
211    return 0
212
213if __name__ == "__main__":
214    options, flags = parser()
215
216    # queue for parallel jobs
217    queue = ParallelModuleQueue(int(options['nprocs']))
218
219    sys.exit(main())

Sample script to download: ndvi-tgrass-v2.py