または、応用テトリソロジーを少し。
新しいものはすべて忘れ去られた古いものです。
エピグラフ。
問題の定式化
現在の PostgreSQL ログ ファイルを AWS クラウドからローカル Linux ホストに定期的にダウンロードする必要があります。 リアルタイムではありませんが、少し遅れがあると言えますか。
ログ ファイルの更新のダウンロード期間は 5 分です。
AWS では、ログ ファイルは XNUMX 時間ごとにローテーションされます。
使用済みツール
ログ ファイルをホストにアップロードするには、AWS API を呼び出す bash スクリプトが使用されます。
パラメータ:
- --db-instance-identifier: AWS のインスタンス名。
- --log-file-name: 現在生成されているログ ファイルの名前
- --max-item: コマンドの出力で返されるアイテムの総数。ダウンロードされたファイルのチャンク サイズ。
- --starting-token: 開始トークン トークン
はい、そして単純に、勤務時間中のトレーニングと多様性のための興味深いタスクです。
問題は日常的にすでに解決されていると思います。 しかし、Google で簡単に検索しても解決策は提案されず、さらに詳しく検索したいという欲求も特にありませんでした。 いずれにせよ、良いトレーニングです。
タスクの形式化
最終的なログ ファイルは、可変長の行のセットです。 ログ ファイルは次のようにグラフで表すことができます。
すでに何かを思い出しましたか? 「テトリス」って何? そして、これが何です。
次のファイルをロードするときに発生する可能性のあるオプションをグラフィカルに表すと (簡単にするために、この場合は行の長さを同じにします)、次のようになります。 標準的なテトリス フィギュア:
1) ファイルは全体がダウンロードされ、最終的なものになります。 チャンク サイズが最終的なファイル サイズより大きくなります。
2) ファイルには続きがあります。 チャンク サイズは最終的なファイル サイズより小さくなります。
3) このファイルは前のファイルの続きであり、継続があります。 チャンク サイズは、最終ファイルの残りのサイズよりも小さくなります。
4) ファイルは前のファイルの続きであり、最終的なものです。 チャンク サイズは、最終ファイルの残りのサイズよりも大きくなります。
タスクは、長方形を組み立てるか、新しいレベルでテトリスをプレイすることです。
問題を解決する過程で生じる問題
1) 2つの部分の紐を接着します
概ね、特に問題はありませんでした。 初期プログラミングコースの標準タスク。
最適な分量
しかし、これはもう少し興味深いものです。
残念ながら、開始チャンク ラベルの後にオフセットを使用する方法はありません。
すでにご存知のとおり、オプション --starting-token は、ページ分割を開始する場所を指定するために使用されます。 このオプションは文字列値を受け取ります。つまり、次のトークン文字列の前にオフセット値を追加しようとしても、オプションはオフセットとして考慮されません。
したがって、何回かに分けて読む必要があります。
大量に読むと、読む回数は最小限になりますが、量は最大になります。
逆に、少しずつ読むと、読む回数は最大になりますが、量は最小限になります。
したがって、トラフィックを削減し、ソリューション全体の美しさを保つために、何らかの解決策を考え出す必要がありましたが、残念なことに、それは少し松葉杖のように見えます。
説明のために、ログ ファイルをダウンロードするプロセスを 2 つの大幅に簡略化したバージョンで考えてみましょう。 どちらの場合も、読み取り回数はポーションのサイズによって異なります。
1) 少しずつ負荷をかける:
2) 大量にロードする:
いつも通り、最適解は真ん中にあります.
ポーションのサイズは最小限ですが、読み取りの過程でサイズを大きくして読み取り回数を減らすことができます。
注目すべきは 読み取り部分の最適なサイズを選択する問題はまだ完全には解決されておらず、より深い調査と分析が必要であることがわかりました。 たぶんもう少し後。
実装の一般的な説明
使用済みサービステーブル
CREATE TABLE endpoint
(
id SERIAL ,
host text
);
TABLE database
(
id SERIAL ,
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.
スクリプトの全文
ダウンロード_aws_piece.sh
#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
let min_item_size=1024
let max_item_size=1048576
let growth_factor=3
let growth_counter=1
let growth_counter_max=3
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:''STARTED'
AWS_LOG_TIME=$1
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
database_id=$2
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:database_id='$database_id
RESULT_FILE=$3
endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:endpoint='$endpoint
db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:db_instance='$db_instance
LOG_FILE=$RESULT_FILE'.tmp_log'
TMP_FILE=$LOG_FILE'.tmp'
TMP_MIDDLE=$LOG_FILE'.tmp_mid'
TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh:is_new_log='$is_new_log
let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
let count=1
if [[ $is_new_log == '1' ]];
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 2
fi
else
next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
if [[ $next_token == '' ]];
then
next_token='0'
fi
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 3
fi
line_count=`cat $LOG_FILE | wc -l`
let lines=$line_count-1
tail -$lines $LOG_FILE > $TMP_MIDDLE
mv -f $TMP_MIDDLE $LOG_FILE
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
if [[ $next_token == '' ]];
then
cp $TMP_FILE $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
else
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
fi
first_str=`tail -1 $TMP_FILE`
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
head -$lines $TMP_FILE > $RESULT_FILE
###############################################
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
#
#################################################################
exit 0
いくつかの説明を含むスクリプトの断片:
スクリプト入力パラメータ:
- YYYY-MM-DD-HH24 形式のログ ファイル名のタイムスタンプ: AWS_LOG_TIME=$1
- データベース ID:database_id=$2
- 収集されたログファイル名:RESULT_FILE=$3
最後にアップロードされたログ ファイルのタイムスタンプを取得します。
current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`
最後にロードされたログ ファイルのタイムスタンプが入力パラメータと一致しない場合、新しいログ ファイルがロードされます。
if [[ $current_aws_log_time != $AWS_LOG_TIME ]];
then
is_new_log='1'
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
then
echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
exit 1
fi
else
is_new_log='0'
fi
ロードされたファイルから nexttoken ラベルの値を取得します。
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
ダウンロードの終了の兆候は、nexttoken の空の値です。
ループでは、途中でファイルの部分をカウントし、行を連結して部分のサイズを増やします。
メインループ
# MAIN CIRCLE
let count=2
while [[ $next_token != '' ]];
do
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: count='$count
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
if ! aws rds download-db-log-file-portion
--max-items $last_aws_max_item_size
--starting-token $next_token
--region REGION
--db-instance-identifier $db_instance
--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
then
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
exit 4
fi
next_token_str=`cat $LOG_FILE | grep NEXTTOKEN`
next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
TMP_FILE=$LOG_FILE'.tmp'
grep -v NEXTTOKEN $LOG_FILE > $TMP_FILE
last_str=`head -1 $TMP_FILE`
if [[ $next_token == '' ]];
then
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_FILE >> $RESULT_FILE
echo $(date +%Y%m%d%H%M)': download_aws_piece.sh: NEXTTOKEN NOT FOUND - FINISH '
rm $LOG_FILE
rm $TMP_FILE
rm $TMP_MIDDLE
rm $TMP_MIDDLE2
exit 0
fi
if [[ $next_token != '' ]];
then
let growth_counter=$growth_counter+1
if [[ $growth_counter -gt $growth_counter_max ]];
then
let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
let growth_counter=1
fi
if [[ $last_aws_max_item_size -gt $max_item_size ]];
then
let last_aws_max_item_size=$max_item_size
fi
psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
concat_str=$first_str$last_str
echo $concat_str >> $RESULT_FILE
line_count=`cat $TMP_FILE | wc -l`
let lines=$line_count-1
#############################
#Get middle of file
head -$lines $TMP_FILE > $TMP_MIDDLE
line_count=`cat $TMP_MIDDLE | wc -l`
let lines=$line_count-1
tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
cat $TMP_MIDDLE2 >> $RESULT_FILE
first_str=`tail -1 $TMP_FILE`
fi
let count=$count+1
done
次は何?
これで、最初の中間タスク「クラウドからログ ファイルをダウンロードする」は解決されました。 ダウンロードしたログはどうすればよいですか?
まず、ログ ファイルを解析し、そこから実際のリクエストを抽出する必要があります。
作業はそれほど難しくありません。 最も単純な bash スクリプトで十分です。
アップロード_ログ_クエリ.sh
#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file
# version HABR
###########################################################
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id
beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '
space=' '
cat $source_file | while read line
do
line="$space$line"
if [[ $first_line == "1" ]]; then
beginer=`echo $line | awk -F" " '{ print $1}' `
first_line='0'
fi
current_beginer=`echo $line | awk -F" " '{ print $1}' `
if [[ $current_beginer == $beginer ]]; then
if [[ $sql_flag == '1' ]]; then
sql_flag='0'
log_date=`echo $sql_line | awk -F" " '{ print $1}' `
log_time=`echo $sql_line | awk -F" " '{ print $2}' `
duration=`echo $sql_line | awk -F" " '{ print $5}' `
#replace ' to ''
sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
sql_line=' '
################
#PROCESSING OF THE SQL-SELECT IS HERE
if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )"
then
echo 'FATAL_ERROR - log_query '
exit 1
fi
################
fi #if [[ $sql_flag == '1' ]]; then
let "line_count=line_count+1"
check=`echo $line | awk -F" " '{ print $8}' `
check_sql=${check^^}
#echo 'check_sql='$check_sql
if [[ $check_sql == 'SELECT' ]]; then
sql_flag='1'
sql_line="$sql_line$line"
ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
fi
else
if [[ $sql_flag == '1' ]]; then
sql_line="$sql_line$line"
fi
fi #if [[ $current_beginer == $beginer ]]; then
done
これで、ログ ファイルから抽出されたクエリを操作できるようになります。
そして、いくつかの有用な可能性があります。
解析されたクエリはどこかに保存する必要があります。 このために、サービス テーブルが使用されます。 ログクエリ
CREATE TABLE log_query
(
id SERIAL ,
queryid bigint ,
query_md5hash text not null ,
database_id integer not null ,
timepoint timestamp without time zone not null,
duration double precision not null ,
query text not null ,
explained_plan text[],
plan_md5hash text ,
explained_plan_wo_costs text[],
plan_hash_value text ,
baseline_id integer ,
ip text ,
port text
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );
CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;
解析されたリクエストは次のように処理されます。 plpgsql 機能 "ログクエリ'。
ログクエリ.sql
--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text ) RETURNS boolean AS $$
DECLARE
result boolean ;
log_timepoint timestamp without time zone ;
log_duration double precision ;
pos integer ;
log_query text ;
activity_string text ;
log_md5hash text ;
log_explain_plan text[] ;
log_planhash text ;
log_plan_wo_costs text[] ;
database_rec record ;
pg_stat_query text ;
test_log_query text ;
log_query_rec record;
found_flag boolean;
pg_stat_history_rec record ;
port_start integer ;
port_end integer ;
client_ip text ;
client_port text ;
log_queryid bigint ;
log_query_text text ;
pg_stat_query_text text ;
BEGIN
result = TRUE ;
RAISE NOTICE '***log_query';
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = duration:: double precision;
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
activity_string = 'New query has logged '||
' database_id = '|| log_database_id ||
' query_md5hash='||log_md5hash||
' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
RAISE NOTICE '%',activity_string;
PERFORM pg_log( log_database_id , 'log_query' , activity_string);
EXCEPTION
WHEN unique_violation THEN
RAISE NOTICE '*** unique_violation *** query already has logged';
END;
SELECT queryid
INTO log_queryid
FROM log_query
WHERE query_md5hash = log_md5hash AND
timepoint = log_timepoint;
IF log_queryid IS NOT NULL
THEN
RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
RETURN result;
END IF;
------------------------------------------------
RAISE NOTICE 'Update queryid';
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
RETURN result ;
END
$$ LANGUAGE plpgsql;
処理時にはサービステーブルが使用されます pg_stat_db_queriesテーブルからの現在のクエリのスナップショットを含む 。 pg_stat_history (テーブルの使用法についてはここで説明します-
TABLE pg_stat_db_queries
(
database_id integer,
queryid bigint ,
query text ,
max_time double precision
);
TABLE pg_stat_history
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision ,
…
);
この関数を使用すると、ログ ファイルからのリクエストを処理するための多くの便利な機能を実装できます。 つまり:
機会 #1 - クエリ実行履歴
パフォーマンス インシデントを開始する場合に非常に役立ちます。 まず、歴史を知りましょう。そして、いつから減速が始まったのでしょうか?
次に、古典に従って、外部の原因を探します。 データベースの負荷が大幅に増加しており、特定のリクエストはそれとは関係がないだけである可能性があります。
log_query テーブルに新しいエントリを追加
port_start = position('(' in ip_port);
port_end = position(')' in ip_port);
client_ip = substring( ip_port from 1 for port_start-1 );
client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );
SELECT e.host , d.name , d.owner_pwd
INTO database_rec
FROM database d JOIN endpoint e ON e.id = d.endpoint_id
WHERE d.id = log_database_id ;
log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
log_duration = to_number(duration,'99999999999999999999D9999999999');
pos = position ('SELECT' in UPPER(sql_line) );
log_query = substring( sql_line from pos for LENGTH(sql_line));
log_query = regexp_replace(log_query,' +',' ','g');
log_query = regexp_replace(log_query,';+','','g');
log_query = trim(trailing ' ' from log_query);
RAISE NOTICE 'log_query=%',log_query ;
log_md5hash = md5( log_query::text );
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
--------------------------
BEGIN
INSERT INTO log_query
(
query_md5hash ,
database_id ,
timepoint ,
duration ,
query ,
explained_plan ,
plan_md5hash ,
explained_plan_wo_costs ,
plan_hash_value ,
ip ,
port
)
VALUES
(
log_md5hash ,
log_database_id ,
log_timepoint ,
log_duration ,
log_query ,
log_explain_plan ,
md5(log_explain_plan::text) ,
log_plan_wo_costs ,
md5(log_plan_wo_costs::text),
client_ip ,
client_port
);
機能 #2 - クエリ実行プランの保存
この時点で、次のような反論や説明のコメントが生じる可能性があります。しかし、すでに自動説明があります」。 はい、その通りですが、実行計画が同じログ ファイルに保存されており、さらに分析するためにそれを保存するためにログ ファイルを解析する必要がある場合、一体どういう意味があるのでしょうか?
ただし、次のものが必要でした。
まず、実行計画を監視データベースのサービス テーブルに保存します。
XNUMX 番目: クエリ実行プランが変更されたことをすぐに確認するために、実行プランを相互に比較できるようにするためです。
特定の実行パラメータを含むリクエストが利用可能です。 EXPLAIN を使用して実行プランを取得して保存することは、基本的なタスクです。
さらに、EXPLAIN (COSTS FALSE) 式を使用すると、プランのフレームワークを取得できます。これは、プランのハッシュ値を取得するために使用され、その後の実行プランの変更履歴の分析に役立ちます。
実行計画テンプレートを取得する
--Explain execution plan--
EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')';
log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
PERFORM dblink_disconnect('LINK1');
機会 #3 - モニタリングのためのクエリ ログの使用
パフォーマンス メトリックはリクエスト テキストではなくその ID に対して設定されるため、ログ ファイルのリクエストを、パフォーマンス メトリクスが設定されているリクエストに関連付ける必要があります。
少なくとも、パフォーマンス インシデントの発生時刻を正確に把握するためです。
したがって、リクエスト ID に対してパフォーマンス インシデントが発生すると、特定のパラメータ値を持つ特定のリクエストへの参照と、リクエストの正確な実行時間と期間が存在します。 ビューのみを使用して指定された情報を取得する pg_stat_statements - それは禁止されています。
クエリの queryid を見つけて、log_query テーブルのエントリを更新します。
SELECT *
INTO log_query_rec
FROM log_query
WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
FOR pg_stat_history_rec IN
SELECT
queryid ,
query
FROM
pg_stat_db_queries
WHERE
database_id = log_database_id AND
queryid is not null
LOOP
pg_stat_query = pg_stat_history_rec.query ;
pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
log_query_text = trim(trailing ' ' from log_query_rec.query);
pg_stat_query_text = pg_stat_query;
--SELECT log_query_rec.query like pg_stat_query INTO found_flag ;
IF (log_query_text LIKE pg_stat_query_text) THEN
found_flag = TRUE ;
ELSE
found_flag = FALSE ;
END IF;
IF found_flag THEN
UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
activity_string = ' updated queryid = '||pg_stat_history_rec.queryid||
' for log_query with id = '||log_query_rec.id
;
RAISE NOTICE '%',activity_string;
EXIT ;
END IF ;
END LOOP ;
後書き
その結果、記載された方法は次の分野で応用できることがわかりました。
もちろん、私の個人的な意見ではありますが、ダウンロードされる部分のサイズを選択および変更するためのアルゴリズムに取り組む必要はあります。 この問題は一般的なケースではまだ解決されていません。 おそらく面白いでしょう。
しかし、それはまったく別の話です...
出所: habr.com