python - Can someone explain how this attribute self.keep_alive works and being applied in ServiceBusSender? - Stack Overflow

admin2025-05-02  2

I am trying to make ServiceBusSender connection last more than 5 minutes without pinging the Sender.

Basically I am trying to make my ServiceBusSender connection last longer. Right now I am applying pinging technique:

if time.time()-last_ping_time >= 200:
        logging.debug(f"trying to ping service bus {QUEUE_DATA['WorkersName']}")
        trash_message = ServiceBusMessage(b"")
        trash_message.application_properties = {b"SiteEnv":b"trash"}
        sender.send_messages(trash_message)
        last_ping_time=time.time()
        logging.debug("pinged service bus")

But I want to get rid of it and I'm just looking for some attributes I can pass in ServiceBusSender to make connection last for more than 5 minutes. So can someone explain whether passing the keep_alive attribute to ServiceBusSender (will dive in class Configuration with this attribute: self.keep_alive = kwargs.get("keep_alive", 30)) and setting it to, say 1 hour will do any good for solving my problem?

UPD: Throughout the day I understood that keep_alive is a kind of pinging mechanism under the hood, but the issue still remains: can I use it like this and remove mine pinging mechanism so the connection won't drop?

sender=ServiceBusSender._from_connection_string(topic_name="SOME_TOPIC_NAME",
                                                conn_str="SOME_CONNECTION_STRING",
                                                transport_type=TransportType.AmqpOverWebsocket,
                                                keep_alive=200)

UPD 2: keep_alive only affects AMQP and not affecting the TCP...

UPD 3: I've examined logs of my app. As you can see reciever and sender was initiated. Based on the time of logs there is question coming up: why the sender and reciever could be idle for long time before first requests made, but after the first request connection closes after 15 minutes with the ConnectionResetError? How can I make sender and reciever connection last for the whole duration of the program without pinging it with messages?

2025-01-02 22:07:20,523 - DEBUG - got bool_launch_service_bus True so launching service bus
2025-01-02 22:07:20,523 - DEBUG - Launch time: 02/01/2025 22:07:20
2025-01-02 22:07:21,388 - INFO - RECEIVER: initiation was successful
2025-01-02 22:07:21,400 - INFO - SENDER: initiation was successful
2025-01-02 22:34:30,996 - DEBUG - Received request 
2025-01-02 22:34:30,997 - DEBUG - MessageId: id - CorrelationId id: 
2025-01-02 22:34:30,997 - DEBUG - Trying to call for CorrelationId: id
2025-01-02 22:34:30,997 - INFO - executing func()
2025-01-02 22:34:31,204 - DEBUG - Finished with CorrelationId: id time: 0.2070004940032959
2025-01-02 22:34:31,694 - DEBUG - full response creation time for id: 0.6970000267028809
2025-01-02 22:34:32,626 - INFO - finished id
2025-01-02 22:35:30,591 - INFO - RECEIVER: message with id: id complete by distributive service
2025-01-02 22:50:38,572 - DEBUG - Transport read failed: ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)

I am trying to make ServiceBusSender connection last more than 5 minutes without pinging the Sender.

Basically I am trying to make my ServiceBusSender connection last longer. Right now I am applying pinging technique:

if time.time()-last_ping_time >= 200:
        logging.debug(f"trying to ping service bus {QUEUE_DATA['WorkersName']}")
        trash_message = ServiceBusMessage(b"")
        trash_message.application_properties = {b"SiteEnv":b"trash"}
        sender.send_messages(trash_message)
        last_ping_time=time.time()
        logging.debug("pinged service bus")

But I want to get rid of it and I'm just looking for some attributes I can pass in ServiceBusSender to make connection last for more than 5 minutes. So can someone explain whether passing the keep_alive attribute to ServiceBusSender (will dive in class Configuration with this attribute: self.keep_alive = kwargs.get("keep_alive", 30)) and setting it to, say 1 hour will do any good for solving my problem?

UPD: Throughout the day I understood that keep_alive is a kind of pinging mechanism under the hood, but the issue still remains: can I use it like this and remove mine pinging mechanism so the connection won't drop?

sender=ServiceBusSender._from_connection_string(topic_name="SOME_TOPIC_NAME",
                                                conn_str="SOME_CONNECTION_STRING",
                                                transport_type=TransportType.AmqpOverWebsocket,
                                                keep_alive=200)

UPD 2: keep_alive only affects AMQP and not affecting the TCP...

UPD 3: I've examined logs of my app. As you can see reciever and sender was initiated. Based on the time of logs there is question coming up: why the sender and reciever could be idle for long time before first requests made, but after the first request connection closes after 15 minutes with the ConnectionResetError? How can I make sender and reciever connection last for the whole duration of the program without pinging it with messages?

2025-01-02 22:07:20,523 - DEBUG - got bool_launch_service_bus True so launching service bus
2025-01-02 22:07:20,523 - DEBUG - Launch time: 02/01/2025 22:07:20
2025-01-02 22:07:21,388 - INFO - RECEIVER: initiation was successful
2025-01-02 22:07:21,400 - INFO - SENDER: initiation was successful
2025-01-02 22:34:30,996 - DEBUG - Received request 
2025-01-02 22:34:30,997 - DEBUG - MessageId: id - CorrelationId id: 
2025-01-02 22:34:30,997 - DEBUG - Trying to call for CorrelationId: id
2025-01-02 22:34:30,997 - INFO - executing func()
2025-01-02 22:34:31,204 - DEBUG - Finished with CorrelationId: id time: 0.2070004940032959
2025-01-02 22:34:31,694 - DEBUG - full response creation time for id: 0.6970000267028809
2025-01-02 22:34:32,626 - INFO - finished id
2025-01-02 22:35:30,591 - INFO - RECEIVER: message with id: id complete by distributive service
2025-01-02 22:50:38,572 - DEBUG - Transport read failed: ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)
Share Improve this question edited Jan 2 at 21:18 Ivan asked Jan 2 at 12:10 IvanIvan 11 bronze badge 1
  • A Keep-Alive mechanism in messaging systems ensures that connections remain active, preventing timeouts and disconnections due to inactivity – Sampath Commented Jan 3 at 9:41
Add a comment  | 

1 Answer 1

Reset to default 0

A Keep-Alive mechanism in messaging systems ensures that connections remain active, preventing timeouts and disconnections due to inactivity.

Set keep_alive for AMQP and long-lived TCP keep-alive mechanisms to prevent premature connection resets.

For connection Pooling and Reusability reuse a single ServiceBusSender instance across your application to avoid frequent reconnections.

Azure Service Bus enforces a 10-minute idle timeout on AMQP links to conserve resources.

Refer this doc for more about idle timeout .

To reslove it keep the connection active by sending or receiving messages periodically and use the AutoCompleteMessages or ReceiveMode.PeekLock to maintain activity in receivers.Use features like Auto Lock Renewal to extend locks and keep the session alive.

Refer to this doc for Keep alive failure

import time
import logging
from azure.servicebus import ServiceBusSender, ServiceBusMessage, TransportType
from azure.core.exceptions import AzureError
logging.basicConfig(level=logging.DEBUG)

def create_sender(connection_string, topic_name, keep_alive=200):
    """
    Creates and returns a ServiceBusSender instance with AMQP keep-alive.
    """
    try:
        sender = ServiceBusSender._from_connection_string(
            topic_name=topic_name,
            conn_str=connection_string,
            transport_type=TransportType.AmqpOverWebsocket, 
            keep_alive=keep_alive  
        )
        logging.info(f"ServiceBusSender created successfully for topic: {topic_name}")
        return sender
    except AzureError as e:
        logging.error(f"Error creating ServiceBusSender: {e}")
        raise


def send_message(sender, message_body, application_properties=None):
    """
    Sends a message to Azure Service Bus. Used here to send a message or to ping the connection.
    """
    try:
        message = ServiceBusMessage(
            message_body,
            application_properties=application_properties
        )
        sender.send_messages(message)
        logging.info(f"Message sent: {message_body}")
    except AzureError as e:
        logging.error(f"Error sending message: {e}")


def main():
    connection_string = "connection string"
    topic_name = "ravi"
    keep_alive = 200  
    sender = create_sender(connection_string, topic_name, keep_alive)
    
    last_ping_time = time.time()

    while True:
        try
            if time.time() - last_ping_time >= 200:
                logging.debug("Sending keep-alive ping...")
                send_message(sender, b"")
                last_ping_time = time.time()  
            send_message(sender, b"Real message payload", application_properties={b"SiteEnv": b"production"})
            time.sleep(10)

        except ConnectionResetError as e:
            logging.error(f"ConnectionResetError: {e}. Recreating sender...")
            sender = create_sender(****connection_string, topic_name, keep_alive) 

        except AzureError as e:
            logging.error(f"Azure error occurred: {e}")
            break 

if __name__ == "__main__":
    main()

Output

转载请注明原文地址:http://www.anycun.com/QandA/1746121359a91966.html