python - PySpark error converting Pandas float Series to Arrow Array (string) - Stack Overflow

admin2025-04-16  4

I've been asked to debug some code which pulls data from an API, converts it from JSON to Pandas, Pandas to Spark, and then writes to a table. The following error occurs when it tries to convert from Pandas to Spark:
Exception thrown when converting pandas.Series (float64) with name 'latitude' to Arrow Array (string).
This leads to a loss of data in the table to which it writes: the API is known to contain ~30,000 records, but the end table only has 11,000 written to it. I believe this is due to the function process_and_write_page() failing mid-way down a page and moving onto the next one.

Code is included below - unfortunately I've had to anonymise the API details, but hopefully this gives enough information to diagnose the problem.

import requests, json, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

import pandas as pd
from tqdm import tqdm

# Initialize Spark session with Arrow optimization and schema auto-merge enabled
spark = SparkSession.builder.appName("app") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

# API details
base_api_url = "https://an/api/url"
api_key = "api12345"

# Headers for the API request
headers = {
    "subscriptionKey": api_key,
    "userAgent": "aUserAgent"
}

# Create a session
session = requests.Session()
session.headers.update(headers)

# Function to fetch a page of IDs
def fetch_ids(page=1, per_page=100, get_total=False):
    try:
        response = session.get(f"{base_api_url}?page={page}&perPage={per_page}", timeout=30)
        response.raise_for_status()
        data = response.json()
        if get_total:
            return data['totalPages']
        ids = [customer['id'] for customer in data['customers']]
        next_page_uri = data.get('nextPageUri')
        return ids, next_page_uri
    except requests.exceptions.RequestException as e:
        print(f"Failed to fetch data for page {page}: {e}")
        return [], None

# Function to fetch detailed information for a single customer with retry 
def fetch_details(id, retries=5):
    for attempt in range(retries):
        try:
            response = session.get(f"{base_api_url}/{id}", timeout=30)
            response.raise_for_status()
            data = response.json()
 
            basic_data = {
                "id": data.get("id"),
                "type": data.get("type"),
                "name": data.get("name"),
                "latitude": data.get("latitude"),
                "longitude": data.get("longitude"),
                "alsoKnownAs": data.get("alsoKnownAs")
            }
            return (basic_data)

        except requests.exceptions.RequestException as e:
            if hasattr(response, 'status_code') and response.status_code == 429:
                time.sleep(1.33 ** attempt)
            else:
                print(f"Request exception for customer {id}: {e}")
                break
    return None

# Function to process and write a page of customer data
def process_and_write_page(ids):
    basic_data_list = []
    inspection_areas_list = []
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_id = {executor.submit(fetch_location_details, id): id for id in ids}
        for future in as_completed(future_to_id):
            id = future_to_id[future]
            try:
                basic_data = future.result()
                if basic_data:
                    basic_data_list.append(basic_data)
                        })
                    
            except Exception as e:
                print(f"Exception fetching details for {id}: {e}")

    basic_schema = StructType([
        StructField("id", StringType(), True),
        StructField("type", StringType(), True),
        StructField("name", StringType(), True),
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("alsoKnownAs", StringType(), True)
    ])   

    basic_df = pd.DataFrame(basic_data_list)

    if not basic_df.empty:
            # Problem seems to occur here, with createDataFrame()
            basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
            basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")          

total_pages = fetch_ids(get_total=True)
display(total_pages) 
# 292 pages - at 100 records per page, should return ~29200 reocrds (API contains 29126)
# (Only ~11,000 records in table)

# Fetch and process data page by page
for p in tqdm(range(1, total_pages+1), desc='Fetching pages', unit='page'):
    ids, next_page_uri = fetch_ids(page=p)
    if not ids or not next_page_uri:
        display("Invalid ids or next_page_uri value")
        break
    # Directly process and write page data
    process_and_write_page(ids)

I have tried changing the latitude and longitude elements in basic_schema to FloatType; however this gives a different error:
AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'latitude' and 'latitude'

I've been asked to debug some code which pulls data from an API, converts it from JSON to Pandas, Pandas to Spark, and then writes to a table. The following error occurs when it tries to convert from Pandas to Spark:
Exception thrown when converting pandas.Series (float64) with name 'latitude' to Arrow Array (string).
This leads to a loss of data in the table to which it writes: the API is known to contain ~30,000 records, but the end table only has 11,000 written to it. I believe this is due to the function process_and_write_page() failing mid-way down a page and moving onto the next one.

Code is included below - unfortunately I've had to anonymise the API details, but hopefully this gives enough information to diagnose the problem.

import requests, json, time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType

import pandas as pd
from tqdm import tqdm

# Initialize Spark session with Arrow optimization and schema auto-merge enabled
spark = SparkSession.builder.appName("app") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true") \
    .getOrCreate()

