Source code for optimus.engines.base.io.save

import warnings

from optimus.helpers.types import *

DEFAULT_MODE = "w"
DEFAULT_NUM_PARTITIONS = 1


[docs]class BaseSave: def __init__(self, root: 'DataFrameType'): self.root = root
[docs] def file(self, path: str, *args, **kwargs): """ :param path: :param args: :param kwargs: :return: """ if "." not in path: warnings.warn("No file extension found in path, saving to Parquet file.") file_ext = "parquet" else: file_ext = path.split(".")[-1] funcs = { 'xls': 'excel', 'xlsx': 'excel' } func_name = funcs.get(file_ext, file_ext.lower()) func = getattr(self, func_name, None) if not callable(func): raise ValueError(f"No function found for extension '{file_ext}'") return func(path, *args, **kwargs)
[docs] def csv(self, path, *args, **kwargs): """ Save data frame to a CSV file. :param path: path where the spark will be saved. :param mode: 'rb', 'wt', etc it uses the default value. :return: Dataframe in a CSV format in the specified path. """ raise NotImplementedError("Not implemented yet")
[docs] def xml(self, filename: str = None, mode: str = 'w'): """ :param filename: :param mode: :return: """ dfd = self.root.data columns = list(dfd.columns) xml_data = ['<root>'] for i in range(len(dfd)): xml_data.append('<row>') # Opening element tag for column in columns: # writing sub-elements xml_data.append(f' <{column}>{dfd[column][i]}</{column}>') xml_data.append('</row>') # Closing element tag xml_data.append('</root>') if filename is None: return '\n'.join(xml_data) folder_name = "/".join(filename.split('/')[0:-1]) from pathlib import Path Path(folder_name).mkdir(parents=True, exist_ok=True) with open(filename, mode, encoding="utf-8") as f: f.write('\n'.join(xml_data))
[docs] def json(self, path, *args, **kwargs): """ Save data frame in a json file :param path: path where the spark will be saved. :param mode: Specifies the behavior of the save operation when data already exists. "append": Append contents of this DataFrame to existing data. "overwrite" (default case): Overwrite existing data. "ignore": Silently ignore this operation if data already exists. "error": Throw an exception if data already exists. :param num_partitions: the number of partitions of the DataFrame :return: """ raise NotImplementedError("Not implemented yet")
[docs] def excel(self, path, mode=DEFAULT_MODE, *args, **kwargs): """ Save data frame to a CSV file. :param path: File path or object :param mode: Python write mode, default ‘w’. it uses the default value. :return: Dataframe in a CSV format in the specified path. """ raise NotImplementedError("Not implemented yet")
def avro(self, path, *args, **kwargs): raise NotImplementedError("Not implemented yet")
[docs] def parquet(self, path, mode=DEFAULT_MODE, num_partitions=DEFAULT_NUM_PARTITIONS, *args, **kwargs): """ Save data frame to a parquet file :param path: File path or object :param mode: Specifies the behavior of the save operation when data already exists. "append": Append contents of this DataFrame to existing data. "overwrite" (default case): Overwrite existing data. "ignore": Silently ignore this operation if data already exists. "error": Throw an exception if data already exists. :param num_partitions: the number of partitions of the DataFrame :return: """ raise NotImplementedError("Not implemented yet")
[docs] def orc(self, path, *args, **kwargs): """ :param path: :param args: :param kwargs: :return: """ raise NotImplementedError("Not implemented yet")
[docs] def hdf5(self, path, *args, **kwargs): """ :param path: :param args: :param kwargs: :return: """ raise NotImplementedError("Not implemented yet")
[docs] def database_table(self, table, db, *args, **kwargs): """ :param table: :param db: :param args: :param kwargs: :return: """ raise NotImplementedError("Not implemented yet")