Build your message queue poller from scratch
When building nearline data pipeline, a common pattern is creating a poller that continue polling from a message queue. Although there are lots of available tools/frameworks out there, it would still be fun to write one yourself!
Assume you are tasked with writing a poller that will keep polling a Kafka topic or AWS SQS and then apply certain operation on top of it how would you do it?
The first idea would be just doing a while loop and call poll message API like below
def apply(): your apply code def poll(): while True: messages = client.poll(timeout=1.0) for msg in messages: apply()
But what are the problems here. First, this is a synchronous function which means we are processing message one by one. And the poller can not move to next batch of messages until all the Apply() calls are done. In a real world example, the apply could be a time consuming jobs and our poll will simply get stuck there for hours!
What can we do with that? Maybe we can create a new thread for each of the job?
def poll(): while True: messages = client.poll(timeout=1.0) for msg in messages: t = threading.Thread(target = apply, args = ( )) t.start()
This definitely help us speed up things but we may create too many threads than we want. Assume your ingestion rate is 1k/s , are we really ok with forking 1k worker threads per second!?
A more elegant way of doing this is by using Executor service with a fixed size threads pool
executor = ThreadPoolExecutor(max_workers=max_threads) def poll(): while True: messages = client.poll(timeout=1.0) for msg in messages: executor.submit(apply, msg)
Aha, this seems much better! But what if the apply methods fail? we want to make our poll method robust so how about we provide some default try catch. This basically means our poll will call the default implementation of _process that will call your apply method in your handler
executor = ThreadPoolExecutor(max_workers=max_threads) # Handler Interface class Handler(ABC): @abstractmethod def apply(self, message): pass # Example Hello World Handler class HelloWorldHandler(Handler): def process_message(self, message): print(f"Processing message: {message}") print("Hello World!") def _process(self, handler, message): try: handler.apply() except Exception as e: logger.warning(f"Attempt {attempt} failed: {str(e)}") def poll(handler): while True: messages = client.poll(timeout=1.0) for msg in messages: executor.submit(_process, handler, msg)
The above code is a little bit messy, maybe we could have made a poll a poller class. If we refactor our code a little bit, this is what we will have
# Handler Interface class Handler(ABC): @abstractmethod def apply(self, message): pass class Poller: def __init__( self, topic, max_threads, kafka_client, handler ): self.topic = topic self.client = kafka_client self.handler = handler self.client.subscribe([self.topic]) self.executor = ThreadPoolExecutor(max_workers=max_threads) def _process(self, message): try: self.handler.apply() except Exception as e: logger.warning(f"Attempt {attempt} failed: {str(e)}") def poll(self): while True: messages = self.client.poll(timeout=1.0) for msg in messages: self.executor.submit(self._process, msg)
Is there anything we can do better here? How about we add something called retry policy? For new in our _process method, we just do a simple try except block. Let's add another interface called retry policy and implement a simple linear retry
class RetryPolicy(ABC): @abstractmethod def get_delay(self, attempt): """Calculate retry delay in seconds""" pass class LinearRetryPolicy(RetryPolicy): def __init__(self, base_delay): if base_delay <= 0: raise ValueError("Delay must be positive") self.base_delay = base_delay def get_delay(self, attempt): return self.base_delay * attempt
After the retry policy, we need to update how we implement our _process method
def _process_with_retry(self, message): """Retry logic wrapper""" for attempt in range(1, self.max_attempts + 1): try: if self.handler.process_message(message): return raise RuntimeError("Handler reported failure") except Exception as e: logger.warning(f"Attempt {attempt} failed: {str(e)}") if attempt == self.max_attempts: break time.sleep(self.retry_policy.get_delay(attempt))
But what if the after all the retry, we still can't process the message? In this case we can either skip it or add the message to a DLQ(Dead letter Queue)
This will be what our final version may look like with DLQ! If you have read this far, Hope you enjoy this article and let me know your thoughts of building poller for queues!
import time import logging from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor from queue import Queue # Configure logging logger = logging.getLogger(__name__) # Handler Interface class Handler(ABC): @abstractmethod def process_message(self, message): """Process a message and return a success status (True/False)""" pass # Example Hello World Handler class HelloWorldHandler(Handler): def process_message(self, message): try: # Main processing logic print(f"Processing message: {message}") # Post-processing action print("Hello World!") return True except Exception as e: logger.error(f"Processing failed: {e}") return False # Retry Policy Implementations class RetryPolicy(ABC): @abstractmethod def get_delay(self, attempt): """Calculate retry delay in seconds""" pass class LinearRetryPolicy(RetryPolicy): def __init__(self, base_delay): if base_delay <= 0: raise ValueError("Delay must be positive") self.base_delay = base_delay def get_delay(self, attempt): return self.base_delay * attempt class ExponentialRetryPolicy(RetryPolicy): def __init__(self, base_delay, factor=2): self.base_delay = base_delay self.factor = factor def get_delay(self, attempt): return self.base_delay * (self.factor ** (attempt - 1)) # Kafka Poller Implementation class Poller: def __init__( self, topic, max_threads, kafka_client, handler, retry_policy, max_attempts=3, dead_letter_queue=None ): self.topic = topic self.client = kafka_client self.handler = handler self.retry_policy = retry_policy self.max_attempts = max_attempts self.dead_letter_queue = dead_letter_queue self._shutdown = False self.client.subscribe([self.topic]) self.executor = ThreadPoolExecutor(max_workers=max_threads) def run(self): """Main polling loop""" logger.info(f"Starting poller for topic: {self.topic}") while not self._shutdown: try: messages = self.client.poll(timeout=1.0) for msg in messages: self.executor.submit(self._process_with_retry, msg) except Exception as e: logger.error(f"Polling error: {e}") def _process_with_retry(self, message): """Retry logic wrapper""" for attempt in range(1, self.max_attempts + 1): try: if self.handler.process_message(message): return raise RuntimeError("Handler reported failure") except Exception as e: logger.warning(f"Attempt {attempt} failed: {str(e)}") if attempt == self.max_attempts: self._handle_failure(message) break time.sleep(self.retry_policy.get_delay(attempt)) def _handle_failure(self, message): """Dead letter queue handling""" if self.dead_letter_queue: try: self.dead_letter_queue.put(message) logger.info("Message sent to dead letter queue") except Exception as e: logger.error(f"Dead letter queue error: {e}") else: logger.warning("Permanently failed message discarded") def shutdown(self): """Graceful shutdown""" logger.info("Shutting down poller...") self._shutdown = True self.executor.shutdown(wait=False) # Example Usage if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Configuration poller = Poller( topic="demo_topic", max_threads=2, kafka_client= KafkaClient(), handler=HelloWorldHandler(), retry_policy=ExponentialRetryPolicy(base_delay=0.5), dead_letter_queue=Queue() ) try: poller.run() except KeyboardInterrupt: poller.shutdown()
Cheers!
Yijie :)