Tải nhật ký PostgreSQL lên từ đám mây AWS

Hoặc một chút ứng dụng tứ phân học.
Mọi thứ mới đều bị lãng quên cũ.
Văn bia.
Tải nhật ký PostgreSQL lên từ đám mây AWS

Báo cáo sự cố

Cần phải tải xuống định kỳ tệp nhật ký PostgreSQL hiện tại từ đám mây AWS về máy chủ Linux cục bộ. Không phải trong thời gian thực, nhưng, chúng ta có thể nói, với một chút chậm trễ.
Thời gian tải xuống bản cập nhật tệp nhật ký là 5 phút.
Tệp nhật ký, trong AWS, được xoay vòng mỗi giờ.

Dụng cụ đã qua sử dụng

Để tải tệp nhật ký lên máy chủ, tập lệnh bash được sử dụng gọi API AWS "aws rds tải xuống-db-log-tệp-phần'.

Trả lời:

  • --db-instance-identifier: Tên phiên bản trong AWS;
  • --log-file-name: tên của tệp nhật ký hiện được tạo
  • --max-item: Tổng số mục được trả về trong đầu ra của lệnh.Kích thước khối của tệp đã tải xuống.
  • --starting-token: Mã thông báo bắt đầu

Trong trường hợp cụ thể này, nhiệm vụ tải nhật ký phát sinh trong quá trình làm việc trên giám sát hiệu suất của các truy vấn PostgreSQL.

Vâng, và đơn giản - một nhiệm vụ thú vị để đào tạo và đa dạng trong giờ làm việc.
Tôi cho rằng vấn đề đã được giải quyết nhờ thói quen. Nhưng Google nhanh chóng không đề xuất giải pháp và không có mong muốn cụ thể nào để tìm kiếm sâu hơn. Trong mọi trường hợp, đó là một tập luyện tốt.

Chính thức hóa nhiệm vụ

Tệp nhật ký cuối cùng là một tập hợp các dòng có độ dài thay đổi. Về mặt đồ họa, tệp nhật ký có thể được biểu diễn như sau:
Tải nhật ký PostgreSQL lên từ đám mây AWS

Nó đã nhắc nhở bạn về một cái gì đó? "tetris" là sao? Và đây là những gì.
Nếu chúng tôi đại diện cho các tùy chọn có thể phát sinh khi tải tệp tiếp theo bằng đồ họa (để đơn giản, trong trường hợp này, hãy để các dòng có cùng độ dài), chúng tôi sẽ nhận được số liệu tetris tiêu chuẩn:

1) Toàn bộ tệp được tải xuống và là tệp cuối cùng. Kích thước khối lớn hơn kích thước tệp cuối cùng:
Tải nhật ký PostgreSQL lên từ đám mây AWS

2) Các tập tin có một sự tiếp tục. Kích thước khối nhỏ hơn kích thước tệp cuối cùng:
Tải nhật ký PostgreSQL lên từ đám mây AWS

3) Tệp này là phần tiếp theo của tệp trước đó và có phần tiếp theo. Kích thước khối nhỏ hơn kích thước của phần còn lại của tệp cuối cùng:
Tải nhật ký PostgreSQL lên từ đám mây AWS

4) Tệp này là phần tiếp theo của tệp trước đó và là tệp cuối cùng. Kích thước khối lớn hơn kích thước của phần còn lại của tệp cuối cùng:
Tải nhật ký PostgreSQL lên từ đám mây AWS

Nhiệm vụ là lắp ráp một hình chữ nhật hoặc chơi Tetris ở một cấp độ mới.
Tải nhật ký PostgreSQL lên từ đám mây AWS

Các vấn đề phát sinh trong quá trình giải quyết vấn đề

1) Dán một chuỗi gồm 2 phần

Tải nhật ký PostgreSQL lên từ đám mây AWS
Nói chung, không có vấn đề cụ thể. Một nhiệm vụ tiêu chuẩn từ khóa học lập trình ban đầu.

Kích thước phục vụ tối ưu

Nhưng điều này thú vị hơn một chút.
Thật không may, không có cách nào để sử dụng phần bù sau nhãn đoạn bắt đầu:

Như bạn đã biết tùy chọn --starting-token được sử dụng để chỉ định nơi bắt đầu phân trang. Tùy chọn này nhận các giá trị Chuỗi có nghĩa là nếu bạn cố gắng thêm một giá trị bù vào trước chuỗi Mã thông báo tiếp theo, tùy chọn sẽ không được coi là giá trị bù.

