Metric Calculator Engines Overview and Benchmarking
Checkita DataQuality contains various data quality metrics which all are computed during single pass over the data. Still, there are some metrics that are based on exact cardinality evaluation. Computation of this metrics requires either storing large state in memory (unique records) or shuffling data. First approach can cause OOM errors and the second one violates statement that data is read only once.
Checkita has to engines for metric computation: RDD-based engine and DF-based engine. Thus, RDD-based engine uses first approach to calculate cardinality-based metrics: metric calculators store in memory collection of all unique records. The DF-based engine, on the other hand, groups and shuffles the data to compute cardinality during latter reduce stage.
There are some additional points that need to be considered when choosing appropriate engine to run data quality job:
- RDD-based engine provide more flexibility when it is required to work closely with row data. Downside of such flexibility is that accessing row elements as Java-objects yields significant serialization/deserialization overhead.
- DF-based engine on the other hand operates using Spark UnsafeRow format which is stored off-heap and, in addition, has some performance leverage from using Tungsten whole-stage code generation. Problem here is that intermediate state of the calculator cannot be accessed and operated with.
Concluding the statements above, we recommend using DF-based engine for batch applications. While streaming applications support only RDD-based engine due to states of the calculators have to be merged between the processing windows and out of the Spark runtime.
Benchmarking
Benchmark tests below separated into two parts:
- evaluate all single-pass metrics (except cardinality-based ones)
- evaluate cardinality-based metrics
Along with running Checkita DQ jobs using either RDD-based of DF-based engine, the same metrics were computed using PySpark job in order to assess how well Checkita performs metric computation as compared to raw Spark code.
About Dataset
The New York Taxi dataset was chosen for processing as it contains massive amount of data collected since 2009.
For the purpose of DQ job run the data for 2009 year was selected from following sources:
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-01.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-02.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-03.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-04.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-05.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-06.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-07.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-08.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-09.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-10.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-11.parquet
https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2009-12.parquet
Datasets are stored in Parquet format and are read as is during DQ job processing.
All parquet files has the same schema:
root
|-- vendor_name: string (nullable = true)
|-- Trip_Pickup_DateTime: string (nullable = true)
|-- Trip_Dropoff_DateTime: string (nullable = true)
|-- Passenger_Count: long (nullable = true)
|-- Trip_Distance: double (nullable = true)
|-- Start_Lon: double (nullable = true)
|-- Start_Lat: double (nullable = true)
|-- Rate_Code: double (nullable = true)
|-- store_and_forward: double (nullable = true)
|-- End_Lon: double (nullable = true)
|-- End_Lat: double (nullable = true)
|-- Payment_Type: string (nullable = true)
|-- Fare_Amt: double (nullable = true)
|-- surcharge: double (nullable = true)
|-- mta_tax: double (nullable = true)
|-- Tip_Amt: double (nullable = true)
|-- Tolls_Amt: double (nullable = true)
|-- Total_Amt: double (nullable = true)
Spark Configuration
All jobs in these test were run using the same Spark application configuration with following parameters:
"spark.driver.cores" = 2
"spark.driver.memory" = "4G"
"spark.executor.memory" = "12G"
"spark.executor.cores" = "4"
"spark.executor.instances" = "3"
"spark.shuffle.service.enabled" = "true"
"spark.sql.shuffle.partitions" = "12"
"spark.default.parallelism" = "12"
"spark.dynamicAllocation.enabled" = "false"
"spark.sql.orc.enabled" = "true"
"spark.sql.parquet.compression.codec" = "snappy"
"spark.sql.autoBroadcastJoinThreshold" = -1
"spark.sql.caseSensitive" = "false"
"spark.sql.legacy.timeParserPolicy" = "CORRECTED"
IMPORTANT For cardinality-based metric tests the parallelism has been increased to 1000:
"spark.sql.shuffle.partitions" = "1000" "spark.default.parallelism" = "1000"
PySpark Job Run
First of all, we tried to compute the data quality metrics directly, using PySpark API and measured time, that job had taken.
Below is the code for computing single-pass metrics:
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.sql.functions import col, lit, when, avg, sum as Fsum, min as Fmin, max as Fmax, datediff, \
length, split, covar_pop, covar_samp, percentile_approx, levenshtein, stddev_pop, count, to_timestamp, upper
# spark = <initiate spark session>
ytd = spark.read.parquet('<path to folder with NY Taxi dataset>')
ytd.withColumn(
"null_pmt_typ", when(col('payment_type').isNull(), lit(1)).otherwise(lit(0))
).withColumn(
"empty_pmt_typ", when(col('payment_type') == lit(''), lit(1)).otherwise(lit(0))
).withColumn(
"null_vendor", when(col('vendor_name').isNull(), lit(1)).otherwise(lit(0))
).withColumn(
"date_1", to_timestamp(col('trip_pickup_datetime'), "yyyy-MM-dd HH-mm-ss")
).withColumn(
"date_2", to_timestamp(col('trip_dropoff_datetime'), "yyyy-MM-dd HH-mm-ss")
).withColumn(
"date_fmt_1", when(col('date_1').isNull(), lit(0)).otherwise(lit(1))
).withColumn(
"date_fmt_2", when(col('date_2').isNull(), lit(0)).otherwise(lit(1))
).withColumn(
"precision", length(col('fare_amt').cast(StringType())) - lit(1)
).withColumn(
"scale", length(split(col('fare_amt').cast(StringType()), '\.').getItem(1))
).withColumn(
"num_fmt_crit", (col('precision') <= lit(3)) & (col('scale') <= lit(2))
).withColumn(
"num_to_cast", split(col('trip_pickup_datetime'), '-').getItem(0)
).select(
count(lit(1)).alias('row_count'),
Fsum(col('null_pmt_typ')).alias('null_vals'),
Fsum(when(col('empty_pmt_typ') == lit(''), lit(1)).otherwise(lit(0))).alias('empty_vals'),
(lit(1) - Fsum(col('null_pmt_typ') + col('null_vendor')) / lit(2) / count(lit(1))).alias('completeness'),
Fmin(length(col('payment_type'))).alias('min_str'),
Fmax(length(col('payment_type'))).alias('max_str'),
avg(length(col('payment_type'))).alias('avg_str'),
Fsum(when(length(col('payment_type').cast(StringType())) == lit(4), lit(1)).otherwise(lit(0))).alias('str_len'),
Fsum(when(col('vendor_name').isin("VTS", "DDS", "CMT"), lit(1)).otherwise(lit(0))).alias('str_in_dmn'),
Fsum(when(~col('payment_type').isin("Credit", "Cash"), lit(1)).otherwise(lit(0))).alias('str_out_dmn'),
Fsum(when(col('payment_type') == lit("No Charge"), lit(1)).otherwise(lit(0))).alias('str_vals'),
Fsum(when(col('tip_amt').rlike(r"^\d\d+\..+$"), lit(1)).otherwise(lit(0))).alias('rgx_match'),
Fsum(when(~col('end_lat').rlike(r"^40\..*$"), lit(1)).otherwise(lit(0))).alias('rgx_mismatch'),
Fsum(col('date_fmt_1') + col('date_fmt_2')).alias('fmt_date'),
Fsum(when(col('num_fmt_crit'), lit(1)).otherwise(lit(0))).alias('fmt_number'),
Fmin(col('total_amt')).alias('min_num'),
Fmax(col('total_amt')).alias('max_num'),
Fsum(col('total_amt')).alias('sum_num'),
avg(col('total_amt')).alias('avg_num'),
stddev_pop(col('total_amt')).alias('std_num'),
Fsum(when(col('num_to_cast').cast(DoubleType()).isNull(), lit(0)).otherwise(lit(1))).alias('cast_num'),
Fsum(when(col('passenger_count').isin(1, 2, 3), lit(1)).otherwise(lit(0))).alias('num_in_dmn'),
Fsum(when(~col('passenger_count').isin(1, 2, 3, 4, 5), lit(1)).otherwise(lit(0))).alias('num_out_dmn'),
Fsum(when(col('trip_distance') < lit(1.0), lit(1)).otherwise(lit(0))).alias('num_less_than'),
Fsum(when(col('trip_distance') > lit(30.0), lit(1)).otherwise(lit(0))).alias('num_greater_than'),
Fsum(when(
(col('trip_distance') >= lit(5.0)) & (col('trip_distance') <= lit(10.0)), lit(1)
).otherwise(lit(0))).alias('num_btwn'),
Fsum(when(
(col('trip_distance') <= lit(1.0)) | (col('trip_distance') >= lit(10.0)), lit(1)
).otherwise(lit(0))).alias('num_not_btwn'),
Fsum(when(col('tolls_amt') == lit(0.0), lit(1)).otherwise(lit(0))).alias('num_vals'),
percentile_approx(col('fare_amt'), 0.5, 1000).alias('median_val'),
percentile_approx(col('fare_amt'), 0.25, 1000).alias('first_quant_val'),
percentile_approx(col('fare_amt'), 0.75, 1000).alias('third_quant_val'),
percentile_approx(col('fare_amt'), 0.9, 1000).alias('get_quant_val'),
Fsum(when(col('start_lon') == col('end_lon'), lit(1)).otherwise(lit(0))).alias('col_eq'),
Fsum(when(datediff(col('date_1'), col('date_2')) < lit(1), lit(1)).otherwise(lit(0))).alias('date_dist'),
Fsum(when(
levenshtein(upper(col('start_lat').cast(StringType())), upper(col('end_lat').cast(StringType()))) < lit(3), lit(1)
).otherwise(lit(0))).alias('lvnst_dist'),
(covar_pop(col('trip_distance'), col('fare_amt')) * count(lit(1))).alias('comoment'),
covar_pop(col('trip_distance'), col('fare_amt')).alias('covariance'),
covar_samp(col('trip_distance'), col('fare_amt')).alias('covarianceBessel')
).show(truncate=False, vertical=True)
Pyspark Single-Pass Metrics Test Results
Summary:
Measurement | Value |
---|---|
Overall Job Execution Time | 6.3 min |
Total Number of Tasks | 46 |
Total time across all tasks | 52 min |
Total input size | 5.3 GiB |
Total records | 170896055 |
Shuffle size | 1467.6 KiB |
Shuffle records | 46 |
Longest tasks statistics (tasks that actually read all 12 dataset parquet files):
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records |
---|---|---|---|---|---|---|
0 | SUCCESS | 1 | 6.3 min | 12 s | 498.3 MiB / 15604551 | 127.8 KiB / 1 |
0 | SUCCESS | 1 | 5.0 min | 11 s | 445.9 MiB / 14092413 | 119.5 KiB / 1 |
0 | SUCCESS | 1 | 5.0 min | 11 s | 452.3 MiB / 14275339 | 124.1 KiB / 1 |
0 | SUCCESS | 1 | 4.8 min | 11 s | 420.4 MiB / 13380122 | 117.1 KiB / 1 |
0 | SUCCESS | 2 | 4.0 min | 8 s | 469.1 MiB / 14796313 | 123.6 KiB / 1 |
0 | SUCCESS | 2 | 4.0 min | 8 s | 461.3 MiB / 14583404 | 124.1 KiB / 1 |
0 | SUCCESS | 2 | 3.9 min | 8 s | 456.9 MiB / 14387371 | 118.7 KiB / 1 |
0 | SUCCESS | 3 | 3.8 min | 9 s | 452.8 MiB / 14294783 | 117.5 KiB / 1 |
0 | SUCCESS | 2 | 3.8 min | 8 s | 434.7 MiB / 13686520 | 122.4 KiB / 1 |
0 | SUCCESS | 3 | 3.7 min | 8 s | 449.2 MiB / 14184249 | 123.7 KiB / 1 |
0 | SUCCESS | 3 | 3.7 min | 8 s | 444.3 MiB / 13984887 | 123.1 KiB / 1 |
0 | SUCCESS | 3 | 3.6 min | 8 s | 431.4 MiB / 13626103 | 122.5 KiB / 1 |
Pyspark Grouping Metrics Test Results
Distinct Values Metric
First test for cardinality-based metrics is to calculate number of distinct values. Below is the PySpark code to calculate number of distinct trip distances:
dist_vals = ytd.groupBy('trip_distance').agg(lit(1)).count()
print(dist_vals)
Summary:
Measurement | Value |
---|---|
Overall Job Execution Time | 58 sec |
Total Number of Tasks | 996 |
Total time across all tasks | 1.1 min |
Total input size | 228.3 MiB |
Total records | 170896055 |
Shuffle size | 1015.7 KiB |
Shuffle records | 47860 |
Mapping tasks statistics (tasks that actually scanned all 12 dataset parquet files). Note that scanning parquet files for distinct column values is extremely fast!
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records |
---|---|---|---|---|---|---|
0 | SUCCESS | 2 | 1 s | 18.9 MiB / 15604551 | 83.5 KiB / 3882 | |
0 | SUCCESS | 1 | 1 s | 19.0 ms | 18.1 MiB / 14092413 | 102.8 KiB / 5716 |
0 | SUCCESS | 1 | 1 s | 18.1 MiB / 14583404 | 83.6 KiB / 3862 | |
0 | SUCCESS | 1 | 1 s | 18.0 ms | 18.0 MiB / 14796313 | 82.8 KiB / 3817 |
0 | SUCCESS | 3 | 1 s | 17.6 MiB / 14294783 | 82.6 KiB / 3799 | |
0 | SUCCESS | 3 | 1 s | 17.6 MiB / 14184249 | 82.6 KiB / 3816 | |
0 | SUCCESS | 3 | 1 s | 17.3 MiB / 14275339 | 82.9 KiB / 3826 | |
0 | SUCCESS | 3 | 1 s | 17.1 MiB / 14387371 | 82.2 KiB / 3797 | |
0 | SUCCESS | 2 | 1 s | 17.0 MiB / 13984887 | 83.2 KiB / 3841 | |
0 | SUCCESS | 1 | 1 s | 17.0 MiB / 13686520 | 83.6 KiB / 3879 | |
0 | SUCCESS | 2 | 1 s | 16.8 MiB / 13626103 | 82.9 KiB / 3834 | |
0 | SUCCESS | 3 | 1 s | 16.1 MiB / 13380122 | 83.0 KiB / 3791 |
Duplicate Values Metric
Let's calculate duplicated trips. These are trips for which following columns do match:
vendor_name
trip_pickup_datetime
trip_dropoff_datetime
start_lon
start_lat
end_lon
end_lat
PySpark code to calculate number of duplicated trips is following:
dupl_vals = ytd.groupBy(
'vendor_name',
'trip_pickup_datetime',
'trip_dropoff_datetime',
'start_lon',
'start_lat',
'end_lon',
'end_lat'
).agg(
(count(lit(1)) - lit(1)).alias('cnt')
).select(Fsum(col('cnt'))).collect()[0][0]
print(dupl_vals)
Summary: Note that shuffling operations did spill lots of data to both memory and disk!
Measurement | Value |
---|---|
Overall Job Execution Time | 2.5 min |
Total Number of Tasks | 996 |
Total time across all tasks | 15 min |
Total input size | 4.6 GiB |
Total records | 170896055 |
Shuffle size | 7.9 GiB |
Shuffle records | 170861620 |
Spill Memory | 27.4 GiB |
Spill Disk | 6.8 GiB |
Mapping tasks statistics (tasks that actually scanned all 12 dataset parquet files).
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records | Spill (Memory) | Spill (Disk) |
---|---|---|---|---|---|---|---|---|
0 | SUCCESS | 2 | 1.9 min | 12 s | 405.4 MiB / 14796313 | 672.8 MiB / 14793946 | 3.9 GiB | 1 GiB |
0 | SUCCESS | 2 | 1.9 min | 12 s | 393.4 MiB / 14387371 | 655.1 MiB / 14386632 | 3.9 GiB | 1 GiB |
0 | SUCCESS | 2 | 1.8 min | 11 s | 387.8 MiB / 14184249 | 647.2 MiB / 14181699 | 2.6 GiB | 564.6 MiB |
0 | SUCCESS | 2 | 1.7 min | 12 s | 372.1 MiB / 13626103 | 622.2 MiB / 13624190 | 3.7 GiB | 1002.1 MiB |
0 | SUCCESS | 3 | 1.3 min | 4 s | 397.2 MiB / 14583404 | 661.9 MiB / 14575564 | 4.4 GiB | 1.2 GiB |
0 | SUCCESS | 3 | 1.2 min | 5 s | 375.2 MiB / 13686520 | 624.4 MiB / 13684841 | 3.8 GiB | 997.5 MiB |
0 | SUCCESS | 3 | 1.2 min | 3 s | 390.0 MiB / 14275339 | 648.4 MiB / 14270095 | 2.6 GiB | 565 MiB |
0 | SUCCESS | 3 | 1.1 min | 3 s | 383.1 MiB / 14092413 | 641.1 MiB / 14090159 | 2.5 GiB | 559.5 MiB |
0 | SUCCESS | 1 | 40 s | 3 s | 429.6 MiB / 15604551 | 808.8 MiB / 15599852 | ||
0 | SUCCESS | 1 | 36 s | 3 s | 390.2 MiB / 14294783 | 740.1 MiB / 14293543 | ||
0 | SUCCESS | 1 | 34 s | 1 s | 383.3 MiB / 13984887 | 724.5 MiB / 13982756 | ||
0 | SUCCESS | 1 | 33 s | 0.8 s | 362.4 MiB / 13380122 | 692.6 MiB / 13378343 |
Checkita RDD-Engine Job Run
Checkita RDD-Engine Single-Pass Metrics Test Results
Summary:
Measurement | Value |
---|---|
Overall Job Execution Time | 23 min |
Total Number of Tasks | 46 |
Total time across all tasks | 3.7 hour |
Total input size | 5.3 GiB |
Total records | 170896055 |
Shuffle size | 2.1 MiB |
Shuffle records | 46 |
Longest tasks statistics (tasks that actually read all 12 dataset parquet files):
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records |
---|---|---|---|---|---|---|
0 | SUCCESS | 1 | 23 min | 49 s | 456.2 MiB / 14275339 | 129.4 KiB / 1 |
0 | SUCCESS | 1 | 22 min | 49 s | 448.0 MiB / 14092413 | 179.4 KiB / 1 |
0 | SUCCESS | 1 | 22 min | 49 s | 446.7 MiB / 13984887 | 138.4 KiB / 1 |
0 | SUCCESS | 1 | 22 min | 48 s | 422.9 MiB / 13380122 | 156.5 KiB / 1 |
0 | SUCCESS | 2 | 18 min | 27 s | 503.1 MiB / 15604551 | 139.6 KiB / 1 |
0 | SUCCESS | 2 | 17 min | 27 s | 472.4 MiB / 14796313 | 131.4 KiB / 1 |
0 | SUCCESS | 2 | 17 min | 26 s | 456.0 MiB / 14294783 | 159.3 KiB / 1 |
0 | SUCCESS | 3 | 17 min | 23 s | 465.0 MiB / 14583404 | 133.3 KiB / 1 |
0 | SUCCESS | 3 | 16 min | 23 s | 460.2 MiB / 14387371 | 166.2 KiB / 1 |
0 | SUCCESS | 3 | 16 min | 23 s | 451.7 MiB / 14184249 | 135.0 KiB / 1 |
0 | SUCCESS | 2 | 16 min | 26 s | 437.0 MiB / 13686520 | 139.9 KiB / 1 |
0 | SUCCESS | 3 | 16 min | 23 s | 433.6 MiB / 13626103 | 137.6 KiB / 1 |
Checkita RDD-Engine Cardinality Metrics Test Results
Distinct Values Metric
Summary:
Measurement | Value |
---|---|
Overall Job Execution Time | 2.1 min |
Total Number of Tasks | 996 |
Total time across all tasks | 14 min |
Total input size | 5.3 GiB |
Total records | 170896055 |
Shuffle size | 1621.7 KiB |
Shuffle records | 996 |
Longest tasks statistics (tasks that actually read all 12 dataset parquet files):
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records |
---|---|---|---|---|---|---|
0 | SUCCESS | 1 | 1.5 min | 3 s | 503.1 MiB / 15604551 | 20.6 KiB / 1 |
0 | SUCCESS | 1 | 1.4 min | 3 s | 465 MiB / 14583404 | 20.5 KiB / 1 |
0 | SUCCESS | 1 | 1.3 min | 3 s | 446.7 MiB / 13984887 | 20.4 KiB / 1 |
0 | SUCCESS | 1 | 1.3 min | 3 s | 448 MiB / 14092413 | 31.2 KiB / 1 |
0 | SUCCESS | 3 | 58 s | 2 s | 472.4 MiB / 14796313 | 20.1 KiB / 1 |
0 | SUCCESS | 3 | 57 s | 2 s | 456 MiB / 14294783 | 20.1 KiB / 1 |
0 | SUCCESS | 3 | 55 s | 1.0 s | 460.2 MiB / 14387371 | 20.1 KiB / 1 |
0 | SUCCESS | 2 | 54 s | 1 s | 437 MiB / 13686520 | 20.5 KiB / 1 |
0 | SUCCESS | 2 | 53 s | 0.7 s | 451.7 MiB / 14184249 | 20.1 KiB / 1 |
0 | SUCCESS | 2 | 53 s | 0.6 s | 456.2 MiB / 14275339 | 20.2 KiB / 1 |
0 | SUCCESS | 2 | 53 s | 1 s | 433.6 MiB / 13626103 | 20.3 KiB / 1 |
0 | SUCCESS | 3 | 46 s | 0.4 s | 422.9 MiB / 13380122 | 20.3 KiB / 1 |
Duplicate Values Metric
Calculation failed with OOM error. The reason is that RDD-metric calculator for duplicate values must store all unique values within its state, i.e. within simple Java collection. For cases when number of unique records is high (in our case its about 170 million records) the executors will fail with OOM error.
Checkita DF-Engine Job Run
Checkita DF-Engine Single-Pass Metrics Test Results
Summary:
Measurement | Value |
---|---|
Overall Job Execution Time | 4.1 min |
Total Number of Tasks | 46 |
Total time across all tasks | 46 min |
Total input size | 5.3 GiB |
Total records | 170896055 |
Shuffle size | 4.5 MiB |
Shuffle records | 46 |
Longest tasks statistics (tasks that actually read all 12 dataset parquet files):
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records |
---|---|---|---|---|---|---|
0 | SUCCESS | 2 | 4.0 min | 4 s | 469.1 MiB / 14796313 | 358.9 KiB / 1 |
0 | SUCCESS | 3 | 4.0 min | 3 s | 498.3 MiB / 15604551 | 401.7 KiB / 1 |
0 | SUCCESS | 3 | 3.9 min | 2 s | 456.9 MiB / 14387371 | 381.0 KiB / 1 |
0 | SUCCESS | 2 | 3.9 min | 4 s | 461.3 MiB / 14583404 | 359.1 KiB / 1 |
0 | SUCCESS | 2 | 3.9 min | 4 s | 449.2 MiB / 14184249 | 359.4 KiB / 1 |
0 | SUCCESS | 1 | 3.8 min | 4 s | 445.9 MiB / 14092413 | 415.1 KiB / 1 |
0 | SUCCESS | 2 | 3.8 min | 4 s | 452.8 MiB / 14294783 | 375.9 KiB / 1 |
0 | SUCCESS | 1 | 3.8 min | 4 s | 444.3 MiB / 13984887 | 390.0 KiB / 1 |
0 | SUCCESS | 1 | 3.7 min | 4 s | 452.3 MiB / 14275339 | 396.9 KiB / 1 |
0 | SUCCESS | 3 | 3.6 min | 2 s | 431.4 MiB / 13626103 | 358.4 KiB / 1 |
0 | SUCCESS | 1 | 3.5 min | 3 s | 420.4 MiB / 13380122 | 391.2 KiB / 1 |
0 | SUCCESS | 3 | 3.4 min | 2 s | 434.7 MiB / 13686520 | 361.4 KiB / 1 |
Checkita DF-Engine Cardinality Metrics Test Results
Distinct Values Metric
Summary:
Measurement | Value |
---|---|
Overall Job Execution Time | 1.2 min |
Total Number of Tasks | 996 |
Total time across all tasks | 4.0 min |
Total input size | 908.2 MiB |
Total records | 170896055 |
Shuffle size | 199.0 MiB |
Shuffle records | 47860 |
Longest tasks statistics (tasks that actually read all 12 dataset parquet files):
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records |
---|---|---|---|---|---|---|
0 | SUCCESS | 3 | 20 s | 3 s | 81.0 MiB / 15604551 | 17.1 MiB / 3882 |
0 | SUCCESS | 2 | 15 s | 1 s | 74.7 MiB / 14387371 | 16.5 MiB / 3797 |
0 | SUCCESS | 1 | 15 s | 1 s | 74.9 MiB / 14294783 | 16.3 MiB / 3799 |
0 | SUCCESS | 3 | 14 s | 1 s | 77.3 MiB / 14796313 | 16.8 MiB / 3817 |
0 | SUCCESS | 2 | 14 s | 0.5 s | 74.0 MiB / 14275339 | 16.7 MiB / 3826 |
0 | SUCCESS | 3 | 13 s | 1 s | 74.4 MiB / 14092413 | 16.9 MiB / 5716 |
0 | SUCCESS | 3 | 13 s | 1 s | 72.4 MiB / 13984887 | 16.7 MiB / 3841 |
0 | SUCCESS | 2 | 12 s | 0.5 s | 69.3 MiB / 13380122 | 15.6 MiB / 3791 |
0 | SUCCESS | 3 | 12 s | 1.0 s | 73.8 MiB / 14184249 | 16.5 MiB / 3816 |
0 | SUCCESS | 1 | 12 s | 0.7 s | 71.2 MiB / 13686520 | 16.7 MiB / 3879 |
0 | SUCCESS | 1 | 12 s | 0.2 s | 75.9 MiB / 14583404 | 16.8 MiB / 3862 |
0 | SUCCESS | 1 | 12 s | 0.4 s | 70.7 MiB / 13626103 | 16.4 MiB / 3834 |
Duplicate Values Metric
Summary: Note that shuffling operations did spill lots of data to both memory and disk!
Measurement | Value |
---|---|
Overall Job Execution Time | 3.3 min |
Total Number of Tasks | 996 |
Total time across all tasks | 28 min |
Total input size | 4.6 GiB |
Total records | 170896055 |
Shuffle size | 15.8 GiB |
Shuffle records | 170861620 |
Spill Memory | 85.6 GiB |
Spill Disk | 19.6 GiB |
Mapping tasks statistics (tasks that actually scanned all 12 dataset parquet files).
Attempt | Status | Executor ID | Duration | GC Time | Input Size / Records | Shuffle Write Size / Records | Spill (Memory) | Spill (Disk) |
---|---|---|---|---|---|---|---|---|
0 | SUCCESS | 1 | 2.6 min | 9 s | 390.2 MiB / 14294783 | 1.3 GiB / 14293543 | 7.7 GiB | 1.8 GiB |
0 | SUCCESS | 1 | 2.5 min | 6 s | 383.1 MiB / 14092413 | 1.3 GiB / 14090159 | 6.3 GiB | 1.4 GiB |
0 | SUCCESS | 1 | 2.5 min | 8 s | 372.1 MiB / 13626103 | 1.3 GiB / 13624190 | 6.0 GiB | 1.4 GiB |
0 | SUCCESS | 1 | 2.4 min | 9 s | 375.2 MiB / 13686520 | 1.3 GiB / 13684841 | 7.6 GiB | 1.8 GiB |
0 | SUCCESS | 2 | 2.3 min | 7 s | 390.0 MiB / 14275339 | 1.3 GiB / 14270095 | 6.4 GiB | 1.4 GiB |
0 | SUCCESS | 2 | 2.2 min | 8 s | 383.3 MiB / 13984887 | 1.3 GiB / 13982756 | 6.3 GiB | 1.4 GiB |
0 | SUCCESS | 3 | 2.2 min | 7 s | 429.6 MiB / 15604551 | 1.4 GiB / 15599852 | 8.0 GiB | 1.8 GiB |
0 | SUCCESS | 2 | 2.2 min | 8 s | 387.8 MiB / 14184249 | 1.3 GiB / 14181699 | 8.3 GiB | 1.9 GiB |
0 | SUCCESS | 2 | 2.2 min | 9 s | 405.4 MiB / 14796313 | 1.4 GiB / 14793946 | 7.8 GiB | 1.8 GiB |
0 | SUCCESS | 3 | 2.1 min | 7 s | 397.2 MiB / 14583404 | 1.3 GiB / 14575564 | 8.4 GiB | 1.9 GiB |
0 | SUCCESS | 3 | 2.1 min | 7 s | 393.4 MiB / 14387371 | 1.3 GiB / 14386632 | 8.2 GiB | 1.9 GiB |
0 | SUCCESS | 3 | 1.9 min | 3 s | 362.4 MiB / 13380122 | 1.2 GiB / 13378343 | 4.7 GiB | 1015.8 MiB |
Overall Summary
Below are some general conclusions that can be made based on benchmark test results.
Single-Pass Metrics
The average input data for the task corresponds to a single parquet file and is following: * Average task input size is ~455 MiB. * Average task input records are 14241338.
Test Case | Job Time | All Tasks Time | Avg Task Time | Avg GC Time | Avg Task Shuffle |
---|---|---|---|---|---|
PySpark Run | 6.3 min | 52 min | 4.3 min | 9.2 sec | 122.0 KiB |
RDD-Engine Run | 23 min | 3.7 hour | 18.5 min | 32.8 sec | 145.5 KiB |
DF-Engine Run | 4.1 min | 46 min | 3.8 min | 3.3 sec | 379.1 KiB |
Thus, DF-Engine provides roughly 5 times better performance for single-pass metrics as compared to RDD-Engine. This is due to RDD metric calculators require data serialization from DataFrame UnsafeRow format to JVM types while DF metric calculators run purely on Spark DF-API without any loss for data serialization.
Also, it could be noted Checkita DF-engine shuffled more data as compared to PySpark run. This is caused by the fact that DF metric calculators also collect row data for metric increment errors.
Cardinality-Based Metrics
Distinct Values Metric Calculation
Not that computation of distinct values via PySpark code written with use of Spark DF-API as well as via Checkita DF-Engine took credit for Parquet format specifics: the distinct values were obtained from metadata of the corresponding column chunks rather than from reading all data line by line. Checkita RDD-based engine on the other hand, read and processed all data records.
Test Case | Job Time | All Tasks Time | Avg Task Time | Avg GC Time | Avg Task Shuffle | Avg. Task Input Size |
---|---|---|---|---|---|---|
PySpark Run | 58 sec | 1.1 min | 1 sec | 3.1 ms | 84.6 KiB | 17.5 MiB |
RDD-Engine Run | 2.1 min | 14 min | 63 sec | 1.7 sec | 21.2 KiB | 454.4 MiB |
DF-Engine Run | 1.2 min | 4.0 min | 13.7 sec | 0.9 sec | 16.6 MiB | 74.1 MiB |
Thus, according to the test above, the Checkita DF-engine provided roughly 2 times better performance for distinct values calculation as compared to RDD-engine. In addition to that, DF-engine can take benefits of data columnar formats while RDD-engine will process all the records from dataset.
It also should be noted, that difference in performance between DF-Engine and RDD-engine will grow with the growth of column values cardinality. For the test above we have intentionally chosen column with low cardinality to prevent possible OOM errors for RDD-based distinct value metric calculator.
In the test above PySpark code run faster than Checkita DF-engine due to fact that we used the simplest way to calculate number of distinct values for the particular column while DF-metric calculator has more general and complex logic and, in addition, collects metric increment errors.
Distinct Values Metric Calculation
For calculation of duplicate values we have chosen tuple of columns with high cardinality (~170 million records). Thus, such RDD-based duplicate values metric calculator couldn't store such amount of unique records within its state and, therefore, application crushed with OOM error.
Test Case | Job Time | All Tasks Time | Avg Task Time | Avg GC Time | Avg Task Shuffle | Avg. Task Input Size | Avg. Task Spill (Mem) | Avg. Task Spill (Disk) |
---|---|---|---|---|---|---|---|---|
PySpark Run | 2.5 min | 15 min | 1.2 min | 5.8 sec | 678.3 MiB | 389.1 MiB | 2.3 GiB | 580.5 MiB |
RDD-Engine Run | OOM ERROR | --- | --- | --- | --- | --- | --- | --- |
DF-Engine Run | 3.3 min | 28 min | 2.3 min | 7.3 sec | 1.3 GiB | 389.1 MiB | 7.1 GiB | 1.6 GiB |
Note, that PySpark application performed better that Checkita DF-Engine for the same reasons as were given above for distinct values calculation.
Conclusions
From the test above it can be concluded that new Checkita DF-engine provides significantly better performance comparing to RDD-engine. We strongly recommend to use DF-engine for all batch applications.
However, DF metric calculators operate completely within Spark runtime. Currently, it is only possible to retrieve final metric result after data source is processed. RDD metric calculators on the other hand are more flexible and can be managed outside of Spark runtime. For that reason our streaming applications supports only RDD-engine for now as we need to merge calculators from different micro-batches outside the Spark runtime.
There are some plans to enhance DF metric calculators to make them support streaming application as well. :)
DQ Job Configuration files
Single-pass metrics
SRC = "ytd"
jobConfig: {
jobId: "yellow_trip_data"
jobDescription: "Data Quality job for NY Taxi Data"
sources: {
file: [
{
id: ${SRC},
kind: "parquet",
path: "/datalake/data/workspace/mlcib/ruadlu/yellow_trip_data"
keyFields: ["vendor_name", "trip_pickup_datetime"]
}
]
}
virtualSources: [
{
id: ${SRC}"_num_to_cast",
kind: "select"
parentSources: [${SRC}]
expr: [
"vendor_name",
"trip_pickup_datetime"
"split(trip_pickup_datetime, '-' )[0] as num_to_cast"
]
keyFields: ["vendor_name", "trip_pickup_datetime"]
}
]
metrics: {
regular: {
rowCount: [{id: ${SRC}"_row_cnt", source: ${SRC}}]
approximateDistinctValues: [{
id: ${SRC}"_apprx_dist_vals", source: ${SRC},
params: {accuracyError: 0.001}, columns: ["trip_distance"]
}]
nullValues: [{id: ${SRC}"_null_vals", source: ${SRC}, columns: ["payment_type"]}]
emptyValues: [{id: ${SRC}"_empty_vals", source: ${SRC}, columns: ["payment_type"]}]
completeness: [{id: ${SRC}"_completeness", source: ${SRC}, columns: ["vendor_name", "payment_type"]}]
minString: [{id: ${SRC}"_min_str", source: ${SRC}, columns: ["payment_type"]}]
maxString: [{id: ${SRC}"_max_str", source: ${SRC}, columns: ["payment_type"]}]
avgString: [{id: ${SRC}"_avg_str", source: ${SRC}, columns: ["payment_type"]}]
stringLength: [{
id: ${SRC}"_str_len", source: ${SRC}, columns: ["payment_type"],
params: {length: 4, compareRule: "eq"}
}]
stringInDomain: [{
id: ${SRC}"_str_in_dmn", source: ${SRC}, columns: ["vendor_name"],
params: {domain: ["VTS", "DDS", "CMT"]}
}]
stringOutDomain: [{
id: ${SRC}"_str_out_dmn", source: ${SRC}, columns: ["payment_type"],
params: {domain: ["Credit", "Cash"]}
}]
stringValues: [{
id: ${SRC}"_str_vals", source: ${SRC}, columns: ["payment_type"],
params: {compareValue: "No Charge"}
}]
regexMatch: [{
id: ${SRC}"_rgx_match", source: ${SRC}, columns: ["tip_amt"],
params: {regex: """^\d\d+\..+$"""}
}]
regexMismatch: [{
id: ${SRC}"_rgx_mismatch", source: ${SRC}, columns: ["end_lat"],
params: {regex: """^40\..*$"""}
}]
formattedDate: [{
id: ${SRC}"_fmt_date", source: ${SRC},
columns: ["trip_pickup_datetime", "trip_dropoff_datetime"],
params: {dateFormat: "yyyy-MM-dd HH-mm-ss"}
}]
formattedNumber: [{
id: ${SRC}"_fmt_number", source: ${SRC}, columns: ["fare_amt"],
params: {precision: 3, scale: 2, compareRule: "inbound"}
}]
minNumber: [{id: ${SRC}"_min_num", source: ${SRC}, columns: ["total_amt"]}]
maxNumber: [{id: ${SRC}"_max_num", source: ${SRC}, columns: ["total_amt"]}]
sumNumber: [{id: ${SRC}"_sum_num", source: ${SRC}, columns: ["total_amt"]}]
avgNumber: [{id: ${SRC}"_avg_num", source: ${SRC}, columns: ["total_amt"]}]
stdNumber: [{id: ${SRC}"_std_num", source: ${SRC}, columns: ["total_amt"]}]
castedNumber: [{id: ${SRC}"_cast_num", source: ${SRC}"_num_to_cast", columns: ["num_to_cast"]}]
numberInDomain: [{
id: ${SRC}"_num_in_dmn", source: ${SRC}, columns: ["passenger_count"],
params: {domain: [1, 2, 3]}
}]
numberOutDomain: [{
id: ${SRC}"_num_out_dmn", source: ${SRC}, columns: ["passenger_count"],
params: {domain: [1, 2, 3, 4, 5]}
}]
numberLessThan: [{
id: ${SRC}"_num_less_than", source: ${SRC}, columns: ["trip_distance"],
params: {compareValue: 1.0, includeBound: false}
}]
numberGreaterThan: [{
id: ${SRC}"_num_greater_than", source: ${SRC}, columns: ["trip_distance"],
params: {compareValue: 30.0, includeBound: false}
}]
numberBetween: [{
id: ${SRC}"_num_btwn", source: ${SRC}, columns: ["trip_distance"],
params: {lowerCompareValue: 5.0, upperCompareValue: 10.0, includeBound: true}
}]
numberNotBetween: [{
id: ${SRC}"_num_not_btwn", source: ${SRC}, columns: ["trip_distance"],
params: {lowerCompareValue: 1.0, upperCompareValue: 30.0, includeBound: true}
}]
numberValues: [{
id: ${SRC}"_num_vals", source: ${SRC}, columns: ["tolls_amt"],
params: {compareValue: 0.0}
}]
medianValue: [{
id: ${SRC}"_median_val", source: ${SRC}, columns: ["fare_amt"],
params: {accuracyError: 0.001}
}]
firstQuantile: [{
id: ${SRC}"_first_quant_val", source: ${SRC}, columns: ["fare_amt"],
params: {accuracyError: 0.001}
}]
thirdQuantile: [{
id: ${SRC}"_third_quant_val", source: ${SRC}, columns: ["fare_amt"],
params: {accuracyError: 0.001}
}]
getQuantile: [{
id: ${SRC}"_get_quant_val", source: ${SRC}, columns: ["fare_amt"],
params: {accuracyError: 0.001, target: 0.9}
}]
getPercentile: [{
id: ${SRC}"_get_percent_val", source: ${SRC}, columns: ["fare_amt"],
params: {accuracyError: 0.001, target: 50.0}
}]
columnEq: [{id: ${SRC}"_col_eq", source: ${SRC}, columns: ["start_lon", "end_lon"]}]
dayDistance: [{
id: ${SRC}"_date_dist", source: ${SRC},
columns: ["trip_pickup_datetime", "trip_dropoff_datetime"],
params: {threshold: 1, dateFormat: "yyyy-MM-dd HH-mm-ss"}
}]
levenshteinDistance: [{
id: ${SRC}"_lvnst_dist", source: ${SRC}, columns: ["start_lat", "end_lat"],
params: {threshold: 3, normalize: false}
}]
coMoment: [{id: ${SRC}"_comoment", source: ${SRC}, columns: ["trip_distance", "fare_amt"]}]
covariance: [{id: ${SRC}"_covariance", source: ${SRC}, columns: ["trip_distance", "fare_amt"]}]
covarianceBessel: [{id: ${SRC}"_covariance_bessel", source: ${SRC}, columns: ["trip_distance", "fare_amt"]}]
}
}
checks: {
snapshot: {
equalTo: [
{id: ${SRC}"_row_cnt_chk", metric: ${SRC}"_row_cnt", threshold: 170896055}
]
}
}
}
Distinct values metric
SRC = "ytd"
jobConfig: {
jobId: "yellow_trip_data"
jobDescription: "Data Quality job for NY Taxi Data"
sources: {
file: [
{
id: ${SRC},
kind: "parquet",
path: "/datalake/data/workspace/mlcib/ruadlu/yellow_trip_data"
keyFields: ["vendor_name", "trip_pickup_datetime"]
}
]
}
metrics: {
regular: {
distinctValues: [{id: ${SRC}"_dist_vals", source: ${SRC}, columns: ["trip_distance"]}]
}
}
checks: {
snapshot: {
equalTo: [
{id: ${SRC}"_dist_vals_chk", metric: ${SRC}"_dist_vals", threshold: 6876}
]
}
}
}
Duplicate values metric
SRC = "ytd"
jobConfig: {
jobId: "yellow_trip_data"
jobDescription: "Data Quality job for NY Taxi Data"
sources: {
file: [
{
id: ${SRC},
kind: "parquet",
path: "/datalake/data/workspace/mlcib/ruadlu/yellow_trip_data"
keyFields: ["vendor_name", "trip_pickup_datetime"]
}
]
}
metrics: {
regular: {
duplicateValues: [{
id: ${SRC}"_dupl_vals", source: ${SRC},
columns: [
"vendor_name", "trip_pickup_datetime", "trip_dropoff_datetime",
"start_lon", "start_lat", "end_lon", "end_lat"
]
}]
}
}
checks: {
snapshot: {
equalTo: [
{id: ${SRC}"_dupl_vals_chk", metric: ${SRC}"_dupl_vals", threshold: 34435}
]
}
}
}