# Drakkar-Software OctoBot-Channels
# Copyright (c) Drakkar-Software, All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library.
import asyncio
from asyncio import Queue, CancelledError
from octobot_commons.logging.logging_util import get_logger
"""
A channel Consumer
"""
[docs]class Consumer:
def __init__(self, callback: object, size: int = 0, filter_size: bool = False):
self.logger = get_logger(self.__class__.__name__)
# Consumer data queue. It contains producer's work (received through Producer.send()).
self.queue = Queue(maxsize=size)
# Method to be called when performing task is done
self.callback = callback
# Should only be used with .cancel()
self.consume_task = None
"""
Should be used as the perform while loop condition
while(self.should_stop):
...
"""
self.should_stop = False
"""
TODO
Filter consumer performing task by waiting for a specified queue size
"""
self.filter_size = filter_size
[docs] async def consume(self):
"""
Should be overwritten with a self.queue.get() in a while loop
:return: None
"""
while not self.should_stop:
try:
await self.perform(await self.queue.get())
except CancelledError:
self.logger.warning("Cancelled task")
except Exception as e:
self.logger.exception(e, True, f"Exception when calling callback on {self}: {e}")
finally:
await self.consume_ends()
[docs] async def consume_ends(self) -> None:
"""
Should be overwritten to handle consumption ends
:return: None
"""
pass
[docs] async def start(self) -> None:
"""
Should be implemented for consumer's non-triggered tasks
:return: None
"""
self.should_stop = False
[docs] async def stop(self) -> None:
"""
Stops non-triggered tasks management
:return: None
"""
self.should_stop = True
if self.consume_task:
self.consume_task.cancel()
[docs] def create_task(self) -> None:
"""
Creates a new asyncio task that contains start() execution
:return: None
"""
self.consume_task = asyncio.create_task(self.consume())
[docs] async def run(self) -> None:
"""
- Initialize the consumer
- Start the consumer main task
:return: None
"""
await self.start()
self.create_task()
def __str__(self):
return f"{self.__class__.__name__} with callback: {self.callback.__name__}"
[docs]class InternalConsumer(Consumer):
def __init__(self):
super().__init__(None)
self.callback = self.internal_callback
[docs] async def internal_callback(self, **kwargs):
raise NotImplementedError("internal_callback is not implemented")
[docs]class SupervisedConsumer(Consumer):
[docs] async def consume_ends(self) -> None:
try:
self.queue.task_done()
except ValueError: # when task_done() is called when the Exception was CancelledError
pass