Và vì vậy, bạn phải đọc theo từng phần.
Nếu bạn đọc với số lượng lớn, thì số lần đọc sẽ là tối thiểu nhưng âm lượng sẽ là tối đa.
Nếu bạn đọc từng phần nhỏ, thì ngược lại, số lần đọc sẽ nhiều nhất nhưng âm lượng sẽ ít nhất.
Do đó, để giảm lưu lượng truy cập và vì vẻ đẹp tổng thể của giải pháp, tôi đã phải nghĩ ra một số giải pháp, thật không may, trông hơi giống một cái nạng.

Để minh họa, hãy xem xét quá trình tải xuống tệp nhật ký trong 2 phiên bản được đơn giản hóa rất nhiều. Số lượng bài đọc trong cả hai trường hợp phụ thuộc vào kích thước phần.

1) Nạp từng phần nhỏ:
Tải nhật ký PostgreSQL lên từ đám mây AWS

2) Tải theo khẩu phần lớn:
Tải nhật ký PostgreSQL lên từ đám mây AWS

Như thường lệ, giải pháp tối ưu nằm ở giữa.
Kích thước phần là tối thiểu, nhưng trong quá trình đọc, kích thước có thể tăng lên để giảm số lần đọc.

Cần lưu ý rằng vấn đề chọn kích thước tối ưu của phần đọc vẫn chưa được giải quyết hoàn toàn và cần được nghiên cứu và phân tích sâu hơn. Có lẽ một chút sau đó.

Mô tả chung về việc thực hiện

Bảng dịch vụ đã qua sử dụng

CREATE TABLE endpoint
(
id SERIAL ,
host text 
);

TABLE database
(
id SERIAL , 
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer 
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.

Toàn văn kịch bản

tải về_aws_ Piece.sh

#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
 let min_item_size=1024
 let max_item_size=1048576
 let growth_factor=3
 let growth_counter=1
 let growth_counter_max=3

 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:''STARTED'
 
 AWS_LOG_TIME=$1
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
  
 database_id=$2
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:database_id='$database_id
 RESULT_FILE=$3 
  
 endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:endpoint='$endpoint
  
 db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
 
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:db_instance='$db_instance

 LOG_FILE=$RESULT_FILE'.tmp_log'
 TMP_FILE=$LOG_FILE'.tmp'
 TMP_MIDDLE=$LOG_FILE'.tmp_mid'  
 TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'  
  
 current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

 echo $(date +%Y%m%d%H%M)':      download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
  
  if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi
  
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:is_new_log='$is_new_log
  
  let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
  
  let count=1
  if [[ $is_new_log == '1' ]];
  then    
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
	if ! aws rds download-db-log-file-portion 
		--max-items $last_aws_max_item_size 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 2
	fi  	
  else
    next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
	
	if [[ $next_token == '' ]];
	then
	  next_token='0'	  
	fi
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
	    --max-items $last_aws_max_item_size 
		--starting-token $next_token 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 3
	fi       
	
	line_count=`cat  $LOG_FILE | wc -l`
	let lines=$line_count-1
	  
	tail -$lines $LOG_FILE > $TMP_MIDDLE 
	mv -f $TMP_MIDDLE $LOG_FILE
  fi
  
  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
  
  grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE 
  
  if [[ $next_token == '' ]];
  then
	  cp $TMP_FILE $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
  else
	psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
  fi
  
  first_str=`tail -1 $TMP_FILE`
  
  line_count=`cat  $TMP_FILE | wc -l`
  let lines=$line_count-1    
  
  head -$lines $TMP_FILE  > $RESULT_FILE

###############################################
# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
             --max-items $last_aws_max_item_size 
             --starting-token $next_token 
             --region REGION 
             --db-instance-identifier  $db_instance 
             --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done
#
#################################################################

exit 0  

Các đoạn script với một số giải thích:

Thông số đầu vào tập lệnh:

  • Dấu thời gian của tên tệp nhật ký ở định dạng YYYY-MM-DD-HH24: AWS_LOG_TIME=$1
  • ID cơ sở dữ liệu: database_id=$2
  • Tên tệp nhật ký đã thu thập: RESULT_FILE=$3

Lấy dấu thời gian của tệp nhật ký được tải lên lần cuối:

current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

Nếu dấu thời gian của tệp nhật ký được tải lần cuối không khớp với tham số đầu vào, tệp nhật ký mới sẽ được tải:

if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi

Chúng tôi nhận được giá trị của nhãn nexttoken từ tệp đã tải:

  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

Dấu hiệu kết thúc quá trình tải xuống là giá trị rỗng của nexttoken.

Trong một vòng lặp, chúng tôi đếm các phần của tệp, trên đường đi, nối các dòng và tăng kích thước của phần:
Vòng lặp chính

# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
     --max-items $last_aws_max_item_size 
	 --starting-token $next_token 
     --region REGION 
     --db-instance-identifier  $db_instance 
     --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
         rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done

Cái gì tiếp theo?

Vì vậy, nhiệm vụ trung gian đầu tiên - "tải xuống tệp nhật ký từ đám mây" đã được giải quyết. Phải làm gì với nhật ký đã tải xuống?
Trước tiên, bạn cần phân tích cú pháp tệp nhật ký và trích xuất các yêu cầu thực tế từ nó.
Nhiệm vụ không phải là rất khó khăn. Tập lệnh bash đơn giản nhất hoạt động tốt.
upload_log_query.sh

#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file 
# version HABR
###########################################################  
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id

beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '    
space=' '
cat $source_file | while read line
do
  line="$space$line"

  if [[ $first_line == "1" ]]; then
    beginer=`echo $line | awk -F" " '{ print $1}' `
    first_line='0'
  fi

  current_beginer=`echo $line | awk -F" " '{ print $1}' `

  if [[ $current_beginer == $beginer ]]; then
    if [[ $sql_flag == '1' ]]; then
     sql_flag='0' 
     log_date=`echo $sql_line | awk -F" " '{ print $1}' `
     log_time=`echo $sql_line | awk -F" " '{ print $2}' `
     duration=`echo $sql_line | awk -F" " '{ print $5}' `

     #replace ' to ''
     sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
     sql_line=' '

	 ################
	 #PROCESSING OF THE SQL-SELECT IS HERE
     if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )" 
     then
        echo 'FATAL_ERROR - log_query '
        exit 1
     fi
	 ################

    fi #if [[ $sql_flag == '1' ]]; then

    let "line_count=line_count+1"

    check=`echo $line | awk -F" " '{ print $8}' `
    check_sql=${check^^}    

    #echo 'check_sql='$check_sql
    
    if [[ $check_sql == 'SELECT' ]]; then
     sql_flag='1'    
     sql_line="$sql_line$line"
	 ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
    fi
  else       

    if [[ $sql_flag == '1' ]]; then
      sql_line="$sql_line$line"
    fi   
    
  fi #if [[ $current_beginer == $beginer ]]; then

