Mercurial > repos > ecology > landcover_subindicator
view land_cover.py @ 0:828c02027180 draft default tip
planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/earth commit 4cbace8c3a71b7a1638cfd44a21f5d4b84d2fd15
author | ecology |
---|---|
date | Mon, 21 Oct 2024 16:47:10 +0000 |
parents | |
children |
line wrap: on
line source
# Land Cover TE Algorithm Without GEE # How to Execute: # - Load the two images (.TIFF) of the rasters # to be processed in the "/data/land_cover/input" folder # - Setting the two filenames in the specific cell of this script (0) # - Run all cells of the script # - Check the script results in the "/data/land_cover/output" folder # - Land Cover Degradation "/data/land_cover/output/lc_dg.tiff" # - Land Cover Transition "/data/land_cover/output/lc_tr.tiff" # Librairies import import argparse import json import os import sys import matplotlib.pyplot as plt import numpy as np import rasterio from rasterio.plot import show from te_schemas.land_cover import LCLegendNesting from te_schemas.land_cover import LCTransitionDefinitionDeg # Methods to manage Rasters def remap(raster, problem_numbers, alternative_numbers): n_min, n_max = raster.min(), raster.max() replacer = np.arange(n_min, n_max + 1) mask = (problem_numbers >= n_min) & (problem_numbers <= n_max) replacer[problem_numbers[mask] - n_min] = alternative_numbers[mask] raster_replaced = replacer[raster - n_min] return raster_replaced def saveRaster(dataset, datasetPath, cols, rows, projection, namedataset=None): if namedataset: rasterSet = rasterio.open(datasetPath, 'w', driver='GTiff', height=rows, width=cols, count=1, dtype=np.int8, crs=projection, transform=transform, ) rasterSet.write(dataset, 1) rasterSet.close() else: rasterSet = rasterio.open(datasetPath, 'w', driver='GTiff', height=rows, width=cols, count=1, dtype=np.int8, crs=projection, transform=transform, ) rasterSet.write(dataset, 1) rasterSet.set_band_description(1, namedataset) rasterSet.close() def plot(ndviImage, cmap): src = rasterio.open(ndviImage, 'r') fig, ax = plt.subplots(1, figsize=(12, 12)) show(src, cmap=cmap, ax=ax) ax.set_xlabel('Est') ax.set_ylabel('Nord') plt.show() def plotContour(ndviImage, cmap): src = rasterio.open(ndviImage, 'r') fig, ax = plt.subplots(1, figsize=(12, 12)) show(src, cmap=cmap, contour=True, ax=ax) ax.set_xlabel('Est') ax.set_ylabel('Nord') plt.show() # Setting inputs command_line_args = sys.argv[1:] parser = argparse.ArgumentParser(description="landcover inputs and outputs") # Add arguments parser.add_argument("--raster_1", help="raster 1") parser.add_argument("--raster_2", help="raster 2") parser.add_argument("--json", help="json") args = parser.parse_args(command_line_args) # Parse the command line arguments # Import data path_raster_yi = args.raster_1 path_raster_yf = args.raster_2 fjson = args.json # Input Rasters # Outputs out_dir = os.path.join(os.getcwd(), "data/land_cover/output/") if not os.path.exists(out_dir): os.makedirs(out_dir) # Output Rasters path_lc_tr = os.path.join(out_dir, 'lc_tr.tiff') path_lc_bl = os.path.join(out_dir, 'lc_bl.tiff') path_lc_dg = os.path.join(out_dir, 'lc_dg.tiff') path_lc_tg = os.path.join(out_dir, 'lc_tg.tiff') path_change_yf_yi = os.path.join(out_dir, 'change_yf_yi.tiff') path_lc_baseline_esa = os.path.join(out_dir, 'lc_baseline_esa.tiff') path_lc_target_esa = os.path.join(out_dir, 'lc_target_esa.tiff') path_out_img = os.path.join(out_dir, 'stack.tiff') # Contours contours_change_yf_yi = os.path.join(out_dir, '/change_yf_yi0.shp') # Parsing Inputs # Load static inputs # Transition Matrix, ESA Legend, IPCC Legend # Input Raster and Vector Paths params = json.load(open(fjson)) crs = params.get("crs") trans_matrix_val = params.get("trans_matrix") trans_matrix = LCTransitionDefinitionDeg.Schema().load(trans_matrix_val) esa_to_custom_nesting = LCLegendNesting.Schema().load( params.get("legend_nesting_esa_to_custom") ) ipcc_nesting = LCLegendNesting.Schema().load( params.get("legend_nesting_custom_to_ipcc") ) class_codes = sorted([c.code for c in esa_to_custom_nesting.parent.key]) class_positions = [*range(1, len(class_codes) + 1)] # Load dynamic inputs # Baseline ESA Raster raster_yi = rasterio.open(path_raster_yi) lc_baseline_esa = raster_yi.read(1) yi_dict_profile = dict(raster_yi.profile) for k in yi_dict_profile: print(k.upper(), yi_dict_profile[k]) # Target ESA Raster raster_yf = rasterio.open(path_raster_yf) lc_target_esa = raster_yf.read(1) yf_dict_profile = dict(raster_yf.profile) for k in yf_dict_profile: print(k.upper(), yf_dict_profile[k]) cols = raster_yi.width rows = raster_yf.height transform = raster_yi.transform projection = raster_yi.crs # Setting up output saveRaster(lc_baseline_esa.astype('int8'), path_lc_baseline_esa, cols, rows, projection, "Land_cover_yi") saveRaster(lc_target_esa.astype('int8'), path_lc_target_esa, cols, rows, projection, "Land_cover_yf") # Algorithm execution # Transition codes are based on the class code indices # (i.e. their order when sorted by class code) - # not the class codes themselves. # So need to reclass the land cover used for the transition calculations # from the raw class codes to the positional indices of those class codes. # And before doing that, need to reclassified initial and # final layers to the IPCC (or custom) classes. # Processing baseline raster bl_remap_1 = remap(lc_baseline_esa, np.asarray(esa_to_custom_nesting.get_list()[0]), np.asarray(esa_to_custom_nesting.get_list()[1])) lc_bl = remap(bl_remap_1, np.asarray(class_codes), np.asarray(class_positions)) saveRaster(lc_bl.astype('int8'), path_lc_bl, cols, rows, projection, "Land_cover_yi") # Processing Target Raster tg_remap_1 = remap(lc_target_esa, np.asarray(esa_to_custom_nesting.get_list()[0]), np.asarray(esa_to_custom_nesting.get_list()[1])) lc_tg = remap(tg_remap_1, np.asarray(class_codes), np.asarray(class_positions)) saveRaster(lc_tg.astype('int8'), path_lc_tg, cols, rows, projection, "Land_cover_yf") # Processing Transition Map # Compute transition map (first digit for baseline land cover, # and second digit for target year land cover) lc_tr = (lc_bl * esa_to_custom_nesting.parent.get_multiplier()) + lc_tg # Compute land cover transition # Remap persistence classes so they are sequential. # This makes it easier to assign a clear color ramp in QGIS. lc_tr_pre_remap = remap(lc_tr, np.asarray(trans_matrix.get_persistence_list()[0]), np.asarray(trans_matrix.get_persistence_list()[1])) saveRaster(lc_tr_pre_remap.astype('int8'), path_lc_tr, cols, rows, projection, "Land_cover_transitions_yi-yf") # Compute land cover degradation # definition of land cover transitions as degradation (-1), # improvement (1), or no relevant change (0) lc_dg = remap(lc_tr, np.asarray(trans_matrix.get_list()[0]), np.asarray(trans_matrix.get_list()[1])) saveRaster(lc_dg.astype('int8'), path_lc_dg, cols, rows, projection, "Land_cover_degradation") # Compute Mutlibands stack # Land Cover Degradation + Baseline ESA # + Target ESA + Land Cover Transition dg_raster = rasterio.open(path_lc_dg, "r") dg = dg_raster.read(1, masked=True) baseline_esa = rasterio.open(path_lc_baseline_esa, "r").read(1, masked=True) target_esa = rasterio.open(path_lc_target_esa, "r").read(1, masked=True) tr = rasterio.open(path_lc_tr, "r").read(1, masked=True) band_list = [dg, lc_baseline_esa, lc_target_esa, tr] out_meta = dg_raster.meta.copy() out_meta.update({"count": 4}) with rasterio.open(path_out_img, 'w', **out_meta) as dest: for band_nr, src in enumerate(band_list, start=1): dest.write(src, band_nr)