Unit 23 - Spatio-temporal parallelizationΒΆ
This unit is focused on improving the script created in Unit 22 - Spatio-temporal scripting by processing Sentinel scenes in parallel. Paralelization is performed by ParallelModuleQueue similarly as in Unit 19 - DSM script parallelization, see line 209.
New PyGRASS functionality is introduced on line 132. By
MultiModule it is possible to define a list of
modules which will work as isolated units not influenced by other
processes running parallel. By setting set_temp_region
computation
region settings will be also isolated from other processes. By
sync=True
all modules will run in serial order in a background
process. Tools are added to the queue (132) with
run_=False
to prevent them from running immediately, see
eg. 72. Queue with calculation tools is processed on line
192.
The option ncproc on line 43 enables controlling number of processes running in parallel.
Compared to ndvi-tgrass-v1.py: NDVI is computed by r.mapcalc (74) instead of combination of r.mask and i.vi. The concept of raster masks created by r.mask would not be functional in the case of parallel computation (the module creates raster map named MASK). The for loop (179) is split into two parts: 1) perform computation in parallel (192) and 2) compute statistics (196).
1#!/usr/bin/env python3
2#
3##############################################################################
4#
5# MODULE: ndvi-tgrass-v2
6#
7# AUTHOR(S): martin
8#
9# PURPOSE: NDVI TGRASS version 2
10#
11# DATE: Sat Feb 3 15:45:35 2018
12#
13##############################################################################
14
15# %module
16# % description: NDVI TGRASS script version 2
17# %end
18# %option G_OPT_STRDS_INPUT
19# % key: b4
20# % description: Name of input 4th band space time raster dataset
21# %end
22# %option G_OPT_STRDS_INPUT
23# % key: b8
24# % description: Name of input 8th band space time raster dataset
25# %end
26# %option G_OPT_STRDS_INPUT
27# % key: mask
28# % description: Name of input mask space time raster dataset
29# %end
30# %option G_OPT_F_OUTPUT
31# %end
32# %option
33# % key: basename
34# % description: Basename for output raster maps
35# % required: yes
36# %end
37# %option
38# % key: min_area
39# % description: Threshold for removing small areas in m2
40# % answer: 1600
41# %end
42# %option
43# % key: nprocs
44# % description: Number of processes
45# % answer: 1
46# % type: integer
47# %end
48
49import sys
50import os
51import atexit
52from subprocess import PIPE
53
54from grass.script import parser, parse_key_val
55from grass.pygrass.modules import Module, MultiModule, ParallelModuleQueue
56
57def cleanup(idx):
58 Module('g.remove', flags='f', name='region_mask' + idx, type='vector')
59 Module('g.remove', flags='f', name='ndvi' + idx, type='raster')
60 Module('g.remove', flags='f', name='ndvi_class' + idx, type='raster')
61 Module('g.remove', flags='f', name='ndvi_class_area' + idx, type='raster')
62 Module('g.remove', flags='f', name='ndvi_class_filled_i' + idx, type='raster')
63 Module('g.remove', flags='f', name='ndvi_class_filled' + idx, type='vector')
64
65def compute(b4, b8, msk, min_area_ha, output, idx, queue):
66 modules = []
67 modules.append(
68 Module("g.region",
69 overwrite=True,
70 raster=msk,
71 align=b4,
72 run_=False)
73 )
74
75 modules.append(
76 Module("r.mask",
77 overwrite=True,
78 maskcats="*",
79 raster=msk,
80 layer="1",
81 run_=False)
82 )
83
84 modules.append(
85 Module("r.mapcalc",
86 overwrite=True,
87 expression="ndvi{idx} = if(isnull({clouds}), null(), float({b8} - {b4}) / ({b8} + {b4}))".format(
88 idx=idx, clouds=msk, b8=b8, b4=b4),
89 run_=False)
90 )
91
92 modules.append(
93 Module("r.recode",
94 overwrite=True,
95 input="ndvi" + idx,
96 output="ndvi_class" + idx,
97 rules="-",
98 stdin_="-1:0.1:1\n0.1:0.5:2\n0.5:1:3",
99 run_=False)
100 )
101
102 modules.append(
103 Module("r.reclass.area",
104 overwrite=True,
105 input="ndvi_class" + idx,
106 output="ndvi_class_area" + idx,
107 value=min_area_ha,
108 mode="greater",
109 method="reclass",
110 run_=False)
111 )
112
113 modules.append(
114 Module("r.grow.distance",
115 overwrite=True,
116 input="ndvi_class_area" + idx,
117 value=output,
118 metric="euclidean",
119 run_=False)
120 )
121
122 modules.append(
123 Module("r.colors",
124 map=output,
125 rules="-",
126 stdin_="1 grey\n2 255 255 0\n3 green",
127 offset=0,
128 scale=1,
129 run_=False)
130 )
131
132 queue.put(MultiModule(modules, sync=False, set_temp_region=True))
133
134def stats(output, date, fd, idx):
135 fd.write('-' * 80)
136 fd.write('\n')
137 fd.write('NDVI class statistics ({0}: {1})'.format(output, date))
138 fd.write('\n')
139 fd.write('-' * 80)
140 fd.write('\n')
141
142 ret = Module('r.stats', input=output, flags='ian', stdout_=PIPE)
143 for line in ret.outputs.stdout.splitlines():
144 # parse line (eg. 1 2737300.000000)
145 data = line.split(' ')
146 cat = data[0]
147 area = float(data[-1])
148 fd.write('NDVI class {0}: {1:.1f} ha\n'.format(cat, area/1e4))
149
150 fd.write('-' * 80)
151 fd.write('\n')
152 # we need integer map
153 Module('r.mapcalc', expression='ndvi_class_filled_i = int({})'.format(output), overwrite=True)
154 Module('r.to.vect', flags='v', input='ndvi_class_filled_i', output='ndvi_class_filled', type='area', overwrite=True)
155
156 Module('v.rast.stats', flags='c', map='ndvi_class_filled', raster='ndvi'+idx,
157 column_prefix='ndvi', method=['minimum','maximum','average'])
158 # v.db.select: don't print column names (-c)
159 ret = Module('v.db.select', flags='c', map='ndvi_class_filled', separator='comma', stdout_=PIPE)
160 for line in ret.outputs.stdout.splitlines():
161 # parse line (eg. 1,,-0.433962264150943,0.740350877192983,0.051388909449992)
162 cat,label,min,max,mean = line.split(',')
163 fd.write('NDVI class {0}: {1:.4f} (min) {2:.4f} (max) {3:.4f} (mean)\n'.format(
164 cat, float(min), float(max), float(mean)))
165
166def main(queue):
167 import grass.temporal as tgis
168
169 tgis.init()
170
171 sp4 = tgis.open_old_stds(options['b4'], 'raster')
172 sp8 = tgis.open_old_stds(options['b8'], 'raster')
173 msk = tgis.open_old_stds(options['mask'], 'raster')
174
175 min_area = int(options['min_area']) / 1e4
176 idx = 1
177 data = []
178 fd = open(options['output'], 'w')
179 for item in sp4.get_registered_maps(columns='name,start_time'):
180 b4 = item[0]
181 date = item[1]
182 b8 = sp8.get_registered_maps(columns='name',
183 where="start_time='{}'".format(date))[0][0]
184 ms = msk.get_registered_maps(columns='name',
185 where="start_time='{}'".format(date))[0][0]
186 output = '{}_{}'.format(options['basename'], idx)
187 compute(b4, b8, ms, min_area, output, str(idx), queue)
188
189 data.append((output, date))
190 idx += 1
191
192 queue.wait()
193
194 idx = 1
195 fd = open(options['output'], 'w')
196 for output, date in data:
197 stats(output, date, fd, str(idx))
198 cleanup(str(idx))
199 idx += 1
200
201 fd.close()
202
203 return 0
204
205if __name__ == "__main__":
206 options, flags=parser()
207
208 # queue for parallel jobs
209 queue = ParallelModuleQueue(int(options['nprocs']))
210
211 sys.exit(main(queue))
Sample script to download: ndvi-tgrass-v2.py
Task
Compare ndvi-tgrass-v1.py / ndvi-tgrass-v2.py processing time