Source code for finetune.sequence_labeling

import itertools
import math
import warnings
import copy

import tensorflow as tf
import numpy as np

from finetune.base import BaseModel, PredictMode
from finetune.target_encoders import SequenceLabelingEncoder, SequenceMultiLabelingEncoder
from finetune.network_modules import sequence_labeler
from finetune.crf import sequence_decode
from finetune.utils import indico_to_finetune_sequence, finetune_to_indico_sequence
from finetune.input_pipeline import BasePipeline, ENCODER
from finetune.estimator_utils import ProgressHook


class SequencePipeline(BasePipeline):
    def __init__(self, config, multi_label):
        super(SequencePipeline, self).__init__(config)
        self.multi_label = multi_label

    def _post_data_initialization(self, Y):
        Y_ = list(itertools.chain.from_iterable(Y))
        super()._post_data_initialization(Y_)

    def text_to_tokens_mask(self, X, Y=None):
        pad_token = [self.config.pad_token] if self.multi_label else self.config.pad_token
        out_gen = self._text_to_ids(X, Y=Y, pad_token=pad_token)
        for out in out_gen:
            feats = {"tokens": out.token_ids, "mask": out.mask}
            if Y is None:
                yield feats
            else:
                yield feats, self.label_encoder.transform(out.labels)

    def _format_for_encoding(self, X):
        return [X]

    def _format_for_inference(self, X):
        return [[x] for x in X]

    def feed_shape_type_def(self):
        TS = tf.TensorShape
        target_shape = (
            [self.config.max_length, self.label_encoder.target_dim] 
            if self.multi_label else [self.config.max_length]
        )
        return (
            (
                {
                    "tokens": tf.int32,
                    "mask": tf.float32
                },
                tf.int32
            ), 
            (
                {
                    "tokens": TS([self.config.max_length, 2]), 
                    "mask": TS([self.config.max_length])
                }, 
                TS(target_shape)
            )
        )

    def _target_encoder(self):
        if self.multi_label:
            return SequenceMultiLabelingEncoder()
        return SequenceLabelingEncoder()


[docs]class SequenceLabeler(BaseModel): """ Labels each token in a sequence as belonging to 1 of N token classes. :param config: A :py:class:`finetune.config.Settings` object or None (for default config). :param \**kwargs: key-value pairs of config items to override. """ defaults = { "n_epochs": 5, "lr_warmup": 0.1, "low_memory_mode": True, "chunk_long_sequences": True } def __init__(self, config=None, **kwargs): """ For a full list of configuration options, see `finetune.config`. :param config: A config object generated by `finetune.config.get_config` or None (for default config). :param n_epochs: defaults to `5`. :param lr_warmup: defaults to `0.1`, :param low_memory_mode: defaults to `True`, :param chunk_long_sequences: defaults to `True` :param **kwargs: key-value pairs of config items to override. """ d = copy.deepcopy(SequenceLabeler.defaults) d.update(kwargs) super().__init__(config=config, **d) def _get_input_pipeline(self): return SequencePipeline(config=self.config, multi_label=self.config.multi_label_sequences) def _initialize(self): self.multi_label = self.config.multi_label_sequences return super()._initialize() def finetune(self, Xs, Y=None, batch_size=None): Xs, Y_new = indico_to_finetune_sequence(Xs, labels=Y, multi_label=self.multi_label, none_value="<PAD>") Y = Y_new if Y is not None else None return super().finetune(Xs, Y=Y, batch_size=batch_size)
[docs] def predict(self, X): """ Produces a list of most likely class labels as determined by the fine-tuned model. :param X: A list / array of text, shape [batch] :returns: list of class labels. """ chunk_size = self.config.max_length - 2 step_size = chunk_size // 3 arr_encoded = list(itertools.chain.from_iterable(self.input_pipeline._text_to_ids([x]) for x in X)) labels, batch_probas = [], [] for pred in self._inference(X, mode=None): labels.append(self.input_pipeline.label_encoder.inverse_transform(pred[PredictMode.NORMAL])) batch_probas.append(pred[PredictMode.PROBAS]) all_subseqs = [] all_labels = [] all_probs = [] doc_idx = -1 for chunk_idx, (label_seq, proba_seq) in enumerate(zip(labels, batch_probas)): position_seq = arr_encoded[chunk_idx].char_locs start_of_doc = arr_encoded[chunk_idx].token_ids[0][0] == ENCODER.start end_of_doc = ( chunk_idx + 1 >= len(arr_encoded) or arr_encoded[chunk_idx + 1].token_ids[0][0] == ENCODER.start ) """ Chunk idx for prediction. Dividers at `step_size` increments. [ 1 | 1 | 2 | 3 | 3 ] """ start, end = 0, None if start_of_doc: # if this is the first chunk in a document, start accumulating from scratch doc_subseqs = [] doc_labels = [] doc_probs = [] doc_idx += 1 start_of_token = 0 if not end_of_doc: end = step_size * 2 else: if end_of_doc: # predict on the rest of sequence start = step_size else: # predict only on middle third start, end = step_size, step_size * 2 label_seq = label_seq[start:end] position_seq = position_seq[start:end] proba_seq = proba_seq[start:end] for label, position, proba in zip(label_seq, position_seq, proba_seq): if position == -1: # indicates padding / special tokens continue # if there are no current subsequence # or the current subsequence has the wrong label if not doc_subseqs or label != doc_labels[-1]: # start new subsequence doc_subseqs.append(X[doc_idx][start_of_token:position]) doc_labels.append(label) doc_probs.append([proba]) else: # continue appending to current subsequence doc_subseqs[-1] += X[doc_idx][start_of_token:position] doc_probs[-1].append(proba) start_of_token = position if end_of_doc: # last chunk in a document prob_dicts = [] for prob_seq in doc_probs: # format probabilities as dictionary probs = np.mean(np.vstack(prob_seq), axis=0) prob_dicts.append(dict(zip(self.input_pipeline.label_encoder.classes_, probs))) if self.multi_label: del prob_dicts[-1][self.config.pad_token] all_subseqs.append(doc_subseqs) all_labels.append(doc_labels) all_probs.append(prob_dicts) _, doc_annotations = finetune_to_indico_sequence( raw_texts=X, subseqs=all_subseqs, labels=all_labels, probs=all_probs, subtoken_predictions=self.config.subtoken_predictions ) return doc_annotations
[docs] def featurize(self, X): """ Embeds inputs in learned feature space. Can be called before or after calling :meth:`finetune`. :param Xs: An iterable of lists or array of text, shape [batch, n_inputs, tokens] :returns: np.array of features of shape (n_examples, embedding_size). """ return self._featurize(X)
[docs] def predict_proba(self, X): """ Produces a list of most likely class labels as determined by the fine-tuned model. :param X: A list / array of text, shape [batch] :returns: list of class labels. """ return self.predict(X)
def _target_model(self, featurizer_state, targets, n_outputs, train=False, reuse=None, **kwargs): return sequence_labeler( hidden=featurizer_state['sequence_features'], targets=targets, n_targets=n_outputs, pad_id=self.input_pipeline.pad_idx, config=self.config, train=train, multilabel=self.multi_label, reuse=reuse, **kwargs ) def _predict_op(self, logits, **kwargs): trans_mats = kwargs.get("transition_matrix") if self.multi_label: logits = tf.unstack(logits, axis=-1) label_idxs = [] label_probas = [] for logits_i, trans_mat_i in zip(logits, trans_mats): idx, prob = sequence_decode(logits_i, trans_mat_i) label_idxs.append(idx) label_probas.append(prob[:, :, 1:]) label_idxs = tf.stack(label_idxs, axis=-1) label_probas = tf.stack(label_probas, axis=-1) else: label_idxs, label_probas = sequence_decode(logits, trans_mats) return label_idxs, label_probas def _predict_proba_op(self, logits, **kwargs): return tf.no_op()