Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

Ή λίγο εφαρμοσμένη τετρασολογία.
Κάθε τι νέο είναι ξεχασμένο παλιό.
Επιγραφές.
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

Δήλωση προβλήματος

Είναι απαραίτητο να κάνετε περιοδική λήψη του τρέχοντος αρχείου καταγραφής PostgreSQL από το σύννεφο AWS στον τοπικό κεντρικό υπολογιστή Linux. Όχι σε πραγματικό χρόνο, αλλά, ας πούμε, με μια μικρή καθυστέρηση.
Η περίοδος λήψης της ενημέρωσης του αρχείου καταγραφής είναι 5 λεπτά.
Το αρχείο καταγραφής, στο AWS, περιστρέφεται κάθε ώρα.

Χρησιμοποιημένα εργαλεία

Για να ανεβάσετε το αρχείο καταγραφής στον κεντρικό υπολογιστή, χρησιμοποιείται ένα σενάριο bash που καλεί το API AWS "aws rds download-db-log-file-partion».

Παράμετροι:

  • --db-instance-identifier: Όνομα παρουσίας στο AWS.
  • --log-file-name: όνομα του τρέχοντος αρχείου καταγραφής που δημιουργείται
  • --max-item: Ο συνολικός αριθμός στοιχείων που επιστράφηκαν στην έξοδο της εντολής.Το μέγεθος του κομματιού του ληφθέντος αρχείου.
  • --starting-token: Token έναρξης

Στη συγκεκριμένη περίπτωση, το έργο της λήψης αρχείων καταγραφής προέκυψε κατά τη διάρκεια της εργασίας παρακολούθηση της απόδοσης των ερωτημάτων PostgreSQL.

Ναι, και απλά - μια ενδιαφέρουσα εργασία για εκπαίδευση και ποικιλία κατά τις ώρες εργασίας.
Υποθέτω ότι το πρόβλημα έχει ήδη λυθεί δυνάμει της ρουτίνας. Αλλά ένα γρήγορο Google δεν πρότεινε λύσεις και δεν υπήρχε ιδιαίτερη επιθυμία για αναζήτηση σε μεγαλύτερο βάθος. Σε κάθε περίπτωση, είναι μια καλή προπόνηση.

Επισημοποίηση της εργασίας

Το τελικό αρχείο καταγραφής είναι ένα σύνολο γραμμών μεταβλητού μήκους. Γραφικά, το αρχείο καταγραφής μπορεί να αναπαρασταθεί ως εξής:
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

Σας θυμίζει ήδη κάτι; Τι συμβαίνει με το "tetris"; Και να τι.
Εάν αντιπροσωπεύσουμε τις πιθανές επιλογές που προκύπτουν κατά τη γραφική φόρτωση του επόμενου αρχείου (για λόγους απλότητας, σε αυτήν την περίπτωση, αφήστε τις γραμμές να έχουν το ίδιο μήκος), παίρνουμε τυπικά στοιχεία tetris:

1) Το αρχείο κατεβάζεται ολόκληρο και είναι οριστικό. Το μέγεθος του κομματιού είναι μεγαλύτερο από το τελικό μέγεθος αρχείου:
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

2) Το αρχείο έχει συνέχεια. Το μέγεθος του κομματιού είναι μικρότερο από το τελικό μέγεθος αρχείου:
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

3) Το αρχείο είναι συνέχεια του προηγούμενου αρχείου και έχει συνέχεια. Το μέγεθος του κομματιού είναι μικρότερο από το μέγεθος του υπόλοιπου τελικού αρχείου:
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

4) Το αρχείο είναι συνέχεια του προηγούμενου αρχείου και είναι οριστικό. Το μέγεθος του κομματιού είναι μεγαλύτερο από το μέγεθος του υπόλοιπου τελικού αρχείου:
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

Το καθήκον είναι να συναρμολογήσετε ένα ορθογώνιο ή να παίξετε Tetris σε ένα νέο επίπεδο.
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

