Source code for idm_broker.consumer

import logging
import threading

import celery.app
from django.apps import apps
from django.conf import settings
from django.db import connection
from kombu.mixins import ConsumerMixin
from kombu import Message


logger = logging.getLogger(__name__)


[docs]class BrokerTaskConsumer(ConsumerMixin): """ A kombu :py:class:`kombu.mixins.ConsumerMixin` that consumes from queues declared in ``settings.IDM_BROKER['CONSUMERS']``, and dispatches each message received to one or more configured celery tasks. """ ready = None # type: threading.Event def __init__(self): super().__init__() self.ready = threading.Event() def __call__(self): idm_broker_config = apps.get_app_config('idm_broker') with idm_broker_config.broker.acquire(block=True) as conn: self.connection = conn self.run()
[docs] def run(self, _tokens=1, **kwargs): try: super().run(_tokens=_tokens, **kwargs) finally: connection.close()
[docs] def get_consumers(self, Consumer, channel): consumers = [] for c in settings.IDM_BROKER['CONSUMERS']: callbacks = [self.get_callback(name) for name in c['tasks']] consumers.append(Consumer(queues=c['queues'], accept=c.get('accept'), callbacks=callbacks, auto_declare=True)) return consumers
[docs] def on_consume_ready(self, connection, channel, consumers, **kwargs): super().on_consume_ready(connection, channel, consumers, **kwargs) self.ready.set()
@classmethod
[docs] def get_callback(cls, task_name): def f(body, message: Message): celery_app = celery.app.default_app # type: celery.app.base.App try: celery_app.send_task(task_name, kwargs={ 'body': body, 'delivery_info': message.delivery_info, 'content_type': message.content_type, 'properties': message.properties, 'headers': message.headers, }) except Exception: # pragma: nocover message.reject() logger.exception("Couldn't send task for '%s' (%s, %s)", task_name, message.delivery_tag, message.delivery_info['routing_key'], extra={ 'body': body, 'delivery_info': message.delivery_info, }) else: message.ack() logger.debug("Sent task for '%s' (%s, %s)", task_name, message.delivery_tag, message.delivery_info['routing_key'], extra={ 'body': body, 'delivery_info': message.delivery_info, }) return f