Mercurial > repos > ufz > omero_metadata_import
diff omero_roi_upload.py @ 1:588d6fa22fc4 draft
planemo upload for repository https://github.com/Helmholtz-UFZ/galaxy-tools/tree/main/tools/omero commit 5b1b30d409355cee98485815c1dd4ac48649bcc1
author | ufz |
---|---|
date | Thu, 05 Sep 2024 11:55:55 +0000 |
parents | |
children | e41f70e69349 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/omero_roi_upload.py Thu Sep 05 11:55:55 2024 +0000 @@ -0,0 +1,165 @@ +import argparse +import re + +import pandas as pd +from ezomero import connect, post_roi +from ezomero.rois import Ellipse, Label, Line, Point, Polygon, Polyline, Rectangle + + +def parse_color(color_str): + if not color_str: + return None + return tuple(map(int, re.findall(r'\d+', color_str))) + + +def parse_points(points_str): + if not points_str: + return None + # Remove leading and trailing brackets and split into individual points + points_str = points_str.strip("[]") + points = points_str.split("),(") + points = [point.strip("()") for point in points] # Remove any remaining parentheses + return [tuple(map(float, point.split(','))) for point in points] + + +def create_shape(row): + shape_type = row['shape'] + shape = None + + if shape_type == 'Ellipse': + shape = Ellipse( + x=row['x'], + y=row['y'], + x_rad=row['x_rad'], + y_rad=row['y_rad'], + z=row.get('z'), + c=row.get('c'), + t=row.get('t'), + fill_color=parse_color(row.get('fill_color')), + stroke_color=parse_color(row.get('stroke_color')), + stroke_width=row.get('stroke_width') + ) + elif shape_type == 'Label': + shape = Label( + x=row['x'], + y=row['y'], + label=row['label'], + fontSize=row['fontSize'], + z=row.get('z'), + c=row.get('c'), + t=row.get('t'), + fill_color=parse_color(row.get('fill_color')), + stroke_color=parse_color(row.get('stroke_color')), + stroke_width=row.get('stroke_width') + ) + elif shape_type == 'Line': + shape = Line( + x1=row['x1'], + y1=row['y1'], + x2=row['x2'], + y2=row['y2'], + markerStart=row.get('markerStart', None), + markerEnd=row.get('markerEnd', None), + label=row.get('label', None), + z=row.get('z'), + c=row.get('c'), + t=row.get('t'), + fill_color=parse_color(row.get('fill_color')), + stroke_color=parse_color(row.get('stroke_color')), + stroke_width=row.get('stroke_width') + ) + elif shape_type == 'Point': + shape = Point( + x=row['x'], + y=row['y'], + z=row.get('z'), + c=row.get('c'), + t=row.get('t'), + fill_color=parse_color(row.get('fill_color')), + stroke_color=parse_color(row.get('stroke_color')), + stroke_width=row.get('stroke_width') + ) + elif shape_type == 'Polygon': + shape = Polygon( + points=parse_points(row['points']), + z=row.get('z'), + c=row.get('c'), + t=row.get('t'), + fill_color=parse_color(row.get('fill_color')), + stroke_color=parse_color(row.get('stroke_color')), + stroke_width=row.get('stroke_width') + ) + elif shape_type == 'Polyline': + shape = Polyline( + points=parse_points(row['points']), + z=row.get('z'), + c=row.get('c'), + t=row.get('t'), + fill_color=parse_color(row.get('fill_color')), + stroke_color=parse_color(row.get('stroke_color')), + stroke_width=row.get('stroke_width') + ) + elif shape_type == 'Rectangle': + shape = Rectangle( + x=row['x'], + y=row['y'], + width=row['width'], + height=row['height'], + z=row.get('z'), + c=row.get('c'), + t=row.get('t'), + fill_color=parse_color(row.get('fill_color')), + stroke_color=parse_color(row.get('stroke_color')), + stroke_width=row.get('stroke_width') + ) + return shape + + +def main(input_file, conn, image_id, log_file): + # Open log file + with open(log_file, 'w') as log: + df = pd.read_csv(input_file, sep='\t') + for index, row in df.iterrows(): + msg = f"Processing row {index + 1}/{len(df)}: {row.to_dict()}" + print(msg) + log.write(msg + "\n") + shape = create_shape(row) + if shape: + roi_name = row['roi_name'] if 'roi_name' in row else None + roi_description = row['roi_description'] if 'roi_description' in row else None + roi_id = post_roi(conn, image_id, [shape], name=roi_name, description=roi_description) + msg = f"ROI ID: {roi_id} for row {index + 1}" + print(msg) + log.write(msg + "\n") + else: + msg = f"Skipping row {index + 1}: Unable to create shape" + print(msg) + log.write(msg + "\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Create shapes from a tabular file and optionally post them as an ROI to OMERO.") + parser.add_argument("--input_file", help="Path to the input tabular file.") + parser.add_argument("--image_id", type=int, required=True, help="ID of the image to which the ROI will be linked") + parser.add_argument("--host", type=str, required=True, help="OMERO server host") + parser.add_argument("--user", type=str, required=True, help="OMERO username") + parser.add_argument("--psw", type=str, required=True, help="OMERO password") + parser.add_argument("--port", type=int, default=4064, help="OMERO server port") + parser.add_argument("--log_file", type=str, default="process.txt", help="Log file path") + + args = parser.parse_args() + + conn = connect( + host=args.host, + user=args.user, + password=args.psw, + port=args.port, + group="", + secure=True + ) + + try: + main(args.input_file, conn, args.image_id, args.log_file) + finally: + conn.close()