Yoki biroz amaliy tetrisologiya.
Hamma yangi narsa yaxshi unutilgan eskidir.
Epigraflar.
Muammoni shakllantirish
Joriy PostgreSQL jurnal faylini vaqti-vaqti bilan AWS bulutidan mahalliy Linux xostiga yuklab olish kerak. Haqiqiy vaqtda emas, lekin biroz kechikish bilan aytamiz.
Jurnal fayl yangilanishini yuklab olish muddati 5 daqiqa.
AWS da jurnal fayli har soatda aylantiriladi.
Ishlatilgan asboblar
Jurnal faylini xostga yuklash uchun AWS API chaqiruvchi bash skripti ishlatiladi "
Parametrlar:
- --db-instance-identifier: AWSdagi misol nomi;
- --log-file-name: hozirda yaratilgan jurnal faylining nomi
- --max-item: Buyruqning chiqishida qaytarilgan elementlarning umumiy soni.Yuklab olingan faylning bo'lak hajmi.
- --starting-token: boshlang'ich token belgisi
Ha, va oddiygina - ish vaqtida mashg'ulotlar va xilma-xillik uchun qiziqarli vazifa.
O'ylaymanki, muammo allaqachon muntazam ravishda hal qilingan. Ammo tezkor Google yechimlarni taklif qilmadi va chuqurroq qidirish uchun alohida istak yo'q edi. Har holda, bu yaxshi mashq.
Vazifani rasmiylashtirish
Yakuniy jurnal fayli o'zgaruvchan uzunlikdagi qatorlar to'plamidir. Grafik jihatdan log fayli quyidagicha ko'rsatilishi mumkin:
Bu sizga allaqachon nimanidir eslatadimi? "Tetris" nima? Va mana nima.
Agar keyingi faylni grafik tarzda yuklashda yuzaga kelishi mumkin bo'lgan variantlarni ifodalasak (oddiylik uchun, bu holda, chiziqlar bir xil uzunlikka ega bo'lsin), biz olamiz standart tetris raqamlari:
1) Fayl to'liq yuklab olinadi va yakuniy hisoblanadi. Bo'lak hajmi oxirgi fayl hajmidan kattaroq:
2) Faylning davomi bor. Bo'lak hajmi oxirgi fayl hajmidan kichikroq:
3) Fayl oldingi faylning davomi bo'lib, davomi mavjud. Bo'lak hajmi oxirgi faylning qolgan qismidan kichikroq:
4) Fayl oldingi faylning davomi boβlib, yakuniy hisoblanadi. Bo'lak hajmi oxirgi faylning qolgan qismidan kattaroqdir:
Vazifa - to'rtburchakni yig'ish yoki Tetrisni yangi darajada o'ynash.
Muammoni hal qilish jarayonida yuzaga keladigan muammolar
1) 2 qismdan iborat ipni yopishtiring
Umuman olganda, alohida muammolar yo'q edi. Dastlabki dasturlash kursidan standart topshiriq.
Optimal xizmat hajmi
Ammo bu biroz qiziqroq.
Afsuski, boshlang'ich bo'lak yorlig'idan keyin ofsetdan foydalanishning hech qanday usuli yo'q:
Siz allaqachon bilganingizdek --starting-token opsiyasi sahifalashni qaerdan boshlashni belgilash uchun ishlatiladi. Ushbu parametr String qiymatlarini oladi, ya'ni agar siz Keyingi token qatori oldiga ofset qiymatini qo'shmoqchi bo'lsangiz, variant ofset sifatida hisobga olinmaydi.
Shunday qilib, siz qismlarga bo'lingan holda o'qishingiz kerak.
Agar siz katta qismlarda o'qisangiz, unda o'qishlar soni minimal bo'ladi, lekin hajm maksimal bo'ladi.
Agar siz kichik qismlarda o'qisangiz, aksincha, o'qishlar soni maksimal bo'ladi, lekin hajmi minimal bo'ladi.
Shuning uchun, tirbandlikni kamaytirish va yechimning umumiy go'zalligi uchun men, afsuski, qo'ltiq tayoqqa o'xshab ketadigan qandaydir echimni topishim kerak edi.
Tasavvur qilish uchun, keling, jurnal faylini 2 ta soddalashtirilgan versiyada yuklab olish jarayonini ko'rib chiqaylik. Ikkala holatda ham o'qishlar soni qism hajmiga bog'liq.
1) Kichik qismlarga yuklang:
2) Katta qismlarga yuklang:
Odatdagidek, optimal yechim o'rtada.
Porsiya hajmi minimal, lekin o'qish jarayonida o'qishlar sonini kamaytirish uchun hajmini oshirish mumkin.
Shuni ta'kidlash kerak o'qilgan qismning optimal hajmini tanlash muammosi hali to'liq hal etilmaganligi va chuqurroq o'rganish va tahlil qilishni talab qiladi. Balki biroz keyinroq.
Amalga oshirishning umumiy tavsifi
Ishlatilgan xizmat jadvallari
CREATE TABLE endpoint
(
id SERIAL ,
host text
);
TABLE database
(
id SERIAL ,
β¦
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer
);
last_aws_log_time β Π²ΡΠ΅ΠΌΠ΅Π½Π½Π°Ρ ΠΌΠ΅ΡΠΊΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅Π³ΠΎ Π·Π°Π³ΡΡΠΆΠ΅Π½Π½ΠΎΠ³ΠΎ Π»ΠΎΠ³-ΡΠ°ΠΉΠ»Π° Π² ΡΠΎΡΠΌΠ°ΡΠ΅ YYYY-MM-DD-HH24.
last_aws_nexttoken β ΡΠ΅ΠΊΡΡΠΎΠ²Π°Ρ ΠΌΠ΅ΡΠΊΠ° ΠΏΠΎΡΠ»Π΅Π΄Π½Π΅ΠΉ Π·Π°Π³ΡΡΠΆΠ΅Π½Π½ΠΎΠΉ ΠΏΠΎΡΡΠΈΠΈ.
aws_max_item_size- ΡΠΌΠΏΠΈΡΠΈΡΠ΅ΡΠΊΠΈΠΌ ΠΏΡΡΠ΅ΠΌ, ΠΏΠΎΠ΄ΠΎΠ±ΡΠ°Π½Π½ΡΠΉ Π½Π°ΡΠ°Π»ΡΠ½ΡΠΉ ΡΠ°Π·ΠΌΠ΅Ρ ΠΏΠΎΡΡΠΈΠΈ.
Skriptning to'liq matni
download_aws_piece.sh
#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
let min_item_size=1024
let max_item_size=1048576
let growth_factor=3
let growth_counter=1
let growth_counter_max=3
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:''STARTED'
AWS_LOG_TIME=$1
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
database_id=$2
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:database_id='$database_id
RESULT_FILE=$3
endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:endpoint='$endpoint
db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:db_instance='$db_instance
LOG_FILE=$RESULT_FILE'.tmp_log'
TMP_FILE=$LOG_FILE'.tmp'
TMP_MIDDLE=$LOG_FILE'.tmp_mid'
TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:is_new_log='$is_new_log
let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
let count=1
if [[ $is_new_log == '1' ]];
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 2
fi
else
next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
if [[ $next_token == '' ]];
then
next_token='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 3
fi
line_count=`cat $LOG_FILE | wc -l`
let lines=$line_count-1
tail -$lines $LOG_FILE > $TMP_MIDDLE
mv -f $TMP_MIDDLE $LOG_FILE
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
if [[ $next_token == '' ]];
then
cp $TMP_FILE $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
else
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
fi
first_str=`tail -1 $TMP_FILE`
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
head -$lines $TMP_FILE > $RESULT_FILE
###############################################
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
#
#################################################################
exit 0
Ba'zi tushuntirishlar bilan skript parchalari:
Skript kiritish parametrlari:
- YYYY-AA-DD-HH24 formatidagi jurnal fayli nomining vaqt tamg'asi: AWS_LOG_TIME=$1
- Ma'lumotlar bazasi identifikatori: database_id=$2
- YigΚ»ilgan jurnal fayli nomi: RESULT_FILE=$3
Oxirgi yuklangan jurnal faylining vaqt tamg'asini oling:
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
Agar oxirgi yuklangan jurnal faylining vaqt tamg'asi kiritilgan parametrga mos kelmasa, yangi jurnal fayli yuklanadi:
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
Biz yuklangan fayldan keyingi belgining qiymatini olamiz:
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
Yuklab olish tugashining belgisi keyingi tokenning bo'sh qiymatidir.
Loopda biz faylning qismlarini hisoblaymiz, qatorlarni birlashtiramiz va qism hajmini oshiramiz:
Asosiy tsikl
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
Keyingisi nima?
Shunday qilib, birinchi oraliq vazifa - "jurnal faylini bulutdan yuklab olish" hal qilindi. Yuklab olingan jurnal bilan nima qilish kerak?
Avval jurnal faylini tahlil qilishingiz va undan haqiqiy so'rovlarni chiqarib olishingiz kerak.
Vazifa juda qiyin emas. Eng oddiy bash-skript juda yaxshi ishlaydi.
upload_log_query.sh
#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file
# version HABR
###########################################################
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id
beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '
space=' '
cat $source_file | while read line
do
line="$space$line"
if [[ $first_line == "1" ]]; then
beginer=`echo $line | awk -F" " '{ print $1}' `
first_line='0'
fi
current_beginer=`echo $line | awk -F" " '{ print $1}' `
if [[ $current_beginer == $beginer ]]; then
if [[ $sql_flag == '1' ]]; then
sql_flag='0'
log_date=`echo $sql_line | awk -F" " '{ print $1}' `
log_time=`echo $sql_line | awk -F" " '{ print $2}' `
duration=`echo $sql_line | awk -F" " '{ print $5}' `
#replace ' to ''
sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
sql_line=' '
################
#PROCESSING OF THE SQL-SELECT IS HERE
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )"
then
echo 'FATAL_ERROR - log_query '
exit 1
fi
################
fi #if [[ $sql_flag == '1' ]]; then
let "line_count=line_count+1"
check=`echo $line | awk -F" " '{ print $8}' `
check_sql=${check^^}
#echo 'check_sql='$check_sql
if [[ $check_sql == 'SELECT' ]]; then
sql_flag='1'
sql_line="$sql_line$line"
ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
fi
else
if [[ $sql_flag == '1' ]]; then
sql_line="$sql_line$line"
fi
fi #if [[ $current_beginer == $beginer ]]; then
done
Endi siz jurnal faylidan olingan so'rov bilan ishlashingiz mumkin.
Va bir nechta foydali imkoniyatlar mavjud.
Tahlil qilingan so'rovlar biron bir joyda saqlanishi kerak. Buning uchun xizmat ko'rsatish jadvali ishlatiladi. log_so'rovi
CREATE TABLE log_query
(
id SERIAL ,
queryid bigint ,
query_md5hash text not null ,
database_id integer not null ,
timepoint timestamp without time zone not null,
duration double precision not null ,
query text not null ,
explained_plan text[],
plan_md5hash text ,
explained_plan_wo_costs text[],
plan_hash_value text ,
baseline_id integer ,
ip text ,
port text
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );
CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;
Tahlil qilingan so'rov qayta ishlanadi plpgsql funktsiyalari"log_so'rovi".
log_query.sql
--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$
DECLARE
result boolean ;
log_timepoint timestamp without time zone ;
log_duration double precision ;
pos integer ;
log_query text ;
activity_string text ;
log_md5hash text ;
log_explain_plan text[] ;
log_planhash text ;
log_plan_wo_costs text[] ;
database_rec record ;
pg_stat_query text ;
test_log_query text ;
log_query_rec record;
found_flag boolean;
pg_stat_history_rec record ;
port_start integer ;
port_end integer ;
client_ip text ;
client_port text ;
log_queryid bigint ;
log_query_text text ;
pg_stat_query_text text ;
BEGIN
result = TRUE ;
RAISE NOTICE '***log_query';
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = duration:: double precision;
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
activity_string = 'New query has logged '||
' database_id = '|| log_database_id ||
' query_md5hash='||log_md5hash||
' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
RAISE NOTICE '%',activity_string;
PERFORM pg_log( log_database_id , 'log_query' , activity_string);
EXCEPTION
WHEN unique_violation THEN
RAISE NOTICE '*** unique_violation *** query already has logged';
END;
SELECT queryid
INTO log_queryid
FROM log_query
WHERE query_md5hash = log_md5hash AND
timepoint = log_timepoint;
IF log_queryid IS NOT NULL
THEN
RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
RETURN result;
END IF;
------------------------------------------------
RAISE NOTICE 'Update queryid';
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
RETURN result ;
END
$$ LANGUAGE plpgsql;
Qayta ishlashda xizmat ko'rsatish jadvali ishlatiladi pg_stat_db_queriesJadvaldagi joriy so'rovlarning suratini o'z ichiga olgan A pg_stat_history (Jadvaldan foydalanish bu yerda tasvirlangan -
TABLE pg_stat_db_queries
(
database_id integer,
queryid bigint ,
query text ,
max_time double precision
);
TABLE pg_stat_history
(
β¦
database_id integer ,
β¦
queryid bigint ,
β¦
max_time double precision ,
β¦
);
Funktsiya log faylidan so'rovlarni qayta ishlash uchun bir qator foydali xususiyatlarni amalga oshirish imkonini beradi. Aynan:
Imkoniyat β1 - so'rovni bajarish tarixi
Ishlash hodisasini boshlash uchun juda foydali. Birinchidan, tarix bilan tanishing - va sekinlashuv qachon boshlangan?
Keyin, klassiklarga ko'ra, tashqi sabablarni izlang. Ehtimol, ma'lumotlar bazasi yuki sezilarli darajada oshdi va maxsus so'rov bunga hech qanday aloqasi yo'q.
log_query jadvaliga yangi yozuv qo'shing
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = to_number(duration,'99999999999999999999D9999999999');
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
RAISE NOTICE 'log_query=%',log_query ;
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
β2 xususiyat - So'rovni bajarish rejalarini saqlash
Shu nuqtada e'tiroz-aniqlash-sharh paydo bo'lishi mumkin: "Lekin allaqachon avtomatik tushuntirish mavjud". Ha, shunday, lekin agar ijro rejasi bir xil jurnal faylida saqlangan bo'lsa va uni keyingi tahlil qilish uchun saqlash uchun jurnal faylini tahlil qilish kerak bo'lsa, nima foyda?
Biroq, menga kerak edi:
birinchidan: ijro rejasini monitoring ma'lumotlar bazasining xizmat jadvalida saqlash;
ikkinchidan: so'rovni bajarish rejasi o'zgarganligini darhol ko'rish uchun ijro rejalarini bir-biri bilan solishtirish imkoniyatiga ega bo'lish.
Muayyan ijro parametrlari bilan so'rov mavjud. EXPLAIN yordamida uning ijro rejasini olish va saqlash oddiy vazifadir.
Bundan tashqari, EXPLAIN (COSTS FALSE) ifodasidan foydalanib, siz rejaning xesh qiymatini olish uchun foydalaniladigan reja ramkasini olishingiz mumkin, bu esa ijro rejasining o'zgarish tarixini keyingi tahlil qilishda yordam beradi.
Ijro rejasi shablonini oling
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
Imkoniyat β3 - Monitoring uchun so'rovlar jurnalidan foydalanish
Ishlash ko'rsatkichlari so'rov matni uchun emas, balki uning identifikatori uchun sozlanganligi sababli, jurnal faylidagi so'rovlarni unumdorlik ko'rsatkichlari sozlangan so'rovlar bilan bog'lashingiz kerak.
Xo'sh, hech bo'lmaganda ishlash hodisasi sodir bo'lgan vaqtni aniq bilish uchun.
Shunday qilib, so'rov identifikatori uchun ishlash hodisasi sodir bo'lganda, ma'lum parametr qiymatlari va so'rovning aniq bajarilish vaqti va davomiyligi bilan ma'lum bir so'rovga havola bo'ladi. Berilgan ma'lumotni faqat ko'rinish yordamida oling pg_stat_statements - bu taqiqlangan.
So'rovning so'rov identifikatorini toping va log_query jadvalidagi yozuvni yangilang
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
So'zdan keyin
Natijada, tasvirlangan usul o'z qo'llanilishini topdi
Albatta, mening shaxsiy fikrimcha, yuklab olingan qismning hajmini tanlash va o'zgartirish algoritmi ustida ishlash kerak bo'ladi. Umumiy holatda muammo hali hal etilmagan. Bu, ehtimol, qiziqarli bo'ladi.
Ammo bu butunlay boshqacha hikoya ...
Manba: www.habr.com