I am trying to read the data from RDS Postgres
via PySpark
3.3
and AWS Glue 5.0
versions using the below command.
df = (
self.config.spark_details.spark.read.format("jdbc")
.option(
"url",
f"jdbc:postgresql://{self.postgres_host}:{self.postgres_port}/{self.postgres_database}",
)
.option("driver", "org.postgresql.Driver")
.option("user", self.postgres_username)
.option("password", self.postgres_password)
.option("query", query)
.load()
)
Now, I want to write this data to S3
. For that, I tried the snippet below:
final_df.write.partitionBy("year","month","day").mode("append").parquet(s3_path)
But this gave me an executor heartbeat error
. Later, after debugging, I learned that this could be because the data was not repartitioned, so I added repartition(10000)
while writing the data. This seemed to work, but the job didn't finish even after 5 hours. No error so I had to stop the job.
After debugging further, I discovered that when data is loaded from the DB, it is loaded in a single partition. So, no matter how many executors I add, they will be of no use.
There are no transformations. I am just supposed to read and write the data in a partitioned way. Data size would be less than 100 GB
. I am using the Glue 5.0 version with 3 workers(12 DPUS)
of the G4.X worker
type.
The same code worked for 38 M
records but caused problems for 59 M
records. I also went through the driver and executor logs and I see something like below:
.
and this .
I am failing to understand. Since there is just 1 partition in which the data is loaded, how come I can see multiple partitions being processed in the logs?
What am I missing here? Any hints would be appreciated. Even now the job has been running for 2 hrs but hasn't finished yet.
TIA
I am trying to read the data from RDS Postgres
via PySpark
3.3
and AWS Glue 5.0
versions using the below command.
df = (
self.config.spark_details.spark.read.format("jdbc")
.option(
"url",
f"jdbc:postgresql://{self.postgres_host}:{self.postgres_port}/{self.postgres_database}",
)
.option("driver", "org.postgresql.Driver")
.option("user", self.postgres_username)
.option("password", self.postgres_password)
.option("query", query)
.load()
)
Now, I want to write this data to S3
. For that, I tried the snippet below:
final_df.write.partitionBy("year","month","day").mode("append").parquet(s3_path)
But this gave me an executor heartbeat error
. Later, after debugging, I learned that this could be because the data was not repartitioned, so I added repartition(10000)
while writing the data. This seemed to work, but the job didn't finish even after 5 hours. No error so I had to stop the job.
After debugging further, I discovered that when data is loaded from the DB, it is loaded in a single partition. So, no matter how many executors I add, they will be of no use.
There are no transformations. I am just supposed to read and write the data in a partitioned way. Data size would be less than 100 GB
. I am using the Glue 5.0 version with 3 workers(12 DPUS)
of the G4.X worker
type.
The same code worked for 38 M
records but caused problems for 59 M
records. I also went through the driver and executor logs and I see something like below:
.
and this .
I am failing to understand. Since there is just 1 partition in which the data is loaded, how come I can see multiple partitions being processed in the logs?
What am I missing here? Any hints would be appreciated. Even now the job has been running for 2 hrs but hasn't finished yet.
TIA
The JDBC reader makes Spark reads everything in a single query and put it into a single partition, because Spark can't read the same query in all executors, since Spark doesn’t know how to distribute the data from your query.
You need to set in the reader for JDBC options:
.option("partitionColumn", "id") -> the column spark is going to divide()<br>
.option("lowerBound", "1") -> the lowest possible value<br>
.option("upperBound", "1000") -> the highest possible value<br>
.option("numPartitions", "10") -> number of partitions you want
Now Spark knows how to split your query into the executors. If you don't have a column to partition, you can create one by using ROW_NUMBER()
in, for example, a table.
Edit:
Since you are passing a query parameter to the DataFrameReader API, the config above will not work, because spark will treat as a standalone query.
There are 2 solutions for this approach if you want to maintain a custom query param
df.repartition
When we attempt to configure both the query
and partitionColumn
options simultaneously, Spark throws the following error:
Options query
and partitionColumn
cannot be specified together. Please define the query using the dbtable
option instead and make sure to qualify the partition columns using the supplied subquery alias to resolve any ambiguity. Example:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
This error gives us a crucial hint: the dbtable
option can be used to define a query that generates the column required for data partitioning. By embedding the query we want to use directly into the dbtable
option, Spark can process the data in parallel across multiple partitions.
Another example:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select date, cast(day_of_month as int) from dates) as subq")
.option("partitionColumn", "day_of_month")
.option("lowerBound", "1")
.option("upperBound", "31")
.option("numPartitions", "16")
.load()
This configuration works perfectly fine, enabling Spark to read the data concurrently using 16 partitions. Then, you can write while partitioning by whichever criteria you require.