airtunnel package

Submodules

airtunnel.data_asset module

Module for Airtunnel data asset abstractions and data asset io abstractions.

class airtunnel.data_asset.BaseDataAsset(name: str)

Bases: object

Base class for all Airtunnel data assets.

declarations

Declarations of this data asset

property discovered_files_xcom_key
Returns

the XCOM key for discovered files for this data asset

property ingest_archive_path
Returns

the ingest/archive path for this data asset

property landing_path
Returns

the ingest/landing path for this data asset

make_ready_temp_path(airflow_context: Dict) → str

Makes a temporary path under ready for staging purposes.

It points to a hidden (prefixed by “.”) folder as the last component, so that this path is ignored by frameworks like Spark.

Parameters

airflow_context – a live Airflow context, used to get the exec timestamp

Returns

the temporary staging path

name

Name of this data asset

property output_filename
Returns

creates an output filename for this data asset considering the declared storage format and compression codec (if using compression)

pickedup_files(airflow_context) → List[str]

Using Airflow XCOM based on the given Airflow context with its DAG id, fetch the list of previously discovered files for ingestion.

Then, considering the files have been picked-up by the ingestion operator, adjust the path prefix for the picked-up folder and return the list of file paths.

Parameters

airflow_context – a live Airflow context

Returns

list of picked-up file paths

ready_archive_path(airflow_context) → str

Return the versioned archive path for this data asset, based on the Airflow execution date.

Parameters

airflow_context – a live Airflow context

Returns

the versioned archived path for this data asset

property ready_path
Returns

the ready (folder) path for this data asset

abstract rebuild_for_store(airflow_context, **kwargs) → None

Rebuild this data asset for the data store’s ready folder.

Parameters
  • airflow_context – a live Airflow context

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

abstract retrieve_from_store() → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]

Retrieve this data asset from the ready layer of the data store.

Returns

A pandas or PySpark dataframe

staging_pickedup_path(airflow_context) → str

Return the pickedup path under staging for this data asset, versioned by the Airflow execution date.

Parameters

airflow_context – a live Airflow context

Returns

the versioned staging/pickedup path for this data asset

property staging_ready_path
Returns

the staging/ready path for this data asset

class airtunnel.data_asset.BaseDataAssetIO

Bases: abc.ABC

Base class for all Airtunnel DataAssetIO.

static prepare_staging_ready_path(asset: airtunnel.data_asset.BaseDataAsset) → None

For a given data asset, prepare the staging/ready path for it, i.e. ensure it exists if this is the first ever run.

Parameters

asset – the data asset for which to prepare the staging/ready path

Returns

None

abstract static read_data_asset(asset: airtunnel.data_asset.BaseDataAsset, source_files: Iterable[str], **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]

Reads a data asset.

Parameters
  • asset – the data asset to read

  • source_files – input source file(s) to read

  • reader_kwargs – additional keyword arguments to pass into the reader function

Returns

the read in data as a dataframe

abstract static retrieve_data_asset(asset: airtunnel.data_asset.BaseDataAsset, **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]

Retrieves a data asset from the Airtunnel data store.

Parameters
  • asset – the data asset to retrieve

  • reader_kwargs – additional keyword arguments to pass into the reader function

Returns

the retrieved data as a dataframe

abstract static write_data_asset(asset: airtunnel.data_asset.BaseDataAsset, data: Union[pandas.core.frame.DataFrame, pyspark.sql.DataFrame], **writer_kwargs) → None

Writes a data asset.

Parameters
  • asset – the data asset to write

  • data – date to write for the data asset

  • writer_kwargs – additional keyword arguments to pass into the writer function

Returns

None

class airtunnel.data_asset.PandasDataAsset(name: str)

Bases: airtunnel.data_asset.BaseDataAsset

Implements a Pandas enabled Airtunnel data asset.

declarations
name
rebuild_for_store(airflow_context, **kwargs) → None

Rebuild this data asset for the data store’s ready folder (using Pandas).

Parameters
  • airflow_context – a live Airflow context

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

rename_fields_as_declared(data: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame

Rename the columns as declared for this data asset.

Parameters

data – the input Pandas dataframe to perform the rename on

Returns

the dataframe with the columns renamed

retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None) → pandas.core.frame.DataFrame

Retrieve this data asset from the ready layer of the data store (using Pandas).

Parameters
  • airflow_context – a live Airflow context (optional)

  • consuming_asset – the consuming data asset for lineage collection (optional)

Returns

the data asset as a Pandas dataframe

class airtunnel.data_asset.PandasDataAssetIO

Bases: airtunnel.data_asset.BaseDataAssetIO

IO interface for the Airtunnel PandasDataAsset.

static read_data_asset(asset: airtunnel.data_asset.PandasDataAsset, source_files: Iterable[str], **reader_kwargs) → pandas.core.frame.DataFrame

Reads a PandasDataAsset using Pandas.

Parameters
  • asset – the PandasDataAsset to read

  • source_files – input source file(s) to read

  • reader_kwargs – additional keyword arguments to pass into the reader function

Returns

the read in data as a dataframe

static retrieve_data_asset(asset: airtunnel.data_asset.PandasDataAsset, **reader_kwargs) → pandas.core.frame.DataFrame

Retrieves a PandasDataAsset from the Airtunnel data store (using Pandas).

Parameters
  • asset – the data asset to retrieve

  • reader_kwargs – additional keyword arguments to pass into the Pandas reader function

Returns

the retrieved data as a Pandas dataframe

static write_data_asset(asset: airtunnel.data_asset.PandasDataAsset, data: pandas.core.frame.DataFrame, **writer_kwargs) → None

Writes a PandasDataAsset using Pandas.

Parameters
  • asset – the data asset to write

  • data – date to write for the data asset

  • writer_kwargs – additional keyword arguments to pass into the Pandas writer function

Returns

None

class airtunnel.data_asset.PySparkDataAsset(name: str)

Bases: airtunnel.data_asset.BaseDataAsset

A PySpark enabled Airtunnel data asset.

declarations
name
rebuild_for_store(airflow_context, **kwargs)

Rebuild this data asset for the data store’s ready folder (using PySpark).

Parameters
  • airflow_context – a live Airflow context

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

rename_fields_as_declared(data: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Rename the columns as declared for this data asset.

Parameters

data – the input PySpark dataframe to perform the rename on

Returns

the PySpark dataframe with the columns renamed

retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None, spark_session: Optional[pyspark.sql.session.SparkSession] = None) → pyspark.sql.dataframe.DataFrame

Retrieve this data asset from the ready layer of the data store (using Pandas).

Parameters
  • airflow_context – a live Airflow context (optional)

  • consuming_asset – the consuming data asset for lineage collection (optional)

  • spark_session – a live Spark session to use (required)

Returns

the data asset as a Pandas dataframe

class airtunnel.data_asset.PySparkDataAssetIO

Bases: airtunnel.data_asset.BaseDataAssetIO

IO interface for the Airtunnel PySparkDataAsset.

static read_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, source_files: Iterable[str], spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame

Reads a PandasDataAsset using PySpark.

Parameters
  • asset – the PySparkDataAsset to read

  • source_files – input source file(s) to read

  • spark_session – a live PySpark session to use (required)

  • reader_kwargs – additional keyword arguments to pass into the PySpark reader function

Returns

the read in data as a dataframe

static retrieve_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame

Retrieves a PySparkDataAsset from the Airtunnel data store (using PySpark).

Parameters
  • asset – the data asset to retrieve

  • spark_session – a live PySpark session to use (required)

  • reader_kwargs – additional keyword arguments to pass into the PySpark reader function

Returns

the retrieved data as a PySpark dataframe

static write_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, data: pyspark.sql.dataframe.DataFrame, **writer_kwargs) → None

Writes a PySparkDataAsset using PySpark.

Parameters
  • asset – the data asset to write

  • data – date to write for the data asset

  • writer_kwargs – additional keyword arguments to pass into the PySpark writer function

Returns

None

class airtunnel.data_asset.SQLDataAsset(name: str, sql_hook: airflow.hooks.dbapi_hook.DbApiHook)

Bases: airtunnel.data_asset.BaseDataAsset

A SQL enabled Airtunnel data asset.

declarations
formatted_sql_script(parameters: Dict, script_type: str = 'dml', dynamic_parameters: Callable = None, airflow_context=None) → str

Get the formatted SQL script for this data asset.

Parameters
  • parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)

  • script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder

  • dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime

  • airflow_context – a live Airflow context – passed into the function to compute dynamic parameters

Returns

the formatted SQL script for this data asset

get_raw_sql_script(script_type: str = 'dml') → str

Returns the raw SQL script for this data asset.

Parameters

script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder

Returns

the raw SQL script as a string

name
rebuild_for_store(airflow_context, parameters: Dict = None, dynamic_parameters: Callable = None, **kwargs)

Rebuild this data asset for the data store’s ready folder (using SQL).

Parameters
  • airflow_context – a live Airflow context

  • parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)

  • dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

retrieve_from_store() → pandas.core.frame.DataFrame

Retrieve this data asset from the ready layer of the data store (using SQL).

Returns

the data asset as a Pandas dataframe

class airtunnel.data_asset.ShellDataAsset(name: str)

Bases: airtunnel.data_asset.BaseDataAsset

A shell DataAsset, acting merely as a container - i.e. for lineage.

declarations
static from_names(names: Iterable[str]) → List[airtunnel.data_asset.ShellDataAsset]

Create several ShellDataAssets from a list of names.

Parameters

names – the names for which to create ShellDataAssets for

Returns

list of created ShellDataAssets

name
rebuild_for_store(airflow_context, **kwargs) → None

Unimplemented method of the BaseDataAsset interface – do not use!

retrieve_from_store()

Unimplemented method of the BaseDataAsset interface – do not use!

to_full_data_asset(target_type: type) → Union[airtunnel.data_asset.PySparkDataAsset, airtunnel.data_asset.PandasDataAsset]

Convert this ShellDataAsset to a full Airtunnel data asset of a given type.

Parameters

target_type – the target class to convert this ShellDataAsset to

Returns

the full data asset

airtunnel.data_store module

Module for Airtunnel DataStore abstractions.

class airtunnel.data_store.BaseDataStoreAdapter

Bases: abc.ABC

Base class for all Airtunnel DataStoreAdapters.

abstract static copy(source: str, destination: str, recursive: bool = False) → None

Copy file(s) from source to destination.

abstract static delete(path: str, recursive: bool = False) → None

Delete file(s) at a given path.

abstract static exists(path: str, **kwargs) → bool

Checks the given path exists.

abstract static glob(pattern: str, **kwargs) → List[str]

Fetch file list using the provided glob pattern.

abstract static inspect(files: List[str]) → Dict[str, Tuple[datetime.datetime, datetime.datetime, int]]

For a given file list, fetch create & modification datetime and byte size.

abstract static listdir(path: str, recursive: bool = False, **kwargs) → List[str]

List entries of a given path.

abstract static makedirs(path: str, exist_ok: bool = False, **kwargs) → None

Make directories along the given path.

abstract static modification_times(files: List[str]) → Dict[str, int]

For a given file list, fetch modification times as int Unix timestamps.

abstract static move(source: str, destination: str, recursive: bool = False) → None

Move file(s) from source to destination.

abstract static open(file: str, mode: str, **kwargs) → TextIO

Open a file-handle with the given path & mode.

class airtunnel.data_store.LocalDataStoreAdapter

Bases: airtunnel.data_store.BaseDataStoreAdapter

DataStoreAdapter implementation targeting the local filesystem.

static copy(source: str, destination: str, recursive: bool = False) → None

Copy file(s) from source to destination.

static delete(path: str, recursive: bool = False) → None

Delete file(s) at a given path.

static exists(path: str, **kwargs) → bool

Checks the given path exists.

static glob(pattern: str, **kwargs) → List[str]

Fetch file list using the provided glob pattern.

abstract static inspect(files: List[str]) → Dict[str, Tuple[datetime.datetime, datetime.datetime, int]]

For a given file list, fetch create & modification datetime and byte size.

static listdir(path: str, recursive: bool = False, **kwargs) → List[str]

List entries of a given path.

static makedirs(path: str, exist_ok: bool = False, **kwargs) → None

Make directories along the given path.

static modification_times(files: List[str]) → Dict[str, int]

For a given file list, fetch modification times as int Unix timestamps.

static move(source: str, destination: str, recursive: bool = False) → None

Move file(s) from source to destination.

static open(file: str, mode: str, **kwargs) → TextIO

Open a file-handle with the given path & mode.

airtunnel.data_store.get_configured_data_store_adapter() → airtunnel.data_store.BaseDataStoreAdapter

Gets the configured DataStoreAdapter.

airtunnel.declaration_store module

Module for Airtunnel Declaration Store abstractions.

class airtunnel.declaration_store.DataAssetDeclaration(data_asset: str)

Bases: object

Abstraction around declarations for an Airtunnel data asset.

property all

Get all declarations

property archive_ingest

Whether to archive the ingested files.

property archive_ready

Whether to archive the ready layer data of this data asset.

property asset_name

Name of the declared asset.

property extra_declarations

Return extra declarations.

property in_storage_format

The declared input storage format.

property ingest_file_glob

The declared file glob for ingestion.

property is_csv_input

Whether the input for this data asset is CSV.

property is_csv_output

Whether the output for this data asset is CSV.

property is_parquet_input

Whether the input for this data asset is Parquet.

property is_parquet_output

Whether the output for this data asset is Parquet.

property is_xls_input

Whether the input for this data asset is Excel.

property key_columns

Return list of declared key columns.

property out_comp_codec

The declared output compression codec.

property out_storage_format

The declared output storage format.

property run_ddl

Whether to run DDL for this data asset.

property staging_assets

Return list of declared staging assets.

property transform_date_formats

The declared data formats.

property transform_renames

The declared rename transformations.

class airtunnel.declaration_store.DeclarationSchemas

Bases: object

Wrapper to hold all our YAML schemas to validate declaration files.

DERIVED_DATA_ASSET = Schema({'type': And(<class 'str'>, <function DeclarationSchemas.<lambda>>), Optional('load'): Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])}), Optional('staging_assets'): Schema([Schema(<class 'str'>)]), Optional('extra'): Schema({<class 'object'>: <class 'object'>})})
INGESTED_DATA_ASSET = Schema({'type': And(<class 'str'>, <function DeclarationSchemas.<lambda>>), 'ingest': Schema({Optional('in_storage_format'): <function DeclarationSchemas.<lambda>>, 'file_input_glob': And(<class 'str'>, <built-in function len>), Optional('archive_ingest'): <function DeclarationSchemas.<lambda>>}), Optional('transformation'): Schema({Optional('in_column_renames'): And(<class 'dict'>, <function DeclarationSchemas.<lambda>>, <function DeclarationSchemas.<lambda>>), Optional('in_date_formats'): And(<class 'dict'>, <function DeclarationSchemas.<lambda>>)}), Optional('load'): Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])}), Optional('staging_assets'): Schema([Schema(<class 'str'>)]), Optional('extra'): Schema(<class 'object'>)})
LOAD_SCHEMA = Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])})
STAGING_ASSETS = Schema([Schema(<class 'str'>)])
airtunnel.declaration_store.fetch_all_declarations() → Iterable[airtunnel.declaration_store.DataAssetDeclaration]

Fetch declarations for all data assets that exist.

airtunnel.paths module

Module for Airtunnel’s paths, i.e. to the declaration, data and scripts store folders.

Module contents

airtunnel – tame your Airflow!

class airtunnel.BaseDataAsset(name: str)

Bases: object

Base class for all Airtunnel data assets.

declarations

Declarations of this data asset

property discovered_files_xcom_key
Returns

the XCOM key for discovered files for this data asset

property ingest_archive_path
Returns

the ingest/archive path for this data asset

property landing_path
Returns

the ingest/landing path for this data asset

make_ready_temp_path(airflow_context: Dict) → str

Makes a temporary path under ready for staging purposes.

It points to a hidden (prefixed by “.”) folder as the last component, so that this path is ignored by frameworks like Spark.

Parameters

airflow_context – a live Airflow context, used to get the exec timestamp

Returns

the temporary staging path

name

Name of this data asset

property output_filename
Returns

creates an output filename for this data asset considering the declared storage format and compression codec (if using compression)

pickedup_files(airflow_context) → List[str]

Using Airflow XCOM based on the given Airflow context with its DAG id, fetch the list of previously discovered files for ingestion.

Then, considering the files have been picked-up by the ingestion operator, adjust the path prefix for the picked-up folder and return the list of file paths.

Parameters

airflow_context – a live Airflow context

Returns

list of picked-up file paths

ready_archive_path(airflow_context) → str

Return the versioned archive path for this data asset, based on the Airflow execution date.

Parameters

airflow_context – a live Airflow context

Returns

the versioned archived path for this data asset

property ready_path
Returns

the ready (folder) path for this data asset

abstract rebuild_for_store(airflow_context, **kwargs) → None

Rebuild this data asset for the data store’s ready folder.

Parameters
  • airflow_context – a live Airflow context

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

abstract retrieve_from_store() → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]

Retrieve this data asset from the ready layer of the data store.

Returns

A pandas or PySpark dataframe

staging_pickedup_path(airflow_context) → str

Return the pickedup path under staging for this data asset, versioned by the Airflow execution date.

Parameters

airflow_context – a live Airflow context

Returns

the versioned staging/pickedup path for this data asset

property staging_ready_path
Returns

the staging/ready path for this data asset

class airtunnel.SQLDataAsset(name: str, sql_hook: airflow.hooks.dbapi_hook.DbApiHook)

Bases: airtunnel.data_asset.BaseDataAsset

A SQL enabled Airtunnel data asset.

declarations
formatted_sql_script(parameters: Dict, script_type: str = 'dml', dynamic_parameters: Callable = None, airflow_context=None) → str

Get the formatted SQL script for this data asset.

Parameters
  • parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)

  • script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder

  • dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime

  • airflow_context – a live Airflow context – passed into the function to compute dynamic parameters

Returns

the formatted SQL script for this data asset

get_raw_sql_script(script_type: str = 'dml') → str

Returns the raw SQL script for this data asset.

Parameters

script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder

Returns

the raw SQL script as a string

name
rebuild_for_store(airflow_context, parameters: Dict = None, dynamic_parameters: Callable = None, **kwargs)

Rebuild this data asset for the data store’s ready folder (using SQL).

Parameters
  • airflow_context – a live Airflow context

  • parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)

  • dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

retrieve_from_store() → pandas.core.frame.DataFrame

Retrieve this data asset from the ready layer of the data store (using SQL).

Returns

the data asset as a Pandas dataframe

class airtunnel.PandasDataAsset(name: str)

Bases: airtunnel.data_asset.BaseDataAsset

Implements a Pandas enabled Airtunnel data asset.

declarations
name
rebuild_for_store(airflow_context, **kwargs) → None

Rebuild this data asset for the data store’s ready folder (using Pandas).

Parameters
  • airflow_context – a live Airflow context

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

rename_fields_as_declared(data: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame

Rename the columns as declared for this data asset.

Parameters

data – the input Pandas dataframe to perform the rename on

Returns

the dataframe with the columns renamed

retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None) → pandas.core.frame.DataFrame

Retrieve this data asset from the ready layer of the data store (using Pandas).

Parameters
  • airflow_context – a live Airflow context (optional)

  • consuming_asset – the consuming data asset for lineage collection (optional)

Returns

the data asset as a Pandas dataframe

class airtunnel.PySparkDataAsset(name: str)

Bases: airtunnel.data_asset.BaseDataAsset

A PySpark enabled Airtunnel data asset.

declarations
name
rebuild_for_store(airflow_context, **kwargs)

Rebuild this data asset for the data store’s ready folder (using PySpark).

Parameters
  • airflow_context – a live Airflow context

  • kwargs – additional keyword arguments to be passed along the downstream scripts

Returns

None

rename_fields_as_declared(data: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame

Rename the columns as declared for this data asset.

Parameters

data – the input PySpark dataframe to perform the rename on

Returns

the PySpark dataframe with the columns renamed

retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None, spark_session: Optional[pyspark.sql.session.SparkSession] = None) → pyspark.sql.dataframe.DataFrame

Retrieve this data asset from the ready layer of the data store (using Pandas).

Parameters
  • airflow_context – a live Airflow context (optional)

  • consuming_asset – the consuming data asset for lineage collection (optional)

  • spark_session – a live Spark session to use (required)

Returns

the data asset as a Pandas dataframe

class airtunnel.BaseDataAssetIO

Bases: abc.ABC

Base class for all Airtunnel DataAssetIO.

static prepare_staging_ready_path(asset: airtunnel.data_asset.BaseDataAsset) → None

For a given data asset, prepare the staging/ready path for it, i.e. ensure it exists if this is the first ever run.

Parameters

asset – the data asset for which to prepare the staging/ready path

Returns

None

abstract static read_data_asset(asset: airtunnel.data_asset.BaseDataAsset, source_files: Iterable[str], **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]

Reads a data asset.

Parameters
  • asset – the data asset to read

  • source_files – input source file(s) to read

  • reader_kwargs – additional keyword arguments to pass into the reader function

Returns

the read in data as a dataframe

abstract static retrieve_data_asset(asset: airtunnel.data_asset.BaseDataAsset, **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]

Retrieves a data asset from the Airtunnel data store.

Parameters
  • asset – the data asset to retrieve

  • reader_kwargs – additional keyword arguments to pass into the reader function

Returns

the retrieved data as a dataframe

abstract static write_data_asset(asset: airtunnel.data_asset.BaseDataAsset, data: Union[pandas.core.frame.DataFrame, pyspark.sql.DataFrame], **writer_kwargs) → None

Writes a data asset.

Parameters
  • asset – the data asset to write

  • data – date to write for the data asset

  • writer_kwargs – additional keyword arguments to pass into the writer function

Returns

None

class airtunnel.PandasDataAssetIO

Bases: airtunnel.data_asset.BaseDataAssetIO

IO interface for the Airtunnel PandasDataAsset.

static read_data_asset(asset: airtunnel.data_asset.PandasDataAsset, source_files: Iterable[str], **reader_kwargs) → pandas.core.frame.DataFrame

Reads a PandasDataAsset using Pandas.

Parameters
  • asset – the PandasDataAsset to read

  • source_files – input source file(s) to read

  • reader_kwargs – additional keyword arguments to pass into the reader function

Returns

the read in data as a dataframe

static retrieve_data_asset(asset: airtunnel.data_asset.PandasDataAsset, **reader_kwargs) → pandas.core.frame.DataFrame

Retrieves a PandasDataAsset from the Airtunnel data store (using Pandas).

Parameters
  • asset – the data asset to retrieve

  • reader_kwargs – additional keyword arguments to pass into the Pandas reader function

Returns

the retrieved data as a Pandas dataframe

static write_data_asset(asset: airtunnel.data_asset.PandasDataAsset, data: pandas.core.frame.DataFrame, **writer_kwargs) → None

Writes a PandasDataAsset using Pandas.

Parameters
  • asset – the data asset to write

  • data – date to write for the data asset

  • writer_kwargs – additional keyword arguments to pass into the Pandas writer function

Returns

None

class airtunnel.PySparkDataAssetIO

Bases: airtunnel.data_asset.BaseDataAssetIO

IO interface for the Airtunnel PySparkDataAsset.

static read_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, source_files: Iterable[str], spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame

Reads a PandasDataAsset using PySpark.

Parameters
  • asset – the PySparkDataAsset to read

  • source_files – input source file(s) to read

  • spark_session – a live PySpark session to use (required)

  • reader_kwargs – additional keyword arguments to pass into the PySpark reader function

Returns

the read in data as a dataframe

static retrieve_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame

Retrieves a PySparkDataAsset from the Airtunnel data store (using PySpark).

Parameters
  • asset – the data asset to retrieve

  • spark_session – a live PySpark session to use (required)

  • reader_kwargs – additional keyword arguments to pass into the PySpark reader function

Returns

the retrieved data as a PySpark dataframe

static write_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, data: pyspark.sql.dataframe.DataFrame, **writer_kwargs) → None

Writes a PySparkDataAsset using PySpark.

Parameters
  • asset – the data asset to write

  • data – date to write for the data asset

  • writer_kwargs – additional keyword arguments to pass into the PySpark writer function

Returns

None