Source code for idm_broker.apps

from urllib.parse import urljoin

import collections
import kombu
from dirtyfields import DirtyFieldsMixin
from django.apps import AppConfig
from django.conf import settings
from django.db import connection
from django.db.models.signals import pre_delete, post_save
from rest_framework.renderers import JSONRenderer, BaseRenderer
from rest_framework.serializers import BaseSerializer

from kombu import Connection
from kombu.pools import connections


class _FakeRequest(object):
    @classmethod
    def build_absolute_uri(cls, url):
        return urljoin(settings.API_BASE, url)

    GET = {}


[docs]class IDMBrokerConfig(AppConfig): name = 'idm_broker' _notification_registry = collections.defaultdict(list) _related_notification_registry = collections.defaultdict(list) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.broker = connections[Connection(hostname=settings.BROKER_HOSTNAME, ssl=settings.BROKER_SSL, virtual_host=settings.BROKER_VHOST, userid=settings.BROKER_USERNAME, password=settings.BROKER_PASSWORD, transport=settings.BROKER_TRANSPORT)] self.broker_prefix = settings.BROKER_PREFIX
[docs] def ready(self): post_save.connect(self._instance_changed) pre_delete.connect(self._instance_deleted)
[docs] def register_notification(self, *, serializer: BaseSerializer, exchange, model=None, renderer: BaseRenderer=JSONRenderer): if model is None: model = serializer.Meta.model if not isinstance(exchange, kombu.Exchange): exchange = kombu.Exchange(settings.BROKER_PREFIX + exchange, 'topic', durable=True) with self.broker.acquire(block=True) as conn: exchange(conn).declare() self._notification_registry[model].append((serializer, renderer, exchange))
[docs] def register_notifications(self, registrations): for registration in registrations: self.register_notification(**registration)
def _publish_change(self, sender, instance, **kwargs): needs_publish = instance._needs_publish instance._needs_publish = set() for serializer, renderer, exchange in self._notification_registry[sender]: if 'created' in needs_publish and 'deleted' in needs_publish: return elif not needs_publish: return elif 'deleted' in needs_publish: publish_type = 'deleted' elif 'created' in needs_publish: publish_type = 'created' else: publish_type = 'changed' serializer = serializer(context={'request': _FakeRequest()}) renderer = renderer() # type: BaseRenderer with self.broker.acquire(block=True) as conn: exchange = exchange(conn) representation = serializer.to_representation(instance) exchange.publish(exchange.Message(renderer.render(representation), content_type=renderer.media_type), routing_key='{}.{}.{}'.format(representation.get('@type') or type(instance).__name__, publish_type, instance.pk)) def _needs_publish(self, instance, publish_type): sender = type(instance) if sender not in self._notification_registry: raise AssertionError("Unexpected sender") # pragma: no cover try: instance._needs_publish.add(publish_type) except AttributeError: instance._needs_publish = {publish_type} connection.on_commit(lambda: self._publish_change(sender, instance)) def _instance_changed(self, sender, instance, created, force=False, **kwargs): if sender in self._notification_registry: publish_type = 'created' if created else 'changed' if not force and not created and isinstance(instance, DirtyFieldsMixin) and not instance.is_dirty(): return # If we can see which fields have changed, then if all of those have auto_now=True, then we don't need to # publish anything if not force and not created and isinstance(instance, DirtyFieldsMixin): for field in instance.get_dirty_fields(): if not getattr(instance._meta.get_field(field), 'auto_now', False): break else: return self._needs_publish(instance, publish_type) for accessor in self._related_notification_registry[sender]: related = accessor(instance) if related: self._instance_changed(sender=type(related), instance=related, created=False, force=True) def _instance_deleted(self, sender, instance, **kwargs): if sender in self._notification_registry: self._needs_publish(instance, 'deleted') for accessor in self._related_notification_registry[sender]: related = accessor(instance) if related: self._instance_changed(sender=type(related), instance=related, created=False, force=True)