Προβλήματα που προκύπτουν κατά την επίλυση του προβλήματος

1) Κολλήστε ένα κορδόνι 2 μερίδων

Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS
Γενικά δεν υπήρχαν ιδιαίτερα προβλήματα. Μια τυπική εργασία από το αρχικό μάθημα προγραμματισμού.

Βέλτιστο μέγεθος σερβιρίσματος

Αλλά αυτό είναι λίγο πιο ενδιαφέρον.
Δυστυχώς, δεν υπάρχει τρόπος να χρησιμοποιήσετε μια μετατόπιση μετά την ετικέτα τεμαχίου έναρξης:

Όπως ήδη γνωρίζετε, η επιλογή --starting-token χρησιμοποιείται για να καθορίσετε πού θα ξεκινήσει η σελιδοποίηση. Αυτή η επιλογή λαμβάνει τιμές συμβολοσειράς που θα σήμαινε ότι εάν προσπαθήσετε να προσθέσετε μια τιμή μετατόπισης μπροστά από τη συμβολοσειρά Next Token, η επιλογή δεν θα ληφθεί υπόψη ως μετατόπιση.

Και έτσι, πρέπει να διαβάζεις σε κομμάτια-μερίδες.
Εάν διαβάζετε σε μεγάλες μερίδες, τότε ο αριθμός των αναγνώσεων θα είναι ελάχιστος, αλλά η ένταση θα είναι μέγιστη.
Εάν διαβάζετε σε μικρές μερίδες, τότε αντίθετα, ο αριθμός των αναγνώσεων θα είναι μέγιστος, αλλά ο όγκος θα είναι ελάχιστος.
Επομένως, για να μειώσω την κίνηση και για τη συνολική ομορφιά της λύσης, έπρεπε να βρω κάποιο είδος λύσης, που, δυστυχώς, μοιάζει λίγο με δεκανίκι.

Για παράδειγμα, εξετάστε τη διαδικασία λήψης ενός αρχείου καταγραφής σε 2 πολύ απλοποιημένες εκδόσεις. Ο αριθμός των αναγνώσεων και στις δύο περιπτώσεις εξαρτάται από το μέγεθος της μερίδας.

1) Φόρτωση σε μικρές μερίδες:
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

2) Φόρτωση σε μεγάλες μερίδες:
Μεταφόρτωση ενός αρχείου καταγραφής PostgreSQL από το σύννεφο AWS

Ως συνήθως, η βέλτιστη λύση βρίσκεται στη μέση.
Το μέγεθος της μερίδας είναι ελάχιστο, αλλά κατά τη διαδικασία ανάγνωσης, το μέγεθος μπορεί να αυξηθεί για να μειωθεί ο αριθμός των αναγνώσεων.

Πρέπει να σημειωθεί ότι το πρόβλημα της επιλογής του βέλτιστου μεγέθους του τμήματος ανάγνωσης δεν έχει ακόμη λυθεί πλήρως και απαιτεί βαθύτερη μελέτη και ανάλυση. Ίσως λίγο αργότερα.

Γενική περιγραφή της υλοποίησης

Μεταχειρισμένα τραπέζια σέρβις

CREATE TABLE endpoint
(
id SERIAL ,
host text 
);

TABLE database
(
id SERIAL , 
…
last_aws_log_time text ,
last_aws_nexttoken text ,
aws_max_item_size integer 
);
last_aws_log_time — временная метка последнего загруженного лог-файла в формате YYYY-MM-DD-HH24.
last_aws_nexttoken — текстовая метка последней загруженной порции.
aws_max_item_size- эмпирическим путем, подобранный начальный размер порции.

Το πλήρες κείμενο του σεναρίου

download_aws_piece.sh

