Mercurial > repos > astroteam > analyse_short_astro_text_astro_tool
view pipeline_ra_dec.py @ 0:a35056104c2c draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit da42ae0d18f550dec7f6d7e29d297e7cf1909df2
author | astroteam |
---|---|
date | Fri, 13 Jun 2025 13:26:36 +0000 |
parents | |
children |
line wrap: on
line source
from astropy.coordinates import SkyCoord from astropy import units as u import pandas as pd import numpy as np import re def split_text_in_phrases(atel_, text_): list_proto_phrases = re.split(r"(\. [a-z])", text_) for i in range(1, len(list_proto_phrases) - 1, 2): back_ = list_proto_phrases[i][0] front_ = list_proto_phrases[i][-1] list_proto_phrases[i+1] = front_ + list_proto_phrases[i+1] list_proto_phrases[i-1] = list_proto_phrases[i-1] + back_ list_phrases = [] for i in range(0, len(list_proto_phrases), 2): list_phrases.append(list_proto_phrases[i]) text_check = " ".join(list_phrases) if text_check != text_: print(atel_) return list_phrases def create_pattern_list(): pattern_list_ra = [] pattern_list_dec = [] pattern_list_ra_dec = [] pattern_list_table = [] ra_text = "r(\\.|)a(\\.|\\:|)" dec_text = "dec(l|)(\\.|)" units_minutes = "(\\'|m|\\:|\\' |m |\\: |)" # units_minutes_mix_all = "(\\'|m|\\:| )" units_seconds = '(\\"|s|\\:|\\" |s |\\: |)' # units_seconds_mix_all = '(\\"|s|\\:| |)' degree_ = "((deg)|d|)" assignment_char = "(( |)(=|\\:)( |))" units_dec_min = "\\'m" units_dec_sec = '\\"s' units_dec_deg = "dego" ra_value = f"([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}hdeg]{{2,}})" dec_value = f"(\\+|-|)([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}{units_dec_deg}]{{2,}})" ra_value_deg = "(\\d{1,3}(\\.|)\\d{0,})" dec_value_deg = "(\\+|-|)(\\d{1,2}(\\.|)\\d{0,})" ra_dec = f"({ra_text},( |){dec_text})" # GOOD pattern_list_ra += [f"\\b({ra_text}{assignment_char}{ra_value})\\b"] pattern_list_dec += [f"\\b({dec_text}{assignment_char}{dec_value})\\b"] pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))" pattern_list_ra += [f"({ra_text}{pattern_J2000}{assignment_char}{ra_value})"] pattern_list_dec += [f"({dec_text}{pattern_J2000}{assignment_char}{dec_value})"] # TABLES pattern_list_table += ["([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})"] pattern_list_table += ["([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})"] pattern_list_table += [f"([0-9]{{1,2}}(h)[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{{1,2}}(d|(deg))[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})"] # PAIRS pattern_list_ra_dec += [fr"\(j2000 {ra_dec}\){assignment_char}\({ra_value_deg}( |)(,|)( |){dec_value_deg}\)( |){degree_}"] pattern_list_ra_dec += [fr"\({ra_dec}( |)(j|)2000(.0|)\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"] pattern_list_ra_dec += [fr"\({ra_dec} {ra_value}( |)(,)( |){dec_value}\)"] pattern_list_ra_dec += [fr"({ra_text}( |)\((j|)2000(.0|)\) {ra_value}), ({dec_text}( |)\((j|)2000(.0|)\) {dec_value})"] pattern_list_ra_dec += [f"\\b({ra_text} {ra_value}(, | |; |,|;){dec_text} {dec_value})\\b"] pattern_list_ra_dec += [f"\\b({ra_text}{assignment_char}{ra_value})( )({dec_text}{assignment_char}{dec_value})\\b"] pattern_list_ra_dec += [f"\\b{ra_dec}{assignment_char}{ra_value}( |)(,|)( |){dec_value}\\b"] pattern_list_ra_dec += [fr"\({ra_dec}\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"] pattern_list_ra_dec += [fr"({ra_text}(\\/|,|, ){dec_text}( |)(\((j|)2000(.0|)\)|){assignment_char}{ra_value}( |,|, ){dec_value})"] pattern_list_ra_dec += [f"({ra_text}( and ){dec_text} {ra_value}( and ){dec_value})"] return pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table def ra_dec_detector(text_id, text_id_text): pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table = create_pattern_list() text_id_text = " ".join(text_id_text.split()).replace("°", "o").replace("º", "o").replace("−", "-").replace('°', "o") list_phrases = split_text_in_phrases(text_id, text_id_text.lower()) dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []} for phrase_ in list_phrases: for pattern_ in pattern_list_ra_dec + pattern_list_ra + pattern_list_dec + pattern_list_table: for m in re.finditer(pattern_, phrase_.lower()): pos_ = m.group(0) start, end = m.span() dict_data["TEXT_ID"].append(text_id) dict_data["Start"].append(start) dict_data["End"].append(end) dict_data["Positions"].append(pos_) dict_data["Phrase"].append(phrase_) df_data = pd.DataFrame(dict_data) return df_data def merge_ra_dec(text_id, df_init): df_init.drop_duplicates(inplace=True) dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []} phrases_, c = np.unique(df_init["Phrase"].values, return_counts=True) for p_n, phrase_ in enumerate(phrases_): df_tmp0 = df_init[df_init["Phrase"] == phrase_] if len(df_tmp0) > 1: df_tmp = df_tmp0.sort_values("Start") start_ = df_tmp["Start"].values end_ = df_tmp["End"].values for i in range(1, len(start_)): if start_[i] <= end_[i-1]: start_[i] = start_[i-1] max_ = max(end_[i-1], end_[i]) end_[i-1], end_[i] = max_, max_ end_[i-1] = -1 start_[i-1] = -1 for s_i, e_i in zip(start_, end_): if s_i != -1: dict_data["TEXT_ID"] += [text_id] dict_data["Start"] += [s_i] dict_data["End"] += [e_i] dict_data["Positions"] += [phrase_[s_i: e_i]] dict_data["Phrase"] += [phrase_] df_data = pd.DataFrame(dict_data) df_data.drop_duplicates(inplace=True) return df_data def clean_ra(ra, ra_text, pattern_J2000): ra_new = " ".join(str(ra).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-") ra_new = re.sub(f"{ra_text}{pattern_J2000}", "", ra_new) ra_new = re.sub(f"{ra_text}", "", ra_new) ra_new = re.sub(r"[^0-9+-\.deg]", ":", ra_new) while len(ra_new) > 1 and (ra_new[-1] in [":", "."]): ra_new = ra_new[:-1] while len(ra_new) > 1 and (ra_new[0] in [":", "."]): ra_new = ra_new[1:] result = re.match("(\\+|)[0-9]{1,2}[:]{1,2}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", ra_new) if result: if result.group(0) == ra_new: ra_new = ra_new.replace("::", ":") ra_new = ra_new.replace(":.", ".") # Remove some incorect pos result = re.match("(\\+|)[0-9]{4,}(\\.|)([0-9]){0,}", ra_new) if result: if result.group(0) == ra_new: ra_new = ":" ra_new = ra_new.replace(":deg", " deg") return ra_new def clean_dec(dec, dec_text, pattern_J2000): dec_new = " ".join(str(dec).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-").replace("--", "-") dec_new = re.sub(f"{dec_text}{pattern_J2000}", "", dec_new) dec_new = re.sub(f"{dec_text}", "", dec_new) dec_new = re.sub(r"[^0-9+-\.deg]", ":", dec_new) while len(dec_new) != 1 and (dec_new[-1] in [":", "."]): dec_new = dec_new[:-1] while len(dec_new) != 1 and (dec_new[0] in [":", "."]): dec_new = dec_new[1:] result = re.match("(\\+|\\-|)[0-9]{1,2}(deg|d|:)[:]{0,1}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", dec_new) if result: if result.group(0) == dec_new: dec_new = dec_new.replace("deg:", ":") dec_new = dec_new.replace("deg", ":") dec_new = dec_new.replace("d:", ":") dec_new = dec_new.replace("d", ":") dec_new = dec_new.replace("::", ":") dec_new = dec_new.replace(":.", ".") dec_new = dec_new.replace(":deg", " deg") # Remove some incorect pos result = re.match("(\\+|\\-|)[0-9]{4,}(\\.|)([0-9]){0,}", dec_new) if result: if result.group(0) == dec_new: dec_new = ":" return dec_new def clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000): ra_dec_n = re.sub(f"{pattern_J2000}", "", ra_dec) ra_dec_n = re.sub(f"{ra_text}", "", ra_dec_n) ra_dec_n = re.sub(f"{dec_text}", "", ra_dec_n) if ra_dec_n[-1] == "o": ra_dec_n = ra_dec_n[:-1] ra_dec_n = re.sub("(o)", "d", ra_dec_n) ra_dec_n = re.sub("('')", "", ra_dec_n) ra_dec_n = re.sub("(')", "m", ra_dec_n) ra_dec_n = re.sub(r"[^0-9+-\.hmd\s:]", "", ra_dec_n) ra_dec_n = re.sub("[,]", "", ra_dec_n) while len(ra_dec_n) != 1 and (ra_dec_n[-1] in [":", ".", " "]): ra_dec_n = ra_dec_n[:-1] while len(ra_dec_n) != 1 and (ra_dec_n[0] in [":", ".", " "]): ra_dec_n = ra_dec_n[1:] return ra_dec_n def astropy_test(df_init): ra_text = "(r(\\.|)a(\\.|\\:|))" dec_text = "(dec(l|)(\\.|\\:|))" # ra_dec_pattern = f"({ra_text},( |){dec_text})" pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))" rest_ra_dec = [] counter_rest = 0 counter_ra_dec_try = 0 good_ra_dec = [] for text_ in list(set(df_init.Phrase)): df_tmp1 = df_init[df_init.Phrase == text_] ra_values = [] dec_values = [] ra_start = [] dec_start = [] ra_end = [] dec_end = [] df_tmp1.sort_values("Start") for ra_dec, s_, e_ in zip(df_tmp1.Positions, df_tmp1.Start, df_tmp1.End): try: ra_dec_n = ra_dec.replace("|", " ") ra_dec_n = ra_dec_n.replace(",", " ") cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg)) good_ra_dec.append(cc) except ValueError: ra_s = re.findall(ra_text, ra_dec) dec_s = re.findall(dec_text, ra_dec) if len(ra_s) > 0 and len(dec_s) > 0: counter_ra_dec_try += 1 ra_dec_n = clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000) try: cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg)) good_ra_dec.append(cc) except ValueError: rest_ra_dec.append(ra_dec) elif len(ra_s) > 0: ra_values.append(ra_dec) ra_start.append(s_) ra_end.append(e_) elif len(dec_s) > 0: dec_values.append(ra_dec) dec_start.append(s_) dec_end.append(e_) else: counter_rest += 1 if len(ra_values) < len(dec_values): for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end): min_diff = 1000 dec_pair = "" for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end): diff_ = s_dec - e_ra if diff_ < min_diff: min_diff = diff_ dec_pair = dec_ c_ra = clean_ra(ra_, ra_text, pattern_J2000) c_dec = clean_dec(dec_pair, dec_text, pattern_J2000) try: cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg)) good_ra_dec.append(cc) except ValueError: try: cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg)) good_ra_dec.append(cc) except ValueError: rest_ra_dec.append(f"{ra_}|{dec_pair}") else: for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end): min_diff = 1000 ra_pair = "" for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end): diff_ = s_dec - e_ra if diff_ < min_diff: min_diff = diff_ ra_pair = ra_ c_ra = clean_ra(ra_pair, ra_text, pattern_J2000) c_dec = clean_dec(dec_, dec_text, pattern_J2000) try: cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg)) good_ra_dec.append(cc) except ValueError: try: cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg)) good_ra_dec.append(cc) except ValueError: rest_ra_dec.append(f"{ra_pair}|{dec_}") return good_ra_dec def rule_based_ra_dec_detector(text_id, text_id_text): df_init = ra_dec_detector(text_id, text_id_text) df_final = merge_ra_dec(text_id, df_init) good_ra_dec = astropy_test(df_final) print(good_ra_dec) dict_out = {"TEXT_ID": [], "RA": [], "Dec": [], "Main ID Name": []} for ra_dec in good_ra_dec: dict_out["TEXT_ID"].append(text_id) dict_out["Main ID Name"].append("NoName") dict_out["RA"].append(ra_dec.ra.deg) dict_out["Dec"].append(ra_dec.dec.deg) return pd.DataFrame(dict_out)