object RDDMetricStreamProcessor extends RDDMetricProcessor with Logging
- Alphabetic
- By Inheritance
- RDDMetricStreamProcessor
- Logging
- RDDMetricProcessor
- BasicMetricProcessor
- AnyRef
- Any
- Hide All
- Show All
- Public
- All
Type Members
-
type
AccType = (Long, AccumulatedErrors)
Type in which metric errors are collected: links errors to a streaming window to which record belongs that has produced these errors
Type in which metric errors are collected: links errors to a streaming window to which record belongs that has produced these errors
- Definition Classes
- RDDMetricStreamProcessor → RDDMetricProcessor
-
type
MicroBatchState = (Long, Map[Long, GroupedCalculators], Option[Checkpoint], Int)
Type alias for micro-batch state used in streaming applications only.
Type alias for micro-batch state used in streaming applications only. State includes following:
- Maximum observed event timestamp
- Map of grouped calculators per each window in a stream
- Updated state of checkpoint (in case if checkpoint provided)
- Number of late records that were skipped
- Note
window is identified by its start time as unix epoch (in seconds).
Value Members
-
final
def
!=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
##(): Int
- Definition Classes
- AnyRef → Any
-
final
def
==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
final
def
asInstanceOf[T0]: T0
- Definition Classes
- Any
-
def
buildResults(groupedCalculators: GroupedCalculators, metricErrors: Map[String, MetricErrors], sourceId: String, sourceKeys: Seq[String]): MetricResults
Build results out of metric calculator result and metric errors.
Build results out of metric calculator result and metric errors. TopN metric yields N-results (requested number of topN values), therefore, for this metric we collect sequence of result with metric name of format: TOP_N_i, where i is the number of top value. For other metrics there is always a single result which correspond to metric name.
- groupedCalculators
Processed grouped metric calculators
- metricErrors
Processed metric errors
- sourceId
Id of the source for which metrics are being calculated
- sourceKeys
Source key fields
- returns
Map(metricId -> all metric calculator results). (Some of the metric calculators yield multiple results)
- Attributes
- protected
- Definition Classes
- RDDMetricProcessor
-
def
clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()
-
final
def
eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
def
equals(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
-
def
finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws( classOf[java.lang.Throwable] )
-
def
getAndRegisterErrorAccumulator(implicit spark: SparkSession): CollectionAccumulator[AccType]
Creates Spark collection accumulator of required type to collect metric errors and registers it.
Creates Spark collection accumulator of required type to collect metric errors and registers it.
- spark
Implicit Spark Session object
- returns
Registered metric errors accumulator
- Attributes
- protected
- Definition Classes
- RDDMetricProcessor
-
final
def
getClass(): Class[_]
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
getColumnIndexMap(df: DataFrame): Map[String, Int]
Builds map column name -> column index for given dataframe
Builds map column name -> column index for given dataframe
- df
Spark Dataframe
- returns
Map(column name -> column index)
- Attributes
- protected
- Definition Classes
- BasicMetricProcessor
-
def
getColumnNamesMap(df: DataFrame): Map[Int, String]
Builds map column index -> column name for given dataframe
Builds map column index -> column name for given dataframe
- df
Spark Dataframe
- returns
Map(column index -> column name)
- Attributes
- protected
- Definition Classes
- BasicMetricProcessor
-
def
getColumnTypes(df: DataFrame): Map[String, DataType]
Builds map of column name to column data type.
Builds map of column name to column data type.
- df
Spark Dataframe
- returns
Map(column name -> column datatype)
- Attributes
- protected
- Definition Classes
- BasicMetricProcessor
-
def
getErrorsFromGroupedCalculators(gc: GroupedCalculators, row: Row, columnIndexes: Map[String, Int], columnNames: Map[Int, String], sourceKeyIds: Seq[Int])(implicit dumpSize: Int): Seq[AccumulatedErrors]
Retrieves errors and failures from grouped calculators, maps it with row data and builds a sequence of metric errors that will be added to error accumulator.
Retrieves errors and failures from grouped calculators, maps it with row data and builds a sequence of metric errors that will be added to error accumulator.
- gc
Map of grouped calculators
- row
Row for which grouped calculators were just updated.
- columnIndexes
Map of column names to their indices
- columnNames
Map of column indices to their names.
- sourceKeyIds
Column indices that correspond to source key fields
- dumpSize
Implicit value of maximum number of metric failure (or errors) to be collected.
- returns
Sequence of metric errors to be accumulated.
- Attributes
- protected
- Definition Classes
- RDDMetricProcessor
-
def
getGroupedCalculators(groupedMetrics: GroupedMetrics): GroupedCalculators
Build grouped calculator collection.
Build grouped calculator collection. Main idea here is following:
- groupedMetrics
Collection of source metrics grouped by their list of columns.
- returns
Collection of grouped metric calculators: Map( Seq(columns) -> Seq( (MetricCalculator, Seq(SourceMetric)) ) )
- Attributes
- protected
- Definition Classes
- RDDMetricProcessor
You want to obtain multiple quantiles for a specific column, but calling a new instance of tdigest for each metric isn't effective. To avoid that first, we're mapping all metric to their calculator classes, then we are grouping them by the same instances of calculator classes (taking credit of all calculators being CASE classes).
, "FIRST_QUANTILE" for column "A" with parameter "accuracyError=0.0001" will require an instance of TDigestMetricCalculator. "MEDIAN_VALUE" for column "A" with the same parameter "accuracyError=0.0001" will also require an instance of TDigestMetricCalculator. In our approach the instance will be the same and it will return us results like Map(("MEDIAN_VALUE:..."->result1),("FIRST_QUANTILE:..."->result2),...) So in the end we are initializing only unique calculators.
Examples: -
def
getGroupedMetrics(metrics: Seq[RegularMetric], allColumns: Seq[String])(implicit caseSensitive: Boolean): Map[Seq[String], Seq[RegularMetric]]
Groups regular metrics by their sequence of columns.
Groups regular metrics by their sequence of columns. The
*
is expanded to actual list of columns prior grouping metrics.- metrics
Sequence of regular metrics for current source
- allColumns
Sequence of all source columns (used to expand
*
)- caseSensitive
Implicit flag defining whether column names are case sensitive or not.
- returns
Map of sequence of columns to sequence of regular metrics that refer to these columns.
- Definition Classes
- RDDMetricProcessor
-
def
hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native()
-
def
initLogger(lvl: Level): Unit
Initialises logger:
Initialises logger:
- gets log4j properties with following priority: resources directory -> working directory -> default settings
- updates root logger level with verbosity level defined at application start
- reconfigures Logger
- lvl
Root logger level defined at application start
- Definition Classes
- Logging
-
final
def
isInstanceOf[T0]: Boolean
- Definition Classes
- Any
-
lazy val
log: Logger
- Definition Classes
- Logging
- Annotations
- @transient()
-
def
mergeGroupCalculators(l: GroupedCalculators, r: GroupedCalculators): GroupedCalculators
Merges two maps of grouped calculators.
Merges two maps of grouped calculators.
- l
First map of grouped calculators
- r
Second map of grouped calculators
- returns
Merged map of grouped calculators.
- Attributes
- protected
- Definition Classes
- RDDMetricProcessor
-
final
def
ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
-
final
def
notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
final
def
notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native()
-
def
processMetricErrors(errors: Seq[AccumulatedErrors])(implicit dumpSize: Int): Map[String, MetricErrors]
Processes accumulated metric errors and builds map of metricID to all metric errors with corresponding row data
Processes accumulated metric errors and builds map of metricID to all metric errors with corresponding row data
- errors
Sequence of accumulated metric errors
- dumpSize
Implicit value of maximum number of metric failures (or errors) to be collected (per metric). Used to prevent OOM errors.
- returns
Map(metricId -> all metric errors)
- Attributes
- protected
- Definition Classes
- RDDMetricProcessor
-
def
processRegularMetrics(streamId: String, streamKeys: Seq[String], streamMetrics: Seq[RegularMetric], streamCheckpoint: Option[Checkpoint])(implicit spark: SparkSession, buffer: ProcessorBuffer, streamConf: StreamConfig, dumpSize: Int, caseSensitive: Boolean): (DataFrame, Long) ⇒ Unit
Processes all regular metrics for micro-batch of the given stream.
Processes all regular metrics for micro-batch of the given stream.
- streamId
Stream ID which micro-batch is processed.
- streamKeys
Stream source key fields.
- streamMetrics
Sequence of metrics defined for the given stream source.
- spark
Implicit Spark Session object.
- buffer
Implicit processor buffer.
- streamConf
Streaming application settings
- dumpSize
Implicit value of maximum number of metric failure (or errors) to be collected (per metric and per partition)
- caseSensitive
Implicit flag defining whether column names are case sensitive or not.
- returns
Does not return results but updates processor buffer instead.
-
def
processWindowResults(gc: GroupedCalculators, errors: Seq[AccumulatedErrors], windowStart: String, streamId: String, streamKeys: Seq[String])(implicit dumpSize: Int): Result[MetricResults]
Processes calculator results and accumulated errors once stream window state is finalized.
Processes calculator results and accumulated errors once stream window state is finalized.
- gc
Final map of grouped metric calculators for stream window
- errors
Final sequence of metric errors accumulated for stream window
- windowStart
Window start datetime (string in format 'yyyy-MM-dd HH:mm:ss')
- streamId
Id of the stream source for which results are processed.
- streamKeys
Stream source key fields.
- dumpSize
Implicit value of maximum number of metric failures (or errors) to be collected (per metric). Used to prevent OOM errors.
- returns
Map of metricId to a sequence of metric results for this metricId (some metrics yield multiple results).
-
final
def
synchronized[T0](arg0: ⇒ T0): T0
- Definition Classes
- AnyRef
-
def
toString(): String
- Definition Classes
- AnyRef → Any
-
def
updateGroupedCalculators(gc: GroupedCalculators, row: Row, columnIndexes: Map[String, Int]): GroupedCalculators
Updates grouped calculators for given row.
Updates grouped calculators for given row.
- gc
Map of grouped calculators
- row
Data row to process
- columnIndexes
Map of column names to their indices.
- returns
Updated map of grouped calculators
- Attributes
- protected
- Definition Classes
- RDDMetricProcessor
-
final
def
wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... )
-
final
def
wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws( ... ) @native()