#!/bin/bash
#########################################################
# download_aws_piece.sh
# downloan piece of log from AWS
# version HABR
 let min_item_size=1024
 let max_item_size=1048576
 let growth_factor=3
 let growth_counter=1
 let growth_counter_max=3

 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:''STARTED'
 
 AWS_LOG_TIME=$1
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:AWS_LOG_TIME='$AWS_LOG_TIME
  
 database_id=$2
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:database_id='$database_id
 RESULT_FILE=$3 
  
 endpoint=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE_DATABASE -A -t -c "select e.host from endpoint e join database d on e.id = d.endpoint_id where d.id = $database_id "`
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:endpoint='$endpoint
  
 db_instance=`echo $endpoint | awk -F"." '{print toupper($1)}'`
 
 echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:db_instance='$db_instance

 LOG_FILE=$RESULT_FILE'.tmp_log'
 TMP_FILE=$LOG_FILE'.tmp'
 TMP_MIDDLE=$LOG_FILE'.tmp_mid'  
 TMP_MIDDLE2=$LOG_FILE'.tmp_mid2'  
  
 current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

 echo $(date +%Y%m%d%H%M)':      download_aws_piece.sh:current_aws_log_time='$current_aws_log_time
  
  if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi
  
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:is_new_log='$is_new_log
  
  let last_aws_max_item_size=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select aws_max_item_size from database where id = $database_id "`
  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: last_aws_max_item_size='$last_aws_max_item_size
  
  let count=1
  if [[ $is_new_log == '1' ]];
  then    
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF NEW AWS LOG'
	if ! aws rds download-db-log-file-portion 
		--max-items $last_aws_max_item_size 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 2
	fi  	
  else
    next_token=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "select last_aws_nexttoken from database where id = $database_id "`
	
	if [[ $next_token == '' ]];
	then
	  next_token='0'	  
	fi
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: CONTINUE DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
	    --max-items $last_aws_max_item_size 
		--starting-token $next_token 
		--region REGION 
		--db-instance-identifier  $db_instance 
		--log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 3
	fi       
	
	line_count=`cat  $LOG_FILE | wc -l`
	let lines=$line_count-1
	  
	tail -$lines $LOG_FILE > $TMP_MIDDLE 
	mv -f $TMP_MIDDLE $LOG_FILE
  fi
  
  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `
  
  grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE 
  
  if [[ $next_token == '' ]];
  then
	  cp $TMP_FILE $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
  else
	psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
  fi
  
  first_str=`tail -1 $TMP_FILE`
  
  line_count=`cat  $TMP_FILE | wc -l`
  let lines=$line_count-1    
  
  head -$lines $TMP_FILE  > $RESULT_FILE

###############################################
# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
             --max-items $last_aws_max_item_size 
             --starting-token $next_token 
             --region REGION 
             --db-instance-identifier  $db_instance 
             --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
          rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done
#
#################################################################

exit 0  

Αποσπάσματα σεναρίου με μερικές επεξηγήσεις:

Παράμετροι εισαγωγής σεναρίου:

  • Χρονική σήμανση του ονόματος αρχείου καταγραφής σε μορφή ΕΕΕΕ-ΜΜ-ΗΗ-ΩΗ24: AWS_LOG_TIME=$1
  • Αναγνωριστικό βάσης δεδομένων: database_id=$2
  • Όνομα αρχείου καταγραφής συλλογής: RESULT_FILE=3$

Λάβετε τη χρονική σήμανση του τελευταίου αρχείου καταγραφής που μεταφορτώθηκε:

current_aws_log_time=`psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -c "select last_aws_log_time from database where id = $database_id "`

Εάν η χρονική σήμανση του τελευταίου φορτωμένου αρχείου καταγραφής δεν ταιριάζει με την παράμετρο εισόδου, φορτώνεται ένα νέο αρχείο καταγραφής:

if [[ $current_aws_log_time != $AWS_LOG_TIME  ]];
  then
    is_new_log='1'
	if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -v ON_ERROR_STOP=1 -A -t -c "update database set last_aws_log_time = '$AWS_LOG_TIME' where id = $database_id "
	then
	  echo '***download_aws_piece.sh -FATAL_ERROR - update database set last_aws_log_time .'
	  exit 1
	fi
  else
    is_new_log='0'
  fi

