I've been asked to debug some code which pulls data from an API, converts it from JSON to Pandas, Pandas to Spark, and then writes to a table. The following error occurs when it tries to convert from Pandas to Spark:
Exception thrown when converting pandas.Series (float64) with name 'latitude' to Arrow Array (string).
This leads to a loss of data in the table to which it writes: the API is known to contain ~30,000 records, but the end table only has 11,000 written to it. I believe this is due to the function process_and_write_page()
failing mid-way down a page and moving onto the next one.
Code is included below - unfortunately I've had to anonymise the API details, but hopefully this gives enough information to diagnose the problem.
import requests, json, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import pandas as pd
from tqdm import tqdm
# Initialize Spark session with Arrow optimization and schema auto-merge enabled
spark = SparkSession.builder.appName("app") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.getOrCreate()
# API details
base_api_url = "https://an/api/url"
api_key = "api12345"
# Headers for the API request
headers = {
"subscriptionKey": api_key,
"userAgent": "aUserAgent"
}
# Create a session
session = requests.Session()
session.headers.update(headers)
# Function to fetch a page of IDs
def fetch_ids(page=1, per_page=100, get_total=False):
try:
response = session.get(f"{base_api_url}?page={page}&perPage={per_page}", timeout=30)
response.raise_for_status()
data = response.json()
if get_total:
return data['totalPages']
ids = [customer['id'] for customer in data['customers']]
next_page_uri = data.get('nextPageUri')
return ids, next_page_uri
except requests.exceptions.RequestException as e:
print(f"Failed to fetch data for page {page}: {e}")
return [], None
# Function to fetch detailed information for a single customer with retry
def fetch_details(id, retries=5):
for attempt in range(retries):
try:
response = session.get(f"{base_api_url}/{id}", timeout=30)
response.raise_for_status()
data = response.json()
basic_data = {
"id": data.get("id"),
"type": data.get("type"),
"name": data.get("name"),
"latitude": data.get("latitude"),
"longitude": data.get("longitude"),
"alsoKnownAs": data.get("alsoKnownAs")
}
return (basic_data)
except requests.exceptions.RequestException as e:
if hasattr(response, 'status_code') and response.status_code == 429:
time.sleep(1.33 ** attempt)
else:
print(f"Request exception for customer {id}: {e}")
break
return None
# Function to process and write a page of customer data
def process_and_write_page(ids):
basic_data_list = []
inspection_areas_list = []
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_id = {executor.submit(fetch_location_details, id): id for id in ids}
for future in as_completed(future_to_id):
id = future_to_id[future]
try:
basic_data = future.result()
if basic_data:
basic_data_list.append(basic_data)
})
except Exception as e:
print(f"Exception fetching details for {id}: {e}")
basic_schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("name", StringType(), True),
StructField("latitude", StringType(), True),
StructField("longitude", StringType(), True),
StructField("alsoKnownAs", StringType(), True)
])
basic_df = pd.DataFrame(basic_data_list)
if not basic_df.empty:
# Problem seems to occur here, with createDataFrame()
basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")
total_pages = fetch_ids(get_total=True)
display(total_pages)
# 292 pages - at 100 records per page, should return ~29200 reocrds (API contains 29126)
# (Only ~11,000 records in table)
# Fetch and process data page by page
for p in tqdm(range(1, total_pages+1), desc='Fetching pages', unit='page'):
ids, next_page_uri = fetch_ids(page=p)
if not ids or not next_page_uri:
display("Invalid ids or next_page_uri value")
break
# Directly process and write page data
process_and_write_page(ids)
I have tried changing the latitude
and longitude
elements in basic_schema
to FloatType
; however this gives a different error:
AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'latitude' and 'latitude'
I've been asked to debug some code which pulls data from an API, converts it from JSON to Pandas, Pandas to Spark, and then writes to a table. The following error occurs when it tries to convert from Pandas to Spark:
Exception thrown when converting pandas.Series (float64) with name 'latitude' to Arrow Array (string).
This leads to a loss of data in the table to which it writes: the API is known to contain ~30,000 records, but the end table only has 11,000 written to it. I believe this is due to the function process_and_write_page()
failing mid-way down a page and moving onto the next one.
Code is included below - unfortunately I've had to anonymise the API details, but hopefully this gives enough information to diagnose the problem.
import requests, json, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
import pandas as pd
from tqdm import tqdm
# Initialize Spark session with Arrow optimization and schema auto-merge enabled
spark = SparkSession.builder.appName("app") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
.getOrCreate()
# API details
base_api_url = "https://an/api/url"
api_key = "api12345"
# Headers for the API request
headers = {
"subscriptionKey": api_key,
"userAgent": "aUserAgent"
}
# Create a session
session = requests.Session()
session.headers.update(headers)
# Function to fetch a page of IDs
def fetch_ids(page=1, per_page=100, get_total=False):
try:
response = session.get(f"{base_api_url}?page={page}&perPage={per_page}", timeout=30)
response.raise_for_status()
data = response.json()
if get_total:
return data['totalPages']
ids = [customer['id'] for customer in data['customers']]
next_page_uri = data.get('nextPageUri')
return ids, next_page_uri
except requests.exceptions.RequestException as e:
print(f"Failed to fetch data for page {page}: {e}")
return [], None
# Function to fetch detailed information for a single customer with retry
def fetch_details(id, retries=5):
for attempt in range(retries):
try:
response = session.get(f"{base_api_url}/{id}", timeout=30)
response.raise_for_status()
data = response.json()
basic_data = {
"id": data.get("id"),
"type": data.get("type"),
"name": data.get("name"),
"latitude": data.get("latitude"),
"longitude": data.get("longitude"),
"alsoKnownAs": data.get("alsoKnownAs")
}
return (basic_data)
except requests.exceptions.RequestException as e:
if hasattr(response, 'status_code') and response.status_code == 429:
time.sleep(1.33 ** attempt)
else:
print(f"Request exception for customer {id}: {e}")
break
return None
# Function to process and write a page of customer data
def process_and_write_page(ids):
basic_data_list = []
inspection_areas_list = []
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_id = {executor.submit(fetch_location_details, id): id for id in ids}
for future in as_completed(future_to_id):
id = future_to_id[future]
try:
basic_data = future.result()
if basic_data:
basic_data_list.append(basic_data)
})
except Exception as e:
print(f"Exception fetching details for {id}: {e}")
basic_schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("name", StringType(), True),
StructField("latitude", StringType(), True),
StructField("longitude", StringType(), True),
StructField("alsoKnownAs", StringType(), True)
])
basic_df = pd.DataFrame(basic_data_list)
if not basic_df.empty:
# Problem seems to occur here, with createDataFrame()
basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")
total_pages = fetch_ids(get_total=True)
display(total_pages)
# 292 pages - at 100 records per page, should return ~29200 reocrds (API contains 29126)
# (Only ~11,000 records in table)
# Fetch and process data page by page
for p in tqdm(range(1, total_pages+1), desc='Fetching pages', unit='page'):
ids, next_page_uri = fetch_ids(page=p)
if not ids or not next_page_uri:
display("Invalid ids or next_page_uri value")
break
# Directly process and write page data
process_and_write_page(ids)
I have tried changing the latitude
and longitude
elements in basic_schema
to FloatType
; however this gives a different error:
AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'latitude' and 'latitude'
With thanks to @Paulo Marques for the suggestion: I converted the latitude and longitude fields to str
type while the data were still in a pandas dataframe.\
The block that converts to Spark and writes to table now looks like this:
basic_df = pd.DataFrame(basic_data_list)
basic_df.latitude = basic_df.latitude.astype(str)
basic_df.longitude = basic_df.longitude.astype(str)
if not basic_df.empty:
basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")
And all the data now pull through without any errors.
.astype(str)
on latitude (and probably longitude) column before converting to Spark. Also, could you update your post with the output ofprint(basic_df)
?(or print(basic_df.head()))
– Paulo Marques Commented Feb 3 at 17:26.astype(str)
has solved the problem. I don't know why I didn't think to do the conversion in pandas earlier. – CrowsNose Commented Feb 5 at 10:16