Mercurial > repos > astroteam > analyse_short_astro_text_astro_tool
diff pipeline_astrobert.py @ 0:a35056104c2c draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit da42ae0d18f550dec7f6d7e29d297e7cf1909df2
author | astroteam |
---|---|
date | Fri, 13 Jun 2025 13:26:36 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pipeline_astrobert.py Fri Jun 13 13:26:36 2025 +0000 @@ -0,0 +1,145 @@ +import re +import pandas as pd +import numpy as np +import tempfile +from transformers import AutoModelForTokenClassification, AutoTokenizer +from transformers import TokenClassificationPipeline + + +def split_text_in_phrases(text_id, text_): + list_proto_phrases = re.split(r"(\. [A-Z])", text_) + for i in range(1, len(list_proto_phrases) - 1, 2): + back_ = list_proto_phrases[i][0] + front_ = list_proto_phrases[i][-1] + list_proto_phrases[i+1] = front_ + list_proto_phrases[i+1] + list_proto_phrases[i-1] = list_proto_phrases[i-1] + back_ + + list_phrases = [] + for i in range(0, len(list_proto_phrases), 2): + list_phrases.append(list_proto_phrases[i]) + + text_check = " ".join(list_phrases) + if text_check != text_: + print(text_id) + return list_phrases + + +def apply_astroBERT(text_id, body_text_0): + dict_out = {"TEXT_ID": [], "word": [], "start": [], "end": [], "score": [], "entity_group": [], "Phrase": []} + + tmpdir_ = tempfile.TemporaryDirectory() + + try: + # load astroBERT for NER-DEAL + remote_model_path = 'adsabs/astroBERT' + # you need to load the astroBERT trained for NER-DEAL, which is on a seperate branch + revision = 'NER-DEAL' + + astroBERT_NER_DEAL = AutoModelForTokenClassification.from_pretrained( + pretrained_model_name_or_path=remote_model_path, + revision=revision, + cache_dir=tmpdir_.name + ) + + astroBERT_tokenizer = AutoTokenizer.from_pretrained( + pretrained_model_name_or_path=remote_model_path, + add_special_tokens=True, + do_lower_case=False, + model_max_length=512, + cache_dir=tmpdir_.name + ) + + # use the Hugginface Pipeline class + NER_pipeline = TokenClassificationPipeline( + model=astroBERT_NER_DEAL, + tokenizer=astroBERT_tokenizer, + task='astroBERT NER_DEAL', + aggregation_strategy='average', + ignore_labels=['O'] + ) + + text = " ".join(body_text_0.split()).replace("°", "o").replace("º", "o").replace("−", "-").replace('°', "o") + list_phrases = split_text_in_phrases(text_id, text) + + for phrase_ in list_phrases: + result = NER_pipeline(phrase_) + + for u in result: + ent_ = u["entity_group"] + if ent_ in ["Instrument", "Telescope", "Wavelength", "CelestialObject", "CelestialRegion", "EntityOfFutureInterest", "Mission", "Observatory", "Survey"]: + dict_out["TEXT_ID"].append(text_id) + dict_out["Phrase"].append(phrase_) + + dict_out["word"].append(u["word"]) + dict_out["score"].append(u["score"]) + dict_out["start"].append(u["start"]) + dict_out["end"].append(u["end"]) + dict_out["entity_group"].append(ent_) + except Exception as e: + print(f"An error occurred in apply_astroBERT: {e}") + finally: + tmpdir_.cleanup() + + return pd.DataFrame(dict_out) + + +def get_astroBERT_cleaned_result(text_id, body_text_0): + list_entities = ["Instrument", "Telescope", "Wavelength", "CelestialObject", "CelestialRegion", "EntityOfFutureInterest", "Mission", "Observatory", "Survey"] + + df_raw = apply_astroBERT(text_id, body_text_0) + dict_out = {"TEXT_ID": [], "word": [], "start": [], "end": [], "Score": [], "Phrase": [], "entity_group": []} + + for entity_to_study in list_entities: + df_tmp0 = df_raw[df_raw["entity_group"] == entity_to_study] + phrases_ = np.unique(df_tmp0["Phrase"]) + + for phrase_ in phrases_: + df_tmp1 = df_tmp0[df_tmp0["Phrase"] == phrase_] + if len(df_tmp1) == 1: + dict_out["TEXT_ID"].append(text_id) + dict_out["Phrase"].append(df_tmp1.Phrase.values[0]) + dict_out["word"].append(df_tmp1.word.values[0]) + dict_out["start"].append(df_tmp1.start.values[0]) + dict_out["end"].append(df_tmp1.end.values[0]) + dict_out["Score"].append(df_tmp1.score.values[0]) + dict_out["entity_group"].append(entity_to_study) + + else: + df_tmp1.sort_values(by=['start']) + for s_i, (s_, e_, sc_) in enumerate(zip(df_tmp1.start.values, df_tmp1.end.values, df_tmp1.score.values)): + if s_i == 0: + s_o = s_ + e_o = e_ + sc_s = sc_ + word_size = 1 + else: + + if s_ <= e_o + 1: + e_o = e_ + sc_s += sc_ + word_size += 1 + + else: + dict_out["TEXT_ID"].append(text_id) + dict_out["Phrase"].append(phrase_) + dict_out["word"].append(phrase_[s_o: e_o]) + dict_out["start"].append(s_o) + dict_out["end"].append(e_o) + dict_out["Score"].append(sc_s / word_size) + dict_out["entity_group"].append(entity_to_study) + + s_o = s_ + e_o = e_ + sc_s = sc_ + word_size = 1 + + if s_i == len(df_tmp1) - 1: + dict_out["TEXT_ID"].append(text_id) + dict_out["Phrase"].append(phrase_) + dict_out["word"].append(phrase_[s_o: e_o]) + dict_out["start"].append(s_o) + dict_out["end"].append(e_o) + dict_out["Score"].append(sc_s / word_size) + dict_out["entity_group"].append(entity_to_study) + + return pd.DataFrame(dict_out)