Λαμβάνουμε την τιμή της ετικέτας του επόμενου συμβολικού από το φορτωμένο αρχείο:

  next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
  next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

Το σύμβολο του τέλους της λήψης είναι η κενή τιμή του nexttoken.

Σε έναν βρόχο, μετράμε τμήματα του αρχείου, στην πορεία, συνενώνοντας γραμμές και αυξάνοντας το μέγεθος του τμήματος:
Κύριος βρόχος

# MAIN CIRCLE
  let count=2
  while [[ $next_token != '' ]];
  do 
    echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: count='$count
	
	echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: START DOWNLOADING OF AWS LOG'
	if ! aws rds download-db-log-file-portion 
     --max-items $last_aws_max_item_size 
	 --starting-token $next_token 
     --region REGION 
     --db-instance-identifier  $db_instance 
     --log-file-name error/postgresql.log.$AWS_LOG_TIME > $LOG_FILE
	then
		echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh: FATAL_ERROR - Could not get log from AWS .'
		exit 4
	fi

	next_token_str=`cat $LOG_FILE | grep NEXTTOKEN` 
	next_token=`echo $next_token_str | awk -F" " '{ print $2}' `

	TMP_FILE=$LOG_FILE'.tmp'
	grep -v NEXTTOKEN $LOG_FILE  > $TMP_FILE  
	
	last_str=`head -1 $TMP_FILE`
  
    if [[ $next_token == '' ]];
	then
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  tail -$lines $TMP_FILE >> $RESULT_FILE
	  
	  echo $(date +%Y%m%d%H%M)':    download_aws_piece.sh:  NEXTTOKEN NOT FOUND - FINISH '
	  rm $LOG_FILE 
	  rm $TMP_FILE
	  rm $TMP_MIDDLE
         rm $TMP_MIDDLE2	  
	  exit 0  
	fi
	
    if [[ $next_token != '' ]];
	then
		let growth_counter=$growth_counter+1
		if [[ $growth_counter -gt $growth_counter_max ]];
		then
			let last_aws_max_item_size=$last_aws_max_item_size*$growth_factor
			let growth_counter=1
		fi
	
		if [[ $last_aws_max_item_size -gt $max_item_size ]]; 
		then
			let last_aws_max_item_size=$max_item_size
		fi 

	  psql -h MONITOR_ENDPOINT.rds.amazonaws.com -U USER -d MONITOR_DATABASE -A -t -q -c "update database set last_aws_nexttoken = '$next_token' where id = $database_id "
	  
	  concat_str=$first_str$last_str
	  	  
	  echo $concat_str >> $RESULT_FILE
		 
	  line_count=`cat  $TMP_FILE | wc -l`
	  let lines=$line_count-1
	  
	  #############################
	  #Get middle of file
	  head -$lines $TMP_FILE > $TMP_MIDDLE
	  
	  line_count=`cat  $TMP_MIDDLE | wc -l`
	  let lines=$line_count-1
	  tail -$lines $TMP_MIDDLE > $TMP_MIDDLE2
	  
	  cat $TMP_MIDDLE2 >> $RESULT_FILE	  
	  
	  first_str=`tail -1 $TMP_FILE`	  
	fi
	  
    let count=$count+1

  done

Τι έπεται?

Έτσι, η πρώτη ενδιάμεση εργασία - "λήψη του αρχείου καταγραφής από το σύννεφο" λύθηκε. Τι να κάνετε με το ληφθέν αρχείο καταγραφής;
Πρώτα πρέπει να αναλύσετε το αρχείο καταγραφής και να εξαγάγετε τα πραγματικά αιτήματα από αυτό.
Το έργο δεν είναι πολύ δύσκολο. Το πιο απλό bash-script κάνει μια χαρά.
upload_log_query.sh

