یا تھوڑا سا اطلاق شدہ tetrisology.
ہر نئی چیز اچھی طرح پرانی بھول جاتی ہے۔
ایپی گرافس۔
مسئلہ کی تشکیل
آپ کو وقتاً فوقتاً موجودہ PostgreSQL لاگ فائل کو AWS کلاؤڈ سے اپنے مقامی لینکس ہوسٹ پر ڈاؤن لوڈ کرنے کی ضرورت ہوتی ہے۔ اصل وقت میں نہیں، لیکن، تھوڑی تاخیر کے ساتھ، ہم کہتے ہیں۔
لاگ فائل اپ ڈیٹ ڈاؤن لوڈ کی مدت 5 منٹ ہے۔
AWS میں لاگ فائل کو ہر گھنٹے بعد گھمایا جاتا ہے۔
استعمال شدہ اوزار
لاگ فائل کو میزبان پر ڈاؤن لوڈ کرنے کے لیے، ایک bash اسکرپٹ استعمال کیا جاتا ہے جو AWS API کو کال کرتا ہے۔
پیرامیٹرز:
- —db-instance-identifier: AWS مثال کا نام؛
- --log-file-name: فی الحال تیار کردہ لاگ فائل کا نام
- --max-item: کمانڈ آؤٹ پٹ میں لوٹائی گئی اشیاء کی کل تعداد۔ڈاؤن لوڈ کی گئی فائل کا حصہ سائز۔
- --starting-token: شروع کرنے والا ٹوکن
اور یہ آسان ہے - کام کے اوقات میں تربیت اور مختلف قسم کے لیے ایک دلچسپ کام۔
میں فرض کروں گا کہ روزمرہ کی زندگی کی وجہ سے مسئلہ پہلے ہی حل ہو چکا ہے۔ لیکن ایک تیز گوگل نے کوئی حل تجویز نہیں کیا، اور مجھے زیادہ گہرائی میں تلاش کرنے کی زیادہ خواہش نہیں تھی۔ کسی بھی طرح سے، یہ ایک اچھی ورزش ہے۔
کام کی رسمی شکل
حتمی لاگ فائل متغیر لمبائی کی کئی لائنوں پر مشتمل ہے۔ گرافک طور پر، لاگ فائل کی نمائندگی کچھ اس طرح کی جا سکتی ہے:
کیا یہ آپ کو پہلے ہی کچھ یاد دلاتا ہے؟ ٹیٹریس کا اس سے کیا تعلق ہے؟ اور یہاں یہ ہے کہ اس کا اس کے ساتھ کیا تعلق ہے۔
اگر ہم ان ممکنہ اختیارات کا تصور کریں جو اگلی فائل کو گرافی طور پر لوڈ کرتے وقت پیدا ہوتے ہیں (سادگی کے لیے، اس معاملے میں، لائنوں کی لمبائی ایک ہی ہو)، ہمیں ملتا ہے۔ معیاری Tetris ٹکڑے ٹکڑے:
1) فائل مکمل طور پر ڈاؤن لوڈ کی گئی ہے اور حتمی ہے۔ حصہ کا سائز حتمی فائل کے سائز سے بڑا ہے:
2) فائل جاری ہے۔ حصہ کا سائز حتمی فائل کے سائز سے چھوٹا ہے:
3) فائل پچھلی فائل کا تسلسل ہے اور اس کا تسلسل ہے۔ حصہ کا سائز فائنل فائل کے بقیہ سائز سے چھوٹا ہے:
4) فائل پچھلی فائل کا تسلسل ہے اور آخری فائل ہے۔ حصہ کا سائز فائنل فائل کے بقیہ سائز سے بڑا ہے:
کام ایک مستطیل کو جمع کرنا یا Tetris کو ایک نئی سطح پر کھیلنا ہے۔
کسی مسئلے کو حل کرتے وقت پیدا ہونے والے مسائل
1) 2 ٹکڑوں کی ایک لائن چپکائیں۔
عام طور پر، کوئی خاص مسائل نہیں تھے. ابتدائی پروگرامنگ کورس سے ایک معیاری مسئلہ۔
بہترین سرونگ سائز
لیکن یہ کچھ زیادہ ہی دلچسپ ہے۔
بدقسمتی سے، اسٹارٹ پورشن لیبل کے بعد آفسیٹ استعمال کرنے کا کوئی طریقہ نہیں ہے:
جیسا کہ آپ پہلے سے ہی آپشن جانتے ہیں — starting-token یہ بتانے کے لیے استعمال کیا جاتا ہے کہ صفحہ بندی کہاں سے شروع کی جائے۔ یہ آپشن سٹرنگ ویلیوز لیتا ہے جس کا مطلب یہ ہوگا کہ اگر آپ نیکسٹ ٹوکن اسٹرنگ کے سامنے آفسیٹ ویلیو شامل کرنے کی کوشش کرتے ہیں، تو آپشن کو آفسیٹ کے طور پر غور نہیں کیا جائے گا۔
اور اس لیے، آپ کو اسے ٹکڑوں میں پڑھنا ہوگا۔
اگر آپ بڑے حصوں میں پڑھتے ہیں، تو پڑھنے کی تعداد کم سے کم ہوگی، لیکن حجم زیادہ سے زیادہ ہوگا۔
اگر آپ چھوٹے حصوں میں پڑھتے ہیں، تو اس کے برعکس، پڑھنے کی تعداد زیادہ سے زیادہ ہوگی، لیکن حجم کم سے کم ہوگا۔
اس لیے، ٹریفک کو کم کرنے اور حل کی مجموعی خوبصورتی کے لیے، مجھے ایک ایسا حل نکالنا پڑا، جو بدقسمتی سے، تھوڑا سا بیساکھی کی طرح لگتا ہے۔
مثال کے لیے، آئیے لاگ فائل کو 2 انتہائی آسان ورژن میں ڈاؤن لوڈ کرنے کے عمل پر غور کریں۔ دونوں صورتوں میں پڑھنے کی تعداد حصہ کے سائز پر منحصر ہے۔
1) چھوٹے حصوں میں لوڈ کریں:
2) بڑے حصوں میں لوڈ کریں:
ہمیشہ کی طرح، بہترین حل درمیان میں ہے۔.
پیش کرنے کا سائز کم سے کم ہے، لیکن پڑھنے کے عمل کے دوران، پڑھنے کی تعداد کو کم کرنے کے لیے سائز میں اضافہ کیا جا سکتا ہے۔
یہ نوٹ کرنا چاہئے کہ پڑھنے کے قابل حصے کے زیادہ سے زیادہ سائز کے انتخاب کا مسئلہ ابھی تک حل نہیں ہوا ہے اور اس کے لیے مزید گہرائی سے مطالعہ اور تجزیہ کی ضرورت ہے۔ شاید تھوڑی دیر بعد۔
نفاذ کی عمومی وضاحت
خدمت کی میزیں استعمال کی گئیں۔
CREATE TABLE endpoint
(
id SERIAL ,
host text
);
TABLE database
(
id SERIAL ,
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.
مکمل اسکرپٹ متن
ڈاؤن لوڈ_aws_piece.sh
#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
let min_item_size=1024
let max_item_size=1048576
let growth_factor=3
let growth_counter=1
let growth_counter_max=3
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:''STARTED'
AWS_LOG_TIME=$1
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
database_id=$2
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:database_id='$database_id
RESULT_FILE=$3
endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:endpoint='$endpoint
db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:db_instance='$db_instance
LOG_FILE=$RESULT_FILE'.tmp_log'
TMP_FILE=$LOG_FILE'.tmp'
TMP_MIDDLE=$LOG_FILE'.tmp_mid'
TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:is_new_log='$is_new_log
let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
let count=1
if [[ $is_new_log == '1' ]];
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 2
fi
else
next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
if [[ $next_token == '' ]];
then
next_token='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 3
fi
line_count=`cat $LOG_FILE | wc -l`
let lines=$line_count-1
tail -$lines $LOG_FILE > $TMP_MIDDLE
mv -f $TMP_MIDDLE $LOG_FILE
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
if [[ $next_token == '' ]];
then
cp $TMP_FILE $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
else
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
fi
first_str=`tail -1 $TMP_FILE`
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
head -$lines $TMP_FILE > $RESULT_FILE
###############################################
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
#
#################################################################
exit 0
اسکرپٹ کے ٹکڑے کچھ وضاحتوں کے ساتھ:
اسکرپٹ ان پٹ پیرامیٹرز:
- لاگ فائل کے نام کا ٹائم اسٹیمپ YYYY-MM-DD-HH24 فارمیٹ میں: AWS_LOG_TIME=$1
- ڈیٹا بیس ID: database_id=$2
- جمع شدہ لاگ فائل کا نام: RESULT_FILE=$3
آخری بھری ہوئی لاگ فائل کا ٹائم اسٹیمپ حاصل کریں:
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
اگر آخری بھری ہوئی لاگ فائل کا ٹائم اسٹیمپ ان پٹ پیرامیٹر سے مماثل نہیں ہے، تو ایک نئی لاگ فائل لوڈ کی جاتی ہے:
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
ہمیں ڈاؤن لوڈ کردہ فائل سے نیکسٹ ٹوکن لیبل کی قدر ملتی ہے:
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
ایک خالی نیکسٹ ٹوکن ویلیو ڈاؤن لوڈ کے اختتام کی علامت کے طور پر کام کرتی ہے۔
ایک لوپ میں، ہم فائل کے حصوں کو گنتے ہیں، راستے میں لائنوں کو جوڑتے ہیں اور حصے کا سائز بڑھاتے ہیں:
مین لوپ
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
اس کے بعد کیا ہے؟
لہذا، پہلا درمیانی کام - "کلاؤڈ سے لاگ فائل ڈاؤن لوڈ کریں" حل ہو گیا ہے۔ ڈاؤن لوڈ کردہ لاگ کا کیا کرنا ہے؟
سب سے پہلے، آپ کو لاگ فائل کو پارس کرنے اور اس سے اصل درخواستیں نکالنے کی ضرورت ہے۔
کام بہت مشکل نہیں ہے۔ سب سے آسان باش اسکرپٹ بہت اچھی طرح سے کام کرتا ہے۔
upload_log_query.sh
#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file
# version HABR
###########################################################
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id
beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '
space=' '
cat $source_file | while read line
do
line="$space$line"
if [[ $first_line == "1" ]]; then
beginer=`echo $line | awk -F" " '{ print $1}' `
first_line='0'
fi
current_beginer=`echo $line | awk -F" " '{ print $1}' `
if [[ $current_beginer == $beginer ]]; then
if [[ $sql_flag == '1' ]]; then
sql_flag='0'
log_date=`echo $sql_line | awk -F" " '{ print $1}' `
log_time=`echo $sql_line | awk -F" " '{ print $2}' `
duration=`echo $sql_line | awk -F" " '{ print $5}' `
#replace ' to ''
sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
sql_line=' '
################
#PROCESSING OF THE SQL-SELECT IS HERE
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )"
then
echo 'FATAL_ERROR - log_query '
exit 1
fi
################
fi #if [[ $sql_flag == '1' ]]; then
let "line_count=line_count+1"
check=`echo $line | awk -F" " '{ print $8}' `
check_sql=${check^^}
#echo 'check_sql='$check_sql
if [[ $check_sql == 'SELECT' ]]; then
sql_flag='1'
sql_line="$sql_line$line"
ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
fi
else
if [[ $sql_flag == '1' ]]; then
sql_line="$sql_line$line"
fi
fi #if [[ $current_beginer == $beginer ]]; then
done
اب آپ لاگ فائل سے منتخب کردہ درخواست کے ساتھ کام کر سکتے ہیں۔
اور کئی مفید مواقع کھلتے ہیں۔
تجزیہ شدہ سوالات کو کہیں ذخیرہ کرنے کی ضرورت ہے۔ اس کے لیے سروس ٹیبل استعمال کیا جاتا ہے۔ log_query
CREATE TABLE log_query
(
id SERIAL ,
queryid bigint ,
query_md5hash text not null ,
database_id integer not null ,
timepoint timestamp without time zone not null,
duration double precision not null ,
query text not null ,
explained_plan text[],
plan_md5hash text ,
explained_plan_wo_costs text[],
plan_hash_value text ,
baseline_id integer ,
ip text ,
port text
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );
CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;
تجزیہ کردہ درخواست پر کارروائی کی جاتی ہے۔ plpgsql افعال "log_query'.
log_query.sql
--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$
DECLARE
result boolean ;
log_timepoint timestamp without time zone ;
log_duration double precision ;
pos integer ;
log_query text ;
activity_string text ;
log_md5hash text ;
log_explain_plan text[] ;
log_planhash text ;
log_plan_wo_costs text[] ;
database_rec record ;
pg_stat_query text ;
test_log_query text ;
log_query_rec record;
found_flag boolean;
pg_stat_history_rec record ;
port_start integer ;
port_end integer ;
client_ip text ;
client_port text ;
log_queryid bigint ;
log_query_text text ;
pg_stat_query_text text ;
BEGIN
result = TRUE ;
RAISE NOTICE '***log_query';
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = duration:: double precision;
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
activity_string = 'New query has logged '||
' database_id = '|| log_database_id ||
' query_md5hash='||log_md5hash||
' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
RAISE NOTICE '%',activity_string;
PERFORM pg_log( log_database_id , 'log_query' , activity_string);
EXCEPTION
WHEN unique_violation THEN
RAISE NOTICE '*** unique_violation *** query already has logged';
END;
SELECT queryid
INTO log_queryid
FROM log_query
WHERE query_md5hash = log_md5hash AND
timepoint = log_timepoint;
IF log_queryid IS NOT NULL
THEN
RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
RETURN result;
END IF;
------------------------------------------------
RAISE NOTICE 'Update queryid';
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
RETURN result ;
END
$$ LANGUAGE plpgsql;
ایک سروس ٹیبل پروسیسنگ کے دوران استعمال کیا جاتا ہے pg_stat_db_queriesٹیبل سے موجودہ سوالات کے اسنیپ شاٹ پر مشتمل ہے۔ pg_stat_history (جدول کا استعمال یہاں بیان کیا گیا ہے۔
TABLE pg_stat_db_queries
(
database_id integer,
queryid bigint ,
query text ,
max_time double precision
);
TABLE pg_stat_history
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision ,
…
);
فنکشن آپ کو لاگ فائل سے درخواستوں پر کارروائی کرنے کے لیے متعدد مفید صلاحیتوں کو نافذ کرنے کی اجازت دیتا ہے۔ یعنی:
موقع #1 - استفسار پر عمل درآمد کی تاریخ
کارکردگی کے واقعے کو حل کرنا شروع کرنے کے لیے بہت مفید ہے۔ سب سے پہلے، تاریخ سے واقف کرو - سست روی کب شروع ہوئی؟
پھر، کلاسیکی کے مطابق، بیرونی وجوہات کو تلاش کریں. ہوسکتا ہے کہ ڈیٹا بیس کا بوجھ تیزی سے بڑھ گیا ہو اور مخصوص درخواست کا اس سے کوئی تعلق نہیں ہے۔
log_query ٹیبل میں ایک نیا اندراج شامل کریں۔
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = to_number(duration,'99999999999999999999D9999999999');
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
RAISE NOTICE 'log_query=%',log_query ;
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
امکان #2 - استفسار پر عمل درآمد کے منصوبوں کو محفوظ کریں۔
اس مقام پر ایک اعتراض-وضاحت-تبصرہ پیدا ہو سکتا ہے:لیکن پہلے سے ہی خودکار وضاحت موجود ہے۔" ہاں، یہ موجود ہے، لیکن کیا فائدہ ہے اگر عمل درآمد کا منصوبہ ایک ہی لاگ فائل میں محفوظ ہے اور اسے مزید تجزیہ کے لیے محفوظ کرنے کے لیے، آپ کو لاگ فائل کو پارس کرنا پڑے گا؟
مجھے جس چیز کی ضرورت تھی وہ یہ تھی:
پہلا: مانیٹرنگ ڈیٹا بیس کے سروس ٹیبل میں عملدرآمد کے منصوبے کو محفوظ کریں۔
دوم: ایک دوسرے کے ساتھ عملدرآمد کے منصوبوں کا موازنہ کرنے کے قابل ہونا تاکہ فوری طور پر یہ دیکھا جا سکے کہ استفسار پر عمل درآمد کا منصوبہ تبدیل ہو گیا ہے۔
عمل درآمد کے مخصوص پیرامیٹرز کے ساتھ ایک درخواست ہے۔ EXPLAIN کا استعمال کرتے ہوئے اس کے نفاذ کے منصوبے کو حاصل کرنا اور محفوظ کرنا ایک ابتدائی کام ہے۔
مزید برآں، EXPLAIN (COSTS FALSE) اظہار کا استعمال کرتے ہوئے، آپ پلان کا ایک ڈھانچہ حاصل کر سکتے ہیں، جسے پلان کی ہیش ویلیو حاصل کرنے کے لیے استعمال کیا جائے گا، جس سے عملدرآمد پلان میں تبدیلیوں کی تاریخ کے بعد کے تجزیے میں مدد ملے گی۔
عملدرآمد پلان ٹیمپلیٹ حاصل کریں۔
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
امکان #3 - نگرانی کے لیے استفسار لاگ کا استعمال
چونکہ کارکردگی کی پیمائشیں درخواست کے متن پر نہیں بلکہ اس کی ID پر ترتیب دی گئی ہیں، آپ کو لاگ فائل سے درخواستوں کو ان درخواستوں کے ساتھ منسلک کرنے کی ضرورت ہے جن کے لیے کارکردگی میٹرکس کو ترتیب دیا گیا ہے۔
ٹھیک ہے، کم از کم کارکردگی کے واقعے کے ہونے کا صحیح وقت حاصل کرنے کے لیے۔
اس طرح، جب درخواست کی ID کے لیے کارکردگی کا کوئی واقعہ پیش آتا ہے، تو مخصوص پیرامیٹر ویلیوز کے ساتھ ایک مخصوص درخواست کا لنک اور درخواست کے عمل درآمد کا صحیح وقت اور دورانیہ ہوگا۔ صرف منظر کا استعمال کرتے ہوئے یہ معلومات حاصل کریں۔ pg_stat_statements - یہ حرام ہے.
درخواست کا استفسار تلاش کریں اور log_query ٹیبل میں اندراج کو اپ ڈیٹ کریں۔
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
بعد میں
بیان کردہ تکنیک نے آخر کار اس میں اطلاق پایا
اگرچہ، یقیناً، میری ذاتی رائے میں، ڈاؤن لوڈ کیے گئے حصے کے سائز کو منتخب کرنے اور تبدیل کرنے کے لیے الگورتھم پر مزید کام کرنے کی ضرورت ہوگی۔ عام صورت میں مسئلہ ابھی تک حل نہیں ہوا ہے۔ یہ شاید دلچسپ ہوگا۔
لیکن یہ بالکل مختلف کہانی ہے...
ماخذ: www.habr.com