I am facing an issue with Azure Databricks and Kafka integration. I would like to set up a readStream, but receiving the following error:
java.util.concurrent.ExecutionException: kafkashaded.apache.kafkamon.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
Authentication should be successful; the Azure DBX workspace, and the Confluent kafka service is in the same resource group. The code would be the following:
df = spark \ .readStream \ .format('kafka') \ .option('kafka.bootstrap.servers', '...:...') \ .option('subscribe', 'transaction_stream') \ .option('group.id', 'dbx-streaming') \ .option('startingOffsets', 'earliest') \ .option('kafka.security.protocol', 'SASL_PLAINTEXT') \ .option('kafka.sasl.mechanism', 'PLAIN') \ .option('kafka.sasl.jaas.config', \ """kafkashaded.apache.kafkamon.security.plain.PlainLoginModule required username="..." password="...";""") \ .option('kafka.request.timeout.ms', '10000') \ .option("kafka.enable.idempotence", "false") \ .load()
Do you have any idea? Tried experimenting with ssl / https connections so far.
Thanks in advance!
Using .option("kafka.ssl.endpoint.identification.algorithm", "https")
, and code from docs, which runs on auth error.
I am facing an issue with Azure Databricks and Kafka integration. I would like to set up a readStream, but receiving the following error:
java.util.concurrent.ExecutionException: kafkashaded.org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
Authentication should be successful; the Azure DBX workspace, and the Confluent kafka service is in the same resource group. The code would be the following:
df = spark \ .readStream \ .format('kafka') \ .option('kafka.bootstrap.servers', '...:...') \ .option('subscribe', 'transaction_stream') \ .option('group.id', 'dbx-streaming') \ .option('startingOffsets', 'earliest') \ .option('kafka.security.protocol', 'SASL_PLAINTEXT') \ .option('kafka.sasl.mechanism', 'PLAIN') \ .option('kafka.sasl.jaas.config', \ """kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="..." password="...";""") \ .option('kafka.request.timeout.ms', '10000') \ .option("kafka.enable.idempotence", "false") \ .load()
Do you have any idea? Tried experimenting with ssl / https connections so far.
Thanks in advance!
Using .option("kafka.ssl.endpoint.identification.algorithm", "https")
, and code from docs, which runs on auth error.
I've solved the issue by installing the following libraries to the cluster.