# API details
base_api_url = "https://an/api/url"
api_key = "api12345"

# Headers for the API request
headers = {
    "subscriptionKey": api_key,
    "userAgent": "aUserAgent"
}

# Create a session
session = requests.Session()
session.headers.update(headers)

# Function to fetch a page of IDs
def fetch_ids(page=1, per_page=100, get_total=False):
    try:
        response = session.get(f"{base_api_url}?page={page}&perPage={per_page}", timeout=30)
        response.raise_for_status()
        data = response.json()
        if get_total:
            return data['totalPages']
        ids = [customer['id'] for customer in data['customers']]
        next_page_uri = data.get('nextPageUri')
        return ids, next_page_uri
    except requests.exceptions.RequestException as e:
        print(f"Failed to fetch data for page {page}: {e}")
        return [], None

# Function to fetch detailed information for a single customer with retry 
def fetch_details(id, retries=5):
    for attempt in range(retries):
        try:
            response = session.get(f"{base_api_url}/{id}", timeout=30)
            response.raise_for_status()
            data = response.json()
 
            basic_data = {
                "id": data.get("id"),
                "type": data.get("type"),
                "name": data.get("name"),
                "latitude": data.get("latitude"),
                "longitude": data.get("longitude"),
                "alsoKnownAs": data.get("alsoKnownAs")
            }
            return (basic_data)

        except requests.exceptions.RequestException as e:
            if hasattr(response, 'status_code') and response.status_code == 429:
                time.sleep(1.33 ** attempt)
            else:
                print(f"Request exception for customer {id}: {e}")
                break
    return None

# Function to process and write a page of customer data
def process_and_write_page(ids):
    basic_data_list = []
    inspection_areas_list = []
    
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_id = {executor.submit(fetch_location_details, id): id for id in ids}
        for future in as_completed(future_to_id):
            id = future_to_id[future]
            try:
                basic_data = future.result()
                if basic_data:
                    basic_data_list.append(basic_data)
                        })
                    
            except Exception as e:
                print(f"Exception fetching details for {id}: {e}")

    basic_schema = StructType([
        StructField("id", StringType(), True),
        StructField("type", StringType(), True),
        StructField("name", StringType(), True),
        StructField("latitude", StringType(), True),
        StructField("longitude", StringType(), True),
        StructField("alsoKnownAs", StringType(), True)
    ])   

    basic_df = pd.DataFrame(basic_data_list)

    if not basic_df.empty:
            # Problem seems to occur here, with createDataFrame()
            basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
            basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")          

total_pages = fetch_ids(get_total=True)
display(total_pages) 
# 292 pages - at 100 records per page, should return ~29200 reocrds (API contains 29126)
# (Only ~11,000 records in table)

# Fetch and process data page by page
for p in tqdm(range(1, total_pages+1), desc='Fetching pages', unit='page'):
    ids, next_page_uri = fetch_ids(page=p)
    if not ids or not next_page_uri:
        display("Invalid ids or next_page_uri value")
        break
    # Directly process and write page data
    process_and_write_page(ids)

I have tried changing the latitude and longitude elements in basic_schema to FloatType; however this gives a different error:
AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'latitude' and 'latitude'

Share Improve this question asked Feb 3 at 16:43 CrowsNoseCrowsNose 1011 silver badge11 bronze badges 3
  • 2 Is it possible to test a sample dataset rather than trying with entire ~30000 rows? Mostly looks like a data issue, my suggestion would be to narrow down a sample dataset with fewer rows and then try debugging it. – samhita Commented Feb 3 at 16:57
  • 4 Agree with @samhita. You can try .astype(str) on latitude (and probably longitude) column before converting to Spark. Also, could you update your post with the output of print(basic_df)? (or print(basic_df.head())) – Paulo Marques Commented Feb 3 at 17:26
  • 1 Thank you @samhita and @Paulo Marques - pleased to say that .astype(str) has solved the problem. I don't know why I didn't think to do the conversion in pandas earlier. – CrowsNose Commented Feb 5 at 10:16
Add a comment  | 

1 Answer 1

Reset to default 1

With thanks to @Paulo Marques for the suggestion: I converted the latitude and longitude fields to str type while the data were still in a pandas dataframe.\

The block that converts to Spark and writes to table now looks like this:

basic_df = pd.DataFrame(basic_data_list)
basic_df.latitude = basic_df.latitude.astype(str)
basic_df.longitude = basic_df.longitude.astype(str)

    if not basic_df.empty:
            basic_spark_df = spark.createDataFrame(basic_df, schema=basic_schema)
            basic_spark_df.write.mode("append").option("mergeSchema", "true").format("delta").saveAsTable("api_test")          

And all the data now pull through without any errors.

转载请注明原文地址:http://www.anycun.com/QandA/1744760279a87229.html