Unit 23 - Spatio-temporal parallelizationΒΆ

This unit is focused on improving the script created in Unit 22 - Spatio-temporal scripting by processing Sentinel scenes in parallel. Paralelization is performed by ParallelModuleQueue similarly as in Unit 19 - DSM script parallelization, see line 209.

New PyGRASS functionality is introduced on line 132. By MultiModule it is possible to define a list of modules which will work as isolated units not influenced by other processes running parallel. By setting set_temp_region computation region settings will be also isolated from other processes. By sync=True all modules will run in serial order in a background process. Tools are added to the queue (132) with run_=False to prevent them from running immediately, see eg. 72. Queue with calculation tools is processed on line 192.

The option ncproc on line 43 enables controlling number of processes running in parallel.

Compared to ndvi-tgrass-v1.py: NDVI is computed by r.mapcalc (74) instead of combination of r.mask and i.vi. The concept of raster masks created by r.mask would not be functional in the case of parallel computation (the module creates raster map named MASK). The for loop (179) is split into two parts: 1) perform computation in parallel (192) and 2) compute statistics (196).

  1#!/usr/bin/env python3
  2#
  3##############################################################################
  4#
  5# MODULE:       ndvi-tgrass-v2
  6#
  7# AUTHOR(S):    martin
  8#
  9# PURPOSE:      NDVI TGRASS version 2
 10#
 11# DATE:         Sat Feb  3 15:45:35 2018
 12#
 13##############################################################################
 14
 15# %module
 16# % description: NDVI TGRASS script version 2
 17# %end
 18# %option G_OPT_STRDS_INPUT
 19# % key: b4
 20# % description: Name of input 4th band space time raster dataset
 21# %end
 22# %option G_OPT_STRDS_INPUT
 23# % key: b8
 24# % description: Name of input 8th band space time raster dataset
 25# %end
 26# %option G_OPT_STRDS_INPUT
 27# % key: mask
 28# % description: Name of input mask space time raster dataset
 29# %end
 30# %option G_OPT_F_OUTPUT
 31# %end
 32# %option
 33# % key: basename
 34# % description: Basename for output raster maps
 35# % required: yes
 36# %end
 37# %option
 38# % key: min_area
 39# % description: Threshold for removing small areas in m2
 40# % answer: 1600
 41# %end
 42# %option
 43# % key: nprocs
 44# % description: Number of processes
 45# % answer: 1
 46# % type: integer
 47# %end
 48
 49import sys
 50import os
 51import atexit
 52from subprocess import PIPE
 53
 54from grass.script import parser, parse_key_val
 55from grass.pygrass.modules import Module, MultiModule, ParallelModuleQueue
 56    
 57def cleanup(idx):
 58    Module('g.remove', flags='f', name='region_mask' + idx, type='vector')
 59    Module('g.remove', flags='f', name='ndvi' + idx, type='raster')
 60    Module('g.remove', flags='f', name='ndvi_class' + idx, type='raster')
 61    Module('g.remove', flags='f', name='ndvi_class_area' + idx, type='raster')
 62    Module('g.remove', flags='f', name='ndvi_class_filled_i' + idx, type='raster')
 63    Module('g.remove', flags='f', name='ndvi_class_filled' + idx, type='vector')
 64
 65def compute(b4, b8, msk, min_area_ha, output, idx, queue):
 66    modules = []
 67    modules.append(
 68        Module("g.region",
 69               overwrite=True,
 70               raster=msk,
 71               align=b4,
 72               run_=False)
 73    )
 74
 75    modules.append(
 76        Module("r.mask",
 77               overwrite=True,
 78               maskcats="*",
 79               raster=msk,
 80               layer="1",
 81               run_=False)
 82    )
 83
 84    modules.append(
 85        Module("r.mapcalc",
 86               overwrite=True,
 87               expression="ndvi{idx} = if(isnull({clouds}), null(), float({b8} - {b4}) / ({b8} + {b4}))".format(
 88                   idx=idx, clouds=msk, b8=b8, b4=b4),
 89               run_=False)
 90    )
 91
 92    modules.append(
 93        Module("r.recode",
 94               overwrite=True,
 95               input="ndvi" + idx,
 96               output="ndvi_class" + idx,
 97               rules="-",
 98               stdin_="-1:0.1:1\n0.1:0.5:2\n0.5:1:3",
 99               run_=False)
100    )
101
102    modules.append(
103        Module("r.reclass.area",
104               overwrite=True,
105               input="ndvi_class" + idx,
106               output="ndvi_class_area" + idx,
107               value=min_area_ha,
108               mode="greater",
109               method="reclass",
110               run_=False)
111    )
112
113    modules.append(
114        Module("r.grow.distance",
115               overwrite=True,
116               input="ndvi_class_area" + idx,
117               value=output,
118               metric="euclidean",
119               run_=False)
120    )
121
122    modules.append(
123        Module("r.colors",
124               map=output,
125               rules="-",
126               stdin_="1 grey\n2 255 255 0\n3 green",
127               offset=0,
128               scale=1,
129               run_=False)
130    )
131    
132    queue.put(MultiModule(modules, sync=False, set_temp_region=True))
133
134def stats(output, date, fd, idx):
135    fd.write('-' * 80)
136    fd.write('\n')
137    fd.write('NDVI class statistics ({0}: {1})'.format(output, date))
138    fd.write('\n')
139    fd.write('-' * 80)
140    fd.write('\n')
141
142    ret = Module('r.stats', input=output, flags='ian', stdout_=PIPE)
143    for line in ret.outputs.stdout.splitlines():
144        # parse line (eg. 1 2737300.000000)
145        data = line.split(' ')
146        cat = data[0]
147        area = float(data[-1])
148        fd.write('NDVI class {0}: {1:.1f} ha\n'.format(cat, area/1e4)) 
149
150    fd.write('-' * 80)
151    fd.write('\n')
152    # we need integer map
153    Module('r.mapcalc', expression='ndvi_class_filled_i = int({})'.format(output), overwrite=True)
154    Module('r.to.vect', flags='v', input='ndvi_class_filled_i', output='ndvi_class_filled', type='area', overwrite=True)
155
156    Module('v.rast.stats', flags='c', map='ndvi_class_filled', raster='ndvi'+idx,
157           column_prefix='ndvi', method=['minimum','maximum','average'])
158    # v.db.select: don't print column names (-c)
159    ret = Module('v.db.select', flags='c', map='ndvi_class_filled', separator='comma', stdout_=PIPE)
160    for line in ret.outputs.stdout.splitlines():
161        # parse line (eg. 1,,-0.433962264150943,0.740350877192983,0.051388909449992)
162        cat,label,min,max,mean = line.split(',')
163        fd.write('NDVI class {0}: {1:.4f} (min) {2:.4f} (max) {3:.4f} (mean)\n'.format(
164        cat, float(min), float(max), float(mean)))
165        
166def main(queue):
167    import grass.temporal as tgis
168
169    tgis.init()
170
171    sp4 = tgis.open_old_stds(options['b4'], 'raster')
172    sp8 = tgis.open_old_stds(options['b8'], 'raster')
173    msk = tgis.open_old_stds(options['mask'], 'raster')
174
175    min_area = int(options['min_area']) / 1e4
176    idx = 1
177    data = []
178    fd = open(options['output'], 'w')
179    for item in sp4.get_registered_maps(columns='name,start_time'):
180        b4 = item[0]
181        date = item[1]
182        b8 = sp8.get_registered_maps(columns='name',
183                                     where="start_time='{}'".format(date))[0][0]
184        ms = msk.get_registered_maps(columns='name',
185                                     where="start_time='{}'".format(date))[0][0]
186        output = '{}_{}'.format(options['basename'], idx)
187        compute(b4, b8, ms, min_area, output, str(idx), queue)
188
189        data.append((output, date))
190        idx += 1
191
192    queue.wait()
193
194    idx = 1
195    fd = open(options['output'], 'w')
196    for output, date in data:
197        stats(output, date, fd, str(idx))
198        cleanup(str(idx))
199        idx += 1
200
201    fd.close()
202    
203    return 0
204
205if __name__ == "__main__":
206    options, flags=parser()
207
208    # queue for parallel jobs
209    queue = ParallelModuleQueue(int(options['nprocs']))
210    
211    sys.exit(main(queue))

Sample script to download: ndvi-tgrass-v2.py

Task

Compare ndvi-tgrass-v1.py / ndvi-tgrass-v2.py processing time