Mercurial > repos > astroteam > analyse_short_astro_text_astro_tool
diff pipeline_ra_dec.py @ 0:a35056104c2c draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit da42ae0d18f550dec7f6d7e29d297e7cf1909df2
author | astroteam |
---|---|
date | Fri, 13 Jun 2025 13:26:36 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pipeline_ra_dec.py Fri Jun 13 13:26:36 2025 +0000 @@ -0,0 +1,343 @@ +from astropy.coordinates import SkyCoord +from astropy import units as u +import pandas as pd +import numpy as np +import re + + +def split_text_in_phrases(atel_, text_): + list_proto_phrases = re.split(r"(\. [a-z])", text_) + for i in range(1, len(list_proto_phrases) - 1, 2): + back_ = list_proto_phrases[i][0] + front_ = list_proto_phrases[i][-1] + list_proto_phrases[i+1] = front_ + list_proto_phrases[i+1] + list_proto_phrases[i-1] = list_proto_phrases[i-1] + back_ + + list_phrases = [] + for i in range(0, len(list_proto_phrases), 2): + list_phrases.append(list_proto_phrases[i]) + + text_check = " ".join(list_phrases) + if text_check != text_: + print(atel_) + + return list_phrases + + +def create_pattern_list(): + pattern_list_ra = [] + pattern_list_dec = [] + pattern_list_ra_dec = [] + pattern_list_table = [] + ra_text = "r(\\.|)a(\\.|\\:|)" + dec_text = "dec(l|)(\\.|)" + + units_minutes = "(\\'|m|\\:|\\' |m |\\: |)" + # units_minutes_mix_all = "(\\'|m|\\:| )" + units_seconds = '(\\"|s|\\:|\\" |s |\\: |)' + # units_seconds_mix_all = '(\\"|s|\\:| |)' + + degree_ = "((deg)|d|)" + assignment_char = "(( |)(=|\\:)( |))" + + units_dec_min = "\\'m" + units_dec_sec = '\\"s' + units_dec_deg = "dego" + ra_value = f"([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}hdeg]{{2,}})" + dec_value = f"(\\+|-|)([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}{units_dec_deg}]{{2,}})" + ra_value_deg = "(\\d{1,3}(\\.|)\\d{0,})" + dec_value_deg = "(\\+|-|)(\\d{1,2}(\\.|)\\d{0,})" + + ra_dec = f"({ra_text},( |){dec_text})" + + # GOOD + pattern_list_ra += [f"\\b({ra_text}{assignment_char}{ra_value})\\b"] + pattern_list_dec += [f"\\b({dec_text}{assignment_char}{dec_value})\\b"] + + pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))" + pattern_list_ra += [f"({ra_text}{pattern_J2000}{assignment_char}{ra_value})"] + pattern_list_dec += [f"({dec_text}{pattern_J2000}{assignment_char}{dec_value})"] + + # TABLES + pattern_list_table += ["([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})"] + pattern_list_table += ["([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})"] + pattern_list_table += [f"([0-9]{{1,2}}(h)[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{{1,2}}(d|(deg))[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})"] + + # PAIRS + pattern_list_ra_dec += [fr"\(j2000 {ra_dec}\){assignment_char}\({ra_value_deg}( |)(,|)( |){dec_value_deg}\)( |){degree_}"] + pattern_list_ra_dec += [fr"\({ra_dec}( |)(j|)2000(.0|)\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"] + pattern_list_ra_dec += [fr"\({ra_dec} {ra_value}( |)(,)( |){dec_value}\)"] + pattern_list_ra_dec += [fr"({ra_text}( |)\((j|)2000(.0|)\) {ra_value}), ({dec_text}( |)\((j|)2000(.0|)\) {dec_value})"] + + pattern_list_ra_dec += [f"\\b({ra_text} {ra_value}(, | |; |,|;){dec_text} {dec_value})\\b"] + pattern_list_ra_dec += [f"\\b({ra_text}{assignment_char}{ra_value})( )({dec_text}{assignment_char}{dec_value})\\b"] + + pattern_list_ra_dec += [f"\\b{ra_dec}{assignment_char}{ra_value}( |)(,|)( |){dec_value}\\b"] + pattern_list_ra_dec += [fr"\({ra_dec}\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"] + + pattern_list_ra_dec += [fr"({ra_text}(\\/|,|, ){dec_text}( |)(\((j|)2000(.0|)\)|){assignment_char}{ra_value}( |,|, ){dec_value})"] + + pattern_list_ra_dec += [f"({ra_text}( and ){dec_text} {ra_value}( and ){dec_value})"] + + return pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table + + +def ra_dec_detector(text_id, text_id_text): + pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table = create_pattern_list() + + text_id_text = " ".join(text_id_text.split()).replace("°", "o").replace("º", "o").replace("−", "-").replace('°', "o") + list_phrases = split_text_in_phrases(text_id, text_id_text.lower()) + + dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []} + + for phrase_ in list_phrases: + for pattern_ in pattern_list_ra_dec + pattern_list_ra + pattern_list_dec + pattern_list_table: + for m in re.finditer(pattern_, phrase_.lower()): + pos_ = m.group(0) + start, end = m.span() + + dict_data["TEXT_ID"].append(text_id) + dict_data["Start"].append(start) + dict_data["End"].append(end) + dict_data["Positions"].append(pos_) + dict_data["Phrase"].append(phrase_) + + df_data = pd.DataFrame(dict_data) + return df_data + + +def merge_ra_dec(text_id, df_init): + df_init.drop_duplicates(inplace=True) + dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []} + phrases_, c = np.unique(df_init["Phrase"].values, return_counts=True) + + for p_n, phrase_ in enumerate(phrases_): + df_tmp0 = df_init[df_init["Phrase"] == phrase_] + if len(df_tmp0) > 1: + df_tmp = df_tmp0.sort_values("Start") + start_ = df_tmp["Start"].values + end_ = df_tmp["End"].values + for i in range(1, len(start_)): + if start_[i] <= end_[i-1]: + start_[i] = start_[i-1] + max_ = max(end_[i-1], end_[i]) + end_[i-1], end_[i] = max_, max_ + end_[i-1] = -1 + start_[i-1] = -1 + + for s_i, e_i in zip(start_, end_): + if s_i != -1: + dict_data["TEXT_ID"] += [text_id] + dict_data["Start"] += [s_i] + dict_data["End"] += [e_i] + dict_data["Positions"] += [phrase_[s_i: e_i]] + dict_data["Phrase"] += [phrase_] + + df_data = pd.DataFrame(dict_data) + df_data.drop_duplicates(inplace=True) + return df_data + + +def clean_ra(ra, ra_text, pattern_J2000): + ra_new = " ".join(str(ra).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-") + + ra_new = re.sub(f"{ra_text}{pattern_J2000}", "", ra_new) + ra_new = re.sub(f"{ra_text}", "", ra_new) + + ra_new = re.sub(r"[^0-9+-\.deg]", ":", ra_new) + + while len(ra_new) > 1 and (ra_new[-1] in [":", "."]): + ra_new = ra_new[:-1] + + while len(ra_new) > 1 and (ra_new[0] in [":", "."]): + ra_new = ra_new[1:] + + result = re.match("(\\+|)[0-9]{1,2}[:]{1,2}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", ra_new) + if result: + if result.group(0) == ra_new: + ra_new = ra_new.replace("::", ":") + ra_new = ra_new.replace(":.", ".") + + # Remove some incorect pos + result = re.match("(\\+|)[0-9]{4,}(\\.|)([0-9]){0,}", ra_new) + if result: + if result.group(0) == ra_new: + ra_new = ":" + + ra_new = ra_new.replace(":deg", " deg") + + return ra_new + + +def clean_dec(dec, dec_text, pattern_J2000): + dec_new = " ".join(str(dec).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-").replace("--", "-") + + dec_new = re.sub(f"{dec_text}{pattern_J2000}", "", dec_new) + dec_new = re.sub(f"{dec_text}", "", dec_new) + + dec_new = re.sub(r"[^0-9+-\.deg]", ":", dec_new) + + while len(dec_new) != 1 and (dec_new[-1] in [":", "."]): + dec_new = dec_new[:-1] + + while len(dec_new) != 1 and (dec_new[0] in [":", "."]): + dec_new = dec_new[1:] + + result = re.match("(\\+|\\-|)[0-9]{1,2}(deg|d|:)[:]{0,1}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", dec_new) + if result: + if result.group(0) == dec_new: + dec_new = dec_new.replace("deg:", ":") + dec_new = dec_new.replace("deg", ":") + dec_new = dec_new.replace("d:", ":") + dec_new = dec_new.replace("d", ":") + dec_new = dec_new.replace("::", ":") + dec_new = dec_new.replace(":.", ".") + + dec_new = dec_new.replace(":deg", " deg") + + # Remove some incorect pos + result = re.match("(\\+|\\-|)[0-9]{4,}(\\.|)([0-9]){0,}", dec_new) + if result: + if result.group(0) == dec_new: + dec_new = ":" + + return dec_new + + +def clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000): + ra_dec_n = re.sub(f"{pattern_J2000}", "", ra_dec) + ra_dec_n = re.sub(f"{ra_text}", "", ra_dec_n) + ra_dec_n = re.sub(f"{dec_text}", "", ra_dec_n) + if ra_dec_n[-1] == "o": + ra_dec_n = ra_dec_n[:-1] + ra_dec_n = re.sub("(o)", "d", ra_dec_n) + ra_dec_n = re.sub("('')", "", ra_dec_n) + ra_dec_n = re.sub("(')", "m", ra_dec_n) + ra_dec_n = re.sub(r"[^0-9+-\.hmd\s:]", "", ra_dec_n) + + ra_dec_n = re.sub("[,]", "", ra_dec_n) + while len(ra_dec_n) != 1 and (ra_dec_n[-1] in [":", ".", " "]): + ra_dec_n = ra_dec_n[:-1] + + while len(ra_dec_n) != 1 and (ra_dec_n[0] in [":", ".", " "]): + ra_dec_n = ra_dec_n[1:] + + return ra_dec_n + + +def astropy_test(df_init): + ra_text = "(r(\\.|)a(\\.|\\:|))" + dec_text = "(dec(l|)(\\.|\\:|))" + + # ra_dec_pattern = f"({ra_text},( |){dec_text})" + pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))" + + rest_ra_dec = [] + counter_rest = 0 + counter_ra_dec_try = 0 + good_ra_dec = [] + + for text_ in list(set(df_init.Phrase)): + df_tmp1 = df_init[df_init.Phrase == text_] + ra_values = [] + dec_values = [] + + ra_start = [] + dec_start = [] + ra_end = [] + dec_end = [] + + df_tmp1.sort_values("Start") + + for ra_dec, s_, e_ in zip(df_tmp1.Positions, df_tmp1.Start, df_tmp1.End): + try: + ra_dec_n = ra_dec.replace("|", " ") + ra_dec_n = ra_dec_n.replace(",", " ") + cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg)) + good_ra_dec.append(cc) + except ValueError: + ra_s = re.findall(ra_text, ra_dec) + dec_s = re.findall(dec_text, ra_dec) + if len(ra_s) > 0 and len(dec_s) > 0: + counter_ra_dec_try += 1 + ra_dec_n = clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000) + try: + cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg)) + good_ra_dec.append(cc) + except ValueError: + rest_ra_dec.append(ra_dec) + + elif len(ra_s) > 0: + ra_values.append(ra_dec) + ra_start.append(s_) + ra_end.append(e_) + + elif len(dec_s) > 0: + dec_values.append(ra_dec) + dec_start.append(s_) + dec_end.append(e_) + + else: + counter_rest += 1 + + if len(ra_values) < len(dec_values): + + for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end): + min_diff = 1000 + dec_pair = "" + for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end): + diff_ = s_dec - e_ra + if diff_ < min_diff: + min_diff = diff_ + dec_pair = dec_ + + c_ra = clean_ra(ra_, ra_text, pattern_J2000) + c_dec = clean_dec(dec_pair, dec_text, pattern_J2000) + try: + cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg)) + good_ra_dec.append(cc) + except ValueError: + try: + cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg)) + good_ra_dec.append(cc) + except ValueError: + rest_ra_dec.append(f"{ra_}|{dec_pair}") + else: + + for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end): + min_diff = 1000 + ra_pair = "" + for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end): + diff_ = s_dec - e_ra + if diff_ < min_diff: + min_diff = diff_ + ra_pair = ra_ + + c_ra = clean_ra(ra_pair, ra_text, pattern_J2000) + c_dec = clean_dec(dec_, dec_text, pattern_J2000) + try: + cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg)) + good_ra_dec.append(cc) + except ValueError: + try: + cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg)) + good_ra_dec.append(cc) + except ValueError: + rest_ra_dec.append(f"{ra_pair}|{dec_}") + + return good_ra_dec + + +def rule_based_ra_dec_detector(text_id, text_id_text): + df_init = ra_dec_detector(text_id, text_id_text) + df_final = merge_ra_dec(text_id, df_init) + good_ra_dec = astropy_test(df_final) + print(good_ra_dec) + dict_out = {"TEXT_ID": [], "RA": [], "Dec": [], "Main ID Name": []} + for ra_dec in good_ra_dec: + dict_out["TEXT_ID"].append(text_id) + dict_out["Main ID Name"].append("NoName") + dict_out["RA"].append(ra_dec.ra.deg) + dict_out["Dec"].append(ra_dec.dec.deg) + + return pd.DataFrame(dict_out)