I want to run a spark job that was originally based on scala 2.12.15 and spark 3.3.4 with sparkBigQueryConnectorVersion = "0.28.1".
However, I just upgraded the runtime of dataproc serverless from 1.1 to 1.2 since i need to upgrade from java 11 to java 17 by also changing the library versions (especially the BigQueryconnector) in build.sbt to following:
val sparkVersion = "3.5.1"
val sparkBigQueryConnectorVersion = "0.36.4"
val sparkAvroVersion = "3.5.1"
val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
val sparkSql = "org.apache.spark" %% "spark-sql" % sparkVersion % "provided"
val sparkHive = "org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
val sparkBigQueryConnector = "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % sparkBigQueryConnectorVersion
val sparkAvro = "org.apache.spark" %% "spark-avro" % sparkAvroVersion`
to support the versions specified here in the dataproc serverless 1.2 documentation .2
I run a airflow dag with the following config:
batch_config = {
"spark_batch": { # Use 'spark_batch' for Spark jobs in Dataproc Serverless
"jar_file_uris": [
f"{artifact_bucket}/bq-ingestion-{release_id}.jar"
],
"main_class": "de.mobile.dmh.campaign.ingestion.ExecutableCampaignsIngestion",
"args": ["--startDate", start_date, "--endDate", end_date, "--outputDs", target_ds]
},
"environment_config": { # Corrected to use environment_config for serverless
"execution_config": {
"subnetwork_uri": subnetwork_uri, # Specify subnetwork
"service_account": service_account # Specify service account
}
},
"runtime_config": { # Specify the runtime version for Dataproc Serverless
"version": "1.2",
"properties": { # Spark properties moved here under 'properties'
"spark.executor.memory": "4g",
"spark.executor.cores": "4",
}
}}
The job I'm running needs to write a partitioned Dataframe to a partitioned table using this:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkInit {
def initSpark(moduleName: String, appName: String, conf: Option[SparkConf]): SparkSession = {
val spark: SparkSession = SparkSession.builder()
.appName(s"[$moduleName] $appName")
.config(conf.getOrElse(defaultConfig()))
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "false")
spark.sparkContext.hadoopConfiguration.set("hive.metastore.client.socket.timeout", "1500")
spark
}
def initSpark(moduleName: String, appName: String): SparkSession = initSpark(moduleName, appName, None)
def defaultConfig(): SparkConf = {
new SparkConf()
.set("spark.files.overwrite", "true")
.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
.set("spark.sql.caseSensitive", "false")
.set("hive.metastore.client.socket.timeout", "1500") // if we need to write a lot of partitions meta store will fail with default of 600
.set("spark.sql.adaptive.optimizeSkewedJoin.enabled", "true")
.set("viewsEnabled", "true")
}
}
ExecutableCampaignsIngestion.scala
// SOME VARIABLE DEFINITIONS HERE
private def createSparkConfig(): SparkConf = {
new SparkConf()
.set("spark.files.overwrite", "true")
.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")
.set("spark.sql.caseSensitive", "false")
.set("spark.sql.legacy.timeParserPolicy","LEGACY")
.set("temporaryGcsBucket", temporaryGcsBucket)
.set("viewsEnabled", "true")
.set("materializationDataset", materializationDataset)
}
val spark: SparkSession = SparkInit.initSpark(AppConfig.moduleName, "ExecutableCampaignsIngestion", Some(createSparkConfig()))
dataFrame
.withColumn(partitionColumn, to_date(col(partitionColumn)))
.write
.format(StorageOptions.BiqQuery)
.mode(SaveMode.Overwrite)
.option("createDisposition", "CREATE_IF_NEEDED")
//.option("intermediateFormat", StorageOptions.Avro)
//.option("useAvroLogicalTypes", "true")
.option("partitionField", partitionColumn)
.save(temporaryTableName)
However, I receive an error:
Caused by: com.google.cloud.spark.bigquery.repackaged.google.cloud.bigquery.BigQueryException: Incompatible table partitioning specification. Expects partitioning specification none, but input partitioning specification is interval(type:day,field:event_date)
I only pasted the relevant code. Its very weird since I checked the BigQuery table characteristics, and it has the partitioning enabled already for the partitionColumn value. If I delete the table, and my job recreates it, theres no error in writing the dataframe to the table. But when i run the job again, it gives the same partitioning error again. The older dataproc runtime and library versions worked correctly.