Source code for octobot_channels.channels.channel

#  Drakkar-Software OctoBot-Channels
#  Copyright (c) Drakkar-Software, All rights reserved.
#
#  This library is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 3.0 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library.
from typing import Iterable

from octobot_commons.logging.logging_util import get_logger

from octobot_channels import CHANNEL_WILDCARD
from octobot_channels.channels.channel_instances import ChannelInstances

"""
A Channel is the object to connect a producer / producers class(es) to a consumer / consumers class(es)
It contains a registered consumers dict to notify every consumer when a producer 'send' something.
It contains a registered producers list to allow producer modification through 'modify'.
To access to channels a 'Channels' singleton is created to manage instances. 
"""


[docs]class Channel(object): # Channel producer class PRODUCER_CLASS = None # Channel consumer class CONSUMER_CLASS = None # Consumer instance in consumer filters INSTANCE_KEY = "consumer_instance" def __init__(self): self.logger = get_logger(self.__class__.__name__) # Channel subscribed producers list self.producers = [] # Channel subscribed consumers list self.consumers = [] # Used to perform global send from non-producer context self.internal_producer = None # Used to save producers state (paused or not) self.is_paused = True
[docs] @classmethod def get_name(cls) -> str: """ Default implementation is to return the name of the class without the 'Channel' substring :returns the channel name """ return cls.__name__.replace('Channel', '')
[docs] async def new_consumer(self, callback: object = None, consumer_filters: dict = None, internal_consumer: object = None, size: int = 0, filter_size: bool = False, **kwargs) -> CONSUMER_CLASS: """ Create an appropriate consumer instance for this channel and add it to the consumer list Should end by calling '_check_producers_state' :param callback: method that should be called when consuming the queue :param consumer_filters: the consumer filters :param size: queue size, default 0 :param internal_consumer: internal consumer instance to use if specified :param filter_size: if the consumer wants a filtered flow :return: consumer instance created """ consumer = internal_consumer if internal_consumer else self.CONSUMER_CLASS(callback) await self._add_new_consumer_and_run(consumer, consumer_filters) await self._check_producers_state() return consumer
async def _add_new_consumer_and_run(self, consumer: CONSUMER_CLASS, consumer_filters: dict, **kwargs) -> None: """ Should be called by 'new_consumer' to add the consumer to self.consumers and call 'consumer.run()' :param consumer: the consumer to add :param kwargs: additional params for consumer list :return: None """ if consumer_filters is None: consumer_filters = {} self.add_new_consumer(consumer, consumer_filters) await consumer.run()
[docs] def add_new_consumer(self, consumer, consumer_filters) -> None: """ Add a new consumer to consumer list with filters :param consumer: the consumer to add :param consumer_filters: the consumer selection filters (used by 'get_consumer_from_filters') :return: None """ consumer_filters[self.INSTANCE_KEY] = consumer self.consumers.append(consumer_filters)
[docs] def get_consumer_from_filters(self, consumer_filters) -> list: return self._filter_consumers(consumer_filters)
[docs] def get_consumers(self) -> list: """ Returns all consumers instance Can be overwritten according to the class needs :return: the subscribed consumers list """ return [consumer[self.INSTANCE_KEY] for consumer in self.consumers]
def _filter_consumers(self, consumer_filters) -> list: """ Returns the consumers that match the selection Returns all consumer instances if consumer_filter is empty :param consumer_filters: listed consumer filters :return: the list of the filtered consumers """ return [consumer[self.INSTANCE_KEY] for consumer in self.consumers if self._check_filters(consumer, consumer_filters)] def _check_filters(self, consumer_filters, expected_filters) -> bool: """ Checks if the consumer match the specified filters Returns True if expected_filters is empty :param consumer_filters: consumer filters :param expected_filters: selected filters :return: True if the consumer match the selection, else False """ return all([k in consumer_filters and (v == CHANNEL_WILDCARD or consumer_filters[k] in [v, CHANNEL_WILDCARD]) for k, v in expected_filters.items()])
[docs] async def remove_consumer(self, consumer: CONSUMER_CLASS, **kwargs) -> None: """ Should be overwritten according to the class needs Should end by calling '_check_producers_state' and then 'consumer.stop' :param consumer: consumer instance to remove from consumers list :param kwargs: consumers list filter params :return: None """ for c in self.consumers: if consumer == c[self.INSTANCE_KEY]: self.consumers.remove(c) await self._check_producers_state() await consumer.stop()
async def _check_producers_state(self) -> None: """ Checks if producers should be paused or resumed after a consumer addition or removal :return: None """ if not self.get_consumers() and not self.is_paused: self.is_paused = True for producer in self.get_producers(): await producer.pause() elif self.get_consumers() and self.is_paused: self.is_paused = False for producer in self.get_producers(): await producer.resume()
[docs] async def register_producer(self, producer, **kwargs) -> None: """ Add the producer to producers list Can be overwritten to perform additional action when registering Should end by calling 'pause' if self.is_paused :param Producer producer: created channel producer to register :param kwargs: additional arguments available for overwritten methods :return: None """ if producer not in self.producers: self.producers.append(producer) if self.is_paused: await producer.pause()
[docs] def unregister_producer(self, producer, **kwargs) -> None: """ Remove the producer from producers list Can be overwritten to perform additional action when registering :param Producer producer: created channel producer to unregister :param kwargs: additional arguments available for overwritten methods :return: None """ if producer in self.producers: self.producers.remove(producer)
[docs] def get_producers(self, **kwargs) -> Iterable: """ Should be overwritten according to the class needs :param kwargs: producers list filter params :return: channel producers iterable """ return self.producers
[docs] async def start(self) -> None: """ Call each registered consumers start method :return: None """ for consumer in self.get_consumers(): await consumer.start()
[docs] async def stop(self) -> None: """ Call each registered consumers and producers stop method :return: None """ for consumer in self.get_consumers(): await consumer.stop() for producer in self.get_producers(): await producer.stop()
[docs] def flush(self) -> None: if self.internal_producer is not None: self.internal_producer.channel = None for producer in self.get_producers(): producer.channel = None
[docs] async def run(self) -> None: """ Call each registered consumers run method :return: None """ for consumer in self.get_consumers(): await consumer.run()
[docs] async def modify(self, **kwargs) -> None: """ Call each registered producers modify method :return: None """ for producer in self.get_producers(): await producer.modify(**kwargs)
[docs] def get_internal_producer(self, **kwargs) -> PRODUCER_CLASS: """ Returns internal producer if exists else creates it :param kwargs: arguments for internal producer __init__ :return: internal producer instance """ if not self.internal_producer: try: self.internal_producer = self.PRODUCER_CLASS(self, **kwargs) except TypeError: self.logger.exception("PRODUCER_CLASS not defined") raise TypeError return self.internal_producer
[docs]def set_chan(chan, name) -> Channel: """ Set a new Channel instance in the channels list according to channel name :param chan: new Channel instance :param name: name of the channel :return: the channel instance if succeed else raise a ValueError """ chan_name = name if name else chan.get_name() if chan_name not in ChannelInstances.instance().channels: ChannelInstances.instance().channels[chan_name] = chan return chan raise ValueError(f"Channel {chan_name} already exists.")
[docs]def del_chan(name) -> None: """ Delete a Channel instance from the channels list according to channel name :param name: name of the channel to delete :return: None """ if name in ChannelInstances.instance().channels: ChannelInstances.instance().channels.pop(name, None)
[docs]def get_chan(chan_name: str, **kwargs) -> Channel: """ Return the channel instance from channel name :param chan_name: the channel name :param kwargs: :return: the Channel instance """ return ChannelInstances.instance().channels[chan_name]