octobot_channels package¶
Subpackages¶
Submodules¶
octobot_channels.constants module¶
octobot_channels.consumer module¶
-
class
octobot_channels.consumer.
Consumer
(callback: object, size: int = 0, filter_size: bool = False)[source]¶ Bases:
object
-
async
consume
()[source]¶ Should be overwritten with a self.queue.get() in a while loop :return: None
-
consume_task
= None¶ Should be used as the perform while loop condition while(self.should_stop):
…
-
create_task
() → None[source]¶ Creates a new asyncio task that contains start() execution :return: None
-
async
perform
(kwargs) → None[source]¶ Should be overwritten to handle queue data :param kwargs: queue get content :return: None
-
should_stop
= None¶ TODO Filter consumer performing task by waiting for a specified queue size
-
async
octobot_channels.producer module¶
-
class
octobot_channels.producer.
Producer
(channel)[source]¶ Bases:
object
-
channel
= None¶ Should only be used with .cancel()
-
create_task
() → None[source]¶ Creates a new asyncio task that contains start() execution :return: None
-
async
modify
(**kwargs) → None[source]¶ Should be implemented when producer can be modified during perform() :return: None
-
async
perform
(**kwargs) → None[source]¶ Should implement producer’s non-triggered tasks Can be use to force producer to perform tasks :return: None
-
produce_task
= None¶ Should be used as the perform while loop condition while(self.should_stop):
…
-
async
push
(**kwargs) → None[source]¶ Push notification that new data should be sent implementation When nothing should be done on data : self.send() :return: None
-
async
run
() → None[source]¶ Start the producer main task Should call ‘self.channel.register_producer’ :return: None
-
async
send
(data, **kwargs) → None[source]¶ Send to each consumer data though its queue :param data: data to be put into consumers queues :param kwargs: :return: None
-
should_stop
= None¶ Should be used to know if the producer is already started
-