diff pipeline_astrobert.py @ 0:a35056104c2c draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit da42ae0d18f550dec7f6d7e29d297e7cf1909df2
author astroteam
date Fri, 13 Jun 2025 13:26:36 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pipeline_astrobert.py	Fri Jun 13 13:26:36 2025 +0000
@@ -0,0 +1,145 @@
+import re
+import pandas as pd
+import numpy as np
+import tempfile
+from transformers import AutoModelForTokenClassification, AutoTokenizer
+from transformers import TokenClassificationPipeline
+
+
+def split_text_in_phrases(text_id, text_):
+    list_proto_phrases = re.split(r"(\. [A-Z])", text_)
+    for i in range(1, len(list_proto_phrases) - 1, 2):
+        back_ = list_proto_phrases[i][0]
+        front_ = list_proto_phrases[i][-1]
+        list_proto_phrases[i+1] = front_ + list_proto_phrases[i+1]
+        list_proto_phrases[i-1] = list_proto_phrases[i-1] + back_
+
+    list_phrases = []
+    for i in range(0, len(list_proto_phrases), 2):
+        list_phrases.append(list_proto_phrases[i])
+
+    text_check = " ".join(list_phrases)
+    if text_check != text_:
+        print(text_id)
+    return list_phrases
+
+
+def apply_astroBERT(text_id, body_text_0):
+    dict_out = {"TEXT_ID": [], "word": [], "start": [], "end": [], "score": [], "entity_group": [], "Phrase": []}
+
+    tmpdir_ = tempfile.TemporaryDirectory()
+
+    try:
+        # load astroBERT for NER-DEAL
+        remote_model_path = 'adsabs/astroBERT'
+        # you need to load the astroBERT trained for NER-DEAL, which is on a seperate branch
+        revision = 'NER-DEAL'
+
+        astroBERT_NER_DEAL = AutoModelForTokenClassification.from_pretrained(
+            pretrained_model_name_or_path=remote_model_path,
+            revision=revision,
+            cache_dir=tmpdir_.name
+        )
+
+        astroBERT_tokenizer = AutoTokenizer.from_pretrained(
+            pretrained_model_name_or_path=remote_model_path,
+            add_special_tokens=True,
+            do_lower_case=False,
+            model_max_length=512,
+            cache_dir=tmpdir_.name
+        )
+
+        # use the Hugginface Pipeline class
+        NER_pipeline = TokenClassificationPipeline(
+            model=astroBERT_NER_DEAL,
+            tokenizer=astroBERT_tokenizer,
+            task='astroBERT NER_DEAL',
+            aggregation_strategy='average',
+            ignore_labels=['O']
+        )
+
+        text = " ".join(body_text_0.split()).replace("°", "o").replace("º", "o").replace("−", "-").replace('°', "o")
+        list_phrases = split_text_in_phrases(text_id, text)
+
+        for phrase_ in list_phrases:
+            result = NER_pipeline(phrase_)
+
+            for u in result:
+                ent_ = u["entity_group"]
+                if ent_ in ["Instrument", "Telescope", "Wavelength", "CelestialObject", "CelestialRegion", "EntityOfFutureInterest", "Mission", "Observatory", "Survey"]:
+                    dict_out["TEXT_ID"].append(text_id)
+                    dict_out["Phrase"].append(phrase_)
+
+                    dict_out["word"].append(u["word"])
+                    dict_out["score"].append(u["score"])
+                    dict_out["start"].append(u["start"])
+                    dict_out["end"].append(u["end"])
+                    dict_out["entity_group"].append(ent_)
+    except Exception as e:
+        print(f"An error occurred in apply_astroBERT: {e}")
+    finally:
+        tmpdir_.cleanup()
+
+    return pd.DataFrame(dict_out)
+
+
+def get_astroBERT_cleaned_result(text_id, body_text_0):
+    list_entities = ["Instrument", "Telescope", "Wavelength", "CelestialObject", "CelestialRegion", "EntityOfFutureInterest", "Mission", "Observatory", "Survey"]
+
+    df_raw = apply_astroBERT(text_id, body_text_0)
+    dict_out = {"TEXT_ID": [], "word": [], "start": [], "end": [], "Score": [], "Phrase": [], "entity_group": []}
+
+    for entity_to_study in list_entities:
+        df_tmp0 = df_raw[df_raw["entity_group"] == entity_to_study]
+        phrases_ = np.unique(df_tmp0["Phrase"])
+
+        for phrase_ in phrases_:
+            df_tmp1 = df_tmp0[df_tmp0["Phrase"] == phrase_]
+            if len(df_tmp1) == 1:
+                dict_out["TEXT_ID"].append(text_id)
+                dict_out["Phrase"].append(df_tmp1.Phrase.values[0])
+                dict_out["word"].append(df_tmp1.word.values[0])
+                dict_out["start"].append(df_tmp1.start.values[0])
+                dict_out["end"].append(df_tmp1.end.values[0])
+                dict_out["Score"].append(df_tmp1.score.values[0])
+                dict_out["entity_group"].append(entity_to_study)
+
+            else:
+                df_tmp1.sort_values(by=['start'])
+                for s_i, (s_, e_, sc_) in enumerate(zip(df_tmp1.start.values, df_tmp1.end.values, df_tmp1.score.values)):
+                    if s_i == 0:
+                        s_o = s_
+                        e_o = e_
+                        sc_s = sc_
+                        word_size = 1
+                    else:
+
+                        if s_ <= e_o + 1:
+                            e_o = e_
+                            sc_s += sc_
+                            word_size += 1
+
+                        else:
+                            dict_out["TEXT_ID"].append(text_id)
+                            dict_out["Phrase"].append(phrase_)
+                            dict_out["word"].append(phrase_[s_o: e_o])
+                            dict_out["start"].append(s_o)
+                            dict_out["end"].append(e_o)
+                            dict_out["Score"].append(sc_s / word_size)
+                            dict_out["entity_group"].append(entity_to_study)
+
+                            s_o = s_
+                            e_o = e_
+                            sc_s = sc_
+                            word_size = 1
+
+                    if s_i == len(df_tmp1) - 1:
+                        dict_out["TEXT_ID"].append(text_id)
+                        dict_out["Phrase"].append(phrase_)
+                        dict_out["word"].append(phrase_[s_o: e_o])
+                        dict_out["start"].append(s_o)
+                        dict_out["end"].append(e_o)
+                        dict_out["Score"].append(sc_s / word_size)
+                        dict_out["entity_group"].append(entity_to_study)
+
+    return pd.DataFrame(dict_out)