airtunnel package¶
Subpackages¶
Submodules¶
airtunnel.data_asset module¶
Module for Airtunnel data asset abstractions and data asset io abstractions.
-
class
airtunnel.data_asset.
BaseDataAsset
(name: str)¶ Bases:
object
Base class for all Airtunnel data assets.
-
declarations
¶ Declarations of this data asset
-
property
discovered_files_xcom_key
¶ - Returns
the XCOM key for discovered files for this data asset
-
property
ingest_archive_path
¶ - Returns
the ingest/archive path for this data asset
-
property
landing_path
¶ - Returns
the ingest/landing path for this data asset
-
make_ready_temp_path
(airflow_context: Dict) → str¶ Makes a temporary path under ready for staging purposes.
It points to a hidden (prefixed by “.”) folder as the last component, so that this path is ignored by frameworks like Spark.
- Parameters
airflow_context – a live Airflow context, used to get the exec timestamp
- Returns
the temporary staging path
-
name
¶ Name of this data asset
-
property
output_filename
¶ - Returns
creates an output filename for this data asset considering the declared storage format and compression codec (if using compression)
-
pickedup_files
(airflow_context) → List[str]¶ Using Airflow XCOM based on the given Airflow context with its DAG id, fetch the list of previously discovered files for ingestion.
Then, considering the files have been picked-up by the ingestion operator, adjust the path prefix for the picked-up folder and return the list of file paths.
- Parameters
airflow_context – a live Airflow context
- Returns
list of picked-up file paths
-
ready_archive_path
(airflow_context) → str¶ Return the versioned archive path for this data asset, based on the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned archived path for this data asset
-
property
ready_path
¶ - Returns
the ready (folder) path for this data asset
-
abstract
rebuild_for_store
(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder.
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
abstract
retrieve_from_store
() → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieve this data asset from the ready layer of the data store.
- Returns
A pandas or PySpark dataframe
-
staging_pickedup_path
(airflow_context) → str¶ Return the pickedup path under staging for this data asset, versioned by the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned staging/pickedup path for this data asset
-
property
staging_ready_path
¶ - Returns
the staging/ready path for this data asset
-
-
class
airtunnel.data_asset.
BaseDataAssetIO
¶ Bases:
abc.ABC
Base class for all Airtunnel DataAssetIO.
-
static
prepare_staging_ready_path
(asset: airtunnel.data_asset.BaseDataAsset) → None¶ For a given data asset, prepare the staging/ready path for it, i.e. ensure it exists if this is the first ever run.
- Parameters
asset – the data asset for which to prepare the staging/ready path
- Returns
None
-
abstract static
read_data_asset
(asset: airtunnel.data_asset.BaseDataAsset, source_files: Iterable[str], **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Reads a data asset.
- Parameters
asset – the data asset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
abstract static
retrieve_data_asset
(asset: airtunnel.data_asset.BaseDataAsset, **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieves a data asset from the Airtunnel data store.
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the retrieved data as a dataframe
-
abstract static
write_data_asset
(asset: airtunnel.data_asset.BaseDataAsset, data: Union[pandas.core.frame.DataFrame, pyspark.sql.DataFrame], **writer_kwargs) → None¶ Writes a data asset.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the writer function
- Returns
None
-
static
-
class
airtunnel.data_asset.
PandasDataAsset
(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAsset
Implements a Pandas enabled Airtunnel data asset.
-
declarations
¶
-
name
¶
-
rebuild_for_store
(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder (using Pandas).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared
(data: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input Pandas dataframe to perform the rename on
- Returns
the dataframe with the columns renamed
-
retrieve_from_store
(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None) → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.data_asset.
PandasDataAssetIO
¶ Bases:
airtunnel.data_asset.BaseDataAssetIO
IO interface for the Airtunnel PandasDataAsset.
-
static
read_data_asset
(asset: airtunnel.data_asset.PandasDataAsset, source_files: Iterable[str], **reader_kwargs) → pandas.core.frame.DataFrame¶ Reads a PandasDataAsset using Pandas.
- Parameters
asset – the PandasDataAsset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset
(asset: airtunnel.data_asset.PandasDataAsset, **reader_kwargs) → pandas.core.frame.DataFrame¶ Retrieves a PandasDataAsset from the Airtunnel data store (using Pandas).
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the Pandas reader function
- Returns
the retrieved data as a Pandas dataframe
-
static
write_data_asset
(asset: airtunnel.data_asset.PandasDataAsset, data: pandas.core.frame.DataFrame, **writer_kwargs) → None¶ Writes a PandasDataAsset using Pandas.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the Pandas writer function
- Returns
None
-
static
-
class
airtunnel.data_asset.
PySparkDataAsset
(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAsset
A PySpark enabled Airtunnel data asset.
-
declarations
¶
-
name
¶
-
rebuild_for_store
(airflow_context, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using PySpark).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared
(data: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input PySpark dataframe to perform the rename on
- Returns
the PySpark dataframe with the columns renamed
-
retrieve_from_store
(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None, spark_session: Optional[pyspark.sql.session.SparkSession] = None) → pyspark.sql.dataframe.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
spark_session – a live Spark session to use (required)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.data_asset.
PySparkDataAssetIO
¶ Bases:
airtunnel.data_asset.BaseDataAssetIO
IO interface for the Airtunnel PySparkDataAsset.
-
static
read_data_asset
(asset: airtunnel.data_asset.PySparkDataAsset, source_files: Iterable[str], spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Reads a PandasDataAsset using PySpark.
- Parameters
asset – the PySparkDataAsset to read
source_files – input source file(s) to read
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset
(asset: airtunnel.data_asset.PySparkDataAsset, spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Retrieves a PySparkDataAsset from the Airtunnel data store (using PySpark).
- Parameters
asset – the data asset to retrieve
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the retrieved data as a PySpark dataframe
-
static
write_data_asset
(asset: airtunnel.data_asset.PySparkDataAsset, data: pyspark.sql.dataframe.DataFrame, **writer_kwargs) → None¶ Writes a PySparkDataAsset using PySpark.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the PySpark writer function
- Returns
None
-
static
-
class
airtunnel.data_asset.
SQLDataAsset
(name: str, sql_hook: airflow.hooks.dbapi_hook.DbApiHook)¶ Bases:
airtunnel.data_asset.BaseDataAsset
A SQL enabled Airtunnel data asset.
-
declarations
¶
-
formatted_sql_script
(parameters: Dict, script_type: str = 'dml', dynamic_parameters: Callable = None, airflow_context=None) → str¶ Get the formatted SQL script for this data asset.
- Parameters
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
airflow_context – a live Airflow context – passed into the function to compute dynamic parameters
- Returns
the formatted SQL script for this data asset
-
get_raw_sql_script
(script_type: str = 'dml') → str¶ Returns the raw SQL script for this data asset.
- Parameters
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
- Returns
the raw SQL script as a string
-
name
¶
-
rebuild_for_store
(airflow_context, parameters: Dict = None, dynamic_parameters: Callable = None, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using SQL).
- Parameters
airflow_context – a live Airflow context
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
retrieve_from_store
() → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using SQL).
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.data_asset.
ShellDataAsset
(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAsset
A shell DataAsset, acting merely as a container - i.e. for lineage.
-
declarations
¶
-
static
from_names
(names: Iterable[str]) → List[airtunnel.data_asset.ShellDataAsset]¶ Create several ShellDataAssets from a list of names.
- Parameters
names – the names for which to create ShellDataAssets for
- Returns
list of created ShellDataAssets
-
name
¶
-
rebuild_for_store
(airflow_context, **kwargs) → None¶ Unimplemented method of the BaseDataAsset interface – do not use!
-
retrieve_from_store
()¶ Unimplemented method of the BaseDataAsset interface – do not use!
-
to_full_data_asset
(target_type: type) → Union[airtunnel.data_asset.PySparkDataAsset, airtunnel.data_asset.PandasDataAsset]¶ Convert this ShellDataAsset to a full Airtunnel data asset of a given type.
- Parameters
target_type – the target class to convert this ShellDataAsset to
- Returns
the full data asset
-
airtunnel.data_store module¶
Module for Airtunnel DataStore abstractions.
-
class
airtunnel.data_store.
BaseDataStoreAdapter
¶ Bases:
abc.ABC
Base class for all Airtunnel DataStoreAdapters.
-
abstract static
copy
(source: str, destination: str, recursive: bool = False) → None¶ Copy file(s) from source to destination.
-
abstract static
delete
(path: str, recursive: bool = False) → None¶ Delete file(s) at a given path.
-
abstract static
exists
(path: str, **kwargs) → bool¶ Checks the given path exists.
-
abstract static
glob
(pattern: str, **kwargs) → List[str]¶ Fetch file list using the provided glob pattern.
-
abstract static
inspect
(files: List[str]) → Dict[str, Tuple[datetime.datetime, datetime.datetime, int]]¶ For a given file list, fetch create & modification datetime and byte size.
-
abstract static
listdir
(path: str, recursive: bool = False, **kwargs) → List[str]¶ List entries of a given path.
-
abstract static
makedirs
(path: str, exist_ok: bool = False, **kwargs) → None¶ Make directories along the given path.
-
abstract static
modification_times
(files: List[str]) → Dict[str, int]¶ For a given file list, fetch modification times as int Unix timestamps.
-
abstract static
move
(source: str, destination: str, recursive: bool = False) → None¶ Move file(s) from source to destination.
-
abstract static
open
(file: str, mode: str, **kwargs) → TextIO¶ Open a file-handle with the given path & mode.
-
abstract static
-
class
airtunnel.data_store.
LocalDataStoreAdapter
¶ Bases:
airtunnel.data_store.BaseDataStoreAdapter
DataStoreAdapter implementation targeting the local filesystem.
-
static
copy
(source: str, destination: str, recursive: bool = False) → None¶ Copy file(s) from source to destination.
-
static
delete
(path: str, recursive: bool = False) → None¶ Delete file(s) at a given path.
-
static
exists
(path: str, **kwargs) → bool¶ Checks the given path exists.
-
static
glob
(pattern: str, **kwargs) → List[str]¶ Fetch file list using the provided glob pattern.
-
abstract static
inspect
(files: List[str]) → Dict[str, Tuple[datetime.datetime, datetime.datetime, int]]¶ For a given file list, fetch create & modification datetime and byte size.
-
static
listdir
(path: str, recursive: bool = False, **kwargs) → List[str]¶ List entries of a given path.
-
static
makedirs
(path: str, exist_ok: bool = False, **kwargs) → None¶ Make directories along the given path.
-
static
modification_times
(files: List[str]) → Dict[str, int]¶ For a given file list, fetch modification times as int Unix timestamps.
-
static
move
(source: str, destination: str, recursive: bool = False) → None¶ Move file(s) from source to destination.
-
static
open
(file: str, mode: str, **kwargs) → TextIO¶ Open a file-handle with the given path & mode.
-
static
-
airtunnel.data_store.
get_configured_data_store_adapter
() → airtunnel.data_store.BaseDataStoreAdapter¶ Gets the configured DataStoreAdapter.
airtunnel.declaration_store module¶
Module for Airtunnel Declaration Store abstractions.
-
class
airtunnel.declaration_store.
DataAssetDeclaration
(data_asset: str)¶ Bases:
object
Abstraction around declarations for an Airtunnel data asset.
-
property
all
¶ Get all declarations
-
property
archive_ingest
¶ Whether to archive the ingested files.
-
property
archive_ready
¶ Whether to archive the ready layer data of this data asset.
-
property
asset_name
¶ Name of the declared asset.
-
property
extra_declarations
¶ Return extra declarations.
-
property
in_storage_format
¶ The declared input storage format.
-
property
ingest_file_glob
¶ The declared file glob for ingestion.
-
property
is_csv_input
¶ Whether the input for this data asset is CSV.
-
property
is_csv_output
¶ Whether the output for this data asset is CSV.
-
property
is_parquet_input
¶ Whether the input for this data asset is Parquet.
-
property
is_parquet_output
¶ Whether the output for this data asset is Parquet.
-
property
is_xls_input
¶ Whether the input for this data asset is Excel.
-
property
key_columns
¶ Return list of declared key columns.
-
property
out_comp_codec
¶ The declared output compression codec.
-
property
out_storage_format
¶ The declared output storage format.
-
property
run_ddl
¶ Whether to run DDL for this data asset.
-
property
staging_assets
¶ Return list of declared staging assets.
-
property
transform_date_formats
¶ The declared data formats.
-
property
transform_renames
¶ The declared rename transformations.
-
property
-
class
airtunnel.declaration_store.
DeclarationSchemas
¶ Bases:
object
Wrapper to hold all our YAML schemas to validate declaration files.
-
DERIVED_DATA_ASSET
= Schema({'type': And(<class 'str'>, <function DeclarationSchemas.<lambda>>), Optional('load'): Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])}), Optional('staging_assets'): Schema([Schema(<class 'str'>)]), Optional('extra'): Schema({<class 'object'>: <class 'object'>})})¶
-
INGESTED_DATA_ASSET
= Schema({'type': And(<class 'str'>, <function DeclarationSchemas.<lambda>>), 'ingest': Schema({Optional('in_storage_format'): <function DeclarationSchemas.<lambda>>, 'file_input_glob': And(<class 'str'>, <built-in function len>), Optional('archive_ingest'): <function DeclarationSchemas.<lambda>>}), Optional('transformation'): Schema({Optional('in_column_renames'): And(<class 'dict'>, <function DeclarationSchemas.<lambda>>, <function DeclarationSchemas.<lambda>>), Optional('in_date_formats'): And(<class 'dict'>, <function DeclarationSchemas.<lambda>>)}), Optional('load'): Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])}), Optional('staging_assets'): Schema([Schema(<class 'str'>)]), Optional('extra'): Schema(<class 'object'>)})¶
-
LOAD_SCHEMA
= Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])})¶
-
STAGING_ASSETS
= Schema([Schema(<class 'str'>)])¶
-
-
airtunnel.declaration_store.
fetch_all_declarations
() → Iterable[airtunnel.declaration_store.DataAssetDeclaration]¶ Fetch declarations for all data assets that exist.
airtunnel.paths module¶
Module for Airtunnel’s paths, i.e. to the declaration, data and scripts store folders.
Module contents¶
airtunnel – tame your Airflow!
-
class
airtunnel.
BaseDataAsset
(name: str)¶ Bases:
object
Base class for all Airtunnel data assets.
-
declarations
¶ Declarations of this data asset
-
property
discovered_files_xcom_key
¶ - Returns
the XCOM key for discovered files for this data asset
-
property
ingest_archive_path
¶ - Returns
the ingest/archive path for this data asset
-
property
landing_path
¶ - Returns
the ingest/landing path for this data asset
-
make_ready_temp_path
(airflow_context: Dict) → str¶ Makes a temporary path under ready for staging purposes.
It points to a hidden (prefixed by “.”) folder as the last component, so that this path is ignored by frameworks like Spark.
- Parameters
airflow_context – a live Airflow context, used to get the exec timestamp
- Returns
the temporary staging path
-
name
¶ Name of this data asset
-
property
output_filename
¶ - Returns
creates an output filename for this data asset considering the declared storage format and compression codec (if using compression)
-
pickedup_files
(airflow_context) → List[str]¶ Using Airflow XCOM based on the given Airflow context with its DAG id, fetch the list of previously discovered files for ingestion.
Then, considering the files have been picked-up by the ingestion operator, adjust the path prefix for the picked-up folder and return the list of file paths.
- Parameters
airflow_context – a live Airflow context
- Returns
list of picked-up file paths
-
ready_archive_path
(airflow_context) → str¶ Return the versioned archive path for this data asset, based on the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned archived path for this data asset
-
property
ready_path
¶ - Returns
the ready (folder) path for this data asset
-
abstract
rebuild_for_store
(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder.
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
abstract
retrieve_from_store
() → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieve this data asset from the ready layer of the data store.
- Returns
A pandas or PySpark dataframe
-
staging_pickedup_path
(airflow_context) → str¶ Return the pickedup path under staging for this data asset, versioned by the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned staging/pickedup path for this data asset
-
property
staging_ready_path
¶ - Returns
the staging/ready path for this data asset
-
-
class
airtunnel.
SQLDataAsset
(name: str, sql_hook: airflow.hooks.dbapi_hook.DbApiHook)¶ Bases:
airtunnel.data_asset.BaseDataAsset
A SQL enabled Airtunnel data asset.
-
declarations
¶
-
formatted_sql_script
(parameters: Dict, script_type: str = 'dml', dynamic_parameters: Callable = None, airflow_context=None) → str¶ Get the formatted SQL script for this data asset.
- Parameters
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
airflow_context – a live Airflow context – passed into the function to compute dynamic parameters
- Returns
the formatted SQL script for this data asset
-
get_raw_sql_script
(script_type: str = 'dml') → str¶ Returns the raw SQL script for this data asset.
- Parameters
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
- Returns
the raw SQL script as a string
-
name
¶
-
rebuild_for_store
(airflow_context, parameters: Dict = None, dynamic_parameters: Callable = None, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using SQL).
- Parameters
airflow_context – a live Airflow context
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
retrieve_from_store
() → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using SQL).
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.
PandasDataAsset
(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAsset
Implements a Pandas enabled Airtunnel data asset.
-
declarations
¶
-
name
¶
-
rebuild_for_store
(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder (using Pandas).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared
(data: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input Pandas dataframe to perform the rename on
- Returns
the dataframe with the columns renamed
-
retrieve_from_store
(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None) → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.
PySparkDataAsset
(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAsset
A PySpark enabled Airtunnel data asset.
-
declarations
¶
-
name
¶
-
rebuild_for_store
(airflow_context, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using PySpark).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared
(data: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input PySpark dataframe to perform the rename on
- Returns
the PySpark dataframe with the columns renamed
-
retrieve_from_store
(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None, spark_session: Optional[pyspark.sql.session.SparkSession] = None) → pyspark.sql.dataframe.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
spark_session – a live Spark session to use (required)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.
BaseDataAssetIO
¶ Bases:
abc.ABC
Base class for all Airtunnel DataAssetIO.
-
static
prepare_staging_ready_path
(asset: airtunnel.data_asset.BaseDataAsset) → None¶ For a given data asset, prepare the staging/ready path for it, i.e. ensure it exists if this is the first ever run.
- Parameters
asset – the data asset for which to prepare the staging/ready path
- Returns
None
-
abstract static
read_data_asset
(asset: airtunnel.data_asset.BaseDataAsset, source_files: Iterable[str], **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Reads a data asset.
- Parameters
asset – the data asset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
abstract static
retrieve_data_asset
(asset: airtunnel.data_asset.BaseDataAsset, **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieves a data asset from the Airtunnel data store.
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the retrieved data as a dataframe
-
abstract static
write_data_asset
(asset: airtunnel.data_asset.BaseDataAsset, data: Union[pandas.core.frame.DataFrame, pyspark.sql.DataFrame], **writer_kwargs) → None¶ Writes a data asset.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the writer function
- Returns
None
-
static
-
class
airtunnel.
PandasDataAssetIO
¶ Bases:
airtunnel.data_asset.BaseDataAssetIO
IO interface for the Airtunnel PandasDataAsset.
-
static
read_data_asset
(asset: airtunnel.data_asset.PandasDataAsset, source_files: Iterable[str], **reader_kwargs) → pandas.core.frame.DataFrame¶ Reads a PandasDataAsset using Pandas.
- Parameters
asset – the PandasDataAsset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset
(asset: airtunnel.data_asset.PandasDataAsset, **reader_kwargs) → pandas.core.frame.DataFrame¶ Retrieves a PandasDataAsset from the Airtunnel data store (using Pandas).
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the Pandas reader function
- Returns
the retrieved data as a Pandas dataframe
-
static
write_data_asset
(asset: airtunnel.data_asset.PandasDataAsset, data: pandas.core.frame.DataFrame, **writer_kwargs) → None¶ Writes a PandasDataAsset using Pandas.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the Pandas writer function
- Returns
None
-
static
-
class
airtunnel.
PySparkDataAssetIO
¶ Bases:
airtunnel.data_asset.BaseDataAssetIO
IO interface for the Airtunnel PySparkDataAsset.
-
static
read_data_asset
(asset: airtunnel.data_asset.PySparkDataAsset, source_files: Iterable[str], spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Reads a PandasDataAsset using PySpark.
- Parameters
asset – the PySparkDataAsset to read
source_files – input source file(s) to read
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset
(asset: airtunnel.data_asset.PySparkDataAsset, spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Retrieves a PySparkDataAsset from the Airtunnel data store (using PySpark).
- Parameters
asset – the data asset to retrieve
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the retrieved data as a PySpark dataframe
-
static
write_data_asset
(asset: airtunnel.data_asset.PySparkDataAsset, data: pyspark.sql.dataframe.DataFrame, **writer_kwargs) → None¶ Writes a PySparkDataAsset using PySpark.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the PySpark writer function
- Returns
None
-
static