یا کمی تتریسولوژی کاربردی.
هر چیز جدید به خوبی فراموش شده قدیمی است.
کتیبه ها.
بیانیه مشکل
شما باید به صورت دوره ای فایل لاگ PostgreSQL فعلی را از ابر AWS به میزبان لینوکس محلی خود دانلود کنید. نه در زمان واقعی، اما، فرض کنید، با کمی تاخیر.
مدت زمان دانلود آپدیت فایل لاگ 5 دقیقه است.
فایل لاگ در AWS هر ساعت یکبار چرخانده می شود.
ابزارهای مورد استفاده
برای دانلود فایل log در هاست، از یک اسکریپت bash استفاده می شود که AWS API را فراخوانی می کند.
پارامترهای:
- —db-instance-identifier: نام نمونه AWS.
- --log-file-name: نام فایل لاگ ایجاد شده در حال حاضر
- --max-item: تعداد کل موارد برگردانده شده در خروجی فرمان.اندازه بخش فایل دانلود شده
- --starting-token: نشانه شروع
و این ساده است - یک کار جالب برای آموزش و تنوع در طول ساعات کاری.
من فرض می کنم که مشکل قبلاً به دلیل زندگی روزمره حل شده است. اما یک گوگل سریع هیچ راه حلی پیشنهاد نکرد و من تمایل زیادی به جستجوی عمیق تر نداشتم. در هر صورت، تمرین خوبی است.
رسمی شدن کار
فایل لاگ نهایی از خطوط زیادی با طول متغیر تشکیل شده است. از نظر گرافیکی، فایل log را می توان چیزی شبیه به این نشان داد:
آیا قبلاً چیزی را به شما یادآوری می کند؟ تتریس چه ربطی به آن دارد؟ و در اینجا این است که چه ارتباطی با آن دارد.
اگر گزینه های ممکنی را که هنگام بارگذاری فایل بعدی به صورت گرافیکی به وجود می آیند تصور کنیم (برای سادگی، در این مورد، اجازه دهید خطوط یکسان باشند)، به دست می آوریم. قطعات تتریس استاندارد:
1) فایل به طور کامل دانلود شده و نهایی است. اندازه بخش بزرگتر از اندازه فایل نهایی است:
2) پرونده ادامه دارد. اندازه قطعه کوچکتر از اندازه فایل نهایی است:
3) فایل ادامه فایل قبلی است و دارای ادامه است. اندازه تیکه کوچکتر از اندازه باقیمانده فایل نهایی است:
4) فایل ادامه فایل قبلی و نهایی است. اندازه تکه بزرگتر از اندازه باقیمانده فایل نهایی است:
وظیفه این است که یک مستطیل جمع کنید یا تتریس را در یک سطح جدید بازی کنید.
مشکلاتی که هنگام حل یک مشکل به وجود می آیند
1) یک خط 2 تکه را چسب بزنید
در کل مشکل خاصی نداشت. یک مشکل استاندارد از یک دوره برنامه نویسی اولیه.
اندازه سرویس بهینه
اما این کمی جالب تر است.
متأسفانه، هیچ راهی برای استفاده از افست پس از برچسب قسمت شروع وجود ندارد:
همانطور که قبلاً می دانید گزینه - starting-token برای تعیین محل شروع صفحه بندی استفاده می شود. این گزینه مقادیر String را می گیرد که به این معنی است که اگر سعی کنید یک مقدار افست در مقابل رشته Next Token اضافه کنید، این گزینه به عنوان یک افست در نظر گرفته نمی شود.
و بنابراین، شما باید آن را به صورت تکه تکه بخوانید.
اگر در قسمتهای زیاد مطالعه کنید، تعداد خواندهها حداقل، اما حجم حداکثر خواهد بود.
اگر در بخش های کوچک مطالعه کنید، برعکس، تعداد خواندن ها حداکثر خواهد بود، اما حجم آن حداقل خواهد بود.
بنابراین، برای کاهش ترافیک و برای زیبایی کلی راه حل، مجبور شدم راه حلی ارائه دهم که متاسفانه کمی شبیه عصا است.
برای مثال، اجازه دهید روند دانلود یک فایل لاگ را در 2 نسخه بسیار ساده شده در نظر بگیریم. تعداد خواندن در هر دو مورد بستگی به اندازه قسمت دارد.
1) بار در قسمت های کوچک:
2) بار در قسمت های بزرگ:
طبق معمول، راه حل بهینه در وسط است.
اندازه سرو حداقل است، اما در طول فرآیند خواندن، می توان اندازه را افزایش داد تا تعداد دفعات مطالعه کاهش یابد.
باید توجه داشت که مشکل انتخاب اندازه بهینه قسمت خوانا هنوز حل نشده است و نیاز به مطالعه و تحلیل عمیق تری دارد. شاید کمی بعد.
شرح کلی پیاده سازی
میزهای سرویس استفاده شده
CREATE TABLE endpoint
(
id SERIAL ,
host text
);
TABLE database
(
id SERIAL ,
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.
متن کامل اسکریپت
download_aws_piece.sh
#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
let min_item_size=1024
let max_item_size=1048576
let growth_factor=3
let growth_counter=1
let growth_counter_max=3
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:''STARTED'
AWS_LOG_TIME=$1
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
database_id=$2
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:database_id='$database_id
RESULT_FILE=$3
endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:endpoint='$endpoint
db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:db_instance='$db_instance
LOG_FILE=$RESULT_FILE'.tmp_log'
TMP_FILE=$LOG_FILE'.tmp'
TMP_MIDDLE=$LOG_FILE'.tmp_mid'
TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:is_new_log='$is_new_log
let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
let count=1
if [[ $is_new_log == '1' ]];
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 2
fi
else
next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
if [[ $next_token == '' ]];
then
next_token='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 3
fi
line_count=`cat $LOG_FILE | wc -l`
let lines=$line_count-1
tail -$lines $LOG_FILE > $TMP_MIDDLE
mv -f $TMP_MIDDLE $LOG_FILE
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
if [[ $next_token == '' ]];
then
cp $TMP_FILE $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
else
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
fi
first_str=`tail -1 $TMP_FILE`
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
head -$lines $TMP_FILE > $RESULT_FILE
###############################################
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
#
#################################################################
exit 0
قطعات اسکریپت با چند توضیح:
پارامترهای ورودی اسکریپت:
- مهر زمانی نام فایل گزارش در قالب YYYY-MM-DD-HH24: AWS_LOG_TIME=$1
- شناسه پایگاه داده: database_id=$2
- نام فایل گزارش جمع آوری شده: RESULT_FILE=3$
مهر زمانی آخرین فایل گزارش بارگیری شده را دریافت کنید:
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
اگر مهر زمانی آخرین فایل گزارش بارگیری شده با پارامتر ورودی مطابقت نداشته باشد، یک فایل گزارش جدید بارگیری می شود:
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
مقدار برچسب nexttoken را از فایل دانلود شده دریافت می کنیم:
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
یک مقدار nexttoken خالی به عنوان نشانه ای از پایان دانلود عمل می کند.
در یک حلقه، بخشهایی از فایل را میشماریم، خطوط را در طول مسیر به هم متصل میکنیم و اندازه آن را افزایش میدهیم:
حلقه اصلی
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
بعدش چی؟
بنابراین، اولین کار میانی - "دانلود یک فایل گزارش از ابر" حل شده است. با گزارش دانلود شده چه باید کرد؟
ابتدا باید فایل log را تجزیه و درخواست های واقعی را از آن استخراج کنید.
کار خیلی سخت نیست. ساده ترین اسکریپت bash این کار را به خوبی انجام می دهد.
upload_log_query.sh
#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file
# version HABR
###########################################################
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id
beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '
space=' '
cat $source_file | while read line
do
line="$space$line"
if [[ $first_line == "1" ]]; then
beginer=`echo $line | awk -F" " '{ print $1}' `
first_line='0'
fi
current_beginer=`echo $line | awk -F" " '{ print $1}' `
if [[ $current_beginer == $beginer ]]; then
if [[ $sql_flag == '1' ]]; then
sql_flag='0'
log_date=`echo $sql_line | awk -F" " '{ print $1}' `
log_time=`echo $sql_line | awk -F" " '{ print $2}' `
duration=`echo $sql_line | awk -F" " '{ print $5}' `
#replace ' to ''
sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
sql_line=' '
################
#PROCESSING OF THE SQL-SELECT IS HERE
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )"
then
echo 'FATAL_ERROR - log_query '
exit 1
fi
################
fi #if [[ $sql_flag == '1' ]]; then
let "line_count=line_count+1"
check=`echo $line | awk -F" " '{ print $8}' `
check_sql=${check^^}
#echo 'check_sql='$check_sql
if [[ $check_sql == 'SELECT' ]]; then
sql_flag='1'
sql_line="$sql_line$line"
ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
fi
else
if [[ $sql_flag == '1' ]]; then
sql_line="$sql_line$line"
fi
fi #if [[ $current_beginer == $beginer ]]; then
done
اکنون می توانید با درخواست انتخاب شده از فایل log کار کنید.
و چندین فرصت مفید باز می شود.
پرس و جوهای تجزیه شده باید در جایی ذخیره شوند. برای این کار از میز سرویس استفاده می شود log_query
CREATE TABLE log_query
(
id SERIAL ,
queryid bigint ,
query_md5hash text not null ,
database_id integer not null ,
timepoint timestamp without time zone not null,
duration double precision not null ,
query text not null ,
explained_plan text[],
plan_md5hash text ,
explained_plan_wo_costs text[],
plan_hash_value text ,
baseline_id integer ,
ip text ,
port text
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );
CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;
درخواست تجزیه شده در پردازش می شود plpgsql کارکرد "log_query'.
log_query.sql
--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$
DECLARE
result boolean ;
log_timepoint timestamp without time zone ;
log_duration double precision ;
pos integer ;
log_query text ;
activity_string text ;
log_md5hash text ;
log_explain_plan text[] ;
log_planhash text ;
log_plan_wo_costs text[] ;
database_rec record ;
pg_stat_query text ;
test_log_query text ;
log_query_rec record;
found_flag boolean;
pg_stat_history_rec record ;
port_start integer ;
port_end integer ;
client_ip text ;
client_port text ;
log_queryid bigint ;
log_query_text text ;
pg_stat_query_text text ;
BEGIN
result = TRUE ;
RAISE NOTICE '***log_query';
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = duration:: double precision;
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
activity_string = 'New query has logged '||
' database_id = '|| log_database_id ||
' query_md5hash='||log_md5hash||
' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
RAISE NOTICE '%',activity_string;
PERFORM pg_log( log_database_id , 'log_query' , activity_string);
EXCEPTION
WHEN unique_violation THEN
RAISE NOTICE '*** unique_violation *** query already has logged';
END;
SELECT queryid
INTO log_queryid
FROM log_query
WHERE query_md5hash = log_md5hash AND
timepoint = log_timepoint;
IF log_queryid IS NOT NULL
THEN
RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
RETURN result;
END IF;
------------------------------------------------
RAISE NOTICE 'Update queryid';
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
RETURN result ;
END
$$ LANGUAGE plpgsql;
در طول پردازش از یک جدول خدمات استفاده می شود pg_stat_db_queries، حاوی یک عکس فوری از پرس و جوهای فعلی از جدول است pg_stat_history (استفاده از جدول در اینجا توضیح داده شده است
TABLE pg_stat_db_queries
(
database_id integer,
queryid bigint ,
query text ,
max_time double precision
);
TABLE pg_stat_history
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision ,
…
);
این تابع به شما امکان می دهد تعدادی از قابلیت های مفید برای پردازش درخواست ها از یک فایل گزارش را پیاده سازی کنید. برای مثال:
فرصت شماره 1 - تاریخچه اجرای پرس و جو
برای شروع حل یک حادثه عملکرد بسیار مفید است. ابتدا با تاریخچه آشنا شوید - کاهش سرعت از چه زمانی آغاز شد؟
سپس، با توجه به کلاسیک ها، به دنبال دلایل خارجی باشید. شاید بارگذاری پایگاه داده به سادگی به شدت افزایش یافته باشد و درخواست خاص ربطی به آن نداشته باشد.
یک ورودی جدید به جدول log_query اضافه کنید
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = to_number(duration,'99999999999999999999D9999999999');
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
RAISE NOTICE 'log_query=%',log_query ;
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
امکان شماره 2 - برنامه های اجرای پرس و جو را ذخیره کنید
در این مرحله ممکن است یک اعتراض-توضیح-نظر مطرح شود:اما در حال حاضر توضیح خودکار وجود دارد" بله وجود دارد، اما اگر برنامه اجرایی در همان فایل لاگ ذخیره شود و برای ذخیره آن برای تجزیه و تحلیل بیشتر، باید فایل لاگ را تجزیه و تحلیل کنید، چه فایده ای دارد؟
چیزی که نیاز داشتم این بود:
اول: برنامه اجرا را در جدول سرویس پایگاه داده نظارت ذخیره کنید.
ثانیاً: بتوانید برنامه های اجرایی را با یکدیگر مقایسه کنید تا بلافاصله متوجه شوید که طرح اجرای پرس و جو تغییر کرده است.
درخواستی با پارامترهای اجرایی خاص وجود دارد. به دست آوردن و ذخیره برنامه اجرای آن با استفاده از EXPLAIN یک کار ابتدایی است.
علاوه بر این، با استفاده از عبارت EXPLAIN (COSTS FALSE) می توانید یک اسکلت از طرح را بدست آورید که برای به دست آوردن مقدار هش طرح استفاده می شود که به تجزیه و تحلیل بعدی تاریخچه تغییرات در طرح اجرا کمک می کند.
دریافت الگوی طرح اجرایی
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
امکان شماره 3 - استفاده از گزارش پرس و جو برای نظارت
از آنجایی که معیارهای عملکرد نه بر روی متن درخواست، بلکه در شناسه آن پیکربندی میشوند، باید درخواستهای فایل گزارش را با درخواستهایی که معیارهای عملکرد برای آنها پیکربندی شدهاند مرتبط کنید.
خوب، حداقل برای اینکه زمان دقیق وقوع یک حادثه اجرا را داشته باشیم.
به این ترتیب، هنگامی که یک حادثه عملکرد برای شناسه درخواست رخ می دهد، پیوندی به یک درخواست خاص با مقادیر پارامتر خاص و زمان دقیق اجرا و مدت زمان درخواست وجود خواهد داشت. این اطلاعات را فقط با استفاده از نمای دریافت کنید pg_stat_statements - ممنوع است.
queryid درخواست را پیدا کنید و ورودی را در جدول log_query به روز کنید
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
پس از کلمه
تکنیک توصیف شده در نهایت کاربرد پیدا کرد
اگرچه البته به نظر شخصی من باید روی الگوریتم انتخاب و تغییر اندازه قسمت دانلود شده بیشتر کار کرد. مشکل در حالت کلی هنوز حل نشده است. احتمالا جالب خواهد بود.
اما این یک داستان کاملا متفاوت است ...
منبع: www.habr.com