airtunnel package¶
Subpackages¶
Submodules¶
airtunnel.data_asset module¶
Module for Airtunnel data asset abstractions and data asset io abstractions.
-
class
airtunnel.data_asset.BaseDataAsset(name: str)¶ Bases:
objectBase class for all Airtunnel data assets.
-
declarations¶ Declarations of this data asset
-
property
discovered_files_xcom_key¶ - Returns
the XCOM key for discovered files for this data asset
-
property
ingest_archive_path¶ - Returns
the ingest/archive path for this data asset
-
property
landing_path¶ - Returns
the ingest/landing path for this data asset
-
make_ready_temp_path(airflow_context: Dict) → str¶ Makes a temporary path under ready for staging purposes.
It points to a hidden (prefixed by “.”) folder as the last component, so that this path is ignored by frameworks like Spark.
- Parameters
airflow_context – a live Airflow context, used to get the exec timestamp
- Returns
the temporary staging path
-
name¶ Name of this data asset
-
property
output_filename¶ - Returns
creates an output filename for this data asset considering the declared storage format and compression codec (if using compression)
-
pickedup_files(airflow_context) → List[str]¶ Using Airflow XCOM based on the given Airflow context with its DAG id, fetch the list of previously discovered files for ingestion.
Then, considering the files have been picked-up by the ingestion operator, adjust the path prefix for the picked-up folder and return the list of file paths.
- Parameters
airflow_context – a live Airflow context
- Returns
list of picked-up file paths
-
ready_archive_path(airflow_context) → str¶ Return the versioned archive path for this data asset, based on the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned archived path for this data asset
-
property
ready_path¶ - Returns
the ready (folder) path for this data asset
-
abstract
rebuild_for_store(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder.
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
abstract
retrieve_from_store() → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieve this data asset from the ready layer of the data store.
- Returns
A pandas or PySpark dataframe
-
staging_pickedup_path(airflow_context) → str¶ Return the pickedup path under staging for this data asset, versioned by the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned staging/pickedup path for this data asset
-
property
staging_ready_path¶ - Returns
the staging/ready path for this data asset
-
-
class
airtunnel.data_asset.BaseDataAssetIO¶ Bases:
abc.ABCBase class for all Airtunnel DataAssetIO.
-
static
prepare_staging_ready_path(asset: airtunnel.data_asset.BaseDataAsset) → None¶ For a given data asset, prepare the staging/ready path for it, i.e. ensure it exists if this is the first ever run.
- Parameters
asset – the data asset for which to prepare the staging/ready path
- Returns
None
-
abstract static
read_data_asset(asset: airtunnel.data_asset.BaseDataAsset, source_files: Iterable[str], **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Reads a data asset.
- Parameters
asset – the data asset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
abstract static
retrieve_data_asset(asset: airtunnel.data_asset.BaseDataAsset, **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieves a data asset from the Airtunnel data store.
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the retrieved data as a dataframe
-
abstract static
write_data_asset(asset: airtunnel.data_asset.BaseDataAsset, data: Union[pandas.core.frame.DataFrame, pyspark.sql.DataFrame], **writer_kwargs) → None¶ Writes a data asset.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the writer function
- Returns
None
-
static
-
class
airtunnel.data_asset.PandasDataAsset(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAssetImplements a Pandas enabled Airtunnel data asset.
-
declarations¶
-
name¶
-
rebuild_for_store(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder (using Pandas).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared(data: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input Pandas dataframe to perform the rename on
- Returns
the dataframe with the columns renamed
-
retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None) → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.data_asset.PandasDataAssetIO¶ Bases:
airtunnel.data_asset.BaseDataAssetIOIO interface for the Airtunnel PandasDataAsset.
-
static
read_data_asset(asset: airtunnel.data_asset.PandasDataAsset, source_files: Iterable[str], **reader_kwargs) → pandas.core.frame.DataFrame¶ Reads a PandasDataAsset using Pandas.
- Parameters
asset – the PandasDataAsset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset(asset: airtunnel.data_asset.PandasDataAsset, **reader_kwargs) → pandas.core.frame.DataFrame¶ Retrieves a PandasDataAsset from the Airtunnel data store (using Pandas).
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the Pandas reader function
- Returns
the retrieved data as a Pandas dataframe
-
static
write_data_asset(asset: airtunnel.data_asset.PandasDataAsset, data: pandas.core.frame.DataFrame, **writer_kwargs) → None¶ Writes a PandasDataAsset using Pandas.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the Pandas writer function
- Returns
None
-
static
-
class
airtunnel.data_asset.PySparkDataAsset(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAssetA PySpark enabled Airtunnel data asset.
-
declarations¶
-
name¶
-
rebuild_for_store(airflow_context, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using PySpark).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared(data: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input PySpark dataframe to perform the rename on
- Returns
the PySpark dataframe with the columns renamed
-
retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None, spark_session: Optional[pyspark.sql.session.SparkSession] = None) → pyspark.sql.dataframe.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
spark_session – a live Spark session to use (required)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.data_asset.PySparkDataAssetIO¶ Bases:
airtunnel.data_asset.BaseDataAssetIOIO interface for the Airtunnel PySparkDataAsset.
-
static
read_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, source_files: Iterable[str], spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Reads a PandasDataAsset using PySpark.
- Parameters
asset – the PySparkDataAsset to read
source_files – input source file(s) to read
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Retrieves a PySparkDataAsset from the Airtunnel data store (using PySpark).
- Parameters
asset – the data asset to retrieve
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the retrieved data as a PySpark dataframe
-
static
write_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, data: pyspark.sql.dataframe.DataFrame, **writer_kwargs) → None¶ Writes a PySparkDataAsset using PySpark.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the PySpark writer function
- Returns
None
-
static
-
class
airtunnel.data_asset.SQLDataAsset(name: str, sql_hook: airflow.hooks.dbapi_hook.DbApiHook)¶ Bases:
airtunnel.data_asset.BaseDataAssetA SQL enabled Airtunnel data asset.
-
declarations¶
-
formatted_sql_script(parameters: Dict, script_type: str = 'dml', dynamic_parameters: Callable = None, airflow_context=None) → str¶ Get the formatted SQL script for this data asset.
- Parameters
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
airflow_context – a live Airflow context – passed into the function to compute dynamic parameters
- Returns
the formatted SQL script for this data asset
-
get_raw_sql_script(script_type: str = 'dml') → str¶ Returns the raw SQL script for this data asset.
- Parameters
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
- Returns
the raw SQL script as a string
-
name¶
-
rebuild_for_store(airflow_context, parameters: Dict = None, dynamic_parameters: Callable = None, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using SQL).
- Parameters
airflow_context – a live Airflow context
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
retrieve_from_store() → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using SQL).
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.data_asset.ShellDataAsset(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAssetA shell DataAsset, acting merely as a container - i.e. for lineage.
-
declarations¶
-
static
from_names(names: Iterable[str]) → List[airtunnel.data_asset.ShellDataAsset]¶ Create several ShellDataAssets from a list of names.
- Parameters
names – the names for which to create ShellDataAssets for
- Returns
list of created ShellDataAssets
-
name¶
-
rebuild_for_store(airflow_context, **kwargs) → None¶ Unimplemented method of the BaseDataAsset interface – do not use!
-
retrieve_from_store()¶ Unimplemented method of the BaseDataAsset interface – do not use!
-
to_full_data_asset(target_type: type) → Union[airtunnel.data_asset.PySparkDataAsset, airtunnel.data_asset.PandasDataAsset]¶ Convert this ShellDataAsset to a full Airtunnel data asset of a given type.
- Parameters
target_type – the target class to convert this ShellDataAsset to
- Returns
the full data asset
-
airtunnel.data_store module¶
Module for Airtunnel DataStore abstractions.
-
class
airtunnel.data_store.BaseDataStoreAdapter¶ Bases:
abc.ABCBase class for all Airtunnel DataStoreAdapters.
-
abstract static
copy(source: str, destination: str, recursive: bool = False) → None¶ Copy file(s) from source to destination.
-
abstract static
delete(path: str, recursive: bool = False) → None¶ Delete file(s) at a given path.
-
abstract static
exists(path: str, **kwargs) → bool¶ Checks the given path exists.
-
abstract static
glob(pattern: str, **kwargs) → List[str]¶ Fetch file list using the provided glob pattern.
-
abstract static
inspect(files: List[str]) → Dict[str, Tuple[datetime.datetime, datetime.datetime, int]]¶ For a given file list, fetch create & modification datetime and byte size.
-
abstract static
listdir(path: str, recursive: bool = False, **kwargs) → List[str]¶ List entries of a given path.
-
abstract static
makedirs(path: str, exist_ok: bool = False, **kwargs) → None¶ Make directories along the given path.
-
abstract static
modification_times(files: List[str]) → Dict[str, int]¶ For a given file list, fetch modification times as int Unix timestamps.
-
abstract static
move(source: str, destination: str, recursive: bool = False) → None¶ Move file(s) from source to destination.
-
abstract static
open(file: str, mode: str, **kwargs) → TextIO¶ Open a file-handle with the given path & mode.
-
abstract static
-
class
airtunnel.data_store.LocalDataStoreAdapter¶ Bases:
airtunnel.data_store.BaseDataStoreAdapterDataStoreAdapter implementation targeting the local filesystem.
-
static
copy(source: str, destination: str, recursive: bool = False) → None¶ Copy file(s) from source to destination.
-
static
delete(path: str, recursive: bool = False) → None¶ Delete file(s) at a given path.
-
static
exists(path: str, **kwargs) → bool¶ Checks the given path exists.
-
static
glob(pattern: str, **kwargs) → List[str]¶ Fetch file list using the provided glob pattern.
-
abstract static
inspect(files: List[str]) → Dict[str, Tuple[datetime.datetime, datetime.datetime, int]]¶ For a given file list, fetch create & modification datetime and byte size.
-
static
listdir(path: str, recursive: bool = False, **kwargs) → List[str]¶ List entries of a given path.
-
static
makedirs(path: str, exist_ok: bool = False, **kwargs) → None¶ Make directories along the given path.
-
static
modification_times(files: List[str]) → Dict[str, int]¶ For a given file list, fetch modification times as int Unix timestamps.
-
static
move(source: str, destination: str, recursive: bool = False) → None¶ Move file(s) from source to destination.
-
static
open(file: str, mode: str, **kwargs) → TextIO¶ Open a file-handle with the given path & mode.
-
static
-
airtunnel.data_store.get_configured_data_store_adapter() → airtunnel.data_store.BaseDataStoreAdapter¶ Gets the configured DataStoreAdapter.
airtunnel.declaration_store module¶
Module for Airtunnel Declaration Store abstractions.
-
class
airtunnel.declaration_store.DataAssetDeclaration(data_asset: str)¶ Bases:
objectAbstraction around declarations for an Airtunnel data asset.
-
property
all¶ Get all declarations
-
property
archive_ingest¶ Whether to archive the ingested files.
-
property
archive_ready¶ Whether to archive the ready layer data of this data asset.
-
property
asset_name¶ Name of the declared asset.
-
property
extra_declarations¶ Return extra declarations.
-
property
in_storage_format¶ The declared input storage format.
-
property
ingest_file_glob¶ The declared file glob for ingestion.
-
property
is_csv_input¶ Whether the input for this data asset is CSV.
-
property
is_csv_output¶ Whether the output for this data asset is CSV.
-
property
is_parquet_input¶ Whether the input for this data asset is Parquet.
-
property
is_parquet_output¶ Whether the output for this data asset is Parquet.
-
property
is_xls_input¶ Whether the input for this data asset is Excel.
-
property
key_columns¶ Return list of declared key columns.
-
property
out_comp_codec¶ The declared output compression codec.
-
property
out_storage_format¶ The declared output storage format.
-
property
run_ddl¶ Whether to run DDL for this data asset.
-
property
staging_assets¶ Return list of declared staging assets.
-
property
transform_date_formats¶ The declared data formats.
-
property
transform_renames¶ The declared rename transformations.
-
property
-
class
airtunnel.declaration_store.DeclarationSchemas¶ Bases:
objectWrapper to hold all our YAML schemas to validate declaration files.
-
DERIVED_DATA_ASSET= Schema({'type': And(<class 'str'>, <function DeclarationSchemas.<lambda>>), Optional('load'): Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])}), Optional('staging_assets'): Schema([Schema(<class 'str'>)]), Optional('extra'): Schema({<class 'object'>: <class 'object'>})})¶
-
INGESTED_DATA_ASSET= Schema({'type': And(<class 'str'>, <function DeclarationSchemas.<lambda>>), 'ingest': Schema({Optional('in_storage_format'): <function DeclarationSchemas.<lambda>>, 'file_input_glob': And(<class 'str'>, <built-in function len>), Optional('archive_ingest'): <function DeclarationSchemas.<lambda>>}), Optional('transformation'): Schema({Optional('in_column_renames'): And(<class 'dict'>, <function DeclarationSchemas.<lambda>>, <function DeclarationSchemas.<lambda>>), Optional('in_date_formats'): And(<class 'dict'>, <function DeclarationSchemas.<lambda>>)}), Optional('load'): Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])}), Optional('staging_assets'): Schema([Schema(<class 'str'>)]), Optional('extra'): Schema(<class 'object'>)})¶
-
LOAD_SCHEMA= Schema({Optional('out_storage_format'): And(<function DeclarationSchemas.<lambda>>), Optional('out_compression_codec'): And(<function DeclarationSchemas.<lambda>>), Optional('archive_ready'): <function DeclarationSchemas.<lambda>>, Optional('run_ddl'): <function DeclarationSchemas.<lambda>>, Optional('key_columns'): Schema([Schema(<class 'str'>)])})¶
-
STAGING_ASSETS= Schema([Schema(<class 'str'>)])¶
-
-
airtunnel.declaration_store.fetch_all_declarations() → Iterable[airtunnel.declaration_store.DataAssetDeclaration]¶ Fetch declarations for all data assets that exist.
airtunnel.paths module¶
Module for Airtunnel’s paths, i.e. to the declaration, data and scripts store folders.
Module contents¶
airtunnel – tame your Airflow!
-
class
airtunnel.BaseDataAsset(name: str)¶ Bases:
objectBase class for all Airtunnel data assets.
-
declarations¶ Declarations of this data asset
-
property
discovered_files_xcom_key¶ - Returns
the XCOM key for discovered files for this data asset
-
property
ingest_archive_path¶ - Returns
the ingest/archive path for this data asset
-
property
landing_path¶ - Returns
the ingest/landing path for this data asset
-
make_ready_temp_path(airflow_context: Dict) → str¶ Makes a temporary path under ready for staging purposes.
It points to a hidden (prefixed by “.”) folder as the last component, so that this path is ignored by frameworks like Spark.
- Parameters
airflow_context – a live Airflow context, used to get the exec timestamp
- Returns
the temporary staging path
-
name¶ Name of this data asset
-
property
output_filename¶ - Returns
creates an output filename for this data asset considering the declared storage format and compression codec (if using compression)
-
pickedup_files(airflow_context) → List[str]¶ Using Airflow XCOM based on the given Airflow context with its DAG id, fetch the list of previously discovered files for ingestion.
Then, considering the files have been picked-up by the ingestion operator, adjust the path prefix for the picked-up folder and return the list of file paths.
- Parameters
airflow_context – a live Airflow context
- Returns
list of picked-up file paths
-
ready_archive_path(airflow_context) → str¶ Return the versioned archive path for this data asset, based on the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned archived path for this data asset
-
property
ready_path¶ - Returns
the ready (folder) path for this data asset
-
abstract
rebuild_for_store(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder.
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
abstract
retrieve_from_store() → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieve this data asset from the ready layer of the data store.
- Returns
A pandas or PySpark dataframe
-
staging_pickedup_path(airflow_context) → str¶ Return the pickedup path under staging for this data asset, versioned by the Airflow execution date.
- Parameters
airflow_context – a live Airflow context
- Returns
the versioned staging/pickedup path for this data asset
-
property
staging_ready_path¶ - Returns
the staging/ready path for this data asset
-
-
class
airtunnel.SQLDataAsset(name: str, sql_hook: airflow.hooks.dbapi_hook.DbApiHook)¶ Bases:
airtunnel.data_asset.BaseDataAssetA SQL enabled Airtunnel data asset.
-
declarations¶
-
formatted_sql_script(parameters: Dict, script_type: str = 'dml', dynamic_parameters: Callable = None, airflow_context=None) → str¶ Get the formatted SQL script for this data asset.
- Parameters
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
airflow_context – a live Airflow context – passed into the function to compute dynamic parameters
- Returns
the formatted SQL script for this data asset
-
get_raw_sql_script(script_type: str = 'dml') → str¶ Returns the raw SQL script for this data asset.
- Parameters
script_type – i.e. dml, ddl – translates to a subfolder in the script-store/sql/ folder
- Returns
the raw SQL script as a string
-
name¶
-
rebuild_for_store(airflow_context, parameters: Dict = None, dynamic_parameters: Callable = None, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using SQL).
- Parameters
airflow_context – a live Airflow context
parameters – dictionary of parameters to inject into the script (i.e. to format the raw string with)
dynamic_parameters – a callable that is dynamically executed to compute SQL script parameters at runtime
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
retrieve_from_store() → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using SQL).
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.PandasDataAsset(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAssetImplements a Pandas enabled Airtunnel data asset.
-
declarations¶
-
name¶
-
rebuild_for_store(airflow_context, **kwargs) → None¶ Rebuild this data asset for the data store’s ready folder (using Pandas).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared(data: pandas.core.frame.DataFrame) → pandas.core.frame.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input Pandas dataframe to perform the rename on
- Returns
the dataframe with the columns renamed
-
retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None) → pandas.core.frame.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.PySparkDataAsset(name: str)¶ Bases:
airtunnel.data_asset.BaseDataAssetA PySpark enabled Airtunnel data asset.
-
declarations¶
-
name¶
-
rebuild_for_store(airflow_context, **kwargs)¶ Rebuild this data asset for the data store’s ready folder (using PySpark).
- Parameters
airflow_context – a live Airflow context
kwargs – additional keyword arguments to be passed along the downstream scripts
- Returns
None
-
rename_fields_as_declared(data: pyspark.sql.dataframe.DataFrame) → pyspark.sql.dataframe.DataFrame¶ Rename the columns as declared for this data asset.
- Parameters
data – the input PySpark dataframe to perform the rename on
- Returns
the PySpark dataframe with the columns renamed
-
retrieve_from_store(airflow_context=None, consuming_asset: Optional[airtunnel.data_asset.BaseDataAsset] = None, spark_session: Optional[pyspark.sql.session.SparkSession] = None) → pyspark.sql.dataframe.DataFrame¶ Retrieve this data asset from the ready layer of the data store (using Pandas).
- Parameters
airflow_context – a live Airflow context (optional)
consuming_asset – the consuming data asset for lineage collection (optional)
spark_session – a live Spark session to use (required)
- Returns
the data asset as a Pandas dataframe
-
-
class
airtunnel.BaseDataAssetIO¶ Bases:
abc.ABCBase class for all Airtunnel DataAssetIO.
-
static
prepare_staging_ready_path(asset: airtunnel.data_asset.BaseDataAsset) → None¶ For a given data asset, prepare the staging/ready path for it, i.e. ensure it exists if this is the first ever run.
- Parameters
asset – the data asset for which to prepare the staging/ready path
- Returns
None
-
abstract static
read_data_asset(asset: airtunnel.data_asset.BaseDataAsset, source_files: Iterable[str], **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Reads a data asset.
- Parameters
asset – the data asset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
abstract static
retrieve_data_asset(asset: airtunnel.data_asset.BaseDataAsset, **reader_kwargs) → Union[pandas.core.frame.DataFrame, pyspark.sql.dataframe.DataFrame]¶ Retrieves a data asset from the Airtunnel data store.
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the retrieved data as a dataframe
-
abstract static
write_data_asset(asset: airtunnel.data_asset.BaseDataAsset, data: Union[pandas.core.frame.DataFrame, pyspark.sql.DataFrame], **writer_kwargs) → None¶ Writes a data asset.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the writer function
- Returns
None
-
static
-
class
airtunnel.PandasDataAssetIO¶ Bases:
airtunnel.data_asset.BaseDataAssetIOIO interface for the Airtunnel PandasDataAsset.
-
static
read_data_asset(asset: airtunnel.data_asset.PandasDataAsset, source_files: Iterable[str], **reader_kwargs) → pandas.core.frame.DataFrame¶ Reads a PandasDataAsset using Pandas.
- Parameters
asset – the PandasDataAsset to read
source_files – input source file(s) to read
reader_kwargs – additional keyword arguments to pass into the reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset(asset: airtunnel.data_asset.PandasDataAsset, **reader_kwargs) → pandas.core.frame.DataFrame¶ Retrieves a PandasDataAsset from the Airtunnel data store (using Pandas).
- Parameters
asset – the data asset to retrieve
reader_kwargs – additional keyword arguments to pass into the Pandas reader function
- Returns
the retrieved data as a Pandas dataframe
-
static
write_data_asset(asset: airtunnel.data_asset.PandasDataAsset, data: pandas.core.frame.DataFrame, **writer_kwargs) → None¶ Writes a PandasDataAsset using Pandas.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the Pandas writer function
- Returns
None
-
static
-
class
airtunnel.PySparkDataAssetIO¶ Bases:
airtunnel.data_asset.BaseDataAssetIOIO interface for the Airtunnel PySparkDataAsset.
-
static
read_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, source_files: Iterable[str], spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Reads a PandasDataAsset using PySpark.
- Parameters
asset – the PySparkDataAsset to read
source_files – input source file(s) to read
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the read in data as a dataframe
-
static
retrieve_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, spark_session: Optional[pyspark.sql.session.SparkSession] = None, **reader_kwargs) → pyspark.sql.dataframe.DataFrame¶ Retrieves a PySparkDataAsset from the Airtunnel data store (using PySpark).
- Parameters
asset – the data asset to retrieve
spark_session – a live PySpark session to use (required)
reader_kwargs – additional keyword arguments to pass into the PySpark reader function
- Returns
the retrieved data as a PySpark dataframe
-
static
write_data_asset(asset: airtunnel.data_asset.PySparkDataAsset, data: pyspark.sql.dataframe.DataFrame, **writer_kwargs) → None¶ Writes a PySparkDataAsset using PySpark.
- Parameters
asset – the data asset to write
data – date to write for the data asset
writer_kwargs – additional keyword arguments to pass into the PySpark writer function
- Returns
None
-
static