Jadi, jadi, bahagian kedua. Seperti yang ditulis sebelum ini, di dalamnya kita akan melakukan perkara berikut:
Mari kita tulis klien kecil untuk alphavantage pada aiohttp dengan permintaan untuk titik akhir yang kita perlukan.
Mari cipta ejen yang akan mengumpul data mengenai sekuriti dan maklumat meta mengenainya.
Tetapi, inilah yang akan kami lakukan untuk projek itu sendiri, dan dari segi penyelidikan yang pantas, kami akan belajar cara menulis ejen yang memproses acara aliran dari kafka, serta cara menulis arahan (pembungkus klik), dalam kes kami - untuk mesej tolak manual ke topik yang sedang dipantau oleh ejen.
Latihan
Pelanggan AlphaVantage
Mula-mula, mari tulis klien aiohttp kecil untuk permintaan alphavantage.
API AlphaVantage direka dengan agak ringkas dan cantik, jadi saya memutuskan untuk membuat semua permintaan melalui kaedah tersebut construct_query di mana pula terdapat panggilan http.
Saya bawa semua bidang ke snake_case untuk kemudahan.
Nah, hiasan logger.catch untuk keluaran jejak balik yang cantik dan bermaklumat.
PS Jangan lupa untuk menambah token alphavantage secara setempat ke config.yml, atau mengeksport pembolehubah persekitaran HORTON_SERVICE_APIKEY. Kami menerima token di sini.
kelas CRUD
Kami akan mempunyai koleksi sekuriti untuk menyimpan maklumat meta tentang sekuriti.
Buat masa ini kami akan mempunyai penciptaan aplikasi yang paling mudah, sedikit kemudian kami akan mengembangkannya, bagaimanapun, untuk tidak membuat anda menunggu, di sini rujukan kepada App-class. Saya juga menasihati anda untuk melihat kelas tetapan, kerana ia bertanggungjawab untuk kebanyakan tetapan.
Bahagian utama
Ejen untuk mengumpul dan mengekalkan senarai sekuriti
Jadi, mula-mula kita dapatkan objek aplikasi faust - ia agak mudah. Seterusnya, kami secara eksplisit mengisytiharkan topik untuk ejen kami... Di sini adalah wajar untuk menyebut apa itu, apakah parameter dalaman dan cara ini boleh diatur secara berbeza.
Topik dalam kafka, jika kita ingin mengetahui definisi yang tepat, lebih baik membaca dimatikan. dokumen, atau anda boleh membaca ringkasan pada Habré dalam bahasa Rusia, di mana segala-galanya juga dicerminkan dengan tepat :)
Parameter dalaman, diterangkan dengan baik dalam dokumen faust, membolehkan kami mengkonfigurasi topik secara langsung dalam kod, sudah tentu, ini bermakna parameter yang disediakan oleh pembangun faust, contohnya: pengekalan, dasar pengekalan (secara lalai padam, tetapi anda boleh menetapkan padat), bilangan partition setiap topik (skoruntuk melakukan, sebagai contoh, kurang daripada kepentingan global aplikasi faust).
Secara umum, ejen boleh mencipta topik terurus dengan nilai global, namun, saya suka mengisytiharkan semuanya secara eksplisit. Selain itu, beberapa parameter (contohnya, bilangan partition atau dasar pengekalan) topik dalam iklan ejen tidak boleh dikonfigurasikan.
Inilah rupanya tanpa menentukan topik secara manual:
Nah, sekarang mari kita terangkan apa yang akan dilakukan oleh ejen kami :)
app = get_app()
collect_securities_topic = app.topic("collect_securities", internal=True)
@app.agent(collect_securities_topic)
async def collect_securities(stream: StreamT[None]) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for _ in stream:
logger.info("Start collect securities")
client = AlphaVantageClient(session, API_KEY)
securities = await client.get_securities()
for security in securities:
await SecurityCRUD.update_one(
{"symbol": security["symbol"], "exchange": security["exchange"]}, security, upsert=True
)
yield True
Jadi, pada permulaan ejen, kami membuka sesi aiohttp untuk permintaan melalui pelanggan kami. Oleh itu, apabila memulakan pekerja, apabila ejen kami dilancarkan, sesi akan segera dibuka - satu, untuk sepanjang masa pekerja sedang berjalan (atau beberapa, jika anda menukar parameter serentak daripada ejen dengan unit lalai).
Seterusnya, kami mengikuti aliran (kami meletakkan mesej dalam _, memandangkan kami, dalam ejen ini, tidak mengambil berat tentang kandungan) mesej daripada topik kami, jika ia wujud pada offset semasa, jika tidak, kitaran kami akan menunggu ketibaan mereka. Nah, di dalam gelung kami, kami log penerimaan mesej, dapatkan senarai sekuriti aktif (get_securities kembali aktif secara lalai, lihat kod pelanggan) dan simpan ke pangkalan data, menyemak sama ada terdapat keselamatan dengan ticker yang sama dan pertukaran dalam pangkalan data , jika ada, maka ia (kertas itu) hanya akan dikemas kini.
Mari lancarkan ciptaan kami!
> docker-compose up -d
... Запуск контейнеров ...
> faust -A horton.agents worker --without-web -l info
Ciri-ciri PS komponen web Saya tidak akan menganggap faust dalam artikel, jadi kami menetapkan bendera yang sesuai.
Dalam arahan pelancaran kami, kami memberitahu faust tempat untuk mencari objek aplikasi dan perkara yang perlu dilakukan dengannya (melancarkan pekerja) dengan tahap output log maklumat. Kami mendapat output berikut:
Mari lihat set partition. Seperti yang kita dapat lihat, topik telah dibuat dengan nama yang kami tetapkan dalam kod, bilangan lalai partition (8, diambil daripada topic_partitions - parameter objek aplikasi), kerana kami tidak menentukan nilai individu untuk topik kami (melalui partition). Ejen yang dilancarkan dalam pekerja diberikan semua 8 partition, kerana ia adalah satu-satunya, tetapi ini akan dibincangkan dengan lebih terperinci dalam bahagian tentang pengelompokan.
Nah, sekarang kita boleh pergi ke tetingkap terminal lain dan menghantar mesej kosong ke topik kita:
PS menggunakan @ kami menunjukkan bahawa kami menghantar mesej kepada topik bernama "collect_securities".
Dalam kes ini, mesej pergi ke partition 6 - anda boleh menyemak ini dengan pergi ke kafdrop on localhost:9000
Pergi ke tetingkap terminal bersama pekerja kami, kami akan melihat mesej gembira yang dihantar menggunakan loguru:
2020-09-23 00:26:37.304 | INFO | horton.agents:collect_securities:40 - Start collect securities
Kami juga boleh melihat mongo (menggunakan Robo3T atau Studio3T) dan melihat bahawa sekuriti berada dalam pangkalan data:
Saya bukan jutawan, dan oleh itu kami berpuas hati dengan pilihan tontonan pertama.
Kegembiraan dan kegembiraan - ejen pertama sudah bersedia :)
Ejen sedia, hidup ejen baru!
Ya, tuan-tuan, kami hanya menutup 1/3 daripada laluan yang disediakan oleh artikel ini, tetapi jangan berkecil hati, kerana sekarang ia akan menjadi lebih mudah.
Jadi sekarang kami memerlukan ejen yang mengumpul maklumat meta dan memasukkannya ke dalam dokumen pengumpulan:
collect_security_overview_topic = app.topic("collect_security_overview", internal=True)
@app.agent(collect_security_overview_topic)
async def collect_security_overview(
stream: StreamT[?],
) -> AsyncIterable[bool]:
async with aiohttp.ClientSession() as session:
async for event in stream:
...
Memandangkan ejen ini akan memproses maklumat tentang keselamatan tertentu, kami perlu menunjukkan ticker (simbol) keselamatan ini dalam mesej. Untuk tujuan ini dalam faus ada Rekod — kelas yang mengisytiharkan skema mesej dalam topik ejen.
Dalam kes ini, mari pergi ke records.pydan terangkan rupa mesej untuk topik ini:
import faust
class CollectSecurityOverview(faust.Record):
symbol: str
exchange: str
Seperti yang anda duga, faust menggunakan anotasi jenis python untuk menerangkan skema mesej, itulah sebabnya versi minimum yang disokong oleh perpustakaan ialah 3.6.
Mari kembali kepada ejen, tetapkan jenis dan tambahnya:
Seperti yang anda lihat, kami menghantar parameter baharu dengan skema kepada kaedah permulaan topik - value_type. Selanjutnya, semuanya mengikut skema yang sama, jadi saya tidak nampak apa-apa gunanya memikirkan perkara lain.
Nah, sentuhan terakhir ialah menambah panggilan kepada ejen pengumpulan maklumat meta untuk collect_securitites:
Kami menggunakan skema yang diumumkan sebelum ini untuk mesej. Dalam kes ini, saya menggunakan kaedah .cast kerana kita tidak perlu menunggu keputusan daripada ejen, tetapi perlu dinyatakan bahawa cara-cara hantar mesej kepada topik:
cast - tidak menyekat kerana tidak mengharapkan hasil. Anda tidak boleh menghantar hasil ke topik lain sebagai mesej.
hantar - tidak menyekat kerana tidak mengharapkan hasil. Anda boleh menentukan ejen dalam topik yang hasilnya akan dihantar.
bertanya - menunggu keputusan. Anda boleh menentukan ejen dalam topik yang hasilnya akan dihantar.
Jadi, itu sahaja dengan ejen untuk hari ini!
Pasukan impian
Perkara terakhir yang saya janjikan untuk menulis dalam bahagian ini ialah arahan. Seperti yang dinyatakan sebelum ini, arahan dalam faust adalah pembungkus di sekitar klik. Sebenarnya, faust hanya melampirkan arahan tersuai kami pada antara mukanya apabila menentukan kekunci -A
Selepas ejen diumumkan masuk agents.py menambah fungsi dengan penghias app.commandmemanggil kaedah membuang у collect_securitites:
@app.command()
async def start_collect_securities():
"""Collect securities and overview."""
await collect_securities.cast()
Oleh itu, jika kami memanggil senarai arahan, arahan baharu kami akan berada di dalamnya:
> faust -A horton.agents --help
....
Commands:
agents List agents.
clean-versions Delete old version directories.
completion Output shell completion to be evaluated by the...
livecheck Manage LiveCheck instances.
model Show model detail.
models List all available models as a tabulated list.
reset Delete local table state.
send Send message to agent/topic.
start-collect-securities Collect securities and overview.
tables List available tables.
worker Start worker instance for given app.
Kita boleh menggunakannya seperti orang lain, jadi mari kita mulakan semula pekerja faust dan mulakan koleksi sekuriti yang lengkap:
> faust -A horton.agents start-collect-securities
Apa yang akan berlaku seterusnya?
Dalam bahagian seterusnya, menggunakan baki ejen sebagai contoh, kami akan mempertimbangkan mekanisme tenggelam untuk mencari keterlaluan dalam harga penutupan dagangan untuk tahun ini dan pelancaran cron ejen.
Itu sahaja untuk hari ini! Terima kasih untuk membaca :)