done

Bây giờ bạn có thể làm việc với truy vấn được trích xuất từ ​​tệp nhật ký.

Và có một số khả năng hữu ích.

Các truy vấn được phân tích cú pháp phải được lưu trữ ở đâu đó. Đối với điều này, một bảng dịch vụ được sử dụng. log_query

CREATE TABLE log_query
(
   id SERIAL ,
   queryid bigint ,
   query_md5hash text not null ,
   database_id integer not null ,  
   timepoint timestamp without time zone not null,
   duration double precision not null ,
   query text not null ,
   explained_plan text[],
   plan_md5hash text  , 
   explained_plan_wo_costs text[],
   plan_hash_value text  ,
   baseline_id integer ,
   ip text ,
   port text 
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );

CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;

Yêu cầu đã phân tích cú pháp được xử lý trong plpgsql chức năng "log_query'.
log_query.sql

--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text   ) RETURNS boolean AS $$
DECLARE
  result boolean ;
  log_timepoint timestamp without time zone ;
  log_duration double precision ; 
  pos integer ;
  log_query text ;
  activity_string text ;
  log_md5hash text ;
  log_explain_plan text[] ;
  
  log_planhash text ;
  log_plan_wo_costs text[] ; 
  
  database_rec record ;
  
  pg_stat_query text ; 
  test_log_query text ;
  log_query_rec record;
  found_flag boolean;
  
  pg_stat_history_rec record ;
  port_start integer ;
  port_end integer ;
  client_ip text ;
  client_port text ;
  log_queryid bigint ;
  log_query_text text ;
  pg_stat_query_text text ; 
BEGIN
  result = TRUE ;

  RAISE NOTICE '***log_query';
  
  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = duration:: double precision; 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);
	activity_string = 	'New query has logged '||
						' database_id = '|| log_database_id ||
						' query_md5hash='||log_md5hash||
						' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
					
	RAISE NOTICE '%',activity_string;					
					 
	PERFORM pg_log( log_database_id , 'log_query' , activity_string);  

	EXCEPTION
	  WHEN unique_violation THEN
		RAISE NOTICE '*** unique_violation *** query already has logged';
	END;

	SELECT 	queryid
	INTO   	log_queryid
	FROM 	log_query 
	WHERE 	query_md5hash = log_md5hash AND
			timepoint = log_timepoint;

	IF log_queryid IS NOT NULL 
	THEN 
	  RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
	  RETURN result;
	END IF;
	
	------------------------------------------------
	RAISE NOTICE 'Update queryid';	
	
	SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
         queryid ,
	  query 
	 FROM 
         pg_stat_db_queries 
     WHERE  
      database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id               
		   				    ;						
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;
	
  RETURN result ;
