Source code for orkgnlp.annotation.csner.encoder

# -*- coding: utf-8 -*-
""" CS-NER service encoder. """

import sys
from typing import Any, Dict, Tuple

import numpy as np
import spacy
import torch
from overrides import overrides

from orkgnlp.common.service.base import ORKGNLPBaseEncoder


[docs] class CSNerEncoder(ORKGNLPBaseEncoder): """ The CSNerEncoder encodes the given input to the arguments needed to execute a Sequence labeling TorchScript model. """ def __init__(self, alphabet: Dict[str, Dict[str, int]]): """ :param alphabet: Dict representing the word, char and label alphabets. :type alphabet: Dict[str, Dict[str, int]] """ super().__init__() self._alphabet = alphabet self._UNKNOWN = "</unk>" self._spacy_nlp = spacy.load( "en_core_web_md", disable=["tokenizer", "tagger", "ner", "textcat", "lemmatizer"], )
[docs] @overrides def encode(self, raw_input: Any, **kwargs: Any) -> Tuple[Any, Dict[str, Any]]: raw_texts, raw_ids = self._read_instance(raw_input) batch_size = 10 total_batch = len(raw_ids) // batch_size + 1 model_input = [] decoder_input = {"raw_texts": raw_texts, "recover": []} for batch_id in range(total_batch): start = batch_id * batch_size end = (batch_id + 1) * batch_size if end > len(raw_ids): end = len(raw_ids) instance = raw_ids[start:end] if not instance: continue ( word, features, word_len, word_recover, char, char_len, char_recover, label, mask, ) = self._batchify(instance) model_input.append( ( word, features, word_len, char, char_recover, mask, 1, ) ) decoder_input["recover"].append((label, mask, word_recover)) kwargs.update(decoder_input) return model_input, kwargs
def _read_instance(self, q, char_padding_size=-1, char_padding_symbol="</pad>"): document = self._spacy_nlp(q.strip()) instance_texts = [] instance_ids = [] # for sequence labeling text format for span in document.sents: ( words, features, chars, labels, word_ids, feature_ids, char_ids, label_ids, ) = ([], [], [], [], [], [], [], []) sentence = [document[i] for i in range(span.start, span.end)] for token in sentence: word = token.text.strip() if not word: continue if sys.version_info[0] < 3: word = word.decode("utf-8") words.append(word) if self._alphabet["number_normalized"]: word = self._normalize_word(word) label = "O" labels.append(label) word_ids.append( self._alphabet["word"].get(word) or self._alphabet["word"].get(self._UNKNOWN) ) label_ids.append( self._alphabet["label"].get(label) or self._alphabet["label"].get(self._UNKNOWN) ) # get features feat_list = [] feat_Id = [] features.append(feat_list) feature_ids.append(feat_Id) # get char char_list = [] char_id = [] for char in word: char_list.append(char) if char_padding_size > 0: char_number = len(char_list) if char_number < char_padding_size: char_list = char_list + [char_padding_symbol] * ( char_padding_size - char_number ) assert len(char_list) == char_padding_size else: # not padding pass for char in char_list: char_id.append( self._alphabet["char"].get(char) or self._alphabet["char"].get(self._UNKNOWN) ) chars.append(char_list) char_ids.append(char_id) instance_texts.append([words, features, chars, labels]) instance_ids.append([word_ids, feature_ids, char_ids, label_ids]) return instance_texts, instance_ids @staticmethod def _batchify(input_batch_list, if_train=False): """ input: list of words, chars and labels, various length. [[words, features, chars, labels],[words, features, chars,labels],...] words: word ids for one sentence. (batch_size, sent_len) features: features ids for one sentence. (batch_size, sent_len, feature_num) chars: char ids for on sentences, various length. (batch_size, sent_len, each_word_length) labels: label ids for one sentence. (batch_size, sent_len) output: zero padding for word and char, with their batch length word_seq_tensor: (batch_size, max_sent_len) Variable feature_seq_tensors: [(batch_size, max_sent_len),...] list of Variable word_seq_lengths: (batch_size,1) Tensor char_seq_tensor: (batch_size*max_sent_len, max_word_len) Variable char_seq_lengths: (batch_size*max_sent_len,1) Tensor char_seq_recover: (batch_size*max_sent_len,1) recover char sequence order label_seq_tensor: (batch_size, max_sent_len) mask: (batch_size, max_sent_len) """ batch_size = len(input_batch_list) words = [sent[0] for sent in input_batch_list] features = [np.asarray(sent[1]) for sent in input_batch_list] feature_num = len(features[0][0]) chars = [sent[2] for sent in input_batch_list] labels = [sent[3] for sent in input_batch_list] word_seq_lengths = torch.LongTensor(list(map(len, words))) max_seq_len = word_seq_lengths.max().item() word_seq_tensor = torch.zeros((batch_size, max_seq_len), requires_grad=if_train).long() label_seq_tensor = torch.zeros((batch_size, max_seq_len), requires_grad=if_train).long() feature_seq_tensors = [] for idx in range(feature_num): feature_seq_tensors.append( torch.zeros((batch_size, max_seq_len), requires_grad=if_train).long() ) mask = torch.zeros((batch_size, max_seq_len), requires_grad=if_train).bool() for idx, (seq, label, seqlen) in enumerate(zip(words, labels, word_seq_lengths)): seqlen = seqlen.item() word_seq_tensor[idx, :seqlen] = torch.LongTensor(seq) label_seq_tensor[idx, :seqlen] = torch.LongTensor(label) mask[idx, :seqlen] = torch.Tensor([1] * seqlen) for idy in range(feature_num): feature_seq_tensors[idy][idx, :seqlen] = torch.LongTensor(features[idx][:, idy]) word_seq_lengths, word_perm_idx = word_seq_lengths.sort(0, descending=True) word_seq_tensor = word_seq_tensor[word_perm_idx] for idx in range(feature_num): feature_seq_tensors[idx] = feature_seq_tensors[idx][word_perm_idx] label_seq_tensor = label_seq_tensor[word_perm_idx] mask = mask[word_perm_idx] # deal with char # pad_chars (batch_size, max_seq_len) pad_chars = [ chars[idx] + [[0]] * (max_seq_len - len(chars[idx])) for idx in range(len(chars)) ] length_list = [list(map(len, pad_char)) for pad_char in pad_chars] max_word_len = max(map(max, length_list)) char_seq_tensor = torch.zeros( (batch_size, max_seq_len, max_word_len), requires_grad=if_train ).long() char_seq_lengths = torch.LongTensor(length_list) for idx, (seq, seqlen) in enumerate(zip(pad_chars, char_seq_lengths)): for idy, (word, wordlen) in enumerate(zip(seq, seqlen)): # print len(word), wordlen char_seq_tensor[idx, idy, :wordlen] = torch.LongTensor(word) char_seq_tensor = char_seq_tensor[word_perm_idx].view(batch_size * max_seq_len, -1) char_seq_lengths = char_seq_lengths[word_perm_idx].view( batch_size * max_seq_len, ) char_seq_lengths, char_perm_idx = char_seq_lengths.sort(0, descending=True) char_seq_tensor = char_seq_tensor[char_perm_idx] _, char_seq_recover = char_perm_idx.sort(0, descending=False) _, word_seq_recover = word_perm_idx.sort(0, descending=False) return ( word_seq_tensor, torch.tensor(feature_seq_tensors), word_seq_lengths, word_seq_recover, char_seq_tensor, char_seq_lengths, char_seq_recover, label_seq_tensor, mask, ) @staticmethod def _normalize_word(word): new_word = "" for char in word: if char.isdigit(): new_word += "0" else: new_word += char return new_word