All,
I'm trying to setup a lakehouse monitoring process for the WineQuality model that is widely available. While setting up the Serving Endpoint, I enabled "Inference Table" option for which the inference table was created automatically. The columns in the winequality_payload table are as follows:
The request_metadata contains "model_name","endpoint_name" & "model_version".
While configuring the monitor against the inference table, I selected request_metadata as the model_id column but it is erroring out with the below error:
The given `DataMonitorInfo` is invalid for the following reason(s): - For table `dev_tst_mls.winequality_uc.winequality_payload`: The specified `model_id_col` (`request_metadata`) must be a groupable column, but instead it is a MAP type. Please check that all referenced columns exist in the table(s) and have compatible column type.
As you can see, there is no other column that I can pick for the model_id column. Why am I getting this error and what alternatives do I have?
Now, based on Databricks Assistant, I created a view adding the model_name and model_version and used this view to create the monitor. Things go through but the dashboard shows no data at all despite my making several scoring attempts.
I tried troubleshooting and found that the datetime in the window column of the profile_metrics table is way off:
profile_metrics table --> Column window
start: "+057064-08-22T07:05:00.000Z"
end: "+057064-08-22T07:10:00.000Z"
As you can see, I tried it today (02/03/2025), but the date is light years ahead!!.
Based on the link, the date column is "The UTC date on which the model serving request was received." and the timestamp_ms column is "The timestamp in epoch milliseconds on when the model serving request was received."
I checked the date column in the inference table and it correctly show 2025-02-03 but the timestamp_ms shows values like 1738620594270 which is "2024-12-31 23:09:54.270"
Am I doing something wrong? Has anyone experienced this before?
Thanks, grajee
All,
I'm trying to setup a lakehouse monitoring process for the WineQuality model that is widely available. While setting up the Serving Endpoint, I enabled "Inference Table" option for which the inference table was created automatically. The columns in the winequality_payload table are as follows:
The request_metadata contains "model_name","endpoint_name" & "model_version".
While configuring the monitor against the inference table, I selected request_metadata as the model_id column but it is erroring out with the below error:
The given `DataMonitorInfo` is invalid for the following reason(s): - For table `dev_tst_mls.winequality_uc.winequality_payload`: The specified `model_id_col` (`request_metadata`) must be a groupable column, but instead it is a MAP type. Please check that all referenced columns exist in the table(s) and have compatible column type.
As you can see, there is no other column that I can pick for the model_id column. Why am I getting this error and what alternatives do I have?
Now, based on Databricks Assistant, I created a view adding the model_name and model_version and used this view to create the monitor. Things go through but the dashboard shows no data at all despite my making several scoring attempts.
I tried troubleshooting and found that the datetime in the window column of the profile_metrics table is way off:
profile_metrics table --> Column window
start: "+057064-08-22T07:05:00.000Z"
end: "+057064-08-22T07:10:00.000Z"
As you can see, I tried it today (02/03/2025), but the date is light years ahead!!.
Based on the link, the date column is "The UTC date on which the model serving request was received." and the timestamp_ms column is "The timestamp in epoch milliseconds on when the model serving request was received."
I checked the date column in the inference table and it correctly show 2025-02-03 but the timestamp_ms shows values like 1738620594270 which is "2024-12-31 23:09:54.270"
Am I doing something wrong? Has anyone experienced this before?
Thanks, grajee
First, you need to create another delta table not the view with below properties.
.property("delta.enableChangeDataFeed", "true")
.property("delta.columnMapping.mode", "name") \
.property("delta.minReaderVersion", "2") \
.property("delta.minWriterVersion", "5")
And then with payload inference table you create custom metrics, model id and append to above created table.
sample of creating model id column which is combination of model name and version.
df.withColumn(
"__db_model_id",
F.concat(
F.col("request_metadata").getItem("model_name"),
F.lit("_"),
F.col("request_metadata").getItem("model_version")
)
For more information refer this notebook .
Here, new table is created, extracted the payload inference table data and transformed with required columns and metrics.
If you don't want any other metrics just the model id column with current columns then use below code.
def transform(df: DataFrame) -> DataFrame:
# Convert the timestamp milliseconds to TimestampType for downstream processing.
requests_timestamped = (df
.withColumn("__db_timestamp", (F.col("timestamp_ms") / 1000))
.drop("timestamp_ms"))
# Convert the model name and version columns into a model identifier column.
requests_identified = requests_timestamped.withColumn(
"__db_model_id",
F.concat(
F.col("request_metadata").getItem("model_name"),
F.lit("_"),
F.col("request_metadata").getItem("model_version")
)
)
return requests_identified
def create_processed_table_if_not_exists(table_name, requests_with_metrics):
(DeltaTable.createIfNotExists(spark)
.tableName(table_name)
.addColumns(requests_with_metrics.schema)
.property("delta.enableChangeDataFeed", "true")
.property("delta.columnMapping.mode", "name") \
.property("delta.minReaderVersion", "2") \
.property("delta.minWriterVersion", "5")
.execute())
requests_raw = spark.readStream.table(payload_table_name)
requests_processed = transform(requests_raw)
# Drop columns that we don't need for monitoring analysis.
requests_processed = requests_processed.drop("date", "status_code", "sampling_fraction", "client_request_id", "databricks_request_id")
# Persist the requests stream, with a defined checkpoint path for this table.
create_processed_table_if_not_exists(processed_table_name, requests_processed)
(requests_processed.writeStream
.trigger(availableNow=True)
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_location)
.toTable(processed_table_name).awaitTermination())
# Display the table (with requests and text evaluation metrics) that will be monitored.
display(spark.table(processed_table_name))
Next, transformed dataframe is written to newly created table, then create monitor with appropriate columns.
request_metadata
and use that column – JayashankarGS Commented Feb 5 at 3:47