mlmodel - Azure Databricks Lakehouse Monitoring error - Stack Overflow

admin2025-04-16  8

All,

I'm trying to setup a lakehouse monitoring process for the WineQuality model that is widely available. While setting up the Serving Endpoint, I enabled "Inference Table" option for which the inference table was created automatically. The columns in the winequality_payload table are as follows:

  1. client_request_id
  2. databricks_request_id
  3. date
  4. timestamp_ms
  5. status_code
  6. execution_time_ms
  7. request
  8. response
  9. sampling_fraction
  10. request_metadata (MAP type)

The request_metadata contains "model_name","endpoint_name" & "model_version".

While configuring the monitor against the inference table, I selected request_metadata as the model_id column but it is erroring out with the below error:

The given `DataMonitorInfo` is invalid for the following reason(s): - For table `dev_tst_mls.winequality_uc.winequality_payload`: The specified `model_id_col` (`request_metadata`) must be a groupable column, but instead it is a MAP type. Please check that all referenced columns exist in the table(s) and have compatible column type.

As you can see, there is no other column that I can pick for the model_id column. Why am I getting this error and what alternatives do I have?

Now, based on Databricks Assistant, I created a view adding the model_name and model_version and used this view to create the monitor. Things go through but the dashboard shows no data at all despite my making several scoring attempts.

I tried troubleshooting and found that the datetime in the window column of the profile_metrics table is way off:

profile_metrics table --> Column window  
start: "+057064-08-22T07:05:00.000Z" 
end: "+057064-08-22T07:10:00.000Z"

As you can see, I tried it today (02/03/2025), but the date is light years ahead!!.

Based on the link, the date column is "The UTC date on which the model serving request was received." and the timestamp_ms column is "The timestamp in epoch milliseconds on when the model serving request was received."

I checked the date column in the inference table and it correctly show 2025-02-03 but the timestamp_ms shows values like 1738620594270 which is "2024-12-31 23:09:54.270"

Am I doing something wrong? Has anyone experienced this before?

Thanks, grajee

All,

I'm trying to setup a lakehouse monitoring process for the WineQuality model that is widely available. While setting up the Serving Endpoint, I enabled "Inference Table" option for which the inference table was created automatically. The columns in the winequality_payload table are as follows:

  1. client_request_id
  2. databricks_request_id
  3. date
  4. timestamp_ms
  5. status_code
  6. execution_time_ms
  7. request
  8. response
  9. sampling_fraction
  10. request_metadata (MAP type)

The request_metadata contains "model_name","endpoint_name" & "model_version".

While configuring the monitor against the inference table, I selected request_metadata as the model_id column but it is erroring out with the below error:

The given `DataMonitorInfo` is invalid for the following reason(s): - For table `dev_tst_mls.winequality_uc.winequality_payload`: The specified `model_id_col` (`request_metadata`) must be a groupable column, but instead it is a MAP type. Please check that all referenced columns exist in the table(s) and have compatible column type.

As you can see, there is no other column that I can pick for the model_id column. Why am I getting this error and what alternatives do I have?

Now, based on Databricks Assistant, I created a view adding the model_name and model_version and used this view to create the monitor. Things go through but the dashboard shows no data at all despite my making several scoring attempts.

I tried troubleshooting and found that the datetime in the window column of the profile_metrics table is way off:

profile_metrics table --> Column window  
start: "+057064-08-22T07:05:00.000Z" 
end: "+057064-08-22T07:10:00.000Z"

As you can see, I tried it today (02/03/2025), but the date is light years ahead!!.

Based on the link, the date column is "The UTC date on which the model serving request was received." and the timestamp_ms column is "The timestamp in epoch milliseconds on when the model serving request was received."

I checked the date column in the inference table and it correctly show 2025-02-03 but the timestamp_ms shows values like 1738620594270 which is "2024-12-31 23:09:54.270"

Am I doing something wrong? Has anyone experienced this before?