#!/bin/bash
#########################################################
# upload_log_query.sh
# Upload table table from dowloaded aws file 
# version HABR
###########################################################  
echo 'TIMESTAMP:'$(date +%c)' Upload log_query table '
source_file=$1
echo 'source_file='$source_file
database_id=$2
echo 'database_id='$database_id

beginer=' '
first_line='1'
let "line_count=0"
sql_line=' '
sql_flag=' '    
space=' '
cat $source_file | while read line
do
  line="$space$line"

  if [[ $first_line == "1" ]]; then
    beginer=`echo $line | awk -F" " '{ print $1}' `
    first_line='0'
  fi

  current_beginer=`echo $line | awk -F" " '{ print $1}' `

  if [[ $current_beginer == $beginer ]]; then
    if [[ $sql_flag == '1' ]]; then
     sql_flag='0' 
     log_date=`echo $sql_line | awk -F" " '{ print $1}' `
     log_time=`echo $sql_line | awk -F" " '{ print $2}' `
     duration=`echo $sql_line | awk -F" " '{ print $5}' `

     #replace ' to ''
     sql_modline=`echo "$sql_line" | sed 's/'''/''''''/g'`
     sql_line=' '

	 ################
	 #PROCESSING OF THE SQL-SELECT IS HERE
     if ! psql -h ENDPOINT.rds.amazonaws.com -U USER -d DATABASE -v ON_ERROR_STOP=1 -A -t -c "select log_query('$ip_port',$database_id , '$log_date' , '$log_time' , '$duration' , '$sql_modline' )" 
     then
        echo 'FATAL_ERROR - log_query '
        exit 1
     fi
	 ################

    fi #if [[ $sql_flag == '1' ]]; then

    let "line_count=line_count+1"

    check=`echo $line | awk -F" " '{ print $8}' `
    check_sql=${check^^}    

    #echo 'check_sql='$check_sql
    
    if [[ $check_sql == 'SELECT' ]]; then
     sql_flag='1'    
     sql_line="$sql_line$line"
	 ip_port=`echo $sql_line | awk -F":" '{ print $4}' `
    fi
  else       

    if [[ $sql_flag == '1' ]]; then
      sql_line="$sql_line$line"
    fi   
    
  fi #if [[ $current_beginer == $beginer ]]; then

done

Τώρα μπορείτε να εργαστείτε με το ερώτημα που εξάγεται από το αρχείο καταγραφής.

Και υπάρχουν πολλές χρήσιμες δυνατότητες.

Τα αναλυμένα ερωτήματα πρέπει να αποθηκευτούν κάπου. Για αυτό, χρησιμοποιείται ένας πίνακας σέρβις. log_query

CREATE TABLE log_query
(
   id SERIAL ,
   queryid bigint ,
   query_md5hash text not null ,
   database_id integer not null ,  
   timepoint timestamp without time zone not null,
   duration double precision not null ,
   query text not null ,
   explained_plan text[],
   plan_md5hash text  , 
   explained_plan_wo_costs text[],
   plan_hash_value text  ,
   baseline_id integer ,
   ip text ,
   port text 
);
ALTER TABLE log_query ADD PRIMARY KEY (id);
ALTER TABLE log_query ADD CONSTRAINT queryid_timepoint_unique_key UNIQUE (queryid, timepoint );
ALTER TABLE log_query ADD CONSTRAINT query_md5hash_timepoint_unique_key UNIQUE (query_md5hash, timepoint );

CREATE INDEX log_query_timepoint_idx ON log_query (timepoint);
CREATE INDEX log_query_queryid_idx ON log_query (queryid);
ALTER TABLE log_query ADD CONSTRAINT database_id_fk FOREIGN KEY (database_id) REFERENCES database (id) ON DELETE CASCADE ;

Το αναλυμένο αίτημα υποβάλλεται σε επεξεργασία στο plpgsql λειτουργίες"log_query».
log_query.sql

