مهام خلفية عن فاوست، الجزء الأول: مقدمة

مهام خلفية عن فاوست، الجزء الأول: مقدمة

كيف جئت لأعيش هكذا؟

منذ وقت ليس ببعيد، اضطررت إلى العمل على الواجهة الخلفية لمشروع محمل للغاية، حيث كان من الضروري تنظيم التنفيذ المنتظم لعدد كبير من مهام الخلفية مع الحسابات المعقدة وطلبات خدمات الطرف الثالث. المشروع غير متزامن وقبل مجيئي، كان لديه آلية بسيطة لمهام إطلاق cron: حلقة للتحقق من الوقت الحالي وإطلاق مجموعات من coroutines عبر التجميع - تبين أن هذا النهج مقبول حتى كان هناك العشرات والمئات من هذه coroutines ومع ذلك، عندما تجاوز عددهم ألفين، كان علي أن أفكر في تنظيم قائمة انتظار مهام عادية مع وسيط، والعديد من العمال، وما إلى ذلك.

قررت أولاً تجربة الكرفس الذي استخدمته من قبل. نظرًا للطبيعة غير المتزامنة للمشروع، فقد تعمقت في السؤال ورأيت статьюكذلك مشروع، أنشأها مؤلف المقال.

سأقول هذا، المشروع مثير للاهتمام للغاية ويعمل بنجاح كبير في التطبيقات الأخرى لفريقنا، ويقول المؤلف نفسه إنه كان قادرًا على طرحه في الإنتاج باستخدام تجمع غير متزامن. لكن لسوء الحظ، لم يناسبني ذلك حقًا، كما اتضح فيما بعد مشكلة مع الإطلاق الجماعي للمهام (انظر. رأس التجميع). في وقت كتابة هذا التقرير قضية تم إغلاقه بالفعل، ولكن العمل مستمر لمدة شهر. على أي حال، حظا سعيدا للمؤلف وكل التوفيق، حيث أن هناك بالفعل أشياء تعمل على lib... بشكل عام، النقطة في داخلي وتبين أن الأداة كانت رطبة بالنسبة لي. بالإضافة إلى ذلك، تحتوي بعض المهام على 2-3 طلبات http لخدمات مختلفة، لذلك حتى عند تحسين المهام، نقوم بإنشاء 4 آلاف اتصال TCP، كل ساعتين تقريبًا - ليس جيدًا جدًا... أرغب في إنشاء جلسة لنوع واحد من المهمة عند بدء العمال. المزيد عن العدد الكبير من الطلبات عبر aiohttp هنا.

في هذا الصدد، بدأت أبحث альтернативы ووجدته! المبدعين من الكرفس، على وجه التحديد، كما أفهمها اسأل سوليم، تم انشائه فاوست، في الأصل للمشروع robinhood. فاوست مستوحى من Kafka Streams ويعمل مع كافكا كوسيط، كما يستخدم rocksdb لتخزين النتائج من عمل الوكلاء، والأهم أن المكتبة غير متزامنة.

أيضا، يمكنك أن تبحث مقارنة سريعة الكرفس وفاوست من مبدعي الأخير: اختلافاتهم، الاختلافات بين الوسطاء، تنفيذ مهمة أولية. كل شيء بسيط للغاية، ومع ذلك، فإن الميزة الرائعة في فاوست تجذب الانتباه - البيانات المكتوبة لنقلها إلى الموضوع.

ماذا نفعل؟

لذا، في سلسلة قصيرة من المقالات، سأوضح لك كيفية جمع البيانات من مهام الخلفية باستخدام فاوست. سيكون مصدر مشروع المثال الخاص بنا، كما يوحي الاسم، alphavantage.co. سأوضح كيفية كتابة الوكلاء (الحوض، والموضوعات، والأقسام)، وكيفية القيام بالتنفيذ العادي (cron)، وأوامر faust cli الأكثر ملاءمة (مجمّع عند النقر)، والتجميع البسيط، وفي النهاية سنرفق datadog ( العمل خارج الصندوق) وحاول رؤية شيء ما. لتخزين البيانات التي تم جمعها سوف نستخدم mongodb وmotor للاتصال.

ملحوظة: انطلاقًا من الثقة التي تمت بها كتابة النقطة المتعلقة بالمراقبة، أعتقد أن القارئ في نهاية المقالة الأخيرة سيظل يبدو كما يلي:

مهام خلفية عن فاوست، الجزء الأول: مقدمة

متطلبات المشروع

نظرًا لحقيقة أنني وعدت بالفعل، فلنضع قائمة صغيرة بما يجب أن تكون الخدمة قادرة على القيام به:

  1. تحميل الأوراق المالية ولمحة عامة عنها (بما في ذلك الأرباح والخسائر، الميزانية العمومية، التدفق النقدي - للعام الماضي) - بانتظام
  2. قم بتحميل البيانات التاريخية (لكل سنة تداول، ابحث عن القيم المتطرفة لسعر إغلاق التداول) - بانتظام
  3. قم بتحميل أحدث بيانات التداول - بانتظام
  4. قم بتحميل قائمة مخصصة من المؤشرات لكل ورقة مالية - بانتظام

وكما هو متوقع، نختار اسمًا للمشروع من الصفر: هورتون

نحن نجهز البنية التحتية

العنوان قوي بالتأكيد، ومع ذلك، كل ما عليك فعله هو كتابة تكوين صغير لـ docker-compose باستخدام kafka (وzookeeper - في حاوية واحدة)، وkafdrop (إذا أردنا إلقاء نظرة على الرسائل في المواضيع)، وmongodb. نحن نحصل [docker-compose.yml](https://github.com/Egnod/horton/blob/562fa5ec14df952cd74760acf76e141707d2ef58/docker-compose.yml) بالصيغة التالية:

version: '3'

services:
  db:
    container_name: horton-mongodb-local
    image: mongo:4.2-bionic
    command: mongod --port 20017
    restart: always
    ports:
      - 20017:20017
    environment:
      - MONGO_INITDB_DATABASE=horton
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=admin_password

  kafka-service:
    container_name: horton-kafka-local
    image: obsidiandynamics/kafka
    restart: always
    ports:
      - "2181:2181"
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-service:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
      KAFKA_RESTART_ATTEMPTS: "10"
      KAFKA_RESTART_DELAY: "5"
      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

  kafdrop:
    container_name: horton-kafdrop-local
    image: 'obsidiandynamics/kafdrop:latest'
    restart: always
    ports:
      - '9000:9000'
    environment:
      KAFKA_BROKERCONNECT: kafka-service:29092
    depends_on:
      - kafka-service

لا يوجد شيء معقد هنا على الإطلاق. تم الإعلان عن مستمعين لكافكا: أحدهما (داخلي) للاستخدام داخل الشبكة المركبة، والثاني (خارجي) للطلبات من الخارج، فأرسلوها إلى الخارج. 2181 - ميناء حارس حديقة الحيوان. والباقي أعتقد أنه واضح.

إعداد الهيكل العظمي للمشروع

في الإصدار الأساسي، يجب أن يبدو هيكل مشروعنا كما يلي:

horton
├── docker-compose.yml
└── horton
    ├── agents.py *
    ├── alphavantage.py *
    ├── app.py *
    ├── config.py
    ├── database
    │   ├── connect.py
    │   ├── cruds
    │   │   ├── base.py
    │   │   ├── __init__.py
    │   │   └── security.py *
    │   └── __init__.py
    ├── __init__.py
    ├── records.py *
    └── tasks.py *

*كل ما لاحظته لم نتطرق إليها بعد، بل نقوم فقط بإنشاء ملفات فارغة.**

لقد أنشأنا الهيكل. الآن دعونا نضيف التبعيات اللازمة، ونكتب التكوين ونتصل بـ mongodb. لن أقدم النص الكامل للملفات الموجودة في المقالة، حتى لا أتأخر، ولكن سأقدم روابط للإصدارات اللازمة.

لنبدأ بالتبعيات والفوقية حول المشروع - pyproject.toml

بعد ذلك، نبدأ في تثبيت التبعيات وإنشاء Virtualenv (أو يمكنك إنشاء مجلد venv بنفسك وتنشيط البيئة):

pip3 install poetry (если ещё не установлено)
poetry install

الآن دعونا ننشئ config.yml - أوراق الاعتماد وأين يطرق. يمكنك وضع البيانات الخاصة بـ alphavantage هناك على الفور. حسنا، دعونا ننتقل إلى config.py - استخراج البيانات الخاصة بالتطبيق من التكوين الخاص بنا. نعم، أعترف أنني استخدمت ليب الخاص بي - سيتري.

عند الاتصال بـ Mongo، كل شيء بسيط للغاية. أعلن فئة العميل للاتصال و الطبقة الأساسية بالنسبة إلى الخامات، لتسهيل إجراء الاستعلامات حول المجموعات.

ماذا سيحدث بعد ذلك؟

المقالة ليست طويلة جدًا، لأنني هنا أتحدث فقط عن التحفيز والتحضير، فلا تلومني - أعدك أن الجزء التالي سيكون به أكشن ورسومات.

لذلك، في هذا الجزء التالي للغاية نحن:

  1. لنكتب عميلًا صغيرًا لـ alphavantage على aiohttp مع طلبات لنقاط النهاية التي نحتاجها.
  2. لنقم بإنشاء وكيل يقوم بجمع البيانات عن الأوراق المالية والأسعار التاريخية لها.

رمز المشروع

رمز لهذا الجزء

المصدر: www.habr.com

إضافة تعليق