Ці крыху прыкладной тэтрысалогіі.
Усё новае - добра забытае старое.
Эпіграфы.
Пастаноўка задачы
Неабходна перыядычна загружаць бягучы лог-файл PostgreSQL з аблокі AWS на лакальны Linux хост. Не ў рэальным часе, але, скажам так, з невялікай затрымкай.
Перыяд загрузкі абнаўлення лог-файла - 5 хвілін.
Лог-файл, у AWS, ратыруецца кожную гадзіну.
Выкарыстоўваныя інструменты
Для загрузкі лог-файла на хост выкарыстоўваецца bash-скрыпт, які выклікае AWS API
параметры:
- -db-instance-identifier: Імя інстанса ў AWS;
- -log-file-name: імя бягучага сфармаванага лог-файла
- -max-item: Агульная колькасць элементаў, якія вяртаюцца ў выходных дадзеных каманды.Памер порцыі загружанага файла.
- -starting-token: Пазнака пачатковай порцыі
Ды і проста - цікавая задача, для трэніроўкі і разнастайнасці падчас працоўнага часу.
Выкажу здагадку, што задача ў сілу штодзённасці ўжо вырашана. Але хуткі гугл рашэнняў не падказаў, а шукаць больш паглыблена не было асаблівага жадання. У любым выпадку - нядрэнная трэніроўка.
Фармалізацыя задачы
Канчатковы лог-файл уяўляе сабою мноства радкоў зменнай даўжыні. Графічна, лог-файл можна ўявіць, прыкладна так:
Ужо нешта нагадвае? Пры чым тут "тэтрыс"? А вось пры чым.
Калі ўявіць магчымыя варыянты, якія ўзнікаюць пры загрузцы чарговага файла графічна (для прастаты, у дадзеным выпадку, няхай радкі маюць адну даўжыню), атрымаюцца стандартныя фігуры тэтрыса:
1) Файл загружаны цалкам і зьяўляецца канчатковым. Памер порцыі большы за памер канчатковага файла:
2) Файл мае працяг. Памер порцыі меншы за памер канчатковага файла:
3) Файл з'яўляецца працягам папярэдняга файла і мае працяг. Памер порцыі меншы за памер астатку канчатковага файла:
4) Файл з'яўляецца працягам папярэдняга файла і з'яўляецца канчатковым. Памер порцыі большы за памер астатку канчатковага файла:
Задача - сабраць прастакутнік або пагуляць у тэтрыс, на новым узроўні.
Праблемы, якія ўзнікаюць па ходзе рашэння задачы
1) Склеіць радок з 2-х порцый
Увогуле ніякіх асаблівых праблем не ўзнікла. Стандартная задача з пачатковага курса праграмавання.
Аптымальны памер порцыі
А вось гэта, крыху цікавей.
Нажаль, няма магчымасці выкарыстоўваць зрушэнне пасля пазнакі пачатковай порцыі:
Як вам дае здагадку пра option -starting-token выкарыстоўваецца для specificity where to start paginating. Гэтая функцыя String values which would mean that if you try to add on offset value in the Next Token string, option is not be this into consideration as offset.
І таму даводзіцца чытаць кавалкамі-порцыямі.
Калі чытаць вялікімі порцыямі, тая колькасць чытанняў будзе мінімальнай, але аб'ём будзе максімальнай.
Калі чытаць маленькімі порцыямі, то наадварот, колькасць чытанняў будзе максімальнай, але затое аб'ём будзе мінімальным.
Таму, для скарачэння трафіку і для агульнай прыгажосці рашэння, прыйшлося прыдумаць нейкае рашэнне, нажаль, трохі падобнае на мыліцу.
Для ілюстрацыі, разгледзім працэс загрузкі лог-файла ў 2-х моцна спрошчаных варыянтах. Колькасць чытанняў у абодвух выпадках залежыць ад памеру порцыі.
1) Загружаем малымі порцыямі:
2) Загружаем вялікімі порцыямі:
Як звычайна, аптымальнае рашэнне-пасярэдзіне.
Памер порцыі мінімальны, але ў працэсе чытання, памер можна павялічваць, для скарачэння колькасці чытанняў.
Трэба адзначыць, што цалкам задача падбору аптымальнага памеру счытваецца порцыі пакуль не вырашана і патрабуе глыбейшай прапрацоўкі і аналізу. Можа, крыху пазней.
Агульнае апісанне рэалізацыі
Выкарыстоўваныя сэрвісныя табліцы
CREATE TABLE endpoint
(
id SERIAL ,
host text
);
TABLE database
(
id SERIAL ,
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.
Поўны тэкст скрыпту
download_aws_piece.sh
#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
let min_item_size=1024
let max_item_size=1048576
let growth_factor=3
let growth_counter=1
let growth_counter_max=3
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:''STARTED'
AWS_LOG_TIME=$1
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
database_id=$2
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:database_id='$database_id
RESULT_FILE=$3
endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:endpoint='$endpoint
db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:db_instance='$db_instance
LOG_FILE=$RESULT_FILE'.tmp_log'
TMP_FILE=$LOG_FILE'.tmp'
TMP_MIDDLE=$LOG_FILE'.tmp_mid'
TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:is_new_log='$is_new_log
let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
let count=1
if [[ $is_new_log == '1' ]];
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 2
fi
else
next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
if [[ $next_token == '' ]];
then
next_token='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 3
fi
line_count=`cat $LOG_FILE | wc -l`
let lines=$line_count-1
tail -$lines $LOG_FILE > $TMP_MIDDLE
mv -f $TMP_MIDDLE $LOG_FILE
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
if [[ $next_token == '' ]];
then
cp $TMP_FILE $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
else
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
fi
first_str=`tail -1 $TMP_FILE`
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
head -$lines $TMP_FILE > $RESULT_FILE
###############################################
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
#
#################################################################
exit 0
Фрагменты скрыпту з некаторымі тлумачэннямі:
Уваходныя параметры скрыпту:
- Часавая пазнака імя лог-файла ў фармаце YYYY-MM-DD-HH24: AWS_LOG_TIME=$1
- ID Базы дадзеных: database_id=$2
- Імя сабранага лог-файла: RESULT_FILE=$3
Атрымаць часовую пазнаку апошняга загружанага лог-файла:
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
Калі часавая пазнака апошняга загружанага лог-файла не супадае з уваходным параметрам - загружаецца новы лог-файл:
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
Атрымліваем значэнне пазнакі nexttoken з загружанага файла:
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
Прыкметай канчатка загрузкі служыць пустое значэнне nexttoken.
У цыкле лічым порцыі файла, адначасна, счапляючы радкі і павялічваючы памер порцыі:
Галоўны цыкл
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
Што ж далей?
Такім чынам, першая прамежкавая задача - "загрузіць лог-файл з аблокі" вырашана. Што рабіць з загружаным логам?
Для пачатку неабходна разабраць лог-файл і вылучыць з яго ўласна запыты.
Задача не надта складаная. Найпросты bash-script суцэль спраўляецца.
upload_log_query.sh
#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file
# version HABR
###########################################################
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id
beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '
space=' '
cat $source_file | while read line
do
line="$space$line"
if [[ $first_line == "1" ]]; then
beginer=`echo $line | awk -F" " '{ print $1}' `
first_line='0'
fi
current_beginer=`echo $line | awk -F" " '{ print $1}' `
if [[ $current_beginer == $beginer ]]; then
if [[ $sql_flag == '1' ]]; then
sql_flag='0'
log_date=`echo $sql_line | awk -F" " '{ print $1}' `
log_time=`echo $sql_line | awk -F" " '{ print $2}' `
duration=`echo $sql_line | awk -F" " '{ print $5}' `
#replace ' to ''
sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
sql_line=' '
################
#PROCESSING OF THE SQL-SELECT IS HERE
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )"
then
echo 'FATAL_ERROR - log_query '
exit 1
fi
################
fi #if [[ $sql_flag == '1' ]]; then
let "line_count=line_count+1"
check=`echo $line | awk -F" " '{ print $8}' `
check_sql=${check^^}
#echo 'check_sql='$check_sql
if [[ $check_sql == 'SELECT' ]]; then
sql_flag='1'
sql_line="$sql_line$line"
ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
fi
else
if [[ $sql_flag == '1' ]]; then
sql_line="$sql_line$line"
fi
fi #if [[ $current_beginer == $beginer ]]; then
done
Цяпер з выдзеленым з лог-файла запытам, можна працаваць.
А карысных магчымасцей адкрываецца некалькі.
Разабраныя запыты трэба недзе захоўваць. Для гэтага выкарыстоўваецца сэрвісная табліца log_query
CREATE TABLE log_query
(
id SERIAL ,
queryid bigint ,
query_md5hash text not null ,
database_id integer not null ,
timepoint timestamp without time zone not null,
duration double precision not null ,
query text not null ,
explained_plan text[],
plan_md5hash text ,
explained_plan_wo_costs text[],
plan_hash_value text ,
baseline_id integer ,
ip text ,
port text
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );
CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;
Апрацоўка разабранага запыту ажыццяўляецца ў plpgsql функцыі «log_query.
log_query.sql
--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$
DECLARE
result boolean ;
log_timepoint timestamp without time zone ;
log_duration double precision ;
pos integer ;
log_query text ;
activity_string text ;
log_md5hash text ;
log_explain_plan text[] ;
log_planhash text ;
log_plan_wo_costs text[] ;
database_rec record ;
pg_stat_query text ;
test_log_query text ;
log_query_rec record;
found_flag boolean;
pg_stat_history_rec record ;
port_start integer ;
port_end integer ;
client_ip text ;
client_port text ;
log_queryid bigint ;
log_query_text text ;
pg_stat_query_text text ;
BEGIN
result = TRUE ;
RAISE NOTICE '***log_query';
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = duration:: double precision;
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
activity_string = 'New query has logged '||
' database_id = '|| log_database_id ||
' query_md5hash='||log_md5hash||
' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
RAISE NOTICE '%',activity_string;
PERFORM pg_log( log_database_id , 'log_query' , activity_string);
EXCEPTION
WHEN unique_violation THEN
RAISE NOTICE '*** unique_violation *** query already has logged';
END;
SELECT queryid
INTO log_queryid
FROM log_query
WHERE query_md5hash = log_md5hash AND
timepoint = log_timepoint;
IF log_queryid IS NOT NULL
THEN
RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
RETURN result;
END IF;
------------------------------------------------
RAISE NOTICE 'Update queryid';
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
RETURN result ;
END
$$ LANGUAGE plpgsql;
Пры апрацоўцы выкарыстоўваецца сэрвісная табліца pg_stat_db_queries, якая змяшчае здымак бягучых запытаў з табліцы pg_stat_history (Выкарыстанне табліцы апісана тут.
TABLE pg_stat_db_queries
(
database_id integer,
queryid bigint ,
query text ,
max_time double precision
);
TABLE pg_stat_history
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision ,
…
);
Функцыя дазваляе ажыццявіць шэраг карысных магчымасцяў па апрацоўцы запытаў з лог-файла. А менавіта:
Магчымасць №1 - Гісторыя выканання запытаў
Вельмі карысна для пачатку рашэння інцыдэнту прадукцыйнасці. Спачатку азнаёміцца з гісторыяй - а калі пачалося запаволенне?
Затым, па класіцы - пашукаць знешнія прычыны. Можа быць, проста рэзка павялічылася загрузка базы дадзеных і канкрэтны запыт ні пры чым.
Дадаць новы запіс у табліцу log_query
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = to_number(duration,'99999999999999999999D9999999999');
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
RAISE NOTICE 'log_query=%',log_query ;
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
Магчымасць №2 — Захоўваць планы выканання запытаў
На гэтым месцы можа ўзнікнуць пярэчанне-ўдакладненне-каментар: «Але ж ужо ёсць autoexplain». Ёсць тое ён ёсць, а што толку, калі план выканання захоўваецца ў тым жа лог-файле і для таго, каб яго захаваць для наступнага аналізу, прыйдзецца парсіць лог-файл?
Мне ж трэба было:
па-першае: захоўваць план выканання ў сэрвіснай табліцы базы даных маніторынгу;
па-другое: мець магчымасці параўноўваць планы выканання паміж сабой, каб адразу бачыць, што план выканання запыту змяніўся.
Запыт з канкрэтнымі параметрамі выканання ёсць. Атрымаць і захаваць яго план выканання, выкарыстоўваючы EXPLAIN - задача элементарная.
Больш за тое, выкарыстоўваючы выраз EXPLAIN (COSTS FALSE), можна атрымаць каркас плана, які і будзе выкарыстаны для атрымання hash-значэння плана, што дапаможа пры наступным аналізе гісторыі змены плана выканання.
Атрымаць шаблон плана выканання
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
Магчымасць №3 — Выкарыстанне лога запытаў для маніторынгу
Паколькі метрыкі прадукцыйнасці настроены не на тэкст запыту, а на яго ID, трэба звязваць запыты з лог-файла з запытамі для якіх настроены метрыкі прадукцыйнасці.
Ну хаця б для таго, каб мець дакладны час узнікнення інцыдэнту прадукцыйнасці.
Такім чынам, пры ўзнікненні інцыдэнту прадукцыйнасці для ID запыту, будзе спасылка на канкрэтны запыт з канкрэтнымі значэннямі параметра і дакладным часам выканання і працягласці запыту. Атрымаць дадзеную інфармацыю выкарыстоўваючы толькі прадстаўленне pg_stat_statements - нельга.
Знайсці queryid запыту і абнавіць запіс у табліцы log_query
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
пасляслоўе
Апісаная методыка ў выніку, знайшла сабе ўжыванне ў
Хоць, вядома, на мой асабісты аўтарскі погляд, трэба будзе яшчэ папрацаваць над алгарытмам выбару і змены памеру загружанай порцыі. Задача пакуль не вырашана ў агульным выпадку. Мусіць, будзе цікава.
Але гэта ўжо зусім іншая гісторыя...
Крыніца: habr.com