import numpy as np
import tqdm
import math
import itertools
import logging
import pandas as pd
import tensorflow as tf
from collections import namedtuple
from tensorflow.python.data import Dataset
from finetune.config import get_config
from finetune.saver import Saver, InitializeHook
from finetune.base import BaseModel
from finetune.target_models.comparison import ComparisonPipeline, Comparison
from finetune.target_models.sequence_labeling import SequencePipeline, SequenceLabeler
from finetune.target_models.association import AssociationPipeline
from finetune.target_models.classifier import ClassificationPipeline
from finetune.base_models import GPTModel, GPTModelSmall, BERTModelCased, GPT2Model
from finetune.model import get_separate_model_fns, PredictMode
from finetune.errors import FinetuneError
from finetune.input_pipeline import BasePipeline
LOGGER = logging.getLogger('finetune')
PredictHook = namedtuple('InitializeHook', 'feat_hook target_hook')
class TaskMode:
SEQUENCE_LABELING = "Sequence_Labeling"
CLASSIFICATION = "Classification"
COMPARISON = "Comparison"
ASSOCIATION = "Association"
class DeploymentPipeline(BasePipeline):
def __init__(self, config):
super().__init__(config)
self.pipeline_type = None
self.pipeline = None
def _target_encoder(self):
raise NotImplementedError
def _dataset_without_targets(self, Xs, train):
if not callable(Xs):
Xs_fn = lambda: self.wrap_tqdm(Xs, train)
else:
Xs_fn = lambda: self.wrap_tqdm(Xs(), train)
dataset_encoded = lambda: itertools.chain.from_iterable(map(self.get_text_token_mask, Xs_fn()))
types, shapes = self.feed_shape_type_def()
return Dataset.from_generator(dataset_encoded, output_types=types[0])
def get_active_pipeline(self):
pipelines = {'Classification':ClassificationPipeline, 'Comparison':ComparisonPipeline, 'Sequence_Labeling':SequencePipeline, 'Association':AssociationPipeline}
self.pipeline_type = pipelines[self.task]
if type(self.pipeline) != self.pipeline_type: #to prevent instantiating the same type of pipeline repeatedly
if self.pipeline_type == SequencePipeline:
self.pipeline = self.pipeline_type(self.config, multi_label=self.multi_label)
else:
self.pipeline = self.pipeline_type(self.config)
return self.pipeline_type
def get_text_token_mask(self, X):
_ = self.get_active_pipeline()
return self.pipeline.text_to_tokens_mask(X)
def get_shapes(self):
_ = self.get_active_pipeline()
types, shapes = self.pipeline.feed_shape_type_def()
if hasattr(self, 'dataset'):
1/0
self.dataset.output_shapes = shapes[0]
return shapes[0]
def get_target_input_fn(self, features, batch_size=None):
batch_size = batch_size or self.config.batch_size
features = pd.DataFrame(features).to_dict('list')
for key in features:
features[key] = np.array(features[key])
return tf.estimator.inputs.numpy_input_fn(features, batch_size=batch_size, shuffle=False)
[docs]class DeploymentModel(BaseModel):
"""
Implements inference in arbitrary tasks in a cached manner by loading weights efficiently, allowing for quick interchanging of
weights while avoiding slow graph recompilation.
:param config: A :py:class:`finetune.config.Settings` object or None (for default config).
:param \**kwargs: key-value pairs of config items to override.
"""
def __init__(self, featurizer, **kwargs):
"""
For a full list of configuration options, see `finetune.config`.
:param base_model: One of the base models from finetune.base_models, excluding textcnn.
:param **kwargs: key-value pairs of config items to override.
"""
if featurizer not in [GPTModel, GPTModelSmall, BERTModelCased, GPT2Model]:
raise FinetuneError("Selected base model not supported.")
self.config = get_config(**kwargs)
self.validate_config()
self.input_pipeline = DeploymentPipeline(self.config)
super().__init__(**kwargs)
self.config.base_model = featurizer
self.task = TaskMode.CLASSIFICATION
self.input_pipeline.task = self.task
self.featurizer_loaded = False
self.adapters = False
self.loaded_custom_previously = False
[docs] def load_featurizer(self):
"""
Performs graph compilation of the featurizer, saving most compilation overhead from occurring at predict time. Should
be called after initialization but BEFORE any calls to load_custom_model or predict.
"""
self.featurizer_est = self._get_estimator('featurizer')
self.predict_hooks.feat_hook.model_portion = 'whole_featurizer'
for hook in self.predict_hooks:
hook.need_to_refresh = True
output = self.predict(['finetune'], exclude_target=True) #run arbitrary predict call to compile featurizer graph
self.featurizer_loaded = True
[docs] def load_custom_model(self, path):
"""
Load in target model, and either adapters or entire featurizer from file. Must be called after load_featurizer.
"""
if not self.featurizer_loaded:
raise FinetuneError('Need to call load_featurizer before loading weights from file.')
original_model = self.saver.load(path)
if original_model.config.adapter_size is None:
LOGGER.warning("Loading without adapters will result in slightly slower load time than models that use adapters, and will also slow the next switch to an adapter model.")
self.predict_hooks.feat_hook.model_portion = 'whole_featurizer' #need to load everything from save file, rather than standard base model file
elif original_model.config.adapter_size != self.config.adapter_size:
raise FinetuneError('adapter_size in config is compatible with this model')
if type(self.config.base_model) != type(original_model.config.base_model):
raise FinetuneError('Loaded file has incompatible base model.')
if original_model.config.max_length != self.config.max_length:
raise FinetuneError('Loaded model has a different config.max_length than current value. Changing max_length between loads is not yet supported.')
if not self.adapters or not self.loaded_custom_previously: #previous model did not use adapters, so we have to update everything
self.predict_hooks.feat_hook.refresh_base_model = True
self.adapters = original_model.config.adapter_size is not None
self._target_model = original_model._target_model
self._predict_op = original_model._predict_op
self._predict_proba_op = original_model._predict_proba_op
for hook in self.predict_hooks:
hook.need_to_refresh = True
self._to_pull = 0
self.loaded_custom_previously = True
self._update_pipeline(original_model)
def _update_pipeline(self, original_model):
"""
Refresh necessary attributes of DeploymentModel's input_pipeline so that it can support a newly loaded model
"""
self.input_pipeline.target_dim = original_model.input_pipeline.target_dim
self.input_pipeline.label_encoder = original_model.input_pipeline.label_encoder
self.input_pipeline.text_encoder = original_model.input_pipeline.text_encoder
self.input_pipeline._target_encoder = original_model.input_pipeline._target_encoder
self.input_pipeline._post_data_initialization = original_model.input_pipeline._post_data_initialization
self.input_pipeline._format_for_inference = original_model.input_pipeline._format_for_inference
self.input_pipeline._format_for_encoding = original_model.input_pipeline._format_for_encoding
if isinstance(original_model.input_pipeline, SequencePipeline) :
self.task = TaskMode.SEQUENCE_LABELING
self.input_pipeline.multi_label = original_model.input_pipeline.multi_label
self.multi_label = original_model.config.multi_label_sequences
original_model.multi_label = self.multi_label
self._initialize = original_model._initialize
elif isinstance(original_model.input_pipeline, ComparisonPipeline):
self.task = TaskMode.COMPARISON
elif isinstance(original_model.input_pipeline, BasePipeline):
self.task = TaskMode.CLASSIFICATION
elif isinstance(original_model.input_pipeline, AssociationPipeline):
self.task = TaskMode.ASSOCIATION
else:
raise FinetuneError("Invalid pipeline in loaded file.")
self.input_pipeline.task = self.task
def _get_estimator(self, portion):
assert portion in ['featurizer', 'target'], "Can only split model into featurizer and target."
config = self._get_estimator_config()
fn = get_separate_model_fns(
target_model_fn=self._target_model if portion == 'target' else None,
predict_op=self._predict_op,
predict_proba_op=self._predict_proba_op,
build_target_model=self.input_pipeline.target_dim is not None,
encoder=self.input_pipeline.text_encoder,
target_dim=self.input_pipeline.target_dim if portion == 'target' else None,
label_encoder=self.input_pipeline.label_encoder if portion == 'target' else None,
saver=self.saver,
portion=portion,
build_attn=not isinstance(self.input_pipeline, ComparisonPipeline)
)
estimator = tf.estimator.Estimator(
model_dir=self.estimator_dir,
model_fn=fn,
config=config,
params=self.config
)
if hasattr(self,'predict_hooks') and portion == 'featurizer':
for hook in self.predict_hooks:
hook.need_to_refresh = True
elif not hasattr(self,'predict_hooks'):
feat_hook = InitializeHook(self.saver, model_portion='featurizer')
target_hook = InitializeHook(self.saver, model_portion='target')
self.predict_hooks = PredictHook(feat_hook, target_hook)
return estimator
def _get_input_pipeline(self):
return self.input_pipeline
[docs] def featurize(self, X):
"""
Embeds inputs in learned feature space. Can be called before or after calling :meth:`finetune`.
:param X: list or array of text to embed.
:returns: np.array of features of shape (n_examples, embedding_size).
"""
features = self.predict(X, exclude_targets=True)
return features['features']
def _get_input_fn(self, gen):
return self.input_pipeline.get_predict_input_fn(gen)
def _inference(self, Xs, predict_keys=[PredictMode.NORMAL], exclude_target=False, n_examples=None):
Xs = self.input_pipeline._format_for_inference(Xs)
self._data = Xs
self._closed = False
n = n_examples or len(self._data)
if self.adapters:
self.predict_hooks.feat_hook.model_portion = 'featurizer'
else:
self.predict_hooks.feat_hook.model_portion = 'whole_featurizer'
if self._predictions is None:
featurizer_est = self._get_estimator('featurizer')
self._predictions = featurizer_est.predict(
input_fn=self._get_input_fn(self._data_generator), predict_keys=None, hooks=[self.predict_hooks.feat_hook], yield_single_examples=False)
self._clear_prediction_queue()
num_batches = math.ceil(n / self.config.batch_size)
features = [None]*n
for i in tqdm.tqdm(range(num_batches), total=num_batches, desc="Featurization by Batch"):
y = next(self._predictions)
for j in range(self.config.batch_size): #this loop needed since yield_single_examples is False. In this case, n = # of predictions * batch_size
single_example = {key:value[j] for key,value in y.items()}
if self.config.batch_size * i + j > n-1: #this is a result of the generator using cached_example and to_pull. If this is the last batch, we need to check that all examples come from self._data and are not cached examples
break
features[self.config.batch_size * i + j] = single_example
if exclude_target: #to initialize featurizer weights in load_featurizer
return features
preds = None
if features is not None:
self.predict_hooks.target_hook.need_to_refresh = True
target_est = self._get_estimator('target')
target_fn = self.input_pipeline.get_target_input_fn(features)
preds = target_est.predict(
input_fn=target_fn, predict_keys=predict_keys, hooks=[self.predict_hooks.target_hook])
predictions = [None]*n
for i in tqdm.tqdm(range(n), total=n, desc="Target Model"):
y = next(preds)
try:
y = y[predict_keys[0]] if len(predict_keys) == 1 else y
except ValueError:
raise FinetuneError("Cannot call `predict()` on a model that has not been fit.")
predictions[i] = y
self._clear_prediction_queue()
return predictions
[docs] def predict(self, X, exclude_target=False):
"""
Performs inference using the weights and targets from the model in filepath used for load_custom_model.
:param X: list or array of text to embed.
:returns: list of class labels.
"""
if self.task == TaskMode.SEQUENCE_LABELING and not exclude_target:
return SequenceLabeler.predict(self, X)
else:
raw_preds = self._inference(X, exclude_target=exclude_target)
if exclude_target:
return raw_preds
return self.input_pipeline.label_encoder.inverse_transform(np.asarray(raw_preds))
[docs] def predict_proba(self, X):
"""
Produces a probability distribution over classes for each example in X.
:param X: list or array of text to embed.
:returns: list of dictionaries. Each dictionary maps from a class label to its assigned class probability.
"""
return super().predict_proba(X)
def finetune(self, X, Y=None, batch_size=None):
raise NotImplementedError
@classmethod
def get_eval_fn(cls):
raise NotImplementedError
@staticmethod
def _target_model(config, featurizer_state, targets, n_outputs, train=False, reuse=None, **kwargs):
raise NotImplementedError
def _predict_op(self, logits, **kwargs):
raise NotImplementedError
def _predict_proba_op(self, logits, **kwargs):
raise NotImplementedError
[docs] def fit(self, *args, **kwargs):
raise NotImplementedError
def attention_weights(self, Xs):
raise NotImplementedError
[docs] def generate_text(self, seed_text, max_length, use_extra_toks):
raise NotImplementedError
[docs] def save(self, path):
raise NotImplementedError
[docs] def create_base_model(self, filename, exists_ok):
raise NotImplementedError
[docs] def load(cls, path, **kwargs):
raise NotImplementedError
[docs] def finetune_grid_search(cls, Xs, Y, *, test_size, eval_fn, probs, return_all, **kwargs):
raise NotImplementedError
[docs] def finetune_grid_search_cv(cls, Xs, Y, *, n_splits, test_size, eval_fn, probs,
return_all, **kwargs):
raise NotImplementedError