O una mica de tetrisologia aplicada.
Tot el nou està ben oblidat i vell.
Epígrafs.
Declaració de problemes
Cal descarregar periòdicament el fitxer de registre de PostgreSQL actual des del núvol AWS a l'amfitrió de Linux local. No en temps real, però, diguem-ne, amb un lleuger retard.
El període de descàrrega de l'actualització del fitxer de registre és de 5 minuts.
El fitxer de registre, a AWS, es gira cada hora.
Eines usades
Per carregar el fitxer de registre a l'amfitrió, s'utilitza un script bash que crida a l'API d'AWS "
Paràmetres:
- --db-instance-identifier: nom de la instància a AWS;
- --log-file-name: nom del fitxer de registre generat actualment
- --max-item: el nombre total d'elements retornats a la sortida de l'ordre.La mida del fragment del fitxer baixat.
- --starting-token: testimoni d'inici
Sí, i simplement: una tasca interessant per a la formació i la varietat durant les hores de treball.
Suposo que el problema ja s'ha resolt en virtut de la rutina. Però un Google ràpid no va suggerir solucions, i no hi havia cap voluntat particular de cercar amb més profunditat. En qualsevol cas, és un bon entrenament.
Formalització de la tasca
El fitxer de registre final és un conjunt de línies de longitud variable. Gràficament, el fitxer de registre es pot representar així:
Ja et recorda alguna cosa? Què passa amb "tetris"? I aquí hi ha què.
Si representem gràficament les possibles opcions que sorgeixen en carregar el següent fitxer (per senzillesa, en aquest cas, que les línies tinguin la mateixa longitud), obtenim xifres de tetris estàndard:
1) El fitxer es descarrega sencer i és definitiu. La mida del fragment és més gran que la mida final del fitxer:
2) El fitxer té una continuació. La mida del tros és més petita que la mida final del fitxer:
3) El fitxer és una continuació del fitxer anterior i té una continuació. La mida del fragment és menor que la mida de la resta del fitxer final:
4) L'expedient és una continuació de l'expedient anterior i és definitiu. La mida del fragment és més gran que la mida de la resta del fitxer final:
La tasca és muntar un rectangle o jugar a Tetris a un nou nivell.
Problemes que sorgeixen en el curs de la resolució del problema
1) Enganxeu una corda de 2 porcions
En general, no hi va haver cap problema particular. Una tasca estàndard del curs inicial de programació.
Mida òptima de la porció
Però això és una mica més interessant.
Malauradament, no hi ha manera d'utilitzar un desplaçament després de l'etiqueta del fragment inicial:
Com ja sabeu, l'opció --starting-token s'utilitza per especificar on començar a paginar. Aquesta opció pren valors de cadena, la qual cosa significa que si intenteu afegir un valor de compensació davant de la cadena Next Token, l'opció no es tindrà en compte com a compensació.
I per tant, heu de llegir en trossos-porcions.
Si llegiu en grans porcions, el nombre de lectures serà mínim, però el volum serà màxim.
Si llegiu en petites porcions, al contrari, el nombre de lectures serà màxim, però el volum serà mínim.
Per tant, per tal de reduir el trànsit i la bellesa general de la solució, vaig haver d'aconseguir algun tipus de solució que, malauradament, sembla una mica una crossa.
Per il·lustrar-ho, considerem el procés de descàrrega d'un fitxer de registre en 2 versions molt simplificades. El nombre de lectures en ambdós casos depèn de la mida de la porció.
1) Carregueu en petites porcions:
2) Càrrega en grans porcions:
Com és habitual, la solució òptima es troba al mig.
La mida de la porció és mínima, però en el procés de lectura, la mida es pot augmentar per reduir el nombre de lectures.
Cal assenyalar que el problema de seleccionar la mida òptima de la part de lectura encara no s'ha resolt completament i requereix un estudi i anàlisi més profunds. Potser una mica més tard.
Descripció general de la implementació
Taules de servei usades
CREATE TABLE endpoint
(
id SERIAL ,
host text
);
TABLE database
(
id SERIAL ,
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.
Text complet del guió
descarregar_aws_piece.sh
#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
let min_item_size=1024
let max_item_size=1048576
let growth_factor=3
let growth_counter=1
let growth_counter_max=3
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:''STARTED'
AWS_LOG_TIME=$1
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
database_id=$2
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:database_id='$database_id
RESULT_FILE=$3
endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:endpoint='$endpoint
db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:db_instance='$db_instance
LOG_FILE=$RESULT_FILE'.tmp_log'
TMP_FILE=$LOG_FILE'.tmp'
TMP_MIDDLE=$LOG_FILE'.tmp_mid'
TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:is_new_log='$is_new_log
let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
let count=1
if [[ $is_new_log == '1' ]];
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 2
fi
else
next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
if [[ $next_token == '' ]];
then
next_token='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 3
fi
line_count=`cat $LOG_FILE | wc -l`
let lines=$line_count-1
tail -$lines $LOG_FILE > $TMP_MIDDLE
mv -f $TMP_MIDDLE $LOG_FILE
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
if [[ $next_token == '' ]];
then
cp $TMP_FILE $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
else
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
fi
first_str=`tail -1 $TMP_FILE`
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
head -$lines $TMP_FILE > $RESULT_FILE
###############################################
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
#
#################################################################
exit 0
Fragments de guió amb algunes explicacions:
Paràmetres d'entrada de l'script:
- Marca de temps del nom del fitxer de registre en format AAAA-MM-DD-HH24: AWS_LOG_TIME=$1
- ID de la base de dades: database_id=$2
- Nom del fitxer de registre recollit: RESULT_FILE=$3
Obteniu la marca de temps de l'últim fitxer de registre penjat:
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
Si la marca de temps de l'últim fitxer de registre carregat no coincideix amb el paràmetre d'entrada, es carregarà un fitxer de registre nou:
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
Obtenim el valor de l'etiqueta nexttoken del fitxer carregat:
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
El signe del final de la descàrrega és el valor buit de nexttoken.
En un bucle, comptem porcions del fitxer, al llarg del camí, concatenant línies i augmentant la mida de la porció:
Bucle principal
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
Que segueix?
Per tant, s'ha resolt la primera tasca intermèdia: "baixar el fitxer de registre del núvol". Què fer amb el registre descarregat?
Primer heu d'analitzar el fitxer de registre i extreure'n les sol·licituds reals.
La tasca no és gaire difícil. El bash-script més senzill funciona bé.
upload_log_query.sh
#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file
# version HABR
###########################################################
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id
beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '
space=' '
cat $source_file | while read line
do
line="$space$line"
if [[ $first_line == "1" ]]; then
beginer=`echo $line | awk -F" " '{ print $1}' `
first_line='0'
fi
current_beginer=`echo $line | awk -F" " '{ print $1}' `
if [[ $current_beginer == $beginer ]]; then
if [[ $sql_flag == '1' ]]; then
sql_flag='0'
log_date=`echo $sql_line | awk -F" " '{ print $1}' `
log_time=`echo $sql_line | awk -F" " '{ print $2}' `
duration=`echo $sql_line | awk -F" " '{ print $5}' `
#replace ' to ''
sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
sql_line=' '
################
#PROCESSING OF THE SQL-SELECT IS HERE
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )"
then
echo 'FATAL_ERROR - log_query '
exit 1
fi
################
fi #if [[ $sql_flag == '1' ]]; then
let "line_count=line_count+1"
check=`echo $line | awk -F" " '{ print $8}' `
check_sql=${check^^}
#echo 'check_sql='$check_sql
if [[ $check_sql == 'SELECT' ]]; then
sql_flag='1'
sql_line="$sql_line$line"
ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
fi
else
if [[ $sql_flag == '1' ]]; then
sql_line="$sql_line$line"
fi
fi #if [[ $current_beginer == $beginer ]]; then
done
Ara podeu treballar amb la consulta extreta del fitxer de registre.
I hi ha diverses possibilitats útils.
Les consultes analitzades s'han d'emmagatzemar en algun lloc. Per a això, s'utilitza una taula de serveis. consulta_log
CREATE TABLE log_query
(
id SERIAL ,
queryid bigint ,
query_md5hash text not null ,
database_id integer not null ,
timepoint timestamp without time zone not null,
duration double precision not null ,
query text not null ,
explained_plan text[],
plan_md5hash text ,
explained_plan_wo_costs text[],
plan_hash_value text ,
baseline_id integer ,
ip text ,
port text
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );
CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;
La sol·licitud analitzada es processa a plpgsql funcions "consulta_log».
log_query.sql
--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$
DECLARE
result boolean ;
log_timepoint timestamp without time zone ;
log_duration double precision ;
pos integer ;
log_query text ;
activity_string text ;
log_md5hash text ;
log_explain_plan text[] ;
log_planhash text ;
log_plan_wo_costs text[] ;
database_rec record ;
pg_stat_query text ;
test_log_query text ;
log_query_rec record;
found_flag boolean;
pg_stat_history_rec record ;
port_start integer ;
port_end integer ;
client_ip text ;
client_port text ;
log_queryid bigint ;
log_query_text text ;
pg_stat_query_text text ;
BEGIN
result = TRUE ;
RAISE NOTICE '***log_query';
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = duration:: double precision;
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
activity_string = 'New query has logged '||
' database_id = '|| log_database_id ||
' query_md5hash='||log_md5hash||
' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
RAISE NOTICE '%',activity_string;
PERFORM pg_log( log_database_id , 'log_query' , activity_string);
EXCEPTION
WHEN unique_violation THEN
RAISE NOTICE '*** unique_violation *** query already has logged';
END;
SELECT queryid
INTO log_queryid
FROM log_query
WHERE query_md5hash = log_md5hash AND
timepoint = log_timepoint;
IF log_queryid IS NOT NULL
THEN
RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
RETURN result;
END IF;
------------------------------------------------
RAISE NOTICE 'Update queryid';
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
RETURN result ;
END
$$ LANGUAGE plpgsql;
Quan es processa, s'utilitza la taula de serveis pg_stat_db_queriesA que conté una instantània de les consultes actuals de la taula pg_stat_history (L'ús de la taula es descriu aquí −
TABLE pg_stat_db_queries
(
database_id integer,
queryid bigint ,
query text ,
max_time double precision
);
TABLE pg_stat_history
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision ,
…
);
La funció us permet implementar una sèrie de funcions útils per processar les sol·licituds d'un fitxer de registre. És a dir:
Oportunitat núm. 1 - Historial d'execució de consultes
Molt útil per iniciar un incident de rendiment. Primer, familiaritzeu-vos amb la història, i quan va començar la desacceleració?
Després, segons els clàssics, busqueu causes externes. Pot ser que la càrrega de la base de dades hagi augmentat dràsticament i la sol·licitud específica no hi tingui res a veure.
Afegeix una nova entrada a la taula log_query
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = to_number(duration,'99999999999999999999D9999999999');
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
RAISE NOTICE 'log_query=%',log_query ;
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
Característica núm. 2: desar els plans d'execució de consultes
En aquest punt, pot sorgir una objecció-aclariment-comentari: "Però ja hi ha autoexplicació". Sí, ho és, però quin sentit té si el pla d'execució s'emmagatzema al mateix fitxer de registre i per desar-lo per a una anàlisi posterior, heu d'analitzar el fitxer de registre?
Tanmateix, necessitava:
primer: emmagatzemar el pla d'execució a la taula de serveis de la base de dades de seguiment;
segon: poder comparar els plans d'execució entre si per veure immediatament que el pla d'execució de la consulta ha canviat.
Hi ha disponible una sol·licitud amb paràmetres d'execució específics. Obtenir i emmagatzemar el seu pla d'execució mitjançant EXPLAIN és una tasca elemental.
A més, utilitzant l'expressió EXPLAIN (COSTS FALSE), podeu obtenir el marc del pla, que s'utilitzarà per obtenir el valor hash del pla, que us ajudarà en l'anàlisi posterior de l'historial de canvis del pla d'execució.
Obteniu una plantilla de pla d'execució
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
Oportunitat #3: Ús del registre de consultes per a la supervisió
Com que les mètriques de rendiment no es configuren per al text de la sol·licitud, sinó per al seu identificador, cal que associïu les sol·licituds del fitxer de registre amb les sol·licituds per a les quals es configuren les mètriques de rendiment.
Bé, almenys per tenir l'hora exacta d'ocurrència d'un incident d'actuació.
Així, quan es produeix un incident de rendiment per a un ID de sol·licitud, hi haurà un enllaç a una sol·licitud específica amb valors de paràmetres específics i el temps exacte d'execució i la durada de la sol·licitud. Obteniu la informació proporcionada utilitzant només la vista pg_stat_statements - està prohibit.
Cerqueu el queryid de la consulta i actualitzeu l'entrada a la taula log_query
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
Paraula posterior
Com a resultat, el mètode descrit ha trobat la seva aplicació a
Encara que, per descomptat, al meu parer personal, encara caldrà treballar l'algorisme per seleccionar i canviar la mida de la part descarregada. El problema encara no s'ha resolt en el cas general. Probablement serà interessant.
Però aquesta és una història completament diferent...
Font: www.habr.com