Thanks, grajee

Share Improve this question edited Feb 4 at 3:08 Gopinath Rajee asked Feb 4 at 1:32 Gopinath RajeeGopinath Rajee 4507 silver badges22 bronze badges 4
  • the error clearly says it should be groupable column and not the maptype. you created another column, which concatenation of the content in request_metadata and use that column – JayashankarGS Commented Feb 5 at 3:47
  • Not sure how to explain. The inference table and the lahehouse monitoring are all created by Databricks itself (Mosaic AI?) Could it not create a column for modelId alone and create the dashboard. In any case, I created a view based off of the table with ModelName as the ModelID and created the dashboard. This is not helping either. One thing about using the view to create the dashboard is that CDF (change data feed) cannot be applied against a view. Lot of the examples that you see on the internet are pretty much DQ against tables (data assets) and not against inference tables. – Gopinath Rajee Commented Feb 6 at 1:23
  • are you able to create new column in winequality_payload table. – JayashankarGS Commented Feb 6 at 6:19
  • I'm not sure since it is autocrated if I pick inference table option while creating the serving endpoint. – Gopinath Rajee Commented Feb 6 at 7:04
Add a comment  | 

1 Answer 1

Reset to default 0

First, you need to create another delta table not the view with below properties.

.property("delta.enableChangeDataFeed", "true")
.property("delta.columnMapping.mode", "name") \
.property("delta.minReaderVersion", "2") \
.property("delta.minWriterVersion", "5")

And then with payload inference table you create custom metrics, model id and append to above created table.

sample of creating model id column which is combination of model name and version.

df.withColumn(
        "__db_model_id",
        F.concat(
            F.col("request_metadata").getItem("model_name"),
            F.lit("_"),
            F.col("request_metadata").getItem("model_version")
        )

For more information refer this notebook .

Here, new table is created, extracted the payload inference table data and transformed with required columns and metrics.

If you don't want any other metrics just the model id column with current columns then use below code.

def transform(df: DataFrame) -> DataFrame:
    # Convert the timestamp milliseconds to TimestampType for downstream processing.
    requests_timestamped = (df
        .withColumn("__db_timestamp", (F.col("timestamp_ms") / 1000))
        .drop("timestamp_ms"))

    # Convert the model name and version columns into a model identifier column.
    requests_identified = requests_timestamped.withColumn(
        "__db_model_id",
        F.concat(
            F.col("request_metadata").getItem("model_name"),
            F.lit("_"),
            F.col("request_metadata").getItem("model_version")
        )
    )
   return requests_identified

def create_processed_table_if_not_exists(table_name, requests_with_metrics):
    (DeltaTable.createIfNotExists(spark)
        .tableName(table_name)
        .addColumns(requests_with_metrics.schema)
        .property("delta.enableChangeDataFeed", "true")
        .property("delta.columnMapping.mode", "name") \
        .property("delta.minReaderVersion", "2") \
        .property("delta.minWriterVersion", "5")
        .execute())

requests_raw = spark.readStream.table(payload_table_name)
requests_processed = transform(requests_raw)

# Drop columns that we don't need for monitoring analysis.
requests_processed = requests_processed.drop("date", "status_code", "sampling_fraction", "client_request_id", "databricks_request_id")

# Persist the requests stream, with a defined checkpoint path for this table.
create_processed_table_if_not_exists(processed_table_name, requests_processed)
(requests_processed.writeStream
                      .trigger(availableNow=True)
                      .format("delta")
                      .outputMode("append")
                      .option("checkpointLocation", checkpoint_location)
                      .toTable(processed_table_name).awaitTermination())

# Display the table (with requests and text evaluation metrics) that will be monitored.
display(spark.table(processed_table_name))

Next, transformed dataframe is written to newly created table, then create monitor with appropriate columns.

转载请注明原文地址:http://www.anycun.com/QandA/1744743188a86983.html