Source code for bentoml._internal.io_descriptors.pandas

from __future__ import annotations

import functools
import io
import logging
import os
import typing as t
from concurrent.futures import ThreadPoolExecutor
from enum import Enum

from starlette.requests import Request
from starlette.responses import Response

from ...exceptions import BadInput
from ...exceptions import InvalidArgument
from ...exceptions import MissingDependencyException
from ...exceptions import UnprocessableEntity
from ...grpc.utils import import_generated_stubs
from ..service.openapi import SUCCESS_DESCRIPTION
from ..service.openapi.specification import MediaType
from ..service.openapi.specification import Schema
from ..types import LazyType
from ..utils.http import set_cookies
from ..utils.lazy_loader import LazyLoader
from ..utils.pkg import find_spec
from .base import IODescriptor

EXC_MSG = "pandas' is required to use PandasDataFrame or PandasSeries. Install with 'pip install bentoml[io-pandas]'"

if t.TYPE_CHECKING:
    import numpy as np
    import pandas as pd
    import pyarrow
    import pyspark.sql.types
    from google.protobuf import message as _message

    from bentoml.grpc.v1 import service_pb2 as pb
    from bentoml.grpc.v1alpha1 import service_pb2 as pb_v1alpha1

    from .. import external_typing as ext
    from ..context import ServiceContext as Context
    from .base import OpenAPIResponse

else:
    pb, _ = import_generated_stubs("v1")
    pb_v1alpha1, _ = import_generated_stubs("v1alpha1")
    pd = LazyLoader("pd", globals(), "pandas", exc_msg=EXC_MSG)
    np = LazyLoader("np", globals(), "numpy")

logger = logging.getLogger(__name__)


# Check for parquet support
@functools.lru_cache(maxsize=1)
def get_parquet_engine() -> str:
    if find_spec("pyarrow") is not None:
        return "pyarrow"
    elif find_spec("fastparquet") is not None:
        return "fastparquet"
    else:
        logger.warning(
            "Neither pyarrow nor fastparquet packages found. Parquet de/serialization will not be available."
        )
        raise MissingDependencyException(
            "Parquet serialization is not available. Try installing pyarrow or fastparquet first."
        )


def _openapi_types(item: t.Any) -> str:  # pragma: no cover
    # convert pandas types to OpenAPI types
    if pd.api.types.is_integer_dtype(item):
        return "integer"
    elif pd.api.types.is_float_dtype(item):
        return "number"
    elif pd.api.types.is_string_dtype(item) or pd.api.types.is_datetime64_dtype(item):
        return "string"
    elif pd.api.types.is_bool_dtype(item):
        return "boolean"
    else:
        return "object"


def _dataframe_openapi_schema(
    dtype: bool | ext.PdDTypeArg | None,
    orient: ext.DataFrameOrient = None,
) -> Schema:  # pragma: no cover
    if isinstance(dtype, dict):
        if orient == "records":
            return Schema(
                type="array",
                items=Schema(
                    type="object",
                    properties={
                        k: Schema(type=_openapi_types(v)) for k, v in dtype.items()
                    },
                ),
            )
        if orient == "index":
            return Schema(
                type="object",
                additionalProperties=Schema(
                    type="object",
                    properties={
                        k: Schema(type=_openapi_types(v)) for k, v in dtype.items()
                    },
                ),
            )
        if orient == "columns":
            return Schema(
                type="object",
                properties={
                    k: Schema(
                        type="object",
                        additionalProperties=Schema(type=_openapi_types(v)),
                    )
                    for k, v in dtype.items()
                },
            )

        return Schema(
            type="object",
            properties={
                k: Schema(type="array", items=Schema(type=_openapi_types(v)))
                for k, v in dtype.items()
            },
        )
    else:
        return Schema(type="object")


def _series_openapi_schema(
    dtype: bool | ext.PdDTypeArg | None, orient: ext.SeriesOrient = None
) -> Schema:  # pragma: no cover
    if isinstance(dtype, str):
        if orient in ["index", "values"]:
            return Schema(
                type="object", additionalProperties=Schema(type=_openapi_types(dtype))
            )
        if orient in ["records", "columns"]:
            return Schema(type="array", items=Schema(type=_openapi_types(dtype)))
    return Schema(type="object")


class SerializationFormat(Enum):
    JSON = "application/json"
    PARQUET = "application/octet-stream"
    CSV = "text/csv"

    def __init__(self, mime_type: str):
        self.mime_type = mime_type

    def __str__(self) -> str:
        if self == SerializationFormat.JSON:
            return "json"
        elif self == SerializationFormat.PARQUET:
            return "parquet"
        elif self == SerializationFormat.CSV:
            return "csv"
        else:
            raise ValueError(f"Unknown serialization format: {self}")


def _infer_serialization_format_from_request(
    request: Request, default_format: SerializationFormat
) -> SerializationFormat:
    """Determine the serialization format from the request's headers['content-type']"""

    content_type = request.headers.get("content-type")

    if content_type == "application/json":
        return SerializationFormat.JSON
    elif content_type == "application/octet-stream":
        return SerializationFormat.PARQUET
    elif content_type == "text/csv":
        return SerializationFormat.CSV
    elif content_type:
        logger.debug(
            "Unknown Content-Type ('%s'), falling back to '%s' serialization format.",
            content_type,
            default_format,
        )
        return default_format
    else:
        logger.debug(
            "Content-Type not specified, falling back to '%s' serialization format.",
            default_format,
        )
        return default_format


def _validate_serialization_format(serialization_format: SerializationFormat):
    if (
        serialization_format is SerializationFormat.PARQUET
        and get_parquet_engine() is None
    ):
        raise MissingDependencyException(
            "Parquet serialization is not available. Try installing pyarrow or fastparquet first."
        )


[docs]class PandasDataFrame( IODescriptor["ext.PdDataFrame"], descriptor_id="bentoml.io.PandasDataFrame", proto_fields=("dataframe",), ): """ :obj:`PandasDataFrame` defines API specification for the inputs/outputs of a Service, where either inputs will be converted to or outputs will be converted from type :code:`pd.DataFrame` as specified in your API function signature. A sample service implementation: .. code-block:: python :caption: `service.py` from __future__ import annotations import bentoml import pandas as pd import numpy as np from bentoml.io import PandasDataFrame input_spec = PandasDataFrame.from_sample(pd.DataFrame(np.array([[5,4,3,2]]))) runner = bentoml.sklearn.get("sklearn_model_clf").to_runner() svc = bentoml.Service("iris-classifier", runners=[runner]) @svc.api(input=input_spec, output=PandasDataFrame()) def predict(input_arr): res = runner.run(input_arr) return pd.DataFrame(res) Users then can then serve this service with :code:`bentoml serve`: .. code-block:: bash % bentoml serve ./service.py:svc --reload Users can then send requests to the newly started services with any client: .. tab-set:: .. tab-item:: Bash .. code-block:: bash % curl -X POST -H "Content-Type: application/json" \\ --data '[{"0":5,"1":4,"2":3,"3":2}]' http://0.0.0.0:3000/predict # [{"0": 1}]% .. tab-item:: Python .. code-block:: python :caption: `request.py` import requests requests.post( "http://0.0.0.0:3000/predict", headers={"content-type": "application/json"}, data='[{"0":5,"1":4,"2":3,"3":2}]' ).text Args: orient: Indication of expected JSON string format. Compatible JSON strings can be produced by :func:`pandas.io.json.to_json()` with a corresponding orient value. Possible orients are: - :obj:`split` - :code:`dict[str, Any]` ↦ {``idx`` ↠ ``[idx]``, ``columns`` ↠ ``[columns]``, ``data`` ↠ ``[values]``} - :obj:`records` - :code:`list[Any]` ↦ [{``column`` ↠ ``value``}, ..., {``column`` ↠ ``value``}] - :obj:`index` - :code:`dict[str, Any]` ↦ {``idx`` ↠ {``column`` ↠ ``value``}} - :obj:`columns` - :code:`dict[str, Any]` ↦ {``column`` ↠ {``index`` ↠ ``value``}} - :obj:`values` - :code:`dict[str, Any]` ↦ Values arrays columns: List of columns name that users wish to update. apply_column_names: Whether to update incoming DataFrame columns. If :code:`apply_column_names=True`, then ``columns`` must be specified. dtype: Data type users wish to convert their inputs/outputs to. If it is a boolean, then pandas will infer dtypes. Else if it is a dictionary of column to ``dtype``, then applies those to incoming dataframes. If ``False``, then don't infer dtypes at all (only applies to the data). This is not applicable for :code:`orient='table'`. enforce_dtype: Whether to enforce a certain data type. if :code:`enforce_dtype=True` then :code:`dtype` must be specified. shape: Optional shape check that users can specify for their incoming HTTP requests. We will only check the number of columns you specified for your given shape: .. code-block:: python :caption: `service.py` import pandas as pd from bentoml.io import PandasDataFrame df = pd.DataFrame([[1, 2, 3]]) # shape (1,3) inp = PandasDataFrame.from_sample(df) @svc.api( input=PandasDataFrame(shape=(51, 10), enforce_shape=True), output=PandasDataFrame() ) def predict(input_df: pd.DataFrame) -> pd.DataFrame: # if input_df have shape (40,9), # it will throw out errors ... enforce_shape: Whether to enforce a certain shape. If ``enforce_shape=True`` then ``shape`` must be specified. default_format: The default serialization format to use if the request does not specify a ``Content-Type`` Headers. It is also the serialization format used for the response. Possible values are: - :obj:`json` - JSON text format (inferred from content-type ``"application/json"``) - :obj:`parquet` - Parquet binary format (inferred from content-type ``"application/octet-stream"``) - :obj:`csv` - CSV text format (inferred from content-type ``"text/csv"``) Returns: :obj:`PandasDataFrame`: IO Descriptor that represents a :code:`pd.DataFrame`. """ def __init__( self, orient: ext.DataFrameOrient = "records", columns: list[str] | None = None, apply_column_names: bool = False, dtype: bool | ext.PdDTypeArg | None = None, enforce_dtype: bool = False, shape: tuple[int, ...] | None = None, enforce_shape: bool = False, default_format: t.Literal["json", "parquet", "csv"] = "json", ): self._orient: ext.DataFrameOrient = orient self._columns = columns self._apply_column_names = apply_column_names # TODO: convert dtype to numpy dtype self._dtype = dtype self._enforce_dtype = enforce_dtype self._shape = shape self._enforce_shape = enforce_shape self._default_format = SerializationFormat[default_format.upper()] _validate_serialization_format(self._default_format) self._mime_type = self._default_format.mime_type def _from_sample(self, sample: ext.PdDataFrame) -> ext.PdDataFrame: """ Create a :class:`~bentoml._internal.io_descriptors.pandas.PandasDataFrame` IO Descriptor from given inputs. Args: sample: Given sample ``pd.DataFrame`` data orient: Indication of expected JSON string format. Compatible JSON strings can be produced by :func:`pandas.io.json.to_json()` with a corresponding orient value. Possible orients are: - :obj:`split` - :code:`dict[str, Any]` ↦ {``idx`` ↠ ``[idx]``, ``columns`` ↠ ``[columns]``, ``data`` ↠ ``[values]``} - :obj:`records` - :code:`list[Any]` ↦ [{``column`` ↠ ``value``}, ..., {``column`` ↠ ``value``}] - :obj:`index` - :code:`dict[str, Any]` ↦ {``idx`` ↠ {``column`` ↠ ``value``}} - :obj:`columns` - :code:`dict[str, Any]` ↦ {``column`` ↠ {``index`` ↠ ``value``}} - :obj:`values` - :code:`dict[str, Any]` ↦ Values arrays - :obj:`table` - :code:`dict[str, Any]` ↦ {``schema``: { schema }, ``data``: { data }} apply_column_names: Update incoming DataFrame columns. ``columns`` must be specified at function signature. If you don't want to enforce a specific columns name then change ``apply_column_names=False``. enforce_dtype: Enforce a certain data type. `dtype` must be specified at function signature. If you don't want to enforce a specific dtype then change ``enforce_dtype=False``. enforce_shape: Enforce a certain shape. ``shape`` must be specified at function signature. If you don't want to enforce a specific shape then change ``enforce_shape=False``. default_format: The default serialization format to use if the request does not specify a ``Content-Type`` Headers. It is also the serialization format used for the response. Possible values are: - :obj:`json` - JSON text format (inferred from content-type ``"application/json"``) - :obj:`parquet` - Parquet binary format (inferred from content-type ``"application/octet-stream"``) - :obj:`csv` - CSV text format (inferred from content-type ``"text/csv"``) Returns: :class:`~bentoml._internal.io_descriptors.pandas.PandasDataFrame`: IODescriptor from given users inputs. Example: .. code-block:: python :caption: `service.py` import pandas as pd from bentoml.io import PandasDataFrame arr = [[1,2,3]] input_spec = PandasDataFrame.from_sample(pd.DataFrame(arr)) @svc.api(input=input_spec, output=PandasDataFrame()) def predict(inputs: pd.DataFrame) -> pd.DataFrame: ... """ if LazyType["ext.NpNDArray"]("numpy", "ndarray").isinstance(sample): logger.warning( "'from_sample' from type '%s' is deprecated. Make sure to only pass pandas DataFrame.", type(sample), ) sample = pd.DataFrame(sample) elif isinstance(sample, str): logger.warning( "'from_sample' from type '%s' is deprecated. Make sure to only pass pandas DataFrame.", type(sample), ) try: if os.path.exists(sample): try: ext = os.path.splitext(sample)[-1].strip(".") sample = getattr( pd, { "json": "read_json", "csv": "read_csv", "html": "read_html", "xls": "read_excel", "xlsx": "read_excel", "hdf5": "read_hdf", "parquet": "read_parquet", "pickle": "read_pickle", "sql": "read_sql", }[ext], )(sample) except KeyError: raise InvalidArgument(f"Unsupported sample '{sample}' format.") else: # Try to load the string as json. sample = pd.read_json(sample) except ValueError as e: raise InvalidArgument( f"Failed to create a 'pd.DataFrame' from sample {sample}: {e}" ) from None if self._shape is None: self._shape = sample.shape if self._columns is None: self._columns = [str(i) for i in list(sample.columns)] if self._dtype is None: self._dtype = sample.dtypes return sample def _convert_dtype( self, value: ext.PdDTypeArg | None ) -> str | dict[str, t.Any] | None: # TODO: support extension dtypes if LazyType["ext.NpNDArray"]("numpy", "ndarray").isinstance(value): return str(value.dtype) elif isinstance(value, bool): return str(value) elif isinstance(value, str): return value elif isinstance(value, dict): return {str(k): self._convert_dtype(v) for k, v in value.items()} elif value is None: return "null" else: logger.warning(f"{type(value)} is not yet supported.") return None def to_spec(self) -> dict[str, t.Any]: return { "id": self.descriptor_id, "args": { "orient": self._orient, "columns": self._columns, "dtype": self._convert_dtype(self._dtype), "shape": self._shape, "enforce_dtype": self._enforce_dtype, "enforce_shape": self._enforce_shape, "default_format": str(self._default_format), }, } @classmethod def from_spec(cls, spec: dict[str, t.Any]) -> t.Self: if "args" not in spec: raise InvalidArgument(f"Missing args key in PandasDataFrame spec: {spec}") res = PandasDataFrame(**spec["args"]) return res def input_type(self) -> LazyType[ext.PdDataFrame]: return LazyType("pandas", "DataFrame") def openapi_schema(self) -> Schema: return _dataframe_openapi_schema(self._dtype, self._orient) def openapi_components(self) -> dict[str, t.Any] | None: pass def openapi_example(self): if self.sample is not None: return self.sample.to_json(orient=self._orient) def openapi_request_body(self) -> dict[str, t.Any]: return { "content": { self._mime_type: MediaType( schema=self.openapi_schema(), example=self.openapi_example() ) }, "required": True, "x-bentoml-io-descriptor": self.to_spec(), } def openapi_responses(self) -> OpenAPIResponse: return { "description": SUCCESS_DESCRIPTION, "content": { self._mime_type: MediaType( schema=self.openapi_schema(), example=self.openapi_example() ) }, "x-bentoml-io-descriptor": self.to_spec(), }
[docs] async def from_http_request(self, request: Request) -> ext.PdDataFrame: """ Process incoming requests and convert incoming objects to `pd.DataFrame` Args: request (`starlette.requests.Requests`): Incoming Requests Returns: a `pd.DataFrame` object. This can then be used inside users defined logics. Raises: BadInput: Raised when the incoming requests are bad formatted. """ serialization_format = _infer_serialization_format_from_request( request, self._default_format ) _validate_serialization_format(serialization_format) obj = await request.body() dtype = self._dtype if self._enforce_dtype else None if serialization_format is SerializationFormat.JSON: if dtype is None: dtype = True # infer dtype automatically res = pd.read_json(io.BytesIO(obj), dtype=dtype, orient=self._orient) elif serialization_format is SerializationFormat.PARQUET: res = pd.read_parquet(io.BytesIO(obj), engine=get_parquet_engine()) elif serialization_format is SerializationFormat.CSV: res: ext.PdDataFrame = pd.read_csv(io.BytesIO(obj), dtype=dtype) else: raise InvalidArgument( f"Unknown serialization format ({serialization_format})." ) from None assert isinstance(res, pd.DataFrame) return self.validate_dataframe(res)
[docs] async def to_http_response( self, obj: ext.PdDataFrame, ctx: Context | None = None ) -> Response: """ Process given objects and convert it to HTTP response. Args: obj (`pd.DataFrame`): `pd.DataFrame` that will be serialized to JSON or parquet Returns: HTTP Response of type `starlette.responses.Response`. This can be accessed via cURL or any external web traffic. """ obj = self.validate_dataframe(obj) # For the response it doesn't make sense to enforce the same serialization format as specified # by the request's headers['content-type']. Instead we simply use the _default_format. serialization_format = self._default_format if not LazyType["ext.PdDataFrame"](pd.DataFrame).isinstance(obj): raise InvalidArgument( f"return object is not of type `pd.DataFrame`, got type {type(obj)} instead" ) if serialization_format is SerializationFormat.JSON: resp = obj.to_json(orient=self._orient) elif serialization_format is SerializationFormat.PARQUET: resp = obj.to_parquet(engine=get_parquet_engine()) elif serialization_format is SerializationFormat.CSV: resp = obj.to_csv() else: raise InvalidArgument( f"Unknown serialization format ({serialization_format})." ) from None if ctx is not None: res = Response( resp, media_type=serialization_format.mime_type, headers=ctx.response.headers, # type: ignore (bad starlette types) status_code=ctx.response.status_code, ) set_cookies(res, ctx.response.cookies) return res else: return Response(resp, media_type=serialization_format.mime_type)
def validate_dataframe( self, dataframe: ext.PdDataFrame, exception_cls: t.Type[Exception] = BadInput ) -> ext.PdDataFrame: if not LazyType["ext.PdDataFrame"]("pandas.core.frame.DataFrame").isinstance( dataframe ): raise InvalidArgument( f"return object is not of type 'pd.DataFrame', got type '{type(dataframe)}' instead" ) from None # TODO: dtype check # if self._dtype is not None and self._dtype != dataframe.dtypes: # msg = f'{self.__class__.__name__}: Expecting DataFrame of dtype "{self._dtype}", but "{dataframe.dtypes}" was received.' # if self._enforce_dtype: # raise exception_cls(msg) from None if self._apply_column_names: if not self._columns: raise BadInput( "When apply_column_names is set, you must provide columns." ) if len(self._columns) != dataframe.shape[1]: raise BadInput( f"length of 'columns' ({len(self._columns)}) does not match the # of columns of incoming data ({dataframe.shape[1]})." ) from None dataframe.columns = pd.Index(self._columns) # TODO: convert from wide to long format (melt()) if self._shape is not None and self._shape != dataframe.shape: msg = f'{self.__class__.__name__}: Expecting DataFrame of shape "{self._shape}", but "{dataframe.shape}" was received.' if self._enforce_shape and not all( left == right for left, right in zip(self._shape, dataframe.shape) if left != -1 and right != -1 ): raise exception_cls(msg) from None return dataframe
[docs] async def from_proto(self, field: pb.DataFrame | bytes) -> ext.PdDataFrame: """ Process incoming protobuf request and convert it to ``pandas.DataFrame`` Args: request: Incoming RPC request message. context: grpc.ServicerContext Returns: a ``pandas.DataFrame`` object. This can then be used inside users defined logics. """ # TODO: support different serialization format if isinstance(field, bytes): # TODO: handle serialized_bytes for dataframe raise NotImplementedError( 'Currently not yet implemented. Use "dataframe" instead.' ) else: # note that there is a current bug where we don't check for # dtype of given fields per Series to match with types of a given # columns, hence, this would result in a wrong DataFrame that is not # expected by our users. # columns orient: { column_name : {index : columns.series._value}} if self._orient != "columns": raise BadInput( f"'dataframe' field currently only supports 'columns' orient. Make sure to set 'orient=columns' in {self.__class__.__name__}." ) from None data: list[t.Any] = [] def process_columns_contents(content: pb.Series) -> dict[str, t.Any]: # To be use inside a ThreadPoolExecutor to handle # large tabular data if len(content.ListFields()) != 1: raise BadInput( f"Array contents can only be one of given values key. Use one of '{list(map(lambda f: f[0].name,content.ListFields()))}' instead." ) from None return {str(i): c for i, c in enumerate(content.ListFields()[0][1])} with ThreadPoolExecutor(max_workers=10) as executor: futures = executor.map(process_columns_contents, field.columns) data.extend([i for i in list(futures)]) dataframe = pd.DataFrame(dict(zip(field.column_names, data))) return self.validate_dataframe(dataframe)
@t.overload async def _to_proto_impl( self, obj: ext.PdDataFrame, *, version: t.Literal["v1"] ) -> pb.DataFrame: ... @t.overload async def _to_proto_impl( self, obj: ext.PdDataFrame, *, version: t.Literal["v1alpha1"] ) -> pb_v1alpha1.DataFrame: ... async def _to_proto_impl( self, obj: ext.PdDataFrame, *, version: str ) -> _message.Message: """ Process given objects and convert it to grpc protobuf response. Args: obj: ``pandas.DataFrame`` that will be serialized to protobuf context: grpc.aio.ServicerContext from grpc.aio.Server Returns: ``service_pb2.Response``: Protobuf representation of given ``pandas.DataFrame`` """ from .numpy import npdtype_to_fieldpb_map pb, _ = import_generated_stubs(version) # TODO: support different serialization format obj = self.validate_dataframe(obj) mapping = npdtype_to_fieldpb_map() # note that this is not safe, since we are not checking the dtype of the series # FIXME(aarnphm): validate and handle mix columns dtype # currently we don't support ExtensionDtype columns_name: list[str] = list(map(str, obj.columns)) not_supported: list[ext.PdDType] = list( filter( lambda x: x not in mapping, map(lambda x: t.cast("ext.PdSeries", obj[x]).dtype, columns_name), ) ) if len(not_supported) > 0: raise UnprocessableEntity( f'dtype in column "{obj.columns}" is not currently supported.' ) from None return pb.DataFrame( column_names=columns_name, columns=[ pb.Series( **{mapping[t.cast("ext.NpDTypeLike", obj[col].dtype)]: obj[col]} ) for col in columns_name ], ) def from_arrow(self, batch: pyarrow.RecordBatch) -> ext.PdDataFrame: res = batch.to_pandas() if isinstance(res, pd.DataFrame): return res if isinstance(res, pd.Series): return res.to_frame() def to_arrow(self, df: pd.Series[t.Any]) -> pyarrow.RecordBatch: import pyarrow return pyarrow.RecordBatch.from_pandas(df) def spark_schema(self) -> pyspark.sql.types.StructType: from pyspark.pandas.typedef import as_spark_type from pyspark.sql.types import StructField from pyspark.sql.types import StructType if self._dtype is None or self._dtype: raise InvalidArgument( "Cannot perform batch inference with a numpy output without a known dtype; please provide a dtype." ) if isinstance(self._dtype, dict): fields = [] for col_name, col_type in self._dtype: try: fields.append(StructField(col_name, as_spark_type(col_type))) except TypeError: raise InvalidArgument( f"dtype {col_type} is not supported for batch inference." ) return StructType(fields) else: raise NotImplementedError( "Only dict dtypes are currently supported for dataframes" )
[docs] async def to_proto(self, obj: ext.PdDataFrame) -> pb.DataFrame: return await self._to_proto_impl(obj, version="v1")
async def to_proto_v1alpha1(self, obj: ext.PdDataFrame) -> pb_v1alpha1.DataFrame: return await self._to_proto_impl(obj, version="v1alpha1")
[docs]class PandasSeries( IODescriptor["ext.PdSeries"], descriptor_id="bentoml.io.PandasSeries", proto_fields=("series",), ): """ ``PandasSeries`` defines API specification for the inputs/outputs of a Service, where either inputs will be converted to or outputs will be converted from type :code:`pd.Series` as specified in your API function signature. A sample service implementation: .. code-block:: python :caption: `service.py` import bentoml import pandas as pd import numpy as np from bentoml.io import PandasSeries runner = bentoml.sklearn.get("sklearn_model_clf").to_runner() svc = bentoml.Service("iris-classifier", runners=[runner]) @svc.api(input=PandasSeries(), output=PandasSeries()) def predict(input_arr): res = runner.run(input_arr) # type: np.ndarray return pd.Series(res) Users then can then serve this service with :code:`bentoml serve`: .. code-block:: bash % bentoml serve ./service.py:svc --reload Users can then send requests to the newly started services with any client: .. tab-set:: .. tab-item:: Bash .. code-block:: bash % curl -X POST -H "Content-Type: application/json" \\ --data '[{"0":5,"1":4,"2":3,"3":2}]' http://0.0.0.0:3000/predict # [{"0": 1}]% .. tab-item:: Python .. code-block:: python :caption: `request.py` import requests requests.post( "http://0.0.0.0:3000/predict", headers={"content-type": "application/json"}, data='[{"0":5,"1":4,"2":3,"3":2}]' ).text Args: orient: Indication of expected JSON string format. Compatible JSON strings can be produced by :func:`pandas.io.json.to_json()` with a corresponding orient value. Possible orients are: - :obj:`split` - :code:`dict[str, Any]` ↦ {``idx`` ↠ ``[idx]``, ``columns`` ↠ ``[columns]``, ``data`` ↠ ``[values]``} - :obj:`records` - :code:`list[Any]` ↦ [{``column`` ↠ ``value``}, ..., {``column`` ↠ ``value``}] - :obj:`index` - :code:`dict[str, Any]` ↦ {``idx`` ↠ {``column`` ↠ ``value``}} - :obj:`columns` - :code:`dict[str, Any]` ↦ {``column`` ↠ {``index`` ↠ ``value``}} - :obj:`values` - :code:`dict[str, Any]` ↦ Values arrays columns: List of columns name that users wish to update. apply_column_names: Whether to update incoming DataFrame columns. If :code:`apply_column_names=True`, then ``columns`` must be specified. dtype: Data type users wish to convert their inputs/outputs to. If it is a boolean, then pandas will infer dtypes. Else if it is a dictionary of column to ``dtype``, then applies those to incoming dataframes. If ``False``, then don't infer dtypes at all (only applies to the data). This is not applicable for :code:`orient='table'`. enforce_dtype: Whether to enforce a certain data type. if :code:`enforce_dtype=True` then :code:`dtype` must be specified. shape: Optional shape check that users can specify for their incoming HTTP requests. We will only check the number of columns you specified for your given shape: .. code-block:: python :caption: `service.py` import pandas as pd from bentoml.io import PandasSeries @svc.api(input=PandasSeries(shape=(51,), enforce_shape=True), output=PandasSeries()) def infer(input_series: pd.Series) -> pd.Series: # if input_series has shape (40,), it will error ... enforce_shape: Whether to enforce a certain shape. If ``enforce_shape=True`` then ``shape`` must be specified. Returns: :obj:`PandasSeries`: IO Descriptor that represents a :code:`pd.Series`. """ _mime_type = "application/json" def __init__( self, orient: ext.SeriesOrient = "records", dtype: ext.PdDTypeArg | None = None, enforce_dtype: bool = False, shape: tuple[int, ...] | None = None, enforce_shape: bool = False, ): self._orient: ext.SeriesOrient = orient self._dtype = dtype self._enforce_dtype = enforce_dtype self._shape = shape self._enforce_shape = enforce_shape def _from_sample(self, sample: ext.PdSeries | t.Sequence[t.Any]) -> ext.PdSeries: """ Create a :class:`~bentoml._internal.io_descriptors.pandas.PandasSeries` IO Descriptor from given inputs. Args: sample_input: Given sample ``pd.DataFrame`` data orient: Indication of expected JSON string format. Compatible JSON strings can be produced by :func:`pandas.io.json.to_json()` with a corresponding orient value. Possible orients are: - :obj:`split` - :code:`dict[str, Any]` ↦ {``idx`` ↠ ``[idx]``, ``columns`` ↠ ``[columns]``, ``data`` ↠ ``[values]``} - :obj:`records` - :code:`list[Any]` ↦ [{``column`` ↠ ``value``}, ..., {``column`` ↠ ``value``}] - :obj:`index` - :code:`dict[str, Any]` ↦ {``idx`` ↠ {``column`` ↠ ``value``}} - :obj:`table` - :code:`dict[str, Any]` ↦ {``schema``: { schema }, ``data``: { data }} enforce_dtype: Enforce a certain data type. `dtype` must be specified at function signature. If you don't want to enforce a specific dtype then change ``enforce_dtype=False``. enforce_shape: Enforce a certain shape. ``shape`` must be specified at function signature. If you don't want to enforce a specific shape then change ``enforce_shape=False``. Returns: :class:`~bentoml._internal.io_descriptors.pandas.PandasSeries`: IODescriptor from given users inputs. Example: .. code-block:: python :caption: `service.py` import pandas as pd from bentoml.io import PandasSeries arr = [1,2,3] input_spec = PandasSeries.from_sample(pd.DataFrame(arr)) @svc.api(input=input_spec, output=PandasSeries()) def predict(inputs: pd.Series) -> pd.Series: ... """ if not isinstance(sample, pd.Series): sample = pd.Series(sample) if self._dtype is None: self._dtype = sample.dtype if self._shape is None: self._shape = sample.shape return sample def input_type(self) -> LazyType[ext.PdSeries]: return LazyType("pandas", "Series") def _convert_dtype( self, value: ext.PdDTypeArg | None ) -> str | dict[str, t.Any] | None: # TODO: support extension dtypes if LazyType["ext.NpNDArray"]("numpy", "ndarray").isinstance(value): return str(value.dtype) elif isinstance(value, np.dtype): return str(value) elif isinstance(value, str): return value elif isinstance(value, bool): return str(value) elif isinstance(value, dict): return {str(k): self._convert_dtype(v) for k, v in value.items()} elif value is None: return "null" else: logger.warning(f"{type(value)} is not yet supported.") return None def to_spec(self) -> dict[str, t.Any]: return { "id": self.descriptor_id, "args": { "orient": self._orient, "dtype": self._convert_dtype(self._dtype), "shape": self._shape, "enforce_dtype": self._enforce_dtype, "enforce_shape": self._enforce_shape, }, } @classmethod def from_spec(cls, spec: dict[str, t.Any]) -> t.Self: if "args" not in spec: raise InvalidArgument(f"Missing args key in PandasSeries spec: {spec}") res = PandasSeries(**spec["args"]) return res def openapi_schema(self) -> Schema: return _series_openapi_schema(self._dtype, self._orient) def openapi_components(self) -> dict[str, t.Any] | None: pass def openapi_example(self): if self.sample is not None: return self.sample.to_json(orient=self._orient) def openapi_request_body(self) -> dict[str, t.Any]: return { "content": { self._mime_type: MediaType( schema=self.openapi_schema(), example=self.openapi_example() ) }, "required": True, "x-bentoml-io-descriptor": self.to_spec(), } def openapi_responses(self) -> OpenAPIResponse: return { "description": SUCCESS_DESCRIPTION, "content": { self._mime_type: MediaType( schema=self.openapi_schema(), example=self.openapi_example() ) }, "x-bentoml-io-descriptor": self.to_spec(), }
[docs] async def from_http_request(self, request: Request) -> ext.PdSeries: """ Process incoming requests and convert incoming objects to ``pd.Series``. Args: request: Incoming Requests Returns: a ``pd.Series`` object. This can then be used inside users defined logics. """ obj = await request.body() res: ext.PdSeries = pd.read_json( io.BytesIO(obj), typ="series", orient=self._orient, dtype=self._dtype if self._enforce_dtype else True, ) return self.validate_series(res)
[docs] async def to_http_response( self, obj: t.Any, ctx: Context | None = None ) -> Response: """ Process given objects and convert it to HTTP response. Args: obj: `pd.Series` that will be serialized to JSON Returns: HTTP Response of type ``starlette.responses.Response``. This can be accessed via cURL or any external web traffic. """ obj = self.validate_series(obj) if ctx is not None: res = Response( obj.to_json(orient=self._orient), media_type=self._mime_type, headers=ctx.response.headers, # type: ignore (bad starlette types) status_code=ctx.response.status_code, ) set_cookies(res, ctx.response.cookies) return res else: return Response( obj.to_json(orient=self._orient), media_type=self._mime_type )
def validate_series( self, series: ext.PdSeries, exception_cls: t.Type[Exception] = BadInput ) -> ext.PdSeries: # TODO: dtype check if not LazyType["ext.PdSeries"]("pandas.core.series.Series").isinstance(series): raise InvalidArgument( f"return object is not of type 'pd.Series', got type '{type(series)}' instead" ) from None # TODO: convert from wide to long format (melt()) if self._shape is not None and self._shape != series.shape: msg = f"{self.__class__.__name__}: Expecting Series of shape '{self._shape}', but '{series.shape}' was received." if self._enforce_shape and not all( left == right for left, right in zip(self._shape, series.shape) if left != -1 and right != -1 ): raise exception_cls(msg) from None if self._dtype is not None and self._dtype != series.dtype: if np.can_cast(series.dtype, self._dtype, casting="same_kind"): series = series.astype(self._dtype) else: msg = '%s: Expecting series of dtype "%s", but "%s" was received.' if self._enforce_dtype: raise exception_cls( msg % (self.__class__.__name__, self._dtype, series.dtype) ) from None else: logger.debug( msg, self.__class__.__name__, self._dtype, series.dtype ) return series
[docs] async def from_proto(self, field: pb.Series | bytes) -> ext.PdSeries: """ Process incoming protobuf request and convert it to ``pandas.Series`` Args: request: Incoming RPC request message. context: grpc.ServicerContext Returns: a ``pandas.Series`` object. This can then be used inside users defined logics. """ if isinstance(field, bytes): # TODO: handle serialized_bytes for dataframe raise NotImplementedError( 'Currently not yet implemented. Use "series" instead.' ) else: # The behaviour of `from_proto` will mimic the behaviour of `NumpyNdArray.from_proto`, # where we will respect self._dtype if set. # since self._dtype uses numpy dtype, we will use some of numpy logics here. from .numpy import fieldpb_to_npdtype_map from .numpy import npdtype_to_fieldpb_map if self._dtype is not None: dtype = self._dtype data = getattr(field, npdtype_to_fieldpb_map()[self._dtype]) else: fieldpb = [ f.name for f, _ in field.ListFields() if f.name.endswith("_values") ] if len(fieldpb) == 0: # input message doesn't have any fields. return pd.Series() elif len(fieldpb) > 1: # when there are more than two values provided in the proto. raise InvalidArgument( f"Array contents can only be one of given values key. Use one of '{fieldpb}' instead.", ) from None dtype = fieldpb_to_npdtype_map()[fieldpb[0]] data = getattr(field, fieldpb[0]) try: series = pd.Series(data, dtype=dtype) except ValueError: series = pd.Series(data) return self.validate_series(series)
@t.overload async def _to_proto_impl( self, obj: ext.PdSeries, *, version: t.Literal["v1"] ) -> pb.Series: ... @t.overload async def _to_proto_impl( self, obj: ext.PdSeries, *, version: t.Literal["v1alpha1"] ) -> pb_v1alpha1.Series: ... async def _to_proto_impl( self, obj: ext.PdSeries, *, version: str ) -> _message.Message: """ Process given objects and convert it to grpc protobuf response. Args: obj: ``pandas.Series`` that will be serialized to protobuf context: grpc.aio.ServicerContext from grpc.aio.Server Returns: ``service_pb2.Response``: Protobuf representation of given ``pandas.Series`` """ from .numpy import npdtype_to_fieldpb_map pb, _ = import_generated_stubs(version) try: obj = self.validate_series(obj, exception_cls=InvalidArgument) except InvalidArgument as e: raise e from None # NOTE: Currently, if series has mixed dtype, we will raise an error. # This has to do with no way to represent mixed dtype in protobuf. # User shouldn't use mixed dtype in the first place. if obj.dtype.kind == "O": raise InvalidArgument( "Series has mixed dtype. Please convert it to a single dtype." ) from None try: fieldpb = npdtype_to_fieldpb_map()[obj.dtype] return pb.Series(**{fieldpb: obj.tolist()}) except KeyError: raise InvalidArgument( f"Unsupported dtype '{obj.dtype}' for response message." ) from None def from_arrow(self, batch: pyarrow.RecordBatch) -> pd.Series[t.Any]: res = batch.to_pandas() if isinstance(res, pd.Series): return res if isinstance(res, pd.DataFrame): if len(res.columns) == 1: return res.squeeze() else: raise InvalidArgument( "Multi-column dataframe was passed when trying to convert to a series." ) def to_arrow(self, series: pd.Series[t.Any]) -> pyarrow.RecordBatch: import pyarrow df = series.to_frame() return pyarrow.RecordBatch.from_pandas(df) def spark_schema(self) -> pyspark.sql.types.StructType: from pyspark.pandas.typedef import as_spark_type from pyspark.sql.types import StructField from pyspark.sql.types import StructType if self._dtype is None or self._dtype is True: raise InvalidArgument( "Cannot perform batch inference with a pandas series output without a known dtype; please provide a dtype." ) try: out_spark_type = as_spark_type(self._dtype) except TypeError: raise InvalidArgument( f"dtype {self._dtype} is not supported for batch inference." ) return StructType([StructField("out", out_spark_type)])
[docs] async def to_proto(self, obj: ext.PdSeries) -> pb.Series: return await self._to_proto_impl(obj, version="v1")
async def to_proto_v1alpha1(self, obj: ext.PdSeries) -> pb_v1alpha1.Series: return await self._to_proto_impl(obj, version="v1alpha1")