END
$$ LANGUAGE plpgsql;

Khi xử lý, bảng dịch vụ được sử dụng pg_stat_db_queriesA chứa ảnh chụp nhanh các truy vấn hiện tại từ bảng pg_stat_history (Việc sử dụng bảng được mô tả ở đây - Giám sát hiệu suất của các truy vấn PostgreSQL. Phần 1 - báo cáo)

TABLE pg_stat_db_queries
(
   database_id integer,  
   queryid bigint ,  
   query text , 
   max_time double precision 
);

TABLE pg_stat_history 
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision	 , 	
…
);

Chức năng này cho phép bạn triển khai một số tính năng hữu ích để xử lý các yêu cầu từ tệp nhật ký. Cụ thể là:

Cơ hội #1 - Lịch sử thực hiện truy vấn

Rất hữu ích để bắt đầu một sự cố hiệu suất. Đầu tiên, hãy làm quen với lịch sử - và khi nào thì sự chậm lại bắt đầu?
Sau đó, theo kinh điển, tìm kiếm các nguyên nhân bên ngoài. Có thể là tải cơ sở dữ liệu đã tăng lên đáng kể và yêu cầu cụ thể không liên quan gì đến nó.
Thêm mục mới vào bảng log_query

  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = to_number(duration,'99999999999999999999D9999999999'); 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 
  RAISE NOTICE 'log_query=%',log_query ;   

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);

Tính năng #2 - Lưu các kế hoạch thực hiện truy vấn

Lúc này, có thể nảy sinh phản đối-làm rõ-bình luận: “Nhưng đã có tự động giải thích“. Đúng vậy, nhưng vấn đề là gì nếu kế hoạch thực hiện được lưu trữ trong cùng một tệp nhật ký và để lưu nó để phân tích thêm, bạn phải phân tích cú pháp tệp nhật ký?

Tuy nhiên, tôi cần:
đầu tiên: lưu trữ kế hoạch thực hiện trong bảng dịch vụ của cơ sở dữ liệu giám sát;
thứ hai: để có thể so sánh các kế hoạch thực hiện với nhau để thấy ngay rằng kế hoạch thực hiện truy vấn đã thay đổi.

Có sẵn một yêu cầu với các thông số thực thi cụ thể. Nhận và lưu trữ kế hoạch thực hiện của nó bằng cách sử dụng EXPLAIN là một nhiệm vụ cơ bản.
Ngoài ra, bằng cách sử dụng biểu thức GIẢI THÍCH (COSTS FALSE), bạn có thể nhận được khung của kế hoạch, khung này sẽ được sử dụng để lấy giá trị băm của kế hoạch, điều này sẽ giúp phân tích tiếp theo về lịch sử thay đổi của kế hoạch thực hiện.
Nhận mẫu kế hoạch thực hiện

  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');

Cơ hội #3 - Sử dụng Nhật ký truy vấn để theo dõi

Vì các chỉ số hiệu suất không được định cấu hình cho văn bản yêu cầu mà cho ID của nó, nên bạn cần liên kết các yêu cầu từ tệp nhật ký với các yêu cầu có chỉ số hiệu suất được định cấu hình.
Chà, ít nhất là để có thời gian chính xác xảy ra sự cố hiệu suất.

Do đó, khi xảy ra sự cố hiệu suất đối với ID yêu cầu, sẽ có một liên kết đến một yêu cầu cụ thể với các giá trị tham số cụ thể và thời gian thực hiện chính xác cũng như thời lượng của yêu cầu. Nhận thông tin đã cho chỉ bằng chế độ xem pg_stat_statements - điều đó bị cấm.
Tìm queryid của truy vấn và cập nhật mục trong bảng log_query

SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
      queryid ,
	  query 
	 FROM 
       pg_stat_db_queries 
     WHERE  
	   database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id		                    
		   				    ;						
					
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;

bạt

Kết quả là, phương pháp được mô tả đã tìm thấy ứng dụng của nó trong hệ thống đã phát triển để theo dõi hiệu suất của các truy vấn PostgreSQL, cho phép bạn có thêm thông tin để phân tích khi giải quyết các sự cố hiệu suất truy vấn mới phát sinh.

Tất nhiên, mặc dù, theo ý kiến ​​​​cá nhân của tôi, vẫn cần phải làm việc với thuật toán để chọn và thay đổi kích thước của phần đã tải xuống. Vấn đề vẫn chưa được giải quyết trong trường hợp tổng quát. Nó có lẽ sẽ rất thú vị.

Nhưng đó lại là một câu chuyện hoàn toàn khác...

Nguồn: www.habr.com

Thêm một lời nhận xét