Source code for orkgnlp.common.service.base

# -*- coding: utf-8 -*-
""" Base interfaces. """
import os
from typing import Any, Dict, Generator, Tuple, Union

from overrides import EnforceOverrides

from orkgnlp.common.config import orkgnlp_context
from orkgnlp.common.tools import downloader
from orkgnlp.common.util.decorators import singleton
from orkgnlp.common.util.exceptions import (
    ORKGNLPIllegalStateError,
    ORKGNLPValidationError,
)


[docs] class PipelineExecutorComponent: """ The PipelineExecutorComponent represents a component of a PipelineExecutor """
[docs] def release_memory(self): """ Releases the memory of all available attributes in a pipeline component. """ attributes = list(self.__dict__.keys()) for attribute in attributes: delattr(self, attribute)
[docs] class ORKGNLPBaseEncoder(EnforceOverrides, PipelineExecutorComponent): """ The ORKGNLPBaseEncoder is the base encoder class. You can freely inherit this class, implement its ``encode(raw_input, **kwargs)`` function and use it to encode your user input to a model-friendly format. Using the ORKGNLPBaseEncoder as your service encoder results in passing the same user's input to the model. """ def __init__(self): pass
[docs] def encode(self, raw_input: Any, **kwargs: Any) -> Tuple[Any, Dict[str, Any]]: """ Encodes the ``raw_input`` to a model-friendly format. :param raw_input: The user's input to be encoded. :return: The model-friendly output and kwargs. """ return (raw_input,), kwargs
[docs] class ORKGNLPBaseDecoder(EnforceOverrides, PipelineExecutorComponent): """ The ORKGNLPBaseDecoder is the base decoder class. You can freely inherit this class, implement its ``decode(model_output, **kwargs)`` function and use it to decode your model output to a user-friendly format. Using the ORKGNLPBaseDecoder as your service decoder results in returning the same model's output to the user. """ def __init__(self): pass
[docs] def decode(self, model_output: Union[Any, Generator[Any, None, None]], **kwargs: Any) -> Any: """ Decodes the model's ``output`` to a user-friendly format. :param model_output: The model's output to be decoded. :return: The user-friendly output. """ return model_output, kwargs
[docs] class ORKGNLPBaseRunner(EnforceOverrides, PipelineExecutorComponent): """ The ORKGNLPBaseRunner is the base runner class. It requires a model object while initialization. This runner must be inherited and the `run(*args, **kwargs)` must be overridden, thus running this runner raises an ``NotImplementedError``. """ def __init__(self, model: Any): """ :param model: The model to be run. :type model: Model object. See the inheriting classes for further information. """ self._model: Any = model
[docs] def run(self, *args: Any, **kwargs: Any): """ :raise: NotImplementedError """ raise NotImplementedError("Subclass must implement abstract method")
[docs] class PipelineExecutor: """ The PipelineExecutor executes a full service workflow given its encoder, runner and decoder. See the ``run`` function description for further information. """ def __init__( self, encoder: ORKGNLPBaseEncoder, runner: ORKGNLPBaseRunner, decoder: ORKGNLPBaseDecoder, ): """ :param encoder: Service's encoder. :param runner: Service's runner. :param decoder: Service's decoder. """ self._encoder: ORKGNLPBaseEncoder = encoder self._runner: ORKGNLPBaseRunner = runner self._decoder: ORKGNLPBaseDecoder = decoder
[docs] def run(self, raw_input: Any, **kwargs: Any) -> Any: """ Executes a full pipline of the common service workflow: 1. Runs the service encoder with the user's input. 2. The encoded input is passed to the model runner, which in turn is executed. 3. The model's output is decoded to a user-friendly format using the service's decoder. Note that the kwargs can be updated and passed through the pipeline components. :param raw_input: User's input to be encoded. :param kwargs: Named parameters for further processing config. Please check your used component documentation for specific parameter description. :return: The decoded user-friendly output. :raise orkgnlp.common.util.exceptions.ORKGNLPIllegalStateException: If either [Encoder, Runner, Decoder] is not initialized. """ if not (self._encoder and self._runner and self._decoder): raise ORKGNLPIllegalStateError("Encoder, Runner and Decoder must be initialized!") inputs, additional_properties = self._encoder.encode(raw_input, **kwargs) kwargs.update(additional_properties) output, additional_properties = self._runner.run(inputs, **kwargs) kwargs.update(additional_properties) return self._decoder.decode(output, **kwargs)
[docs] def release_memory(self): """ Releases the memory of all available pipeline components. """ self._encoder.release_memory() self._runner.release_memory() self._decoder.release_memory()
[docs] class ORKGNLPBaseConfig: """ The ORKGNLPBaseConfig encapsulates the required configurations for a service given its name. """ def __init__(self, service: str): """ :param service: The service name. """ service_data_dir = os.path.join(orkgnlp_context.get("ORKG_NLP_DATA_CACHE_ROOT"), service) self.service_name: str = service self.service_dir: str = service_data_dir self.requirements = self._get_requirement_paths() def _get_requirement_paths(self) -> Dict[str, str]: paths = {} for repo in orkgnlp_context.get("HUGGINGFACE_REPOS")[self.service_name]: for file_obj in repo["files"]: paths[file_obj["internal_name"]] = os.path.join( self.service_dir, file_obj.get("subbdir", ""), file_obj["filename"] ) return paths
[docs] class ORKGNLPBaseService: """ Base class for shared config parameters and functionalities. All ORKG-NLP services must inherit from this class. This class follows the singleton pattern, i.e. only one instance can be obtained from it or its subclasses. """ @singleton def __new__(cls, *args, **kwargs): pass def __init__( self, service: str, *, force_download: bool = False, batch_size: int = 16, _unittest: bool = False ): """ :param service: Service name. :param force_download: Indicates whether the required files are to be downloaded again. Defaults to False. :param batch_size: Size of the batches used during model prediction. This argument is used by services that applies batch evaluation. Defaults to 16. """ self._pipeline_executors: Dict[str, PipelineExecutor] = {} self._config: ORKGNLPBaseConfig = ORKGNLPBaseConfig(service) self._force_download: bool = force_download self._batch_size: int = batch_size self._unittest: bool = _unittest self._download(service) def _download(self, service: str): """ Downloads the required files for the given service name based on the ``force_download`` class attribute. :param service: a string representing a ORKG-NLP service name. Check :doc:`../services/services` for a full list. :return: """ downloader.download(service, self._force_download) def _run(self, raw_input: Any, pipline_executor_name: str = None, **kwargs: Any): """ Executes the only PipelineExecutor registered for the service, or one of them given its name. :param raw_input: User's input to be encoded. :param pipline_executor_name: Name of the PipelineExecutor to run. :param kwargs: Named parameters for further processing config. Please check your used component documentation for specific parameter description. :return: The decoded user-friendly output. :raise orkgnlp.common.util.exceptions.ORKGNLPIllegalStateException: If either [Encoder, Runner, Decoder] is not initialized. :raise orkgnlp.common.util.exceptions.ORKGNLPValidationError: If the given ``pipline_executor_name`` is unknown or not given, in case of multiple registered ones. """ if not self._pipeline_executors: raise ORKGNLPIllegalStateError( "There is no PipelineRunner registered. Please consider registering" "one using the _register_pipline_runner() function." ) if pipline_executor_name: if pipline_executor_name not in self._pipeline_executors: raise ORKGNLPValidationError("PipelineExecutor name is unknown.") return self._pipeline_executors[pipline_executor_name].run(raw_input, **kwargs) if len(self._pipeline_executors) > 1: raise ORKGNLPValidationError( "PipelineExecutor is ambiguous. Consider passing pipline_executor_name in the " "input." ) return next(iter(self._pipeline_executors.values())).run(raw_input, **kwargs) def _register_pipeline( self, name: str, encoder: ORKGNLPBaseEncoder, runner: ORKGNLPBaseRunner, decoder: ORKGNLPBaseDecoder, ): """ Registers a PipelineExecutor to the service. :param name: PipelineExecutors name. :param encoder: Service's encoder. :param runner: Service's runner. :param decoder: Service's decoder. """ if name in self._pipeline_executors: raise ORKGNLPValidationError("PipelineExecutor name already exists.") self._pipeline_executors[name] = PipelineExecutor(encoder, runner, decoder)
[docs] def release_memory(self): """ Releases the memory of all available executors. """ for executor in self._pipeline_executors.values(): executor.release_memory()