diff pipeline_ra_dec.py @ 0:a35056104c2c draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit da42ae0d18f550dec7f6d7e29d297e7cf1909df2
author astroteam
date Fri, 13 Jun 2025 13:26:36 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pipeline_ra_dec.py	Fri Jun 13 13:26:36 2025 +0000
@@ -0,0 +1,343 @@
+from astropy.coordinates import SkyCoord
+from astropy import units as u
+import pandas as pd
+import numpy as np
+import re
+
+
+def split_text_in_phrases(atel_, text_):
+    list_proto_phrases = re.split(r"(\. [a-z])", text_)
+    for i in range(1, len(list_proto_phrases) - 1, 2):
+        back_ = list_proto_phrases[i][0]
+        front_ = list_proto_phrases[i][-1]
+        list_proto_phrases[i+1] = front_ + list_proto_phrases[i+1]
+        list_proto_phrases[i-1] = list_proto_phrases[i-1] + back_
+
+    list_phrases = []
+    for i in range(0, len(list_proto_phrases), 2):
+        list_phrases.append(list_proto_phrases[i])
+
+    text_check = " ".join(list_phrases)
+    if text_check != text_:
+        print(atel_)
+
+    return list_phrases
+
+
+def create_pattern_list():
+    pattern_list_ra = []
+    pattern_list_dec = []
+    pattern_list_ra_dec = []
+    pattern_list_table = []
+    ra_text = "r(\\.|)a(\\.|\\:|)"
+    dec_text = "dec(l|)(\\.|)"
+
+    units_minutes = "(\\'|m|\\:|\\' |m |\\: |)"
+    # units_minutes_mix_all = "(\\'|m|\\:| )"
+    units_seconds = '(\\"|s|\\:|\\" |s |\\: |)'
+    # units_seconds_mix_all = '(\\"|s|\\:| |)'
+
+    degree_ = "((deg)|d|)"
+    assignment_char = "(( |)(=|\\:)( |))"
+
+    units_dec_min = "\\'m"
+    units_dec_sec = '\\"s'
+    units_dec_deg = "dego"
+    ra_value = f"([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}hdeg]{{2,}})"
+    dec_value = f"(\\+|-|)([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}{units_dec_deg}]{{2,}})"
+    ra_value_deg = "(\\d{1,3}(\\.|)\\d{0,})"
+    dec_value_deg = "(\\+|-|)(\\d{1,2}(\\.|)\\d{0,})"
+
+    ra_dec = f"({ra_text},( |){dec_text})"
+
+    # GOOD
+    pattern_list_ra += [f"\\b({ra_text}{assignment_char}{ra_value})\\b"]
+    pattern_list_dec += [f"\\b({dec_text}{assignment_char}{dec_value})\\b"]
+
+    pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))"
+    pattern_list_ra += [f"({ra_text}{pattern_J2000}{assignment_char}{ra_value})"]
+    pattern_list_dec += [f"({dec_text}{pattern_J2000}{assignment_char}{dec_value})"]
+
+    # TABLES
+    pattern_list_table += ["([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})"]
+    pattern_list_table += ["([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})"]
+    pattern_list_table += [f"([0-9]{{1,2}}(h)[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{{1,2}}(d|(deg))[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})"]
+
+    # PAIRS
+    pattern_list_ra_dec += [fr"\(j2000 {ra_dec}\){assignment_char}\({ra_value_deg}( |)(,|)( |){dec_value_deg}\)( |){degree_}"]
+    pattern_list_ra_dec += [fr"\({ra_dec}( |)(j|)2000(.0|)\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"]
+    pattern_list_ra_dec += [fr"\({ra_dec} {ra_value}( |)(,)( |){dec_value}\)"]
+    pattern_list_ra_dec += [fr"({ra_text}( |)\((j|)2000(.0|)\) {ra_value}), ({dec_text}( |)\((j|)2000(.0|)\) {dec_value})"]
+
+    pattern_list_ra_dec += [f"\\b({ra_text} {ra_value}(, | |; |,|;){dec_text} {dec_value})\\b"]
+    pattern_list_ra_dec += [f"\\b({ra_text}{assignment_char}{ra_value})( )({dec_text}{assignment_char}{dec_value})\\b"]
+
+    pattern_list_ra_dec += [f"\\b{ra_dec}{assignment_char}{ra_value}( |)(,|)( |){dec_value}\\b"]
+    pattern_list_ra_dec += [fr"\({ra_dec}\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"]
+
+    pattern_list_ra_dec += [fr"({ra_text}(\\/|,|, ){dec_text}( |)(\((j|)2000(.0|)\)|){assignment_char}{ra_value}( |,|, ){dec_value})"]
+
+    pattern_list_ra_dec += [f"({ra_text}( and ){dec_text} {ra_value}( and ){dec_value})"]
+
+    return pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table
+
+
+def ra_dec_detector(text_id, text_id_text):
+    pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table = create_pattern_list()
+
+    text_id_text = " ".join(text_id_text.split()).replace("°", "o").replace("º", "o").replace("−", "-").replace('°', "o")
+    list_phrases = split_text_in_phrases(text_id, text_id_text.lower())
+
+    dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []}
+
+    for phrase_ in list_phrases:
+        for pattern_ in pattern_list_ra_dec + pattern_list_ra + pattern_list_dec + pattern_list_table:
+            for m in re.finditer(pattern_, phrase_.lower()):
+                pos_ = m.group(0)
+                start, end = m.span()
+
+                dict_data["TEXT_ID"].append(text_id)
+                dict_data["Start"].append(start)
+                dict_data["End"].append(end)
+                dict_data["Positions"].append(pos_)
+                dict_data["Phrase"].append(phrase_)
+
+    df_data = pd.DataFrame(dict_data)
+    return df_data
+
+
+def merge_ra_dec(text_id, df_init):
+    df_init.drop_duplicates(inplace=True)
+    dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []}
+    phrases_, c = np.unique(df_init["Phrase"].values, return_counts=True)
+
+    for p_n, phrase_ in enumerate(phrases_):
+        df_tmp0 = df_init[df_init["Phrase"] == phrase_]
+        if len(df_tmp0) > 1:
+            df_tmp = df_tmp0.sort_values("Start")
+            start_ = df_tmp["Start"].values
+            end_ = df_tmp["End"].values
+            for i in range(1, len(start_)):
+                if start_[i] <= end_[i-1]:
+                    start_[i] = start_[i-1]
+                    max_ = max(end_[i-1], end_[i])
+                    end_[i-1], end_[i] = max_, max_
+                    end_[i-1] = -1
+                    start_[i-1] = -1
+
+            for s_i, e_i in zip(start_, end_):
+                if s_i != -1:
+                    dict_data["TEXT_ID"] += [text_id]
+                    dict_data["Start"] += [s_i]
+                    dict_data["End"] += [e_i]
+                    dict_data["Positions"] += [phrase_[s_i: e_i]]
+                    dict_data["Phrase"] += [phrase_]
+
+    df_data = pd.DataFrame(dict_data)
+    df_data.drop_duplicates(inplace=True)
+    return df_data
+
+
+def clean_ra(ra, ra_text, pattern_J2000):
+    ra_new = " ".join(str(ra).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-")
+
+    ra_new = re.sub(f"{ra_text}{pattern_J2000}", "", ra_new)
+    ra_new = re.sub(f"{ra_text}", "", ra_new)
+
+    ra_new = re.sub(r"[^0-9+-\.deg]", ":", ra_new)
+
+    while len(ra_new) > 1 and (ra_new[-1] in [":", "."]):
+        ra_new = ra_new[:-1]
+
+    while len(ra_new) > 1 and (ra_new[0] in [":", "."]):
+        ra_new = ra_new[1:]
+
+    result = re.match("(\\+|)[0-9]{1,2}[:]{1,2}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", ra_new)
+    if result:
+        if result.group(0) == ra_new:
+            ra_new = ra_new.replace("::", ":")
+            ra_new = ra_new.replace(":.", ".")
+
+    # Remove some incorect pos
+    result = re.match("(\\+|)[0-9]{4,}(\\.|)([0-9]){0,}", ra_new)
+    if result:
+        if result.group(0) == ra_new:
+            ra_new = ":"
+
+    ra_new = ra_new.replace(":deg", " deg")
+
+    return ra_new
+
+
+def clean_dec(dec, dec_text, pattern_J2000):
+    dec_new = " ".join(str(dec).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-").replace("--", "-")
+
+    dec_new = re.sub(f"{dec_text}{pattern_J2000}", "", dec_new)
+    dec_new = re.sub(f"{dec_text}", "", dec_new)
+
+    dec_new = re.sub(r"[^0-9+-\.deg]", ":", dec_new)
+
+    while len(dec_new) != 1 and (dec_new[-1] in [":", "."]):
+        dec_new = dec_new[:-1]
+
+    while len(dec_new) != 1 and (dec_new[0] in [":", "."]):
+        dec_new = dec_new[1:]
+
+    result = re.match("(\\+|\\-|)[0-9]{1,2}(deg|d|:)[:]{0,1}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", dec_new)
+    if result:
+        if result.group(0) == dec_new:
+            dec_new = dec_new.replace("deg:", ":")
+            dec_new = dec_new.replace("deg", ":")
+            dec_new = dec_new.replace("d:", ":")
+            dec_new = dec_new.replace("d", ":")
+            dec_new = dec_new.replace("::", ":")
+            dec_new = dec_new.replace(":.", ".")
+
+    dec_new = dec_new.replace(":deg", " deg")
+
+    # Remove some incorect pos
+    result = re.match("(\\+|\\-|)[0-9]{4,}(\\.|)([0-9]){0,}", dec_new)
+    if result:
+        if result.group(0) == dec_new:
+            dec_new = ":"
+
+    return dec_new
+
+
+def clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000):
+    ra_dec_n = re.sub(f"{pattern_J2000}",   "",  ra_dec)
+    ra_dec_n = re.sub(f"{ra_text}",         "",  ra_dec_n)
+    ra_dec_n = re.sub(f"{dec_text}",        "",  ra_dec_n)
+    if ra_dec_n[-1] == "o":
+        ra_dec_n = ra_dec_n[:-1]
+    ra_dec_n = re.sub("(o)", "d", ra_dec_n)
+    ra_dec_n = re.sub("('')", "", ra_dec_n)
+    ra_dec_n = re.sub("(')", "m", ra_dec_n)
+    ra_dec_n = re.sub(r"[^0-9+-\.hmd\s:]", "", ra_dec_n)
+
+    ra_dec_n = re.sub("[,]", "", ra_dec_n)
+    while len(ra_dec_n) != 1 and (ra_dec_n[-1] in [":", ".", " "]):
+        ra_dec_n = ra_dec_n[:-1]
+
+    while len(ra_dec_n) != 1 and (ra_dec_n[0] in [":", ".", " "]):
+        ra_dec_n = ra_dec_n[1:]
+
+    return ra_dec_n
+
+
+def astropy_test(df_init):
+    ra_text = "(r(\\.|)a(\\.|\\:|))"
+    dec_text = "(dec(l|)(\\.|\\:|))"
+
+    # ra_dec_pattern = f"({ra_text},( |){dec_text})"
+    pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))"
+
+    rest_ra_dec = []
+    counter_rest = 0
+    counter_ra_dec_try = 0
+    good_ra_dec = []
+
+    for text_ in list(set(df_init.Phrase)):
+        df_tmp1 = df_init[df_init.Phrase == text_]
+        ra_values = []
+        dec_values = []
+
+        ra_start = []
+        dec_start = []
+        ra_end = []
+        dec_end = []
+
+        df_tmp1.sort_values("Start")
+
+        for ra_dec, s_, e_ in zip(df_tmp1.Positions, df_tmp1.Start, df_tmp1.End):
+            try:
+                ra_dec_n = ra_dec.replace("|", " ")
+                ra_dec_n = ra_dec_n.replace(",", " ")
+                cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg))
+                good_ra_dec.append(cc)
+            except ValueError:
+                ra_s = re.findall(ra_text, ra_dec)
+                dec_s = re.findall(dec_text, ra_dec)
+                if len(ra_s) > 0 and len(dec_s) > 0:
+                    counter_ra_dec_try += 1
+                    ra_dec_n = clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000)
+                    try:
+                        cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg))
+                        good_ra_dec.append(cc)
+                    except ValueError:
+                        rest_ra_dec.append(ra_dec)
+
+                elif len(ra_s) > 0:
+                    ra_values.append(ra_dec)
+                    ra_start.append(s_)
+                    ra_end.append(e_)
+
+                elif len(dec_s) > 0:
+                    dec_values.append(ra_dec)
+                    dec_start.append(s_)
+                    dec_end.append(e_)
+
+                else:
+                    counter_rest += 1
+
+        if len(ra_values) < len(dec_values):
+
+            for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end):
+                min_diff = 1000
+                dec_pair = ""
+                for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end):
+                    diff_ = s_dec - e_ra
+                    if diff_ < min_diff:
+                        min_diff = diff_
+                        dec_pair = dec_
+
+                c_ra = clean_ra(ra_, ra_text, pattern_J2000)
+                c_dec = clean_dec(dec_pair, dec_text, pattern_J2000)
+                try:
+                    cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg))
+                    good_ra_dec.append(cc)
+                except ValueError:
+                    try:
+                        cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg))
+                        good_ra_dec.append(cc)
+                    except ValueError:
+                        rest_ra_dec.append(f"{ra_}|{dec_pair}")
+        else:
+
+            for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end):
+                min_diff = 1000
+                ra_pair = ""
+                for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end):
+                    diff_ = s_dec - e_ra
+                    if diff_ < min_diff:
+                        min_diff = diff_
+                        ra_pair = ra_
+
+                c_ra = clean_ra(ra_pair, ra_text, pattern_J2000)
+                c_dec = clean_dec(dec_, dec_text, pattern_J2000)
+                try:
+                    cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg))
+                    good_ra_dec.append(cc)
+                except ValueError:
+                    try:
+                        cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg))
+                        good_ra_dec.append(cc)
+                    except ValueError:
+                        rest_ra_dec.append(f"{ra_pair}|{dec_}")
+
+    return good_ra_dec
+
+
+def rule_based_ra_dec_detector(text_id, text_id_text):
+    df_init = ra_dec_detector(text_id, text_id_text)
+    df_final = merge_ra_dec(text_id, df_init)
+    good_ra_dec = astropy_test(df_final)
+    print(good_ra_dec)
+    dict_out = {"TEXT_ID": [], "RA": [], "Dec": [], "Main ID Name": []}
+    for ra_dec in good_ra_dec:
+        dict_out["TEXT_ID"].append(text_id)
+        dict_out["Main ID Name"].append("NoName")
+        dict_out["RA"].append(ra_dec.ra.deg)
+        dict_out["Dec"].append(ra_dec.dec.deg)
+
+    return pd.DataFrame(dict_out)