Ehi Habr!
Ti piace volare? Io lo adoro, ma durante l'autoisolamento mi sono anche appassionato all'analisi dei dati dei biglietti aerei tramite una fonte molto nota: Aviasales.
Oggi analizzeremo il funzionamento di Amazon Kinesis, creeremo un sistema di streaming con analisi in tempo reale, installeremo il database NoSQL Amazon DynamoDB come principale archivio dati e configureremo le notifiche SMS per i ticket interessanti.
Tutti i dettagli sotto il taglio! Andiamo!

Introduzione
Ad esempio, avremo bisogno di accedere a L'accesso è gratuito e senza restrizioni, è sufficiente registrarsi nella sezione "Sviluppatori" per ottenere il token API per accedere ai dati.
Lo scopo principale di questo articolo è fornire una comprensione generale dell'uso delle informazioni in streaming in AWS; teniamo presente che i dati restituiti dall'API utilizzata non sono strettamente aggiornati e vengono trasmessi da una cache formata in base alle ricerche degli utenti su Aviasales.ru e Jetradar.com nelle ultime 48 ore.
L'agente Kinesis installato sulla macchina del produttore analizzerà automaticamente i dati dei biglietti aerei ricevuti tramite l'API e li trasferirà al flusso richiesto tramite Kinesis Data Analytics. La versione non elaborata di questo flusso verrà scritta direttamente nello storage. L'archiviazione dei dati grezzi implementata in DynamoDB consentirà un'analisi più approfondita dei biglietti tramite strumenti di BI, come AWS Quick Sight.
Prenderemo in considerazione due opzioni per l'implementazione dell'intera infrastruttura:
- Manuale - tramite AWS Management Console;
- Infrastruttura di codice Terraform per ingegneri dell'automazione pigri;
Architettura del sistema sviluppato

Componenti utilizzati:
- — i dati restituiti da questa API verranno utilizzati per tutti i lavori successivi;
- — una normale macchina virtuale nel cloud, sulla quale verrà generato il flusso di dati di input:
- — è un'applicazione Java installata localmente sulla macchina che fornisce un modo semplice per raccogliere e inviare dati a Kinesis (Kinesis Data Streams o Kinesis Firehose). L'agente monitora costantemente un set di file in directory specifiche e invia nuovi dati a Kinesis;
- — Uno script Python che invia richieste all'API e inserisce la risposta in una cartella monitorata da Kinesis Agent;
- — un servizio di streaming di dati in tempo reale con ampia scalabilità;
- — un servizio serverless che semplifica l'analisi dei dati in streaming in tempo reale. Amazon Kinesis Data Analytics configura le risorse per l'esecuzione delle applicazioni e si ridimensiona automaticamente per gestire qualsiasi volume di dati in entrata;
- — un servizio che consente di eseguire codice senza dover prenotare e configurare server. Tutta la potenza di calcolo viene automaticamente scalata a ogni chiamata;
- — un database di chiavi-valori e documenti che offre una latenza inferiore a 10 millisecondi a qualsiasi scala. Con DynamoDB, non ci sono server da predisporre, applicare patch o gestire. DynamoDB scala automaticamente le tabelle in base alle risorse disponibili, mantenendo al contempo prestazioni elevate. Non è richiesta alcuna amministrazione di sistema.
- — un servizio di messaggistica Pub/Sub completamente gestito che può essere utilizzato per isolare microservizi, sistemi distribuiti e applicazioni serverless. SNS può essere utilizzato per distribuire informazioni agli utenti finali tramite notifiche push mobili, SMS ed e-mail.
Formazione iniziale
Per emulare il flusso di dati, ho deciso di utilizzare le informazioni sui biglietti aerei restituite dall'API Aviasales. Tra i metodi disponibili, ne prendiamo in considerazione uno piuttosto lungo: "Calendario prezzi per un mese", che restituisce i prezzi per ogni giorno del mese, raggruppati in base al numero di trasferimenti. Se non si specifica il mese di ricerca nella richiesta, verranno restituite le informazioni per il mese successivo a quello corrente.
Quindi, registriamoci e riceviamo il nostro token.
Di seguito un esempio di richiesta:
http://api.travelpayouts.com/v2/prices/month-matrix?currency=rub&origin=LED&destination=HKT&show_to_affiliates=true&token=TOKEN_APIIl metodo sopra descritto per ottenere dati dall'API specificando il token nella richiesta funzionerà, ma preferisco passare il token di accesso tramite l'intestazione, quindi nello script api_caller.py useremo questo metodo.
Esempio di risposta:
{{
"success":true,
"data":[{
"show_to_affiliates":true,
"trip_class":0,
"origin":"LED",
"destination":"HKT",
"depart_date":"2015-10-01",
"return_date":"",
"number_of_changes":1,
"value":29127,
"found_at":"2015-09-24T00:06:12+04:00",
"distance":8015,
"actual":true
}]
}
L'esempio di risposta API qui sopra mostra un biglietto da San Pietroburgo a Phuk... Oh, che senso ha sognare...
Dato che sono di Kazan e Phuket ormai è "solo un sogno per noi", cerchiamo i biglietti da San Pietroburgo a Kazan.
Si presume che tu abbia già un account AWS. Vorrei attirare la tua attenzione sul fatto che Kinesis e l'invio di notifiche tramite SMS non sono inclusi nel piano annuale. Ma nonostante questo, avendo a disposizione un paio di dollari, è possibile costruire il sistema proposto e sperimentarlo. E, naturalmente, non dimenticare di eliminare tutte le risorse quando non sono più necessarie.
Fortunatamente, DynamoDB e le funzioni lambda saranno gratuite a determinate condizioni se rispettiamo i limiti mensili gratuiti. Ad esempio, per DynamoDB: 25 GB di spazio di archiviazione, 25 WCU/RCU e 100 milioni di query. E un milione di chiamate alle funzioni lambda al mese.
Distribuzione manuale del sistema
Impostazione dei flussi di dati Kinesis
Andiamo al servizio Kinesis Data Streams e creiamo due nuovi flussi, uno shard per ciascuno.
Cos'è uno shard?
Uno shard è l'unità di base per il trasferimento dati in un flusso di dati Amazon Kinesis. Uno shard fornisce 1 MB/sec di dati in input e 2 MB/sec di dati in output. Uno shard supporta fino a 1000 record PUT al secondo. Quando si crea un flusso di dati, è necessario specificare il numero di shard desiderati. Ad esempio, è possibile creare un flusso di dati con due shard. Questo flusso di dati fornirà 2 MB/sec di dati in input e 4 MB/sec di dati in output, supportando fino a 2000 record PUT al secondo.
Più shard ci sono nel tuo stream, maggiore è la sua capacità di elaborazione. In linea di principio, è così che gli stream scalano: aggiungendo shard. Ma più shard hai, più alto è il prezzo. Ogni shard costa 1,5 centesimi all'ora e 1.4 centesimi aggiuntivi per ogni milione di operazioni di aggiunta allo stream (unità di payload PUT).
Creiamo un nuovo thread con il nome biglietti_aerei, gli basterà 1 frammento:

Ora creiamo un altro thread denominato flusso speciale:

Impostazione del produttore
Come produttore di dati per l'analisi delle attività, è sufficiente utilizzare una normale istanza EC2. Non è necessario che si tratti di una macchina virtuale potente e costosa: una istanza t2.micro spot andrà benissimo.
Важное замечание: для примера следует использовать image — Amazon Linux AMI 2018.03.0, с ним меньше настроек для быстрого запуска Kinesis Agent.
Andiamo al servizio EC2, creiamo una nuova macchina virtuale, selezioniamo l'AMI richiesta con il tipo t2.micro, che è incluso nel livello gratuito:

Affinché la macchina virtuale appena creata possa interagire con il servizio Kinesis, è necessario assegnarle l'autorizzazione necessaria. Il modo migliore per farlo è assegnare un ruolo IAM. Pertanto, nella schermata "Passaggio 3: Configura dettagli istanza", è necessario selezionare Crea un nuovo ruolo IAM:
Crea ruolo IAM per EC2

Nella finestra che si apre, seleziona che stiamo creando un nuovo ruolo per EC2 e vai alla sezione Autorizzazioni:

Nell'esempio, non è necessario entrare nei dettagli delle impostazioni granulari dei diritti sulle risorse, quindi sceglieremo le policy preconfigurate di Amazon: AmazonKinesisFullAccess e CloudWatchFullAccess.
Diamo a questo ruolo un nome significativo, ad esempio: EC2-KinesisStreams-FullAccess. Il risultato dovrebbe essere quello mostrato nell'immagine seguente:

Dopo aver creato questo nuovo ruolo, non dimenticare di associarlo all'istanza della macchina virtuale che stai creando:

Non modifichiamo nient'altro in questa schermata e passiamo alle finestre successive.
I parametri del disco rigido possono essere lasciati ai valori predefiniti, così come i tag (anche se è buona norma utilizzare i tag, almeno per dare un nome all'istanza e specificare l'ambiente).
Ora siamo alla scheda "Passaggio 6: Configura gruppo di sicurezza", dove è necessario crearne uno nuovo o specificarne uno esistente che consenta di connettersi all'istanza tramite SSH (porta 22). Selezionare "Sorgente" -> "Il mio IP" e avviare l'istanza.

Non appena passa allo stato di esecuzione, puoi provare a connetterti tramite ssh.
Per poter lavorare con Kinesis Agent, dopo aver effettuato correttamente la connessione alla macchina, è necessario immettere i seguenti comandi nel terminale:
sudo yum -y update
sudo yum install -y python36 python36-pip
sudo /usr/bin/pip-3.6 install --upgrade pip
sudo yum install -y aws-kinesis-agent
Creiamo una cartella in cui salvare le risposte API:
sudo mkdir /var/log/airline_ticketsPrima di avviare l'agente, è necessario configurarne la configurazione:
sudo vim /etc/aws-kinesis/agent.jsonIl contenuto del file agent.json dovrebbe apparire così:
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/var/log/airline_tickets/*log",
"kinesisStream": "airline_tickets",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["cost","trip_class","show_to_affiliates",
"return_date","origin","number_of_changes","gate","found_at",
"duration","distance","destination","depart_date","actual","record_id"]
}
]
}
]
}
Come si può vedere dal file di configurazione, l'agente monitorerà i file con estensione .log nella directory /var/log/airline_tickets/, li analizzerà e li passerà al flusso airline_tickets.
Riavviamo il servizio e ci assicuriamo che sia avviato e funzioni:
sudo service aws-kinesis-agent restartOra scarichiamo uno script Python che richiederà i dati dall'API:
REPO_PATH=https://raw.githubusercontent.com/igorgorbenko/aviasales_kinesis/master/producer
wget $REPO_PATH/api_caller.py -P /home/ec2-user/
wget $REPO_PATH/requirements.txt -P /home/ec2-user/
sudo chmod a+x /home/ec2-user/api_caller.py
sudo /usr/local/bin/pip3 install -r /home/ec2-user/requirements.txt
Lo script api_caller.py richiede dati ad Aviasales e salva la risposta ricevuta nella directory scansionata dall'agente Kinesis. L'implementazione di questo script è piuttosto standard: esiste una classe TicketsApi, che consente di estrarre l'API in modo asincrono. Passiamo l'intestazione con il token e i parametri di richiesta a questa classe:
class TicketsApi:
"""Api caller class."""
def __init__(self, headers):
"""Init method."""
self.base_url = BASE_URL
self.headers = headers
async def get_data(self, data):
"""Get the data from API query."""
response_json = {}
async with ClientSession(headers=self.headers) as session:
try:
response = await session.get(self.base_url, data=data)
response.raise_for_status()
LOGGER.info('Response status %s: %s',
self.base_url, response.status)
response_json = await response.json()
except HTTPError as http_err:
LOGGER.error('Oops! HTTP error occurred: %s', str(http_err))
except Exception as err:
LOGGER.error('Oops! An error ocurred: %s', str(err))
return response_json
def prepare_request(api_token):
"""Return the headers and query fot the API request."""
headers = {'X-Access-Token': api_token,
'Accept-Encoding': 'gzip'}
data = FormData()
data.add_field('currency', CURRENCY)
data.add_field('origin', ORIGIN)
data.add_field('destination', DESTINATION)
data.add_field('show_to_affiliates', SHOW_TO_AFFILIATES)
data.add_field('trip_duration', TRIP_DURATION)
return headers, data
async def main():
"""Get run the code."""
if len(sys.argv) != 2:
print('Usage: api_caller.py <your_api_token>')
sys.exit(1)
return
api_token = sys.argv[1]
headers, data = prepare_request(api_token)
api = TicketsApi(headers)
response = await api.get_data(data)
if response.get('success', None):
LOGGER.info('API has returned %s items', len(response['data']))
try:
count_rows = log_maker(response)
LOGGER.info('%s rows have been saved into %s',
count_rows,
TARGET_FILE)
except Exception as e:
LOGGER.error('Oops! Request result was not saved to file. %s',
str(e))
else:
LOGGER.error('Oops! API request was unsuccessful %s!', response)
Per testare la correttezza delle impostazioni e la funzionalità dell'agente, eseguiamo un'esecuzione di prova dello script api_caller.py:
sudo ./api_caller.py TOKEN 
E osserviamo il risultato del lavoro nei registri dell'agente e nella scheda Monitoraggio nel flusso di dati airline_tickets:
tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log 

Come puoi vedere, tutto funziona e Kinesis Agent invia correttamente i dati allo stream. Ora configuriamo il consumer.
Impostazione di Kinesis Data Analytics
Passiamo ora al componente centrale dell'intero sistema: creiamo una nuova applicazione in Kinesis Data Analytics denominata kinesis_analytics_airlines_app:

Kinesis Data Analytics consente l'analisi in tempo reale dei dati di Kinesis Streams utilizzando SQL. Si tratta di un servizio completamente scalabile automaticamente (a differenza di Kinesis Streams) che:
- consente di creare nuovi flussi (Output Stream) in base alle richieste ai dati di origine;
- fornisce un flusso di errori verificatisi durante il funzionamento delle applicazioni (Error Stream);
- può rilevare automaticamente lo schema dei dati di input (può essere ridefinito manualmente se necessario).
Non si tratta di un servizio economico: costa 0.11 USD all'ora di lavoro, quindi dovresti usarlo con cautela ed eliminarlo una volta terminato.
Colleghiamo l'applicazione alla sorgente dati:

Selezioniamo il flusso a cui vogliamo connetterci (airline_tickets):

Successivamente, è necessario associare un nuovo ruolo IAM in modo che l'applicazione possa leggere e scrivere dal flusso. Per farlo, è sufficiente lasciare invariato il blocco "Autorizzazioni di accesso":

Ora richiederemo la scoperta dello schema dati nel flusso, per farlo premiamo il pulsante "Scopri schema". Di conseguenza, il ruolo IAM verrà aggiornato (ne verrà creato uno nuovo) e verrà avviata la scoperta dello schema dai dati già arrivati nel flusso:

Ora devi andare all'editor SQL. Cliccando su questo pulsante, apparirà una finestra che ti chiederà se vuoi eseguire l'applicazione: scegli cosa vuoi eseguire:

Nella finestra dell'editor SQL, inserisci la seguente query semplice e fai clic su Salva ed esegui SQL:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ("cost" DOUBLE, "gate" VARCHAR(16));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "cost", "gate"
FROM "SOURCE_SQL_STREAM_001"
WHERE "cost" < 5000
and "gate" = 'Aeroflot';
Nei database relazionali, si lavora con le tabelle utilizzando istruzioni INSERT per aggiungere record e istruzioni SELECT per interrogare i dati. In Amazon Kinesis Data Analytics, si lavora con STREAMS e PUMP, ovvero richieste di inserimento continue che spingono i dati da un flusso dell'applicazione a un altro.
La query SQL sopra cerca i biglietti Aeroflot con un prezzo inferiore a cinquemila rubli. Tutti i record che soddisfano queste condizioni verranno inseriti nel flusso DESTINATION_SQL_STREAM.

Nel blocco Destinazione, seleziona il flusso special_stream e nell'elenco a discesa Nome flusso in-applicazione, seleziona DESTINATION_SQL_STREAM:

Come risultato di tutte le manipolazioni dovresti ottenere qualcosa di simile all'immagine qui sotto:

Crea e iscriviti a un argomento SNS
Andiamo al Simple Notification Service e creiamo un nuovo argomento con il nome Airlines:

Ci iscriviamo a questo argomento, in esso indichiamo il numero di cellulare a cui verranno inviate le notifiche SMS:

Creazione di una tabella in DynamoDB
Per memorizzare i dati grezzi dal flusso airline_tickets, creiamo una tabella in DynamoDB con lo stesso nome. Useremo record_id come chiave primaria:

Creazione di un collettore di funzioni lambda
Creiamo una funzione lambda chiamata Collector, il cui compito è interrogare il flusso airline_tickets e, se vengono trovati nuovi record, inserirli nella tabella DynamoDB. Ovviamente, oltre ai permessi predefiniti, questa funzione lambda deve avere accesso in lettura al flusso di dati Kinesis e accesso in scrittura a DynamoDB.
Crea ruolo IAM per la funzione lambda del collettore
Per prima cosa, creiamo un nuovo ruolo IAM per la lambda denominato Lambda-TicketsProcessingRole:

Per l'esempio di test, le policy preconfigurate AmazonKinesisReadOnlyAccess e AmazonDynamoDBFullAccess sono molto adatte, come mostrato nell'immagine seguente:


Questa lambda dovrebbe essere attivata da un trigger Kinesis quando vengono aggiunti nuovi record al flusso airline_stream, quindi dobbiamo aggiungere un nuovo trigger:


Non resta che inserire il codice e salvare la lambda.
"""Parsing the stream and inserting into the DynamoDB table."""
import base64
import json
import boto3
from decimal import Decimal
DYNAMO_DB = boto3.resource('dynamodb')
TABLE_NAME = 'airline_tickets'
class TicketsParser:
"""Parsing info from the Stream."""
def __init__(self, table_name, records):
"""Init method."""
self.table = DYNAMO_DB.Table(table_name)
self.json_data = TicketsParser.get_json_data(records)
@staticmethod
def get_json_data(records):
"""Return deserialized data from the stream."""
decoded_record_data = ([base64.b64decode(record['kinesis']['data'])
for record in records])
json_data = ([json.loads(decoded_record)
for decoded_record in decoded_record_data])
return json_data
@staticmethod
def get_item_from_json(json_item):
"""Pre-process the json data."""
new_item = {
'record_id': json_item.get('record_id'),
'cost': Decimal(json_item.get('cost')),
'trip_class': json_item.get('trip_class'),
'show_to_affiliates': json_item.get('show_to_affiliates'),
'origin': json_item.get('origin'),
'number_of_changes': int(json_item.get('number_of_changes')),
'gate': json_item.get('gate'),
'found_at': json_item.get('found_at'),
'duration': int(json_item.get('duration')),
'distance': int(json_item.get('distance')),
'destination': json_item.get('destination'),
'depart_date': json_item.get('depart_date'),
'actual': json_item.get('actual')
}
return new_item
def run(self):
"""Batch insert into the table."""
with self.table.batch_writer() as batch_writer:
for item in self.json_data:
dynamodb_item = TicketsParser.get_item_from_json(item)
batch_writer.put_item(dynamodb_item)
print('Has been added ', len(self.json_data), 'items')
def lambda_handler(event, context):
"""Parse the stream and insert into the DynamoDB table."""
print('Got event:', event)
parser = TicketsParser(TABLE_NAME, event['Records'])
parser.run()
Creazione di una funzione lambda di notifica
La seconda funzione lambda, che monitorerà il secondo flusso (special_stream) e invierà una notifica a SNS, viene creata in modo simile. Pertanto, questa lambda deve avere accesso alla lettura da Kinesis e inviare messaggi all'argomento SNS specificato, che verranno poi inviati dal servizio SNS a tutti gli abbonati a questo argomento (e-mail, SMS, ecc.).
Creazione di un ruolo IAM
Per prima cosa, creiamo il ruolo IAM Lambda-KinesisAlarm per questa lambda, quindi assegniamo questo ruolo alla lambda alarm_notifier creata:


Questa lambda dovrebbe funzionare su un trigger quando nuovi record entrano nel flusso special_stream, quindi è necessario configurare il trigger nello stesso modo in cui abbiamo fatto per la lambda Collector.
Per semplificare la configurazione di questa lambda, introdurremo una nuova variabile d'ambiente, TOPIC_ARN, in cui inseriremo gli ANR (Amazon Recourse Names) dell'argomento Airlines:

E inseriamo il codice lambda, non è affatto complicato:
import boto3
import base64
import os
SNS_CLIENT = boto3.client('sns')
TOPIC_ARN = os.environ['TOPIC_ARN']
def lambda_handler(event, context):
try:
SNS_CLIENT.publish(TopicArn=TOPIC_ARN,
Message='Hi! I have found an interesting stuff!',
Subject='Airline tickets alarm')
print('Alarm message has been successfully delivered')
except Exception as err:
print('Delivery failure', str(err))
Sembra che la configurazione manuale del sistema sia completata. Non resta che testare e accertarsi di aver impostato tutto correttamente.
Distribuisci dal codice Terraform
Preparazione necessaria
— uno strumento open source molto pratico per la distribuzione di infrastrutture da codice. Ha una propria sintassi, facile da apprendere, e molti esempi su come e cosa distribuire. L'editor Atom o Visual Studio Code offrono molti plugin utili che semplificano l'utilizzo di Terraform.
Puoi scaricare la distribuzione Un'analisi dettagliata di tutte le funzionalità di Terraform esula dallo scopo di questo articolo, pertanto ci limiteremo ai punti principali.
Come correre
Il codice completo del progetto è qui Clona il repository per te stesso. Prima di iniziare, assicurati di aver installato e configurato AWS CLI, perché Terraform cercherà le credenziali nel file ~/.aws/credentials.
Una buona pratica è quella di eseguire il comando plan prima di distribuire l'intera infrastruttura per vedere cosa Terraform creerà per noi nel cloud:
terraform.exe planTi verrà chiesto di inserire un numero di telefono a cui verranno inviate le notifiche. L'inserimento è facoltativo in questa fase.

Dopo aver analizzato il piano di lavoro del programma, possiamo iniziare a creare risorse:
terraform.exe applyDopo aver inviato questo comando, ti verrà nuovamente chiesto di inserire il tuo numero di telefono; digita "Sì" quando verrà visualizzata la domanda sull'effettiva esecuzione delle azioni. Questo ti consentirà di avviare l'intera infrastruttura, eseguire tutte le impostazioni EC2 necessarie, distribuire le funzioni lambda, ecc.
Dopo aver creato correttamente tutte le risorse tramite il codice Terraform, è necessario accedere ai dettagli dell'applicazione Kinesis Analytics (purtroppo non ho trovato come farlo direttamente dal codice).
Avviare l'applicazione:

Dopodiché, è necessario impostare esplicitamente il nome del flusso nell'applicazione selezionandolo dall'elenco a discesa:


Ora tutto è pronto per funzionare.
Test dell'applicazione
Indipendentemente da come hai distribuito il sistema, manualmente o tramite il codice Terraform, funzionerà sempre allo stesso modo.
Andiamo tramite SSH alla macchina virtuale EC2 dove è installato Kinesis Agent ed eseguiamo lo script api_caller.py
sudo ./api_caller.py TOKENNon resta che attendere un SMS al tuo numero:

SMS - il messaggio arriva sul telefono in circa 1 minuto:

Resta da vedere se i record vengono salvati nel database DynamoDB per un'analisi più approfondita. La tabella airline_tickets contiene dati più o meno come questi:

conclusione
Nel corso del lavoro svolto, è stato sviluppato un sistema di elaborazione dati online basato su Amazon Kinesis. Sono state valutate le opzioni per l'utilizzo di Kinesis Agent in combinazione con Kinesis Data Streams e l'analisi in tempo reale di Kinesis Analytics tramite comandi SQL, nonché l'interazione di Amazon Kinesis con altri servizi AWS.
Abbiamo implementato il sistema sopra descritto in due modi: un metodo manuale piuttosto lungo e un metodo rapido basato sul codice Terraform.
L'intero codice sorgente del progetto è disponibile , ti consiglio di leggerlo.
Sono felice di discutere l'articolo, in attesa dei vostri commenti. Spero in critiche costruttive.
Vi auguro successo!
Fonte: habr.com
