import os
import glob
import ntpath
import psutil
from abc import abstractmethod
from optimus.infer import is_empty_function, is_list, is_str, is_url
from optimus.helpers.core import one_list_to_val, val_to_list
from optimus.helpers.functions import prepare_path, unquote_path
from optimus.helpers.types import DataFrameType, InternalDataFrameType
from optimus.helpers.logger import logger
from optimus.helpers.raiseit import RaiseIt
from optimus.engines.base.meta import Meta
XML_THRESHOLD = 10
JSON_THRESHOLD = 20
BYTES_SIZE = 327680
[docs]class BaseLoad:
def __init__(self, op):
self.op = op
@abstractmethod
def df(self, *args, **kwargs) -> 'DataFrameType':
pass
def _csv(self, *args, **kwargs) -> 'InternalDataFrameType':
pass
def _json(self, *args, **kwargs) -> 'InternalDataFrameType':
pass
def _excel(self, *args, **kwargs) -> 'InternalDataFrameType':
pass
def _avro(self, *args, **kwargs) -> 'InternalDataFrameType':
pass
def _xml(self, *args, **kwargs) -> 'InternalDataFrameType':
pass
def _parquet(self, *args, **kwargs) -> 'InternalDataFrameType':
pass
[docs] def csv(self, filepath_or_buffer, sep=",", header=True, infer_schema=True, encoding="UTF-8", n_rows=None,
null_value="None", quoting=3, lineterminator='\r\n', on_bad_lines='warn', cache=False, na_filter=False,
storage_options=None, conn=None, *args, **kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a csv file. It is the same read.csv Spark function with some predefined
params.
:param encoding:
:param storage_options:
:param quoting:
:param filepath_or_buffer: path or location of the file.
:param sep: usually delimiter mark are ',' or ';'.
:param header: tell the function whether dataset has a header row. True default.
:param infer_schema: infers the input schema automatically from data.
:param n_rows:
:param null_value:
:param cache:
:param na_filter:
:param lineterminator:
:param on_bad_lines:
:param conn:
It requires one extra pass over the data. True default.
:return dataFrame
"""
if is_empty_function(self._csv):
raise NotImplementedError(f"'load.csv' is not implemented on '{self.op.engine_label}'")
unquoted_path = None
if not is_url(filepath_or_buffer):
unquoted_path = glob.glob(unquote_path(filepath_or_buffer))
if unquoted_path and len(unquoted_path):
meta = {"file_name": unquoted_path, "name": ntpath.basename(unquoted_path[0])}
else:
meta = {"file_name": filepath_or_buffer, "name": ntpath.basename(filepath_or_buffer)}
filepath_or_buffer = val_to_list(filepath_or_buffer)
try:
# Pandas do not support \r\n terminator.
if lineterminator and lineterminator.encode(encoding='UTF-8', errors='strict') == b'\r\n':
lineterminator = None
if conn is not None:
filepath_or_buffer = [conn.path(fb) for fb in filepath_or_buffer]
storage_options = conn.storage_options
if kwargs.get("chunk_size") == "auto":
# Chunk size is going to be 75% of the memory available
kwargs.pop("chunk_size")
kwargs["chunksize"] = psutil.virtual_memory().free * 0.75
na_filter = na_filter if null_value else False
if not is_str(on_bad_lines):
on_bad_lines = 'error' if on_bad_lines else 'skip'
def _read(_filepath_or_buffer):
return self._csv(_filepath_or_buffer, sep=sep, header=0 if header else None, encoding=encoding,
nrows=n_rows, quoting=quoting, lineterminator=lineterminator,
on_bad_lines=on_bad_lines, na_filter=na_filter,
na_values=val_to_list(null_value), index_col=False,
storage_options=storage_options, *args, **kwargs)
if is_list(filepath_or_buffer):
df = self.op.F.new_df()
for f in filepath_or_buffer:
df = df.append(_read(f))
else:
df = _read(filepath_or_buffer)
df = self.df(df, op=self.op)
df.meta = Meta.set(df.meta, value=meta)
except IOError as error:
logger.print(error)
raise
return df
[docs] def xml(self, path, n_rows=None, storage_options=None, conn=None, *args, **kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a XML file.
:param path:
:param n_rows:
:param storage_options:
:param conn:
:param args:
:param kwargs:
:return:
"""
if is_empty_function(self._xml):
raise NotImplementedError(f"'load.xml' is not implemented on '{self.op.engine_label}'")
path = unquote_path(path)
if conn is not None:
path = conn.path(path)
storage_options = conn.storage_options
file, file_name = prepare_path(path, "xml")[0]
try:
df = self._xml(file, n_rows, storage_options=storage_options, *args, **kwargs)
df = self.df(df, op=self.op)
df.meta = Meta.set(df.meta, "file_name", ntpath.basename(file_name))
except IOError as error:
logger.print(error)
raise
return df
[docs] def json(self, filepath_or_buffer, multiline=False, n_rows=False, storage_options=None,
conn=None, *args, **kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a json file.
:param filepath_or_buffer: path or location of the file.
:param multiline:
:param n_rows:
:param storage_options:
:param conn:
:param args:
:param kwargs:
:return:
"""
if is_empty_function(self._json):
raise NotImplementedError(f"'load.json' is not implemented on '{self.op.engine_label}'")
if conn is not None:
filepath_or_buffer = conn.path(filepath_or_buffer)
storage_options = conn.storage_options
if n_rows:
kwargs["nrows"] = n_rows
multiline = True
if is_str(filepath_or_buffer):
try:
filepath_or_buffer = unquote_path(filepath_or_buffer)
local_file_names = prepare_path(filepath_or_buffer, "json")
df_list = []
for file_name, j in local_file_names:
df = self._json(file_name, lines=multiline, *args, **kwargs)
df_list.append(df)
df = self.op.F.df_concat(df_list)
df = self.df(df, op=self.op)
df.meta = Meta.set(df.meta, "file_name", local_file_names[0])
except IOError as error:
logger.print(error)
raise
else:
df = self._json(filepath_or_buffer, lines=multiline, storage_options=storage_options, *args, **kwargs)
df = self.df(df, op=self.op)
return df
[docs] def excel(self, filepath_or_buffer, header=0, sheet_name=0, merge_sheets=False, skip_rows=0, n_rows=None, storage_options=None,
conn=None, n_partitions=None, *args, **kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a excel file.
:param filepath_or_buffer: Path or location of the file. Must be string dataType
:param header:
:param sheet_name: excel sheet name
:param merge_sheets:
:param skip_rows:
:param n_rows:
:param storage_options:
:param conn:
:param n_partitions:
:param args: custom argument to be passed to the excel function
:param kwargs: custom keyword arguments to be passed to the excel function
:return:
"""
if is_empty_function(self._excel):
raise NotImplementedError(f"'load.excel' is not implemented on '{self.op.engine_label}'")
filepath_or_buffer = unquote_path(filepath_or_buffer)
if conn is not None:
filepath_or_buffer = conn.path(filepath_or_buffer)
storage_options = conn.storage_options
file, file_name = prepare_path(filepath_or_buffer, "xls")[0]
if merge_sheets is True:
skip_rows = -1
df, sheet_names = self._excel(file, sheet_name=sheet_name, skiprows=skip_rows, header=header, nrows=n_rows,
storage_options=storage_options, n_partitions=n_partitions, *args, **kwargs)
df = self.df(df, op=self.op)
df.meta = Meta.set(df.meta, "file_name", ntpath.basename(file_name))
df.meta = Meta.set(df.meta, "sheet_names", sheet_names)
return df
[docs] def avro(self, filepath_or_buffer, n_rows=None, storage_options=None, conn=None,
*args, **kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a avro file.
:param filepath_or_buffer: path or location of the file. Must be string dataType
:param n_rows:
:param storage_options:
:param conn:
:param args: custom argument to be passed to the spark avro function
:param kwargs: custom keyword arguments to be passed to the spark avro function
"""
if is_empty_function(self._avro):
raise NotImplementedError(f"'load.avro' is not implemented on '{self.op.engine_label}'")
filepath_or_buffer = unquote_path(filepath_or_buffer)
if conn is not None:
logger.warn("'load.avro' does not support connection options ('conn')")
if storage_options is not None:
logger.warn("'load.avro' does not support 'storage_options'")
file, file_name = prepare_path(filepath_or_buffer, "avro")[0]
try:
df = self._avro(filepath_or_buffer, nrows=n_rows, *args, **kwargs)
df = self.df(df, op=self.op)
df.meta = Meta.set(df.meta, value={"file_name": file_name, "name": ntpath.basename(filepath_or_buffer)})
except IOError as error:
logger.print(error)
raise
return df
[docs] def parquet(self, filepath_or_buffer, columns=None, n_rows=None, storage_options=None, conn=None,
*args, **kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a parquet file.
:param filepath_or_buffer: path or location of the file. Must be string dataType
:param columns: select the columns that will be loaded. In this way you do not need to load all the dataframe
:param storage_options:
:param conn:
:param args: custom argument to be passed to the spark parquet function
:param kwargs: custom keyword arguments to be passed to the spark parquet function
"""
if is_empty_function(self._parquet):
raise NotImplementedError(f"'load.parquet' is not implemented on '{self.op.engine_label}'")
filepath_or_buffer = unquote_path(filepath_or_buffer)
if conn is not None:
filepath_or_buffer = conn.path(filepath_or_buffer)
storage_options = conn.storage_options
try:
dfd = self._parquet(filepath_or_buffer, columns=columns, nrows=n_rows,
storage_options=storage_options, *args, **kwargs)
df = self.df(dfd, op=self.op)
df.meta = Meta.set(df.meta, value={"file_name": filepath_or_buffer, "name": ntpath.basename(filepath_or_buffer)})
except IOError as error:
logger.print(error)
raise
return df
[docs] def orc(self, path, columns, storage_options=None, conn=None, n_partitions=None, *args,
**kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a OCR file.
:param path: path or location of the file. Must be string dataType.
:param columns: Specific column names to be loaded from the file.
:param storage_options:
:param conn:
:param args: custom argument to be passed to the spark avro function.
:param kwargs: custom keyword arguments to be passed to the spark avro function.
"""
raise NotImplementedError('Not implemented yet')
def zip(self, path, filename, dest=None, columns=None, storage_options=None, conn=None, n_partitions=None,
*args, **kwargs) -> 'DataFrameType':
pass
[docs] def hdf5(self, path, columns=None, n_partitions=None, *args, **kwargs) -> 'DataFrameType':
"""
Loads a dataframe from a HDF5 file.
:param path: path or location of the file. Must be string dataType.
:param columns: Specific column names to be loaded from the file.
:param n_partitions:
:param args: custom argument to be passed to the spark avro function.
:param kwargs: custom keyword arguments to be passed to the spark avro function.
:return:
"""
raise NotImplementedError('Not implemented yet')
[docs] def tsv(self, filepath_or_buffer, header=True, infer_schema=True, *args, **kwargs):
"""
Loads a dataframe from a tsv(Tabular separated values) file.
:param filepath_or_buffer: Path or location of the file. Must be string dataType
:param header:
:param infer_schema:
:param args: custom argument to be passed to the spark avro function.
:param kwargs: custom keyword arguments to be passed to the spark avro function.
:return:
"""
return self.csv(filepath_or_buffer, sep='\t', header=header, infer_schema=infer_schema, *args, **kwargs)
[docs] def file(self, path, *args, **kwargs) -> 'DataFrameType':
"""
Try to infer the file data format and encoding and load the data into a dataframe.
:param path: Path to the file you want to load.
:param args: custom argument to be passed to the spark avro function.
:param kwargs: custom keyword arguments to be passed to the spark avro function.
:return:
"""
conn = kwargs.get("conn")
if conn:
import boto3
remote_obj = boto3.resource(
conn.type, **conn.boto).Object(conn.options.get("bucket"), path)
body = remote_obj.get()['Body']
buffer = body.read(amt=BYTES_SIZE)
full_path = conn.path(path)
file_name = os.path.basename(path)
else:
full_path, file_name = prepare_path(path)[0]
file = open(full_path, "rb")
buffer = file.read(BYTES_SIZE)
# Detect the file type
try:
file_ext = os.path.splitext(file_name)[1].replace(".", "")
import magic
mime, encoding = magic.Magic(
mime=True, mime_encoding=True).from_buffer(buffer).split(";")
mime_info = {"mime": mime, "encoding": encoding.strip().split("=")[
1], "file_ext": file_ext}
except Exception as e:
print(getattr(e, 'message', repr(e)))
full_path = path
file_name = path.split('/')[-1]
file_ext = file_name.split('.')[-1]
mime = False
mime_info = {"file_type": file_ext, "encoding": False}
file_type = file_ext
if mime:
if mime in ["text/plain", "application/csv"]:
if mime_info["file_ext"] == "json":
file_type = "json"
else:
file_type = "csv"
elif mime == "application/json":
file_type = "json"
elif mime == "text/xml":
file_type = "xml"
elif mime in ["application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"]:
file_type = "excel"
else:
RaiseIt.value_error(
mime, ["csv", "json", "xml", "xls", "xlsx"])
# Detect the file encoding
if file_type == "csv":
# In some case magic get a "unknown-8bit" which can not be use to decode the file use latin-1 instead
if mime_info.get("encoding", None) == "unknown-8bit":
mime_info["encoding"] = "latin-1"
if mime:
import csv
dialect = csv.Sniffer().sniff(str(buffer))
mime_info["file_type"] = "csv"
r = {"properties": {"sep": dialect.delimiter,
"doublequote": dialect.doublequote,
"escapechar": dialect.escapechar,
"lineterminator": dialect.lineterminator,
"quotechar": dialect.quotechar,
"quoting": dialect.quoting,
"skipinitialspace": dialect.skipinitialspace}}
mime_info.update(r)
kwargs.update({
"encoding": mime_info.get("encoding", None),
**mime_info.get("properties", {})
})
df = self.csv(filepath_or_buffer=path, *args, **kwargs)
elif file_type == "json":
mime_info["file_type"] = "json"
df = self.json(full_path, *args, **kwargs)
elif file_type == "xml":
mime_info["file_type"] = "xml"
df = self.xml(full_path, **kwargs)
elif file_type == "excel":
mime_info["file_type"] = "excel"
df = self.excel(full_path, **kwargs)
else:
RaiseIt.value_error(
file_type, ["csv", "json", "xml", "xls", "xlsx"])
return df
[docs] @staticmethod
def model(path):
"""
Load a machine learning model from a file.
:param path: Path to the file we want to load.
:return:
"""
import joblib
return joblib.load(path)