airtunnel.metadata package

Submodules

airtunnel.metadata.adapter module

Module defining Airtunnel metadata adapters.

class airtunnel.metadata.adapter.BaseHookFactory

Bases: object

Base class to define the interface we expect for a custom hook factory.

abstract static make_hook() → Optional[airflow.hooks.base_hook.BaseHook]

Creates and returns an Airflow hook that should be used to log metadata.

class airtunnel.metadata.adapter.BaseMetaAdapter

Bases: abc.ABC

Base class for all Airtunnel MetaAdapters.

abstract read_inspected_files(for_asset: airtunnel.data_asset.BaseDataAsset, dag_id: str, dag_exec_date: datetime.datetime) → List[airtunnel.metadata.entities.IngestedFileMetadata]

Read existing metadata of previously inspected files.

Parameters
  • for_asset – a data asset for which to retrieve the metadata for

  • dag_id – an Airflow DAG ID for which to retrieve the metadata for

  • dag_exec_date – an Airflow DAG execution date for which to retrieve the metadata for

Returns

list of metadata entities

abstract read_lineage(for_target: airtunnel.data_asset.BaseDataAsset, dag_id: Optional[str] = None, dag_exec_date: Optional[datetime.datetime] = None) → List[Tuple[airtunnel.metadata.entities.Lineage, int]]

Read previously logged lineage metadata.

Parameters
  • for_target – the data asset for which to retrieve lineage information for, here the data target

  • dag_id – the DAG ID for which to restrict the lineage search on (optional)

  • dag_exec_date – the DAG execution datetime for which to restrict the lineage search on (optional)

Returns

the lineage information captured as a list of tuples, in which the first element is a lineage metadata entity and the second an int of the upstream level in the lineage chain

abstract read_load_status(for_asset: airtunnel.data_asset.BaseDataAsset) → airtunnel.metadata.entities.LoadStatus

Read previously logged load status for a data asset.

Parameters

for_asset – data asset for which to retrieve the load status information

Returns

the load status metadata entity

abstract write_generic_metadata(for_asset: airtunnel.data_asset.BaseDataAsset, payload: Dict) → None

Log generic metadata in JSON format.

Parameters
  • for_asset – a data asset for which to retrieve the metadata for

  • payload – the generic metadata to log – needs to be serializable as JSON

Returns

None

abstract write_inspected_files(discovered_files: Iterable[airtunnel.metadata.entities.IngestedFileMetadata]) → None

Log metadata of discovered files of the ingestion process.

Parameters

discovered_files – iterable of metadata entities for all discovered files

Returns

None

abstract write_lineage(lineage: airtunnel.metadata.entities.Lineage) → None

Log lineage information.

Parameters

lineage – the lineage entity to log.

Returns

None

abstract write_load_status(load_status: airtunnel.metadata.entities.LoadStatus) → None

Log load status metadata.

Parameters

load_status – the load status entity to log.

Returns

None

class airtunnel.metadata.adapter.DefaultSQLHookFactory

Bases: airtunnel.metadata.adapter.BaseHookFactory

Default (dummy) hook factory used with the SQLMetaAdapter if nothing custom is defined.

static make_hook() → Optional[airflow.hooks.base_hook.BaseHook]

This implementation does not create a hook, but returns ‘None’ so that the SQLMetaAdapter falls back to simply creating a SQLAlchemy engine based on Airflow’s configured SQLAlchemy connection string. We do not attempt to create a hook in this scenario, as we can not know for certain the type of database backend one uses with their Airflow setup, and therefor we can not safely chose and instantiate the hook of the right type.

class airtunnel.metadata.adapter.SQLMetaAdapter(sql_hook: airflow.hooks.dbapi_hook.DbApiHook = None)

Bases: airtunnel.metadata.adapter.BaseMetaAdapter

BaseMetaAdapter implementation using SQLAlchemy, hence compatible with relational databases that SQLAlchemy supports. The database connection that is used for metadata logging (=SQLAlchemy engine) can be customized through an Airflow DbApiHook that is either explicitly given as part of the constructor, or configured using the Airtunnel configuration parameter: meta_adapter_hook_factory.

If neither of these two define a DbApiHook to use, SQLMetaAdapter will use Airflow’s backend database by leveraging the configured SQLAlchemy connection string and creating the engine based on it.

FN_DAG_ID = 'dag_id'
FN_DATA_ASSET = 'data_asset'
FN_DATA_ASSET_SRC = 'data_asset_source'
FN_DATA_ASSET_TRG = 'data_asset_target'
FN_EXEC_DATE = 'dag_exec_datetime'
FN_FILE_CREATE_TIME = 'createtime'
FN_FILE_MOD_TIME = 'modtime'
FN_FILE_PATH = 'path'
FN_FILE_SIZE = 'size'
FN_LOAD_TIME = 'load_time'
FN_METADATA_JSON = 'metadata_json'
FN_METADATA_TIME = 'metadata_time'
FN_METADATA_TYPE = 'metadata_type'
FN_TASK_ID = 'task_id'
TABLES = ('airtunnel_load_status', 'airtunnel_lineage', 'airtunnel_metadata', 'airtunnel_load_status_hist', 'airtunnel_ingested_files')
TN_GENERIC_METADATA = 'airtunnel_metadata'
TN_INFILE_METADATA = 'airtunnel_ingested_files'
TN_LINEAGE = 'airtunnel_lineage'
TN_LOAD_STATUS = 'airtunnel_load_status'
TN_LOAD_STATUS_HIST = 'airtunnel_load_status_hist'
read_inspected_files(for_asset: airtunnel.data_asset.BaseDataAsset, dag_id: str, dag_exec_date: datetime.datetime) → List[airtunnel.metadata.entities.IngestedFileMetadata]

Read existing metadata of previously inspected files.

Parameters
  • for_asset – a data asset for which to retrieve the metadata for

  • dag_id – an Airflow DAG ID for which to retrieve the metadata for

  • dag_exec_date – an Airflow DAG execution date for which to retrieve the metadata for

Returns

list of metadata entities

read_lineage(for_target: airtunnel.data_asset.BaseDataAsset, dag_id: Optional[str] = None, dag_exec_date: Optional[datetime.datetime] = None) → List[Tuple[airtunnel.metadata.entities.Lineage, int]]

Read previously logged lineage metadata.

Parameters
  • for_target – the data asset for which to retrieve lineage information for, here the data target.

  • dag_id – the DAG ID for which to restrict the lineage search on (optional)

  • dag_exec_date – the DAG execution datetime for which to restrict the lineage search on (optional)

Returns

the lineage information captured as a list of tuples, in which the first element is a lineage metadata entity and the second an int of the upstream level in the lineage chain

read_load_status(for_asset: airtunnel.data_asset.BaseDataAsset) → airtunnel.metadata.entities.LoadStatus

Read previously logged load status for a data asset.

Parameters

for_asset – data asset for which to retrieve the load status information

Returns

the load status metadata entity

write_generic_metadata(for_asset: airtunnel.data_asset.BaseDataAsset, payload: Dict)

Log generic metadata in JSON format.

Parameters
  • for_asset – a data asset for which to retrieve the metadata for

  • payload – the generic metadata to log – needs to be serializable as JSON

Returns

None

write_inspected_files(discovered_files: Iterable[airtunnel.metadata.entities.IngestedFileMetadata]) → None

Log metadata of discovered files of the ingestion process.

Parameters

discovered_files – iterable of metadata entities for all discovered files

Returns

None

write_lineage(lineage: airtunnel.metadata.entities.Lineage)

Log lineage information.

Parameters

lineage – the lineage entity to log.

Returns

None

write_load_status(load_status: airtunnel.metadata.entities.LoadStatus)

Log load status metadata.

Parameters

load_status – the load status entity to log.

Returns

None

airtunnel.metadata.adapter.get_configured_meta_adapter() → airtunnel.metadata.adapter.BaseMetaAdapter

Gets the configured (or default) BaseMetaAdapter to use for metadata operations.

It can be defined using the config key meta_adapter_class with a value that points to a class implementing airtunnel.metadata.adapter.BaseMetaAdapter. This class will then be retrieved and used as the metadata adapter in Airtunnel’s Airflow operators.

Returns

the configured (or default) BaseMetaAdapter

airtunnel.metadata.adapter.get_configured_meta_adapter_hook() → Optional[airflow.hooks.base_hook.BaseHook]

Gets the configured hook to use for metadata operations.

It can be defined using the config key meta_adapter_hook_factory with a value that points to a class implementing airtunnel.metadata.adapter.BaseHookFactory. This class will then be retrieved and the method make_hook() will be called to get the hook.

If nothing defined, airtunnel.metadata.adapter.DefaultSQLHookFactory is used, which returns ‘None’ for a hook – letting an implemented metadata adapter proceed to create their own default hook/connection.

Returns

a BaseHook to use for metadata operations or ‘None’ if not defined

airtunnel.metadata.entities module

Module defining Airtunnel metadata entities.

class airtunnel.metadata.entities.IngestedFileMetadata(for_asset: airtunnel.data_asset.BaseDataAsset, filepath: str, filesize: int, file_mod_time: datetime.datetime, file_create_time: datetime.datetime, dag_id: str, dag_exec_date: datetime.datetime, task_id: str)

Bases: object

Metadata entity that holds information on a single file ingested for an Airtunnel data asset.

property dag_exec_date

The Airflow dag execution datetime that ingested the file.

property dag_id

The Airflow DAG ID that ingested the file.

property file_create_time

The datetime of creation of the ingested file.

property file_mod_time

The datetime of last modification of the ingested file.

property filepath

The path of the ingested file.

property filesize

The size in byte of the ingested file.

property for_asset

The Airtunnel data asset for which the file was ingested for.

property task_id

The Airflow task ID that ingested the file.

class airtunnel.metadata.entities.Lineage(data_sources: Iterable[airtunnel.data_asset.BaseDataAsset], data_target: airtunnel.data_asset.BaseDataAsset, dag_id: Optional[str] = None, dag_exec_date: Optional[datetime.datetime] = None, task_id: Optional[str] = None)

Bases: object

Metadata entity that holds lineage information between Airtunnel data assets. Comprised of a single Airtunnel data asset – the data target – and one or multiple of its data sources.

Optionally an Airflow DAG ID, task ID and DAG execution datetime can be given to further specify the lineage context.

property dag_exec_date

The DAG execution datetime when this lineage occurred.

property dag_id

The DAG ID where this lineage occurred.

property data_sources

An iterable of data sources as part of this lineage entity.

property data_target

The single data target as part of this lineage entity.

static lineage_from_sql_script(script_file_relative_path: str = None) → Iterable[airtunnel.metadata.entities.Lineage]

Extract the lineage information from a SQL script.

Parameters

script_file_relative_path – the relative path to the SQL script from the Airtunnel SQL scripts folder

Returns

iterable with Lineage entities

static lineage_from_sql_statement(statement: str, known_data_assets: Optional[Iterable[str]] = None) → airtunnel.metadata.entities.Lineage

Extract the lineage metadata from a (simple) SQL statement.

Parameters
  • statement – the string with the SQL statement

  • known_data_assets – iterable of known data asset names to restrict the search space of lineage sources and targets to it - if not given, all known data assets will be fetched from the declaration store

Returns

the lineage entity with ShellDataAssets as data source(s) and data target and without additional context (DAG ID, task ID, DAG execution datetime)

property task_id

The task ID where this lineage occurred.

class airtunnel.metadata.entities.LoadStatus(for_asset: airtunnel.data_asset.BaseDataAsset, load_time: Optional[datetime.datetime] = None, dag_id: Optional[str] = None, task_id: Optional[str] = None, dag_exec_date: Optional[datetime.datetime] = None)

Bases: object

Defines the load status metadata information of an Airtunnel DataAsset.

property dag_exec_date

The Airflow DAG execution datetime of this LoadStatus.

property dag_id

The Airflow DAG ID of this LoadStatus.

is_within(frame: datetime.timedelta) → bool

Check if the LoadStatus entity at hand is within a time frame, i.e. is it within the last 6 hours?

Parameters

frame – the time frame (timedelta) to use for the check, from the current datetime as the reference point

Returns

boolean value whether within time frame or not

property load_time

The load time of this LoadStatus.

property task_id

The Airflow task ID of this LoadStatus.

Module contents

Package containing Airtunnel’s extensions to the Airflow metadata model.