Mercurial > repos > astroteam > analyse_short_astro_text_astro_tool
comparison pipeline_ra_dec.py @ 0:a35056104c2c draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit da42ae0d18f550dec7f6d7e29d297e7cf1909df2
author | astroteam |
---|---|
date | Fri, 13 Jun 2025 13:26:36 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a35056104c2c |
---|---|
1 from astropy.coordinates import SkyCoord | |
2 from astropy import units as u | |
3 import pandas as pd | |
4 import numpy as np | |
5 import re | |
6 | |
7 | |
8 def split_text_in_phrases(atel_, text_): | |
9 list_proto_phrases = re.split(r"(\. [a-z])", text_) | |
10 for i in range(1, len(list_proto_phrases) - 1, 2): | |
11 back_ = list_proto_phrases[i][0] | |
12 front_ = list_proto_phrases[i][-1] | |
13 list_proto_phrases[i+1] = front_ + list_proto_phrases[i+1] | |
14 list_proto_phrases[i-1] = list_proto_phrases[i-1] + back_ | |
15 | |
16 list_phrases = [] | |
17 for i in range(0, len(list_proto_phrases), 2): | |
18 list_phrases.append(list_proto_phrases[i]) | |
19 | |
20 text_check = " ".join(list_phrases) | |
21 if text_check != text_: | |
22 print(atel_) | |
23 | |
24 return list_phrases | |
25 | |
26 | |
27 def create_pattern_list(): | |
28 pattern_list_ra = [] | |
29 pattern_list_dec = [] | |
30 pattern_list_ra_dec = [] | |
31 pattern_list_table = [] | |
32 ra_text = "r(\\.|)a(\\.|\\:|)" | |
33 dec_text = "dec(l|)(\\.|)" | |
34 | |
35 units_minutes = "(\\'|m|\\:|\\' |m |\\: |)" | |
36 # units_minutes_mix_all = "(\\'|m|\\:| )" | |
37 units_seconds = '(\\"|s|\\:|\\" |s |\\: |)' | |
38 # units_seconds_mix_all = '(\\"|s|\\:| |)' | |
39 | |
40 degree_ = "((deg)|d|)" | |
41 assignment_char = "(( |)(=|\\:)( |))" | |
42 | |
43 units_dec_min = "\\'m" | |
44 units_dec_sec = '\\"s' | |
45 units_dec_deg = "dego" | |
46 ra_value = f"([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}hdeg]{{2,}})" | |
47 dec_value = f"(\\+|-|)([0-9\\.\\:\\s{units_dec_min}{units_dec_sec}{units_dec_deg}]{{2,}})" | |
48 ra_value_deg = "(\\d{1,3}(\\.|)\\d{0,})" | |
49 dec_value_deg = "(\\+|-|)(\\d{1,2}(\\.|)\\d{0,})" | |
50 | |
51 ra_dec = f"({ra_text},( |){dec_text})" | |
52 | |
53 # GOOD | |
54 pattern_list_ra += [f"\\b({ra_text}{assignment_char}{ra_value})\\b"] | |
55 pattern_list_dec += [f"\\b({dec_text}{assignment_char}{dec_value})\\b"] | |
56 | |
57 pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))" | |
58 pattern_list_ra += [f"({ra_text}{pattern_J2000}{assignment_char}{ra_value})"] | |
59 pattern_list_dec += [f"({dec_text}{pattern_J2000}{assignment_char}{dec_value})"] | |
60 | |
61 # TABLES | |
62 pattern_list_table += ["([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}(\\:)[0-9]{1,2}(\\:)[0-9]{1,2}(\\.|)[0-9]{0,})"] | |
63 pattern_list_table += ["([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{1,2}( )[0-9]{1,2}( )[0-9]{1,2}(\\.|)[0-9]{0,})"] | |
64 pattern_list_table += [f"([0-9]{{1,2}}(h)[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})((( |)(,|\\|)( |))|( ))(\\+|-|)([0-9]{{1,2}}(d|(deg))[0-9]{{1,2}}{units_minutes}[0-9]{{1,2}}(\\.|)[0-9]{{0,}}{units_seconds})"] | |
65 | |
66 # PAIRS | |
67 pattern_list_ra_dec += [fr"\(j2000 {ra_dec}\){assignment_char}\({ra_value_deg}( |)(,|)( |){dec_value_deg}\)( |){degree_}"] | |
68 pattern_list_ra_dec += [fr"\({ra_dec}( |)(j|)2000(.0|)\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"] | |
69 pattern_list_ra_dec += [fr"\({ra_dec} {ra_value}( |)(,)( |){dec_value}\)"] | |
70 pattern_list_ra_dec += [fr"({ra_text}( |)\((j|)2000(.0|)\) {ra_value}), ({dec_text}( |)\((j|)2000(.0|)\) {dec_value})"] | |
71 | |
72 pattern_list_ra_dec += [f"\\b({ra_text} {ra_value}(, | |; |,|;){dec_text} {dec_value})\\b"] | |
73 pattern_list_ra_dec += [f"\\b({ra_text}{assignment_char}{ra_value})( )({dec_text}{assignment_char}{dec_value})\\b"] | |
74 | |
75 pattern_list_ra_dec += [f"\\b{ra_dec}{assignment_char}{ra_value}( |)(,|)( |){dec_value}\\b"] | |
76 pattern_list_ra_dec += [fr"\({ra_dec}\){assignment_char}(\(|){ra_value}( |)(,|)( |){dec_value}(\)|)"] | |
77 | |
78 pattern_list_ra_dec += [fr"({ra_text}(\\/|,|, ){dec_text}( |)(\((j|)2000(.0|)\)|){assignment_char}{ra_value}( |,|, ){dec_value})"] | |
79 | |
80 pattern_list_ra_dec += [f"({ra_text}( and ){dec_text} {ra_value}( and ){dec_value})"] | |
81 | |
82 return pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table | |
83 | |
84 | |
85 def ra_dec_detector(text_id, text_id_text): | |
86 pattern_list_ra_dec, pattern_list_ra, pattern_list_dec, pattern_list_table = create_pattern_list() | |
87 | |
88 text_id_text = " ".join(text_id_text.split()).replace("°", "o").replace("º", "o").replace("−", "-").replace('°', "o") | |
89 list_phrases = split_text_in_phrases(text_id, text_id_text.lower()) | |
90 | |
91 dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []} | |
92 | |
93 for phrase_ in list_phrases: | |
94 for pattern_ in pattern_list_ra_dec + pattern_list_ra + pattern_list_dec + pattern_list_table: | |
95 for m in re.finditer(pattern_, phrase_.lower()): | |
96 pos_ = m.group(0) | |
97 start, end = m.span() | |
98 | |
99 dict_data["TEXT_ID"].append(text_id) | |
100 dict_data["Start"].append(start) | |
101 dict_data["End"].append(end) | |
102 dict_data["Positions"].append(pos_) | |
103 dict_data["Phrase"].append(phrase_) | |
104 | |
105 df_data = pd.DataFrame(dict_data) | |
106 return df_data | |
107 | |
108 | |
109 def merge_ra_dec(text_id, df_init): | |
110 df_init.drop_duplicates(inplace=True) | |
111 dict_data = {"TEXT_ID": [], "Positions": [], "Start": [], "End": [], "Phrase": []} | |
112 phrases_, c = np.unique(df_init["Phrase"].values, return_counts=True) | |
113 | |
114 for p_n, phrase_ in enumerate(phrases_): | |
115 df_tmp0 = df_init[df_init["Phrase"] == phrase_] | |
116 if len(df_tmp0) > 1: | |
117 df_tmp = df_tmp0.sort_values("Start") | |
118 start_ = df_tmp["Start"].values | |
119 end_ = df_tmp["End"].values | |
120 for i in range(1, len(start_)): | |
121 if start_[i] <= end_[i-1]: | |
122 start_[i] = start_[i-1] | |
123 max_ = max(end_[i-1], end_[i]) | |
124 end_[i-1], end_[i] = max_, max_ | |
125 end_[i-1] = -1 | |
126 start_[i-1] = -1 | |
127 | |
128 for s_i, e_i in zip(start_, end_): | |
129 if s_i != -1: | |
130 dict_data["TEXT_ID"] += [text_id] | |
131 dict_data["Start"] += [s_i] | |
132 dict_data["End"] += [e_i] | |
133 dict_data["Positions"] += [phrase_[s_i: e_i]] | |
134 dict_data["Phrase"] += [phrase_] | |
135 | |
136 df_data = pd.DataFrame(dict_data) | |
137 df_data.drop_duplicates(inplace=True) | |
138 return df_data | |
139 | |
140 | |
141 def clean_ra(ra, ra_text, pattern_J2000): | |
142 ra_new = " ".join(str(ra).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-") | |
143 | |
144 ra_new = re.sub(f"{ra_text}{pattern_J2000}", "", ra_new) | |
145 ra_new = re.sub(f"{ra_text}", "", ra_new) | |
146 | |
147 ra_new = re.sub(r"[^0-9+-\.deg]", ":", ra_new) | |
148 | |
149 while len(ra_new) > 1 and (ra_new[-1] in [":", "."]): | |
150 ra_new = ra_new[:-1] | |
151 | |
152 while len(ra_new) > 1 and (ra_new[0] in [":", "."]): | |
153 ra_new = ra_new[1:] | |
154 | |
155 result = re.match("(\\+|)[0-9]{1,2}[:]{1,2}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", ra_new) | |
156 if result: | |
157 if result.group(0) == ra_new: | |
158 ra_new = ra_new.replace("::", ":") | |
159 ra_new = ra_new.replace(":.", ".") | |
160 | |
161 # Remove some incorect pos | |
162 result = re.match("(\\+|)[0-9]{4,}(\\.|)([0-9]){0,}", ra_new) | |
163 if result: | |
164 if result.group(0) == ra_new: | |
165 ra_new = ":" | |
166 | |
167 ra_new = ra_new.replace(":deg", " deg") | |
168 | |
169 return ra_new | |
170 | |
171 | |
172 def clean_dec(dec, dec_text, pattern_J2000): | |
173 dec_new = " ".join(str(dec).split()).replace("±", "+/-").replace("—", "-").replace("−", "-").replace("−", "-").replace("--", "-") | |
174 | |
175 dec_new = re.sub(f"{dec_text}{pattern_J2000}", "", dec_new) | |
176 dec_new = re.sub(f"{dec_text}", "", dec_new) | |
177 | |
178 dec_new = re.sub(r"[^0-9+-\.deg]", ":", dec_new) | |
179 | |
180 while len(dec_new) != 1 and (dec_new[-1] in [":", "."]): | |
181 dec_new = dec_new[:-1] | |
182 | |
183 while len(dec_new) != 1 and (dec_new[0] in [":", "."]): | |
184 dec_new = dec_new[1:] | |
185 | |
186 result = re.match("(\\+|\\-|)[0-9]{1,2}(deg|d|:)[:]{0,1}[0-9]{1,2}[:]{1,2}[0-9]{1,2}(:\\.|\\.|)([0-9]){0,}", dec_new) | |
187 if result: | |
188 if result.group(0) == dec_new: | |
189 dec_new = dec_new.replace("deg:", ":") | |
190 dec_new = dec_new.replace("deg", ":") | |
191 dec_new = dec_new.replace("d:", ":") | |
192 dec_new = dec_new.replace("d", ":") | |
193 dec_new = dec_new.replace("::", ":") | |
194 dec_new = dec_new.replace(":.", ".") | |
195 | |
196 dec_new = dec_new.replace(":deg", " deg") | |
197 | |
198 # Remove some incorect pos | |
199 result = re.match("(\\+|\\-|)[0-9]{4,}(\\.|)([0-9]){0,}", dec_new) | |
200 if result: | |
201 if result.group(0) == dec_new: | |
202 dec_new = ":" | |
203 | |
204 return dec_new | |
205 | |
206 | |
207 def clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000): | |
208 ra_dec_n = re.sub(f"{pattern_J2000}", "", ra_dec) | |
209 ra_dec_n = re.sub(f"{ra_text}", "", ra_dec_n) | |
210 ra_dec_n = re.sub(f"{dec_text}", "", ra_dec_n) | |
211 if ra_dec_n[-1] == "o": | |
212 ra_dec_n = ra_dec_n[:-1] | |
213 ra_dec_n = re.sub("(o)", "d", ra_dec_n) | |
214 ra_dec_n = re.sub("('')", "", ra_dec_n) | |
215 ra_dec_n = re.sub("(')", "m", ra_dec_n) | |
216 ra_dec_n = re.sub(r"[^0-9+-\.hmd\s:]", "", ra_dec_n) | |
217 | |
218 ra_dec_n = re.sub("[,]", "", ra_dec_n) | |
219 while len(ra_dec_n) != 1 and (ra_dec_n[-1] in [":", ".", " "]): | |
220 ra_dec_n = ra_dec_n[:-1] | |
221 | |
222 while len(ra_dec_n) != 1 and (ra_dec_n[0] in [":", ".", " "]): | |
223 ra_dec_n = ra_dec_n[1:] | |
224 | |
225 return ra_dec_n | |
226 | |
227 | |
228 def astropy_test(df_init): | |
229 ra_text = "(r(\\.|)a(\\.|\\:|))" | |
230 dec_text = "(dec(l|)(\\.|\\:|))" | |
231 | |
232 # ra_dec_pattern = f"({ra_text},( |){dec_text})" | |
233 pattern_J2000 = r"( |)(-|)((\((((j|)2000(.0|))|(deg))\))|(2000))" | |
234 | |
235 rest_ra_dec = [] | |
236 counter_rest = 0 | |
237 counter_ra_dec_try = 0 | |
238 good_ra_dec = [] | |
239 | |
240 for text_ in list(set(df_init.Phrase)): | |
241 df_tmp1 = df_init[df_init.Phrase == text_] | |
242 ra_values = [] | |
243 dec_values = [] | |
244 | |
245 ra_start = [] | |
246 dec_start = [] | |
247 ra_end = [] | |
248 dec_end = [] | |
249 | |
250 df_tmp1.sort_values("Start") | |
251 | |
252 for ra_dec, s_, e_ in zip(df_tmp1.Positions, df_tmp1.Start, df_tmp1.End): | |
253 try: | |
254 ra_dec_n = ra_dec.replace("|", " ") | |
255 ra_dec_n = ra_dec_n.replace(",", " ") | |
256 cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg)) | |
257 good_ra_dec.append(cc) | |
258 except ValueError: | |
259 ra_s = re.findall(ra_text, ra_dec) | |
260 dec_s = re.findall(dec_text, ra_dec) | |
261 if len(ra_s) > 0 and len(dec_s) > 0: | |
262 counter_ra_dec_try += 1 | |
263 ra_dec_n = clean_ra_dec(ra_dec, ra_text, dec_text, pattern_J2000) | |
264 try: | |
265 cc = SkyCoord(ra_dec_n, unit=(u.hourangle, u.deg)) | |
266 good_ra_dec.append(cc) | |
267 except ValueError: | |
268 rest_ra_dec.append(ra_dec) | |
269 | |
270 elif len(ra_s) > 0: | |
271 ra_values.append(ra_dec) | |
272 ra_start.append(s_) | |
273 ra_end.append(e_) | |
274 | |
275 elif len(dec_s) > 0: | |
276 dec_values.append(ra_dec) | |
277 dec_start.append(s_) | |
278 dec_end.append(e_) | |
279 | |
280 else: | |
281 counter_rest += 1 | |
282 | |
283 if len(ra_values) < len(dec_values): | |
284 | |
285 for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end): | |
286 min_diff = 1000 | |
287 dec_pair = "" | |
288 for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end): | |
289 diff_ = s_dec - e_ra | |
290 if diff_ < min_diff: | |
291 min_diff = diff_ | |
292 dec_pair = dec_ | |
293 | |
294 c_ra = clean_ra(ra_, ra_text, pattern_J2000) | |
295 c_dec = clean_dec(dec_pair, dec_text, pattern_J2000) | |
296 try: | |
297 cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg)) | |
298 good_ra_dec.append(cc) | |
299 except ValueError: | |
300 try: | |
301 cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg)) | |
302 good_ra_dec.append(cc) | |
303 except ValueError: | |
304 rest_ra_dec.append(f"{ra_}|{dec_pair}") | |
305 else: | |
306 | |
307 for dec_, s_dec, e_dec in zip(dec_values, dec_start, dec_end): | |
308 min_diff = 1000 | |
309 ra_pair = "" | |
310 for ra_, s_ra, e_ra in zip(ra_values, ra_start, ra_end): | |
311 diff_ = s_dec - e_ra | |
312 if diff_ < min_diff: | |
313 min_diff = diff_ | |
314 ra_pair = ra_ | |
315 | |
316 c_ra = clean_ra(ra_pair, ra_text, pattern_J2000) | |
317 c_dec = clean_dec(dec_, dec_text, pattern_J2000) | |
318 try: | |
319 cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.hourangle, u.deg)) | |
320 good_ra_dec.append(cc) | |
321 except ValueError: | |
322 try: | |
323 cc = SkyCoord(ra=c_ra, dec=c_dec, unit=(u.deg, u.deg)) | |
324 good_ra_dec.append(cc) | |
325 except ValueError: | |
326 rest_ra_dec.append(f"{ra_pair}|{dec_}") | |
327 | |
328 return good_ra_dec | |
329 | |
330 | |
331 def rule_based_ra_dec_detector(text_id, text_id_text): | |
332 df_init = ra_dec_detector(text_id, text_id_text) | |
333 df_final = merge_ra_dec(text_id, df_init) | |
334 good_ra_dec = astropy_test(df_final) | |
335 print(good_ra_dec) | |
336 dict_out = {"TEXT_ID": [], "RA": [], "Dec": [], "Main ID Name": []} | |
337 for ra_dec in good_ra_dec: | |
338 dict_out["TEXT_ID"].append(text_id) | |
339 dict_out["Main ID Name"].append("NoName") | |
340 dict_out["RA"].append(ra_dec.ra.deg) | |
341 dict_out["Dec"].append(ra_dec.dec.deg) | |
342 | |
343 return pd.DataFrame(dict_out) |