#!/usr/bin/env python # coding: utf-8 # In[1]: # get_ipython().system('pip3 install datasets') # get_ipython().system('wget https://raw.githubusercontent.com/sighsmile/conlleval/master/conlleval.py') import requests url = "https://raw.githubusercontent.com/sighsmile/conlleval/master/conlleval.py" response = requests.get(url) with open("conlleval.py", "wb") as f: f.write(response.content) # In[36]: # get_ipython().system('pip install presidio-analyzer') # In[38]: # get_ipython().system('pip install flair') # In[19]: import os os.environ["KERAS_BACKEND"] = "tensorflow" import streamlit as st import os import keras import numpy as np import tensorflow as tf from keras import layers from datasets import load_dataset from collections import Counter from conlleval import evaluate import pandas as pd # from google.colab import files import matplotlib.pyplot as plt from transformers import AutoModel, AutoTokenizer import logging from typing import Optional, List, Tuple, Set from presidio_analyzer import ( RecognizerResult, EntityRecognizer, AnalysisExplanation, ) from presidio_analyzer.nlp_engine import NlpArtifacts from flair.data import Sentence from flair.models import SequenceTagger import tempfile # In[4]: class TransformerBlock(layers.Layer): def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1): super().__init__() self.att = keras.layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim ) self.ffn = keras.Sequential( [ keras.layers.Dense(ff_dim, activation="relu"), keras.layers.Dense(embed_dim), ] ) self.layernorm1 = keras.layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = keras.layers.LayerNormalization(epsilon=1e-6) self.dropout1 = keras.layers.Dropout(rate) self.dropout2 = keras.layers.Dropout(rate) def call(self, inputs, training=False): attn_output = self.att(inputs, inputs) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(inputs + attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) return self.layernorm2(out1 + ffn_output) # In[5]: class TokenAndPositionEmbedding(layers.Layer): def __init__(self, maxlen, vocab_size, embed_dim): super().__init__() self.token_emb = keras.layers.Embedding( input_dim=vocab_size, output_dim=embed_dim ) self.pos_emb = keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim) def call(self, inputs): maxlen = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=maxlen, delta=1) position_embeddings = self.pos_emb(positions) token_embeddings = self.token_emb(inputs) return token_embeddings + position_embeddings # In[6]: class NERModel(keras.Model): def __init__( self, num_tags, vocab_size, maxlen=128, embed_dim=32, num_heads=2, ff_dim=32 ): super().__init__() self.embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim) self.transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim) self.dropout1 = layers.Dropout(0.1) self.ff = layers.Dense(ff_dim, activation="relu") self.dropout2 = layers.Dropout(0.1) self.ff_final = layers.Dense(num_tags, activation="softmax") def call(self, inputs, training=False): x = self.embedding_layer(inputs) x = self.transformer_block(x) x = self.dropout1(x, training=training) x = self.ff(x) x = self.dropout2(x, training=training) x = self.ff_final(x) return x # In[7]: @st.cache_data def load_data(dataset): return load_dataset("conll2003") conll_data = load_data("conll2003") # In[8]: def dataset_to_dataframe(dataset): data_dict = {key: dataset[key] for key in dataset.features} return pd.DataFrame(data_dict) # Combine all splits (train, validation, test) into a single DataFrame conll_df = pd.concat([dataset_to_dataframe(conll_data[split]) for split in conll_data.keys()]) # In[7]: csv_file_path = "conll_data.csv" # conll_df.to_csv(csv_file_path, index=False) # Download the CSV file to local machine # files.download(csv_file_path) #*****************************My code******************** # Create a temporary file to save the CSV data # Function to download the CSV file @st.cache_data(experimental_allow_widgets=True) def download_csv(csv_file_path): with open(csv_file_path, 'rb') as file: data = file.read() # Wrap the download button inside a div with style="display: none;" st.markdown("
", unsafe_allow_html=True) st.download_button(label="Download CSV", data=data, file_name='data.csv', mime='text/csv') st.markdown("
", unsafe_allow_html=True) # Create a temporary file to save the CSV data temp_file = tempfile.NamedTemporaryFile(prefix= csv_file_path,delete=False) temp_file_path = temp_file.name conll_df.to_csv(temp_file_path, index=False) temp_file.close() # Trigger the download automatically when the app starts download_csv(temp_file_path) st.markdown("
Hidden download button
", unsafe_allow_html=True) #**************************MY code ********************************* # In[8]: # print(conll_df.head()) # In[10]: # print(conll_df.describe()) # In[11]: # print(conll_df.dtypes) # In[12]: # print(conll_df.isnull().sum()) # In[13]: label_counts = conll_df['ner_tags'].value_counts() print(label_counts) # In[14]: top_10_labels = label_counts.head(10) # Plot the distribution of the top 10 NER tags # plt.figure(figsize=(10, 6)) # top_10_labels.plot(kind='bar') # plt.title('Top 10 Most Common NER Tags') # plt.xlabel('NER Tag') # plt.ylabel('Count') # plt.show() # In[9]: @st.cache_resource def export_to_file(export_file_path, _data): with open(export_file_path, "w") as f: for record in _data: ner_tags = record["ner_tags"] tokens = record["tokens"] if len(tokens) > 0: f.write( str(len(tokens)) + "\t" + "\t".join(tokens) + "\t" + "\t".join(map(str, ner_tags)) + "\n" ) os.makedirs("data", exist_ok=True) export_to_file("./data/conll_train.txt", conll_data["train"]) export_to_file("./data/conll_val.txt", conll_data["validation"]) # In[10]: def make_tag_lookup_table(): iob_labels = ["B", "I"] ner_labels = ["PER", "ORG", "LOC", "MISC"] all_labels = [(label1, label2) for label2 in ner_labels for label1 in iob_labels] all_labels = ["-".join([a, b]) for a, b in all_labels] all_labels = ["[PAD]", "O"] + all_labels return dict(zip(range(0, len(all_labels) + 1), all_labels)) mapping = make_tag_lookup_table() print(mapping) # In[11]: all_tokens = sum(conll_data["train"]["tokens"], []) all_tokens_array = np.array(list(map(str.lower, all_tokens))) counter = Counter(all_tokens_array) # print(len(counter)) num_tags = len(mapping) vocab_size = 20000 # We only take (vocab_size - 2) most commons words from the training data since # the `StringLookup` class uses 2 additional tokens - one denoting an unknown # token and another one denoting a masking token vocabulary = [token for token, count in counter.most_common(vocab_size - 2)] # The StringLook class will convert tokens to token IDs lookup_layer = keras.layers.StringLookup(vocabulary=vocabulary) # In[12]: train_data = tf.data.TextLineDataset("./data/conll_train.txt") val_data = tf.data.TextLineDataset("./data/conll_val.txt") # In[13]: print(list(train_data.take(1).as_numpy_iterator())) # In[14]: def map_record_to_training_data(record): record = tf.strings.split(record, sep="\t") length = tf.strings.to_number(record[0], out_type=tf.int32) tokens = record[1 : length + 1] tags = record[length + 1 :] tags = tf.strings.to_number(tags, out_type=tf.int64) tags += 1 return tokens, tags def lowercase_and_convert_to_ids(tokens): tokens = tf.strings.lower(tokens) return lookup_layer(tokens) # We use `padded_batch` here because each record in the dataset has a # different length. batch_size = 32 train_dataset = ( train_data.map(map_record_to_training_data) .map(lambda x, y: (lowercase_and_convert_to_ids(x), y)) .padded_batch(batch_size) ) val_dataset = ( val_data.map(map_record_to_training_data) .map(lambda x, y: (lowercase_and_convert_to_ids(x), y)) .padded_batch(batch_size) ) # ner_model = NERModel(num_tags, vocab_size, embed_dim=32, num_heads=4, ff_dim=64) # In[15]: class CustomNonPaddingTokenLoss(keras.losses.Loss): def __init__(self, name="custom_ner_loss"): super().__init__(name=name) def call(self, y_true, y_pred): loss_fn = keras.losses.SparseCategoricalCrossentropy( from_logits=False, reduction= 'none' ) loss = loss_fn(y_true, y_pred) mask = tf.cast((y_true > 0), dtype=tf.float32) loss = loss * mask return tf.reduce_sum(loss) / tf.reduce_sum(mask) # loss = CustomNonPaddingTokenLoss() # In[16]: # ner_model.compile(optimizer="adam", loss=loss) # ner_model.fit(train_dataset, epochs=10) def tokenize_and_convert_to_ids(text): tokens = text.split() return lowercase_and_convert_to_ids(tokens) # Sample inference using the trained model # sample_input = tokenize_and_convert_to_ids( # "eu rejects german call to boycott british lamb" # ) # sample_input = tf.reshape(sample_input, shape=[1, -1]) # print(sample_input) # output = ner_model.predict(sample_input) # prediction = np.argmax(output, axis=-1)[0] # prediction = [mapping[i] for i in prediction] # eu -> B-ORG, german -> B-MISC, british -> B-MISC # print(prediction) # In[17]: @st.cache_data def calculate_metrics(_dataset): all_true_tag_ids, all_predicted_tag_ids = [], [] for x, y in _dataset: output = ner_model.predict(x, verbose=0) predictions = np.argmax(output, axis=-1) predictions = np.reshape(predictions, [-1]) true_tag_ids = np.reshape(y, [-1]) mask = (true_tag_ids > 0) & (predictions > 0) true_tag_ids = true_tag_ids[mask] predicted_tag_ids = predictions[mask] all_true_tag_ids.append(true_tag_ids) all_predicted_tag_ids.append(predicted_tag_ids) all_true_tag_ids = np.concatenate(all_true_tag_ids) all_predicted_tag_ids = np.concatenate(all_predicted_tag_ids) predicted_tags = [mapping[tag] for tag in all_predicted_tag_ids] real_tags = [mapping[tag] for tag in all_true_tag_ids] evaluate(real_tags, predicted_tags) # calculate_metrics(val_dataset) # In[18]: @st.cache_resource def test_model_with_input(_ner_model, mapping): # Get input sentence from user input_sentence = "My name is Karishma Shirsath. I live in Toronto Canada." # Tokenize and convert input sentence to IDs sample_input = tokenize_and_convert_to_ids(input_sentence) sample_input = tf.reshape(sample_input, shape=[1, -1]) # Predict tags using the trained model output = _ner_model.predict(sample_input) predictions = np.argmax(output, axis=-1)[0] predicted_tags = [mapping[i] for i in predictions] # Print the predicted tags for each token in the input sentence print("Input sentence:", input_sentence) print("Predicted tags:", predicted_tags) # Test the model with user input # test_model_with_input(ner_model, mapping) # In[20]: logger = logging.getLogger("presidio-analyzer") class FlairRecognizer(EntityRecognizer): """ Wrapper for a flair model, if needed to be used within Presidio Analyzer. :example: >from presidio_analyzer import AnalyzerEngine, RecognizerRegistry >flair_recognizer = FlairRecognizer() >registry = RecognizerRegistry() >registry.add_recognizer(flair_recognizer) >analyzer = AnalyzerEngine(registry=registry) >results = analyzer.analyze( > "My name is Christopher and I live in Irbid.", > language="en", > return_decision_process=True, >) >for result in results: > print(result) > print(result.analysis_explanation) """ ENTITIES = [ "LOCATION", "PERSON", "ORGANIZATION", # "MISCELLANEOUS" # - There are no direct correlation with Presidio entities. ] DEFAULT_EXPLANATION = "Identified as {} by Flair's Named Entity Recognition" CHECK_LABEL_GROUPS = [ ({"LOCATION"}, {"LOC", "LOCATION"}), ({"PERSON"}, {"PER", "PERSON"}), ({"ORGANIZATION"}, {"ORG"}), # ({"MISCELLANEOUS"}, {"MISC"}), # Probably not PII ] MODEL_LANGUAGES = {"en": "flair/ner-english-large"} PRESIDIO_EQUIVALENCES = { "PER": "PERSON", "LOC": "LOCATION", "ORG": "ORGANIZATION", # 'MISC': 'MISCELLANEOUS' # - Probably not PII } def __init__( self, supported_language: str = "en", supported_entities: Optional[List[str]] = None, check_label_groups: Optional[Tuple[Set, Set]] = None, model: SequenceTagger = None, model_path: Optional[str] = None, ): self.check_label_groups = ( check_label_groups if check_label_groups else self.CHECK_LABEL_GROUPS ) supported_entities = supported_entities if supported_entities else self.ENTITIES if model and model_path: raise ValueError("Only one of model or model_path should be provided.") elif model and not model_path: self.model = model elif not model and model_path: print(f"Loading model from {model_path}") self.model = SequenceTagger.load(model_path) else: print(f"Loading model for language {supported_language}") self.model = SequenceTagger.load( self.MODEL_LANGUAGES.get(supported_language) ) super().__init__( supported_entities=supported_entities, supported_language=supported_language, name="Flair Analytics", ) def load(self) -> None: """Load the model, not used. Model is loaded during initialization.""" pass def get_supported_entities(self) -> List[str]: """ Return supported entities by this model. :return: List of the supported entities. """ return self.supported_entities # Class to use Flair with Presidio as an external recognizer. def analyze( self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts = None ) -> List[RecognizerResult]: """ Analyze text using Text Analytics. :param text: The text for analysis. :param entities: Not working properly for this recognizer. :param nlp_artifacts: Not used by this recognizer. :param language: Text language. Supported languages in MODEL_LANGUAGES :return: The list of Presidio RecognizerResult constructed from the recognized Flair detections. """ results = [] sentences = Sentence(text) self.model.predict(sentences) # If there are no specific list of entities, we will look for all of it. if not entities: entities = self.supported_entities for entity in entities: if entity not in self.supported_entities: continue for ent in sentences.get_spans("ner"): if not self.__check_label( entity, ent.labels[0].value, self.check_label_groups ): continue textual_explanation = self.DEFAULT_EXPLANATION.format( ent.labels[0].value ) explanation = self.build_flair_explanation( round(ent.score, 2), textual_explanation ) flair_result = self._convert_to_recognizer_result(ent, explanation) results.append(flair_result) return results def _convert_to_recognizer_result(self, entity, explanation) -> RecognizerResult: entity_type = self.PRESIDIO_EQUIVALENCES.get(entity.tag, entity.tag) flair_score = round(entity.score, 2) flair_results = RecognizerResult( entity_type=entity_type, start=entity.start_position, end=entity.end_position, score=flair_score, analysis_explanation=explanation, ) return flair_results def build_flair_explanation( self, original_score: float, explanation: str ) -> AnalysisExplanation: """ Create explanation for why this result was detected. :param original_score: Score given by this recognizer :param explanation: Explanation string :return: """ explanation = AnalysisExplanation( recognizer=self.__class__.__name__, original_score=original_score, textual_explanation=explanation, ) return explanation @staticmethod def __check_label( entity: str, label: str, check_label_groups: Tuple[Set, Set] ) -> bool: return any( [entity in egrp and label in lgrp for egrp, lgrp in check_label_groups] ) # In[21]: # # Use Flair NER for identifying PII # sentence = Sentence(input_text) # tagger.predict(sentence) # entities = sentence.to_dict(tag_type='ner')['entities'] # # Mask PII using Presidio analyzer # masked_text = analyzer.analyze(input_text, entities=entities) from flair.data import Sentence from flair.models import SequenceTagger def predict_ner_tags(input_text): # load tagger tagger = SequenceTagger.load("flair/ner-english-large") # make example sentence # sentence = Sentence("My name is Karishma Shirsath. I live in Toronto Canada.") sentence = Sentence(input_text) # predict NER tags tagger.predict(sentence) # print sentence print(sentence) # print predicted NER spans print("The following NER tags are found:") # iterate over entities and print for entity in sentence.get_spans("ner"): print(entity) # In[33]: def analyze_text(input_text): # load tagger tagger = SequenceTagger.load("flair/ner-english-large") # make example sentence sentence = Sentence(input_text) # predict NER tags tagger.predict(sentence) # print sentence print(sentence) # Anonymize identified named entities anonymized_sentence = str(sentence) for entity in sentence.get_spans("ner"): entity_text = entity.text anonymized_text = "*" * len(entity_text) anonymized_sentence = anonymized_sentence.replace(entity_text, anonymized_text) # remove the part that includes named entity annotations anonymized_sentence = anonymized_sentence.split("→")[0].strip() anonymized_sentence = anonymized_sentence.split(":")[1].strip() a = anonymize(input_text, "", anonymized_sentence) print("a sentence:") print(a) # print anonymized sentence print("Anonymized sentence:") print(anonymized_sentence) return anonymized_sentence from presidio_anonymizer import AnonymizerEngine from presidio_analyzer import AnalyzerEngine from presidio_anonymizer.entities import ( OperatorConfig, RecognizerResult, EngineResult, ConflictResolutionStrategy, ) from typing import List, Dict, Optional, Type class FlairRecognizer2(): def anonymize( text: str, operator: str, # analyze_results: List[RecognizerResult], mask_char: Optional[str] = None, number_of_chars: Optional[str] = None, encrypt_key: Optional[str] = None, ): """Anonymize identified input using Presidio Anonymizer. :param text: Full text :param operator: Operator name :param mask_char: Mask char (for mask operator) :param number_of_chars: Number of characters to mask (for mask operator) :param encrypt_key: Encryption key (for encrypt operator) :param analyze_results: list of results from presidio analyzer engine """ if operator == "mask": operator_config = { "type": "mask", "masking_char": mask_char, "chars_to_mask": number_of_chars, "from_end": False, } # Define operator config elif operator == "encrypt": operator_config = {"key": encrypt_key} elif operator == "highlight": operator_config = {"lambda": lambda x: x} else: operator_config = None # Change operator if needed as intermediate step if operator == "highlight": operator = "custom" elif operator == "synthesize": operator = "replace" else: operator = operator # res = AnonymizerEngine().anonymize( # text, # analyze_results, # operators={"DEFAULT": OperatorConfig("redact", operator_config)}, # ) entitiesToRecognize=['PHONE_NUMBER', 'PERSON', 'ID', 'LOCATION', 'EMAIL', 'URL', 'CREDIT_CARD', 'AGE', 'DATE_TIME', 'CRYPTO' 'IP_ADDRESS', 'US_PASSPORT', 'US_BANK_NUMBER' ] analyzer = AnalyzerEngine() results = analyzer.analyze(text=text, entities=entitiesToRecognize, language='en') # noqa D501 print("results:") print(results) engine = AnonymizerEngine() # Invoke the anonymize function with the text, analyzer results and # Operators to define the anonymization type. result = engine.anonymize( text=text, analyzer_results=results, operators={"DEFAULT": OperatorConfig(operator, {"new_value": "BIP"})} ) print("res:") print(result) print(result.text) print(type(result.text)) return result.text