funboost_task.py 1.2 KB

1234567891011121314151617181920212223
  1. import time
  2. from funboost import boost, BrokerEnum, run_forever, funboost_aps_scheduler
  3. from Ansjer.config import LOGGER
  4. from Model.models import Device_Info
  5. @boost("task_queue_name1", qps=0.5, broker_kind=BrokerEnum.REDIS_ACK_ABLE) # 入参包括20种,运行控制方式非常多,想得到的控制都会有。
  6. def task_fun(x, y):
  7. print(f'{x} + {y} = {x + y}')
  8. LOGGER.info('funboost代码调试,时间为:{}'.format(time.time()))
  9. time.sleep(3) # 框架会自动并发绕开这个阻塞,无论函数内部随机耗时多久都能自动调节并发达到每秒运行 5 次 这个 task_fun 函数的目的。
  10. return True
  11. if __name__ == "__main__":
  12. # for i in range(2):
  13. # task_fun.pub(dict(x=i, y=i * 2))
  14. # task_fun.push(i, y=i * 2) # 发布者发布任务
  15. funboost_aps_scheduler.add_push_job(task_fun, 'cron', day_of_week='*', hour=9, minute=30, second=00,
  16. kwargs={"x": 5, "y": 6}) # 每隔3秒发布一次任务,自然就能每隔3秒消费一次任务了。
  17. funboost_aps_scheduler.start()
  18. task_fun.consume() # 消费者启动循环调度并发消费任务
  19. run_forever()