--log_query.sql
--verison HABR
CREATE OR REPLACE FUNCTION log_query( ip_port text ,log_database_id integer , log_date text , log_time text , duration text , sql_line text   ) RETURNS boolean AS $$
DECLARE
  result boolean ;
  log_timepoint timestamp without time zone ;
  log_duration double precision ; 
  pos integer ;
  log_query text ;
  activity_string text ;
  log_md5hash text ;
  log_explain_plan text[] ;
  
  log_planhash text ;
  log_plan_wo_costs text[] ; 
  
  database_rec record ;
  
  pg_stat_query text ; 
  test_log_query text ;
  log_query_rec record;
  found_flag boolean;
  
  pg_stat_history_rec record ;
  port_start integer ;
  port_end integer ;
  client_ip text ;
  client_port text ;
  log_queryid bigint ;
  log_query_text text ;
  pg_stat_query_text text ; 
BEGIN
  result = TRUE ;

  RAISE NOTICE '***log_query';
  
  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = duration:: double precision; 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);
	activity_string = 	'New query has logged '||
						' database_id = '|| log_database_id ||
						' query_md5hash='||log_md5hash||
						' , timepoint = '||to_char(log_timepoint,'YYYYMMDD HH24:MI:SS');
					
	RAISE NOTICE '%',activity_string;					
					 
	PERFORM pg_log( log_database_id , 'log_query' , activity_string);  

	EXCEPTION
	  WHEN unique_violation THEN
		RAISE NOTICE '*** unique_violation *** query already has logged';
	END;

	SELECT 	queryid
	INTO   	log_queryid
	FROM 	log_query 
	WHERE 	query_md5hash = log_md5hash AND
			timepoint = log_timepoint;

	IF log_queryid IS NOT NULL 
	THEN 
	  RAISE NOTICE 'log_query with query_md5hash = % and timepoint = % has already has a QUERYID = %',log_md5hash,log_timepoint , log_queryid ;
	  RETURN result;
	END IF;
	
	------------------------------------------------
	RAISE NOTICE 'Update queryid';	
	
	SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
         queryid ,
	  query 
	 FROM 
         pg_stat_db_queries 
     WHERE  
      database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id               
		   				    ;						
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;
	
  RETURN result ;
END
$$ LANGUAGE plpgsql;

Κατά την επεξεργασία, χρησιμοποιείται ο πίνακας σέρβις pg_stat_db_queriesA που περιέχει ένα στιγμιότυπο των τρεχόντων ερωτημάτων από τον πίνακα pg_stat_history (Η χρήση του πίνακα περιγράφεται εδώ − Παρακολούθηση απόδοσης ερωτημάτων PostgreSQL. Μέρος 1 - αναφορά)

TABLE pg_stat_db_queries
(
   database_id integer,  
   queryid bigint ,  
   query text , 
   max_time double precision 
);

TABLE pg_stat_history 
(
…
database_id integer ,
…
queryid bigint ,
…
max_time double precision	 , 	
…
);

Η λειτουργία σάς επιτρέπει να εφαρμόσετε μια σειρά από χρήσιμες λειτουργίες για την επεξεργασία αιτημάτων από ένα αρχείο καταγραφής. Και συγκεκριμένα:

Ευκαιρία #1 - Ιστορικό εκτέλεσης ερωτήματος

Πολύ χρήσιμο για την έναρξη ενός περιστατικού απόδοσης. Πρώτα, εξοικειωθείτε με την ιστορία - και πότε ξεκίνησε η επιβράδυνση;
Στη συνέχεια, σύμφωνα με τους κλασικούς, αναζητήστε εξωτερικές αιτίες. Μπορεί απλώς να έχει αυξηθεί δραματικά ο φόρτος της βάσης δεδομένων και το συγκεκριμένο αίτημα να μην έχει καμία σχέση με αυτό.
Προσθήκη νέας καταχώρισης στον πίνακα log_query

  port_start = position('(' in ip_port);
  port_end = position(')' in ip_port);
  client_ip = substring( ip_port from 1 for port_start-1 );
  client_port = substring( ip_port from port_start+1 for port_end-port_start-1 );

  SELECT e.host , d.name , d.owner_pwd 
  INTO database_rec
  FROM database d JOIN endpoint e ON e.id = d.endpoint_id
  WHERE d.id = log_database_id ;
  
  log_timepoint = to_timestamp(log_date||' '||log_time,'YYYY-MM-DD HH24-MI-SS');
  log_duration = to_number(duration,'99999999999999999999D9999999999'); 

  
  pos = position ('SELECT' in UPPER(sql_line) );
  log_query = substring( sql_line from pos for LENGTH(sql_line));
  log_query = regexp_replace(log_query,' +',' ','g');
  log_query = regexp_replace(log_query,';+','','g');
  log_query = trim(trailing ' ' from log_query);
 
  RAISE NOTICE 'log_query=%',log_query ;   

  log_md5hash = md5( log_query::text );
  
  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');
  --------------------------
  BEGIN
	INSERT INTO log_query
	(
		query_md5hash ,
		database_id , 
		timepoint ,
		duration ,
		query ,
		explained_plan ,
		plan_md5hash , 
		explained_plan_wo_costs , 
		plan_hash_value , 
		ip , 
		port
	) 
	VALUES 
	(
		log_md5hash ,
		log_database_id , 
		log_timepoint , 
		log_duration , 
		log_query ,
		log_explain_plan , 
		md5(log_explain_plan::text) ,
		log_plan_wo_costs , 
		md5(log_plan_wo_costs::text),
		client_ip , 
		client_port		
	);

Δυνατότητα #2 - Αποθήκευση σχεδίων εκτέλεσης ερωτήματος

Στο σημείο αυτό μπορεί να προκύψει ένσταση-διευκρίνιση-σχόλιο: «Αλλά υπάρχει ήδη αυτόματη εξήγηση". Ναι, είναι, αλλά ποιο είναι το νόημα εάν το σχέδιο εκτέλεσης αποθηκεύεται στο ίδιο αρχείο καταγραφής και για να το αποθηκεύσετε για περαιτέρω ανάλυση, πρέπει να αναλύσετε το αρχείο καταγραφής;

Ωστόσο, χρειαζόμουν:
πρώτον: αποθηκεύστε το σχέδιο εκτέλεσης στον πίνακα υπηρεσιών της βάσης δεδομένων παρακολούθησης.
δεύτερον: να μπορείς να συγκρίνεις σχέδια εκτέλεσης μεταξύ τους για να δεις αμέσως ότι το σχέδιο εκτέλεσης ερωτήματος έχει αλλάξει.

Είναι διαθέσιμο ένα αίτημα με συγκεκριμένες παραμέτρους εκτέλεσης. Η λήψη και η αποθήκευση του σχεδίου εκτέλεσής του χρησιμοποιώντας το EXPLAIN είναι μια στοιχειώδης εργασία.
Επιπλέον, χρησιμοποιώντας την έκφραση EXPLAIN (COSTS FALSE), μπορείτε να λάβετε το πλαίσιο του σχεδίου, το οποίο θα χρησιμοποιηθεί για την απόκτηση της τιμής κατακερματισμού του σχεδίου, το οποίο θα βοηθήσει στην επακόλουθη ανάλυση του ιστορικού αλλαγών του σχεδίου εκτέλεσης.
Λάβετε ένα πρότυπο σχεδίου εκτέλεσης

  --Explain execution plan--
  EXECUTE 'SELECT dblink_connect(''LINK1'',''host='||database_rec.host||' dbname='||database_rec.name||' user=DATABASE password='||database_rec.owner_pwd||' '')'; 
  
  log_explain_plan = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN '||log_query ) AS t (plan text) );
  log_plan_wo_costs = ARRAY ( SELECT * FROM dblink('LINK1', 'EXPLAIN ( COSTS FALSE ) '||log_query ) AS t (plan text) );
    
  PERFORM dblink_disconnect('LINK1');

Ευκαιρία #3 - Χρήση του αρχείου καταγραφής ερωτημάτων για παρακολούθηση

Εφόσον οι μετρήσεις απόδοσης δεν έχουν διαμορφωθεί για το κείμενο αιτήματος, αλλά για το αναγνωριστικό του, πρέπει να συσχετίσετε αιτήματα από το αρχείο καταγραφής με αιτήματα για τα οποία έχουν διαμορφωθεί οι μετρήσεις απόδοσης.
Λοιπόν, τουλάχιστον για να έχουμε τον ακριβή χρόνο εμφάνισης ενός περιστατικού παράστασης.

Έτσι, όταν συμβαίνει ένα περιστατικό απόδοσης για ένα αναγνωριστικό αιτήματος, θα υπάρχει ένας σύνδεσμος προς ένα συγκεκριμένο αίτημα με συγκεκριμένες τιμές παραμέτρων και τον ακριβή χρόνο εκτέλεσης και τη διάρκεια του αιτήματος. Λάβετε τις δεδομένες πληροφορίες χρησιμοποιώντας μόνο την προβολή pg_stat_statements - ειναι ΑΠΑΓΟΡΕΥΜΕΝΟ.
Βρείτε το ερώτημα του ερωτήματος και ενημερώστε την καταχώρηση στον πίνακα log_query

SELECT * 
	INTO log_query_rec
	FROM log_query
	WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ; 
	
	log_query_rec.query=regexp_replace(log_query_rec.query,';+','','g');
	
	FOR pg_stat_history_rec IN
	 SELECT 
      queryid ,
	  query 
	 FROM 
       pg_stat_db_queries 
     WHERE  
	   database_id = log_database_id AND
       queryid is not null 
	LOOP
	  pg_stat_query = pg_stat_history_rec.query ; 
	  pg_stat_query=regexp_replace(pg_stat_query,'n+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'t+',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,' +',' ','g');
	  pg_stat_query=regexp_replace(pg_stat_query,'$.','%','g');
	
	  log_query_text = trim(trailing ' ' from log_query_rec.query);
	  pg_stat_query_text = pg_stat_query; 
	  
	  --SELECT log_query_rec.query like pg_stat_query INTO found_flag ; 
	  IF (log_query_text LIKE pg_stat_query_text) THEN
		found_flag = TRUE ;
	  ELSE
		found_flag = FALSE ;
	  END IF;	  
	  
	  
	  IF found_flag THEN
	    
		UPDATE log_query SET queryid = pg_stat_history_rec.queryid WHERE query_md5hash = log_md5hash AND timepoint = log_timepoint ;
		activity_string = 	' updated queryid = '||pg_stat_history_rec.queryid||
		                    ' for log_query with id = '||log_query_rec.id		                    
		   				    ;						
					
	    RAISE NOTICE '%',activity_string;	
		EXIT ;
	  END IF ;
	  
	END LOOP ;

Επίλογος

Ως αποτέλεσμα, η περιγραφόμενη μέθοδος βρήκε την εφαρμογή της σε αναπτύχθηκε σύστημα παρακολούθησης της απόδοσης των ερωτημάτων PostgreSQL, επιτρέποντάς σας να έχετε περισσότερες πληροφορίες για ανάλυση κατά την επίλυση αναδυόμενων περιστατικών απόδοσης ερωτημάτων.

Αν και, φυσικά, κατά την προσωπική μου άποψη, θα πρέπει να εργαστείτε στον αλγόριθμο για την επιλογή και την αλλαγή του μεγέθους του τμήματος που έχετε λάβει. Το πρόβλημα δεν έχει ακόμη λυθεί στη γενική περίπτωση. Μάλλον θα έχει ενδιαφέρον.

Αλλά αυτή είναι μια εντελώς διαφορετική ιστορία...

Πηγή: www.habr.com

Προσθέστε ένα σχόλιο