Data Quality Results Storage
In order to use all features of the framework, it is required to set up a results storage. Checkita can use various RDBMS as a results storage. Also, Hive can be used as a results storage and even a simple file storage is supported.
The full list of various storage types is following:
PostgreSQL
(v.9.3 and higher) - recommended database to be used as resutls storage.Oracle
MySQL
Microsoft SQL Server
SQLite
H2
Hive
File
(directory in local file system or remote one (HDFS, S3))
Checkita framework support results storage schema evolution. Flyway is run under the hood to
support schema migrations. Therefore, if one of the supported RDBMS is chosen for results storage then it is possible
to set it up during the first run of the Data Quality job providing -m
application argument on startup.
For more details on how to run Data Quality applications refer to
Submitting Data Quality Application chapter.
IMPORTANT: Flyway migrations usually run either in empty database/schema or in one that was initiated with Flyway. In Checkita framework it is also possible to run migration in non-empty database/schema. In this case it is up to user to ensure that there are no conflicting table names in database/schema.
If File
type of storage is used then it is only required to provide a path to a directory/bucket, where results
will be stored. Results are stored as parquet files with the same schema as for RDMS storage. No schema evolution
mechanisms are provided for File
type of storage. Therefore, if results schemas would evolve later,
it will be up to user to update existing results to a new structure.
IMPORTANT: There is no partitioning used for storing results as parquet files. Every job will read entire results history and overwrite it adding new ones. Therefore, using
File
type of storage is not recommended for production use.
For Hive
type of storage the schema evolution mechanisms are also not available. Therefore, it is up to user to
create corresponding hive tables. DDL scripts from Hive Storage Setup Scripts chapter
below can be used for that.
IMPORTANT: Results hive table must be partitioned by
job_id
. Job ID is chosen as partition column to support faster results fetching during computation of trend checks (used for anomaly detection in data).Hive
type of results storage works faster thatFile
one, since only partition for currentjob_id
is read and overwritten. Nevertheless, this type of storage is also not recommended for use in production where large number of jobs will be run.
Results Types and Schemas
There are for types of result are written in storage:
- regular metrics results
- composed metrics results
- trend metric results
- load checks results
- checks results
- job state
Schemas for all results types are given below.
Primary keys denotes how we keep track if unique records: generally results for the same Data Quality job that is run for the same reference date are overwritten. History of various attempts of the same job for the same reference date is not stored. It is done in order trend metrics and trend checks work correctly as their computation requires reading historical results from Data Quality storage. Therefore, it is required that there will be only one set of results per Data Quality job and given reference date.
Regular Metrics Results Schema
- Primary key:
(job_id, metric_id, metric_name, reference_date)
; source_id
&column_names
contain string representation of lists in format'[val1,val2,val3]'
.params
is a JSON string.
Column Name | Column Type | Constraint |
---|---|---|
job_id | STRING | NOT NULL |
metric_id | STRING | NOT NULL |
metric_name | STRING | NOT NULL |
description | STRING | |
metadata | STRING | |
source_id | STRING | NOT NULL |
column_names | STRING | |
params | STRING | |
result | DOUBLE | NOT NULL |
additional_result | STRING | |
reference_date | TIMESTAMP | NOT NULL |
execution_date | TIMESTAMP | NOT NULL |
Composed Metrics Results Schema
- Primary key:
(job_id, metric_id, reference_date)
; source_id
contains string representation of list in format'[val1,val2,val3]'
.
Column Name | Column Type | Constraint |
---|---|---|
job_id | STRING | NOT NULL |
metric_id | STRING | NOT NULL |
metric_name | STRING | NOT NULL |
description | STRING | |
metadata | STRING | |
source_id | STRING | NOT NULL |
formula | STRING | NOT NULL |
result | DOUBLE | NOT NULL |
additional_result | STRING | |
reference_date | TIMESTAMP | NOT NULL |
execution_date | TIMESTAMP | NOT NULL |
Trend Metrics Results Schema
- Primary key:
(job_id, metric_id, reference_date)
; source_id
contains string representation of list in format'[val1,val2,val3]'
.params
is a JSON string.
Column Name | Column Type | Constraint |
---|---|---|
job_id | STRING | NOT NULL |
metric_id | STRING | NOT NULL |
metric_name | STRING | NOT NULL |
description | STRING | |
metadata | STRING | |
source_id | STRING | NOT NULL |
params | STRING | |
result | DOUBLE | NOT NULL |
additional_result | STRING | |
reference_date | TIMESTAMP | NOT NULL |
execution_date | TIMESTAMP | NOT NULL |
Metric Error Results Schema
- Primary key:
(job_id, error_hash, reference_date)
; source_id
is a JSON string;source_key_fields
is a JSON string;metric_columns
is a JSON string;row_data
is a JSON string.errorHash
is a MD5 hash string computed from values of columnsmetric_id
,status
,message
androw_data
NOTE Error hash is computed with use of raw value of
row_data
field even if it is encrypted later.
Column Name | Column Type | Constraint |
---|---|---|
job_id | TEXT | NOT NULL |
metric_id | TEXT | NOT NULL |
source_id | TEXT | |
source_key_fields | TEXT | |
metric_columns | TEXT | |
status | TEXT | NOT NULL |
message | TEXT | NOT NULL |
row_data | TEXT | NOT NULL |
error_hash | TEXT | NOT NULL |
reference_date | TIMESTAMP | NOT NULL |
execution_date | TIMESTAMP | NOT NULL |
Load Checks Results Schema
- Primary key:
(job_id, check_id, reference_date)
; source_id
contains string representation of lists in format'[val1,val2,val3]'
.
Column Name | Column Type | Constraint |
---|---|---|
job_id | STRING | NOT NULL |
check_id | STRING | NOT NULL |
check_name | STRING | NOT NULL |
description | STRING | |
metadata | STRING | |
source_id | STRING | NOT NULL |
expected | STRING | NOT NULL |
status | STRING | NOT NULL |
message | STRING | |
is_critical | BOOLEAN | NOT NULL |
reference_date | TIMESTAMP | NOT NULL |
execution_date | TIMESTAMP | NOT NULL |
Checks Results Schema
- Primary key:
(job_id, check_id, reference_date)
; source_id
contains string representation of lists in format'[val1,val2,val3]'
.compare_metric
contains sequence of metrics to compare with[val1,val2]
. Usually empty list or single metric. But expression checks can use multiple metrics in their formulas, which will be stored in this field, except the very first metric which is considered asbase_metric
.
Column Name | Column Type | Constraint |
---|---|---|
job_id | STRING | NOT NULL |
check_id | STRING | NOT NULL |
check_name | STRING | NOT NULL |
description | STRING | |
metadata | STRING | |
source_id | STRING | NOT NULL |
base_metric | STRING | NOT NULL |
compared_metric | STRING | |
compared_threshold | DOUBLE | |
lower_bound | DOUBLE | |
upper_bound | DOUBLE | |
status | STRING | NOT NULL |
message | STRING | |
is_critical | BOOLEAN | NOT NULL |
reference_date | TIMESTAMP | NOT NULL |
execution_date | TIMESTAMP | NOT NULL |
Job State Schema
- Primary key:
(job_id, reference_date)
; version_info
is a JSON string;config
is a JSON string.
Column Name | Column Type | Constraint |
---|---|---|
job_id | STRING | NOT NULL |
config | STRING | NOT NULL |
version_info | STRING | |
reference_date | TIMESTAMP | NOT NULL |
execution_date | TIMESTAMP | NOT NULL |
Hive Storage Setup Scripts
Below is a HiveQL script that can be used to set up Hive results storage:
-- REPLACE <schema_name> and <schema_dir> with actual name and path:
set hivevar:schema_name=<schema_name>;
set hivevar:schema_dir=<schema_path>;
CREATE SCHEMA IF NOT EXISTS ${schema_name};
DROP TABLE IF EXISTS ${schema_name}.results_metric_regular;
CREATE EXTERNAL TABLE ${schema_name}.results_metric_regular
(
metric_id STRING COMMENT '',
metric_name STRING COMMENT '',
description STRING COMMENT '',
metadata STRING COMMENT '',
source_id STRING COMMENT '',
column_names STRING COMMENT '',
params STRING COMMENT '',
result DOUBLE COMMENT '',
additional_result STRING COMMENT '',
reference_date TIMESTAMP COMMENT '',
execution_date TIMESTAMP COMMENT ''
)
COMMENT 'Data Quality Regular Metrics Results'
PARTITIONED BY (job_id STRING)
STORED AS PARQUET
LOCATION '${schema_dir}/results_metric_regular';
DROP TABLE IF EXISTS ${schema_name}.results_metric_composed;
CREATE EXTERNAL TABLE ${schema_name}.results_metric_composed
(
metric_id STRING COMMENT '',
metric_name STRING COMMENT '',
description STRING COMMENT '',
metadata STRING COMMENT '',
source_id STRING COMMENT '',
formula STRING COMMENT '',
result DOUBLE COMMENT '',
additional_result STRING COMMENT '',
reference_date TIMESTAMP COMMENT '',
execution_date TIMESTAMP COMMENT ''
)
COMMENT 'Data Quality Composed Metrics Results'
PARTITIONED BY (job_id STRING)
STORED AS PARQUET
LOCATION '${schema_dir}/results_metric_composed';
DROP TABLE IF EXISTS ${schema_name}.results_metric_trend;
CREATE EXTERNAL TABLE ${schema_name}.results_metric_trend
(
metric_id STRING COMMENT '',
metric_name STRING COMMENT '',
description STRING COMMENT '',
metadata STRING COMMENT '',
source_id STRING COMMENT '',
params STRING COMMENT '',
result DOUBLE COMMENT '',
additional_result STRING COMMENT '',
reference_date TIMESTAMP COMMENT '',
execution_date TIMESTAMP COMMENT ''
)
COMMENT 'Data Quality Trend Metrics Results'
PARTITIONED BY (job_id STRING)
STORED AS PARQUET
LOCATION '${schema_dir}/results_metric_trend';
DROP TABLE IF EXISTS ${schema_name}.results_metric_error;
CREATE EXTERNAL TABLE ${schema_name}.results_metric_error
(
metric_id STRING COMMENT '',
source_id STRING COMMENT '',
source_key_fields STRING COMMENT '',
metric_columns STRING COMMENT '',
status STRING COMMENT '',
message STRING COMMENT '',
row_data STRING COMMENT '',
error_hash STRING COMMENT '',
reference_date TIMESTAMP COMMENT '',
execution_date TIMESTAMP COMMENT ''
)
COMMENT 'Data Quality Metrics Error Results'
PARTITIONED BY (job_id STRING)
STORED AS PARQUET
LOCATION '${schema_dir}/results_metric_error';
DROP TABLE IF EXISTS ${schema_name}.results_check_load;
CREATE EXTERNAL TABLE ${schema_name}.results_check_load
(
check_id STRING COMMENT '',
check_name STRING COMMENT '',
description STRING COMMENT '',
metadata STRING COMMENT '',
source_id STRING COMMENT '',
expected STRING COMMENT '',
status STRING COMMENT '',
message STRING COMMENT '',
is_critical BOOLEAN COMMENT '',
reference_date TIMESTAMP COMMENT '',
execution_date TIMESTAMP COMMENT ''
)
COMMENT 'Data Quality Load Checks Results'
PARTITIONED BY (job_id STRING)
STORED AS PARQUET
LOCATION '${schema_dir}/results_check_load';
DROP TABLE IF EXISTS ${schema_name}.results_check;
CREATE EXTERNAL TABLE ${schema_name}.results_check
(
check_id STRING COMMENT '',
check_name STRING COMMENT '',
description STRING COMMENT '',
metadata STRING COMMENT '',
source_id STRING COMMENT '',
base_metric STRING COMMENT '',
compared_metric STRING COMMENT '',
compared_threshold DOUBLE COMMENT '',
lower_bound DOUBLE COMMENT '',
upper_bound DOUBLE COMMENT '',
status STRING COMMENT '',
message STRING COMMENT '',
is_critical BOOLEAN COMMENT '',
reference_date TIMESTAMP COMMENT '',
execution_date TIMESTAMP COMMENT ''
)
COMMENT 'Data Quality Checks Results'
PARTITIONED BY (job_id STRING)
STORED AS PARQUET
LOCATION '${schema_dir}/results_check';
DROP TABLE IF EXISTS ${schema_name}.job_state;
CREATE EXTERNAL TABLE ${schema_name}.job_state
(
config STRING COMMENT '',
version_info STRING COMMENT '',
reference_date TIMESTAMP COMMENT '',
execution_date TIMESTAMP COMMENT ''
)
COMMENT 'Data Quality Job State'
PARTITIONED BY (job_id STRING)
STORED AS PARQUET
LOCATION '${schema_dir}/job_state';