octobot_channels package

Submodules

octobot_channels.constants module

octobot_channels.consumer module

class octobot_channels.consumer.Consumer(callback: object, size: int = 0, filter_size: bool = False)[source]

Bases: object

async consume()[source]

Should be overwritten with a self.queue.get() in a while loop :return: None

async consume_ends() → None[source]

Should be overwritten to handle consumption ends :return: None

consume_task = None

Should be used as the perform while loop condition while(self.should_stop):

create_task() → None[source]

Creates a new asyncio task that contains start() execution :return: None

async perform(kwargs) → None[source]

Should be overwritten to handle queue data :param kwargs: queue get content :return: None

async run() → None[source]
  • Initialize the consumer

  • Start the consumer main task

Returns

None

should_stop = None

TODO Filter consumer performing task by waiting for a specified queue size

async start() → None[source]

Should be implemented for consumer’s non-triggered tasks :return: None

async stop() → None[source]

Stops non-triggered tasks management :return: None

class octobot_channels.consumer.InternalConsumer[source]

Bases: octobot_channels.consumer.Consumer

async internal_callback(**kwargs)[source]
class octobot_channels.consumer.SupervisedConsumer(callback: object, size: int = 0, filter_size: bool = False)[source]

Bases: octobot_channels.consumer.Consumer

async consume_ends() → None[source]

Should be overwritten to handle consumption ends :return: None

octobot_channels.producer module

class octobot_channels.producer.Producer(channel)[source]

Bases: object

channel = None

Should only be used with .cancel()

create_task() → None[source]

Creates a new asyncio task that contains start() execution :return: None

async modify(**kwargs) → None[source]

Should be implemented when producer can be modified during perform() :return: None

async pause() → None[source]

Called when the channel runs out of consumer :return: None

async perform(**kwargs) → None[source]

Should implement producer’s non-triggered tasks Can be use to force producer to perform tasks :return: None

produce_task = None

Should be used as the perform while loop condition while(self.should_stop):

async push(**kwargs) → None[source]

Push notification that new data should be sent implementation When nothing should be done on data : self.send() :return: None

async resume() → None[source]

Called when the channel is no longer out of consumer :return: None

async run() → None[source]

Start the producer main task Should call ‘self.channel.register_producer’ :return: None

async send(data, **kwargs) → None[source]

Send to each consumer data though its queue :param data: data to be put into consumers queues :param kwargs: :return: None

should_stop = None

Should be used to know if the producer is already started

async start() → None[source]

Should be implemented for producer’s non-triggered tasks :return: None

async stop()[source]

Stops non-triggered tasks management :return: None

async wait_for_processing() → None[source]

Should be used only with SupervisedConsumers It will wait until all consumers have notified that their consume() method have ended :return: None

Module contents