Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

การแนะนำ

มันบังเอิญว่า ณ สถานที่ทำงานปัจจุบันของฉัน ฉันต้องทำความคุ้นเคยกับเทคโนโลยีนี้ ฉันจะเริ่มต้นด้วยพื้นหลังเล็กน้อย ในการประชุมครั้งถัดไป ทีมงานของเราได้รับแจ้งว่าเราจำเป็นต้องสร้างการบูรณาการด้วย ระบบที่รู้จัก. จากการผสานรวม หมายความว่าระบบที่รู้จักกันดีนี้จะส่งคำขอถึงเราผ่าน HTTP ไปยังอุปกรณ์ปลายทางเฉพาะ และที่น่าแปลกก็คือเราจะส่งคำตอบกลับในรูปแบบของข้อความ SOAP ทุกอย่างดูเรียบง่ายและไม่สำคัญ จากนี้เป็นไปตามที่คุณต้องการ...

งาน

สร้าง 3 บริการ ประการแรกคือบริการอัพเดตฐานข้อมูล บริการนี้เมื่อข้อมูลใหม่มาถึงจากระบบของบุคคลที่สาม จะอัปเดตข้อมูลในฐานข้อมูลและสร้างไฟล์ในรูปแบบ CSV เพื่อถ่ายโอนไปยังระบบถัดไป จุดสิ้นสุดของบริการที่สองเรียกว่า - FTP Transport Service ซึ่งรับไฟล์ที่ถ่ายโอน ตรวจสอบความถูกต้อง และวางไว้ในที่จัดเก็บไฟล์ผ่าน FTP บริการที่สามคือบริการถ่ายโอนข้อมูลผู้บริโภค ทำงานแบบอะซิงโครนัสกับสองบริการแรก ได้รับการร้องขอจากระบบภายนอกของบุคคลที่สามให้รับไฟล์ที่กล่าวถึงข้างต้น รับไฟล์ตอบกลับที่พร้อมใช้งาน แก้ไข (อัปเดตฟิลด์ id, คำอธิบาย, linkToFile) และส่งการตอบกลับในรูปแบบของข้อความ SOAP นั่นคือภาพรวมมีดังนี้: สองบริการแรกเริ่มทำงานเฉพาะเมื่อมีข้อมูลสำหรับการอัปเดตมาถึงเท่านั้น บริการที่สามทำงานอย่างต่อเนื่องเนื่องจากมีผู้บริโภคข้อมูลจำนวนมาก คำขอข้อมูลประมาณ 1000 รายการต่อนาที บริการต่างๆ มีให้ใช้งานอย่างต่อเนื่องและอินสแตนซ์ตั้งอยู่ในสภาพแวดล้อมที่แตกต่างกัน เช่น การทดสอบ การสาธิต ก่อนการผลิต และการผลิต ด้านล่างนี้คือแผนภาพแสดงวิธีการทำงานของบริการเหล่านี้ ฉันขอชี้แจงทันทีว่ารายละเอียดบางอย่างได้รับการทำให้ง่ายขึ้นเพื่อหลีกเลี่ยงความซับซ้อนที่ไม่จำเป็น

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

เจาะลึกทางเทคนิค

เมื่อวางแผนการแก้ปัญหา ขั้นแรกเราตัดสินใจสร้างแอปพลิเคชันใน Java โดยใช้ Spring Framework, Nginx Balancer, ฐานข้อมูล Postgres และสิ่งทางเทคนิคอื่นๆ ที่ไม่ใช่ทางเทคนิค เนื่องจากเวลาในการพัฒนาโซลูชันด้านเทคนิคทำให้เราสามารถพิจารณาแนวทางอื่นในการแก้ปัญหานี้ได้ เราจึงจ้องมองไปที่เทคโนโลยี Apache NIFI ซึ่งเป็นแฟชั่นในบางวงการ ฉันจะบอกทันทีว่าเทคโนโลยีนี้ทำให้เราสังเกตเห็นบริการทั้ง 3 นี้ บทความนี้จะอธิบายการพัฒนาบริการขนส่งไฟล์และบริการถ่ายโอนข้อมูลไปยังผู้บริโภค แต่หากบทความมีประโยชน์ ผมจะเขียนเกี่ยวกับบริการอัปเดตข้อมูลในฐานข้อมูล

นี่คืออะไร

NIFI เป็นสถาปัตยกรรมแบบกระจายสำหรับการโหลดและการประมวลผลข้อมูลแบบขนานที่รวดเร็ว ปลั๊กอินจำนวนมากสำหรับแหล่งที่มาและการแปลง การกำหนดเวอร์ชันของการกำหนดค่า และอื่นๆ อีกมากมาย โบนัสที่ดีคือมันใช้งานง่ายมาก กระบวนการเล็กๆ น้อยๆ เช่น getFile, sendHttpRequest และอื่นๆ สามารถแสดงเป็นสี่เหลี่ยมได้ แต่ละช่องแสดงถึงกระบวนการ ซึ่งการโต้ตอบดังกล่าวสามารถดูได้ในรูปด้านล่าง มีการเขียนเอกสารรายละเอียดเพิ่มเติมเกี่ยวกับการโต้ตอบการตั้งค่ากระบวนการ ที่นี่ สำหรับผู้ที่พูดภาษารัสเซีย - ที่นี่. เอกสารประกอบอธิบายวิธีการแตกแพ็กและรัน NIFI ได้อย่างสมบูรณ์แบบ รวมถึงวิธีสร้างกระบวนการหรือที่เรียกว่า Squares
แนวคิดในการเขียนบทความเกิดขึ้นหลังจากการค้นหาและจัดโครงสร้างข้อมูลที่ได้รับมาอย่างมีสติ รวมถึงความปรารถนาที่จะทำให้ชีวิตง่ายขึ้นเล็กน้อยสำหรับนักพัฒนาในอนาคต

ตัวอย่าง

ตัวอย่างของการพิจารณาปฏิสัมพันธ์ระหว่างช่องสี่เหลี่ยมระหว่างกัน รูปแบบทั่วไปค่อนข้างง่าย: เราได้รับคำขอ HTTP (ตามทฤษฎีแล้ว โดยมีไฟล์อยู่ในเนื้อหาของคำขอ เพื่อแสดงให้เห็นถึงความสามารถของ NIFI ในตัวอย่างนี้ คำขอจะเริ่มต้นกระบวนการรับไฟล์จากที่จัดเก็บไฟล์ในเครื่อง ) จากนั้นเราจะตอบกลับว่าได้รับคำขอแล้ว ควบคู่ไปกับกระบวนการรับไฟล์จาก FH และกระบวนการย้ายไฟล์ผ่าน FTP ไปยัง FH เป็นเรื่องที่ควรค่าแก่การชี้แจงว่ากระบวนการโต้ตอบกันผ่านสิ่งที่เรียกว่า flowFile นี่คือเอนทิตีฐานใน NIFI ที่เก็บแอตทริบิวต์และเนื้อหา เนื้อหาคือข้อมูลที่แสดงโดยไฟล์สตรีม กล่าวคือ หากคุณได้รับไฟล์จากสแควร์หนึ่งและถ่ายโอนไปยังอีกสแควร์หนึ่ง เนื้อหานั้นก็จะเป็นไฟล์ของคุณ

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

ดังที่คุณเห็นในภาพนี้แสดงกระบวนการทั่วไป HandleHttpRequest - ยอมรับคำขอ,แทนที่ข้อความ - สร้างเนื้อหาการตอบกลับ, HandleHttpResponse - ส่งการตอบกลับ FetchFile - รับไฟล์จากที่เก็บไฟล์ ถ่ายโอนไปยัง Square PutSftp - วางไฟล์นี้บน FTP ตามที่อยู่ที่ระบุ ตอนนี้เพิ่มเติมเกี่ยวกับกระบวนการนี้

ในกรณีนี้ คำขอคือจุดเริ่มต้นของทุกสิ่ง ลองดูที่พารามิเตอร์การกำหนดค่า

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

ทุกอย่างที่นี่ค่อนข้างไม่สำคัญยกเว้น StandardHttpContextMap ซึ่งเป็นบริการประเภทหนึ่งที่ให้คุณส่งและรับคำขอได้ คุณสามารถดูรายละเอียดเพิ่มเติมและแม้กระทั่งตัวอย่างได้ - ที่นี่

ต่อไป มาดูพารามิเตอร์การกำหนดค่าแทนที่ข้อความของสี่เหลี่ยมจัตุรัส ควรให้ความสนใจกับ ReplacementValue - นี่คือสิ่งที่จะส่งคืนให้กับผู้ใช้ในรูปแบบของการตอบกลับ ในการตั้งค่า คุณสามารถปรับระดับการบันทึกได้ คุณสามารถดูบันทึก {where you unpacked nifi}/nifi-1.9.2/logs นอกจากนี้ยังมีพารามิเตอร์ความล้มเหลว/ความสำเร็จ - ขึ้นอยู่กับพารามิเตอร์เหล่านี้ คุณสามารถควบคุมกระบวนการโดยรวมได้ . นั่นคือในกรณีของการประมวลผลข้อความที่ประสบความสำเร็จ กระบวนการในการส่งการตอบกลับไปยังผู้ใช้จะถูกเรียก และในอีกกรณีหนึ่ง เราก็จะบันทึกกระบวนการที่ไม่สำเร็จ

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

ไม่มีอะไรน่าสนใจเป็นพิเศษในคุณสมบัติ HandleHttpResponse ยกเว้นสถานะเมื่อมีการสร้างการตอบสนองสำเร็จ

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

เราได้แยกคำขอและการตอบกลับแล้ว - ต่อไปรับไฟล์และวางไว้บนเซิร์ฟเวอร์ FTP กัน FetchFile - รับไฟล์ตามเส้นทางที่ระบุในการตั้งค่าและส่งผ่านไปยังกระบวนการถัดไป

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

จากนั้นสี่เหลี่ยม PutSftp - วางไฟล์ไว้ในที่จัดเก็บไฟล์ เราสามารถดูพารามิเตอร์การกำหนดค่าด้านล่าง

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

ควรให้ความสนใจกับความจริงที่ว่าแต่ละสแควร์เป็นกระบวนการแยกกันที่ต้องเปิดตัว เราดูตัวอย่างที่ง่ายที่สุดที่ไม่ต้องมีการปรับแต่งที่ซับซ้อนใดๆ ต่อไป เราจะดูกระบวนการที่ซับซ้อนขึ้นอีกเล็กน้อย โดยเราจะเขียนรายละเอียดเล็กน้อยลงในร่อง

ตัวอย่างที่ซับซ้อนมากขึ้น

บริการถ่ายโอนข้อมูลไปยังผู้บริโภคมีความซับซ้อนมากขึ้นเล็กน้อยเนื่องจากกระบวนการแก้ไขข้อความ SOAP กระบวนการทั่วไปแสดงในรูปด้านล่าง

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

แนวคิดนี้ไม่ซับซ้อนเป็นพิเศษ: เราได้รับคำขอจากผู้บริโภคว่าเขาต้องการข้อมูล ส่งการตอบกลับว่าเขาได้รับข้อความ เริ่มกระบวนการรับไฟล์ตอบกลับ จากนั้นแก้ไขด้วยตรรกะบางอย่าง จากนั้น ถ่ายโอนไฟล์ไปยังผู้บริโภคในรูปแบบของข้อความ SOAP ไปยังเซิร์ฟเวอร์

ฉันคิดว่าไม่จำเป็นต้องอธิบายสี่เหลี่ยมเหล่านั้นที่เราเห็นด้านบนอีกครั้ง - มาดูช่องใหม่กันดีกว่า หากคุณต้องการแก้ไขไฟล์ใดๆ และช่องสี่เหลี่ยมประเภทแทนที่ข้อความธรรมดาไม่เหมาะ คุณจะต้องเขียนสคริปต์ของคุณเอง ซึ่งสามารถทำได้โดยใช้ตาราง ExecuteGroogyScript การตั้งค่าแสดงไว้ด้านล่าง

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

มีสองตัวเลือกสำหรับการโหลดสคริปต์ลงในช่องสี่เหลี่ยมนี้ ประการแรกคือการดาวน์โหลดไฟล์ด้วยสคริปต์ ประการที่สองคือการแทรกสคริปต์ลงใน scriptBody เท่าที่ฉันรู้ ตารางExecuteScriptรองรับหลายภาษา - หนึ่งในนั้นคือ Groovy ฉันจะทำให้นักพัฒนา Java ผิดหวัง - คุณไม่สามารถเขียนสคริปต์ใน Java ในช่องสี่เหลี่ยมดังกล่าวได้ สำหรับผู้ที่ต้องการจริงๆ คุณต้องสร้างสแควร์ที่คุณกำหนดเองและเพิ่มลงในระบบ NIFI การดำเนินการทั้งหมดนี้มาพร้อมกับการเต้นรำที่ยาวนานกับแทมบูรีนซึ่งเราจะไม่พูดถึงในบทความนี้ ฉันเลือกภาษาที่ไพเราะ ด้านล่างนี้เป็นสคริปต์ทดสอบที่เพียงแค่อัพเดต ID ในข้อความ SOAP แบบเพิ่มหน่วย เป็นสิ่งสำคัญที่จะต้องทราบ คุณนำไฟล์จาก flowFile และอัปเดต อย่าลืมว่าคุณต้องนำไฟล์กลับไปที่นั่นและอัปเดต เป็นที่น่าสังเกตว่าไม่ได้รวมห้องสมุดทั้งหมดไว้ด้วย อาจเกิดขึ้นได้ว่าคุณยังคงต้องนำเข้า libs อันใดอันหนึ่ง ข้อเสียอีกประการหนึ่งคือสคริปต์ในสแควร์นี้ค่อนข้างจะแก้ไขจุดบกพร่องได้ยาก มีวิธีเชื่อมต่อกับ NIFI JVM และเริ่มกระบวนการดีบัก โดยส่วนตัวแล้ว ฉันเปิดตัวแอปพลิเคชันในเครื่องและจำลองการรับไฟล์จากเซสชัน ฉันยังทำการดีบักในเครื่องด้วย ข้อผิดพลาดที่ปรากฏขึ้นเมื่อโหลดสคริปต์นั้นค่อนข้างง่ายสำหรับ Google และ NIFI เขียนลงในบันทึกเอง

import org.apache.commons.io.IOUtils
import groovy.xml.XmlUtil
import java.nio.charset.*
import groovy.xml.StreamingMarkupBuilder

def flowFile = session.get()
if (!flowFile) return
try {
    flowFile = session.write(flowFile, { inputStream, outputStream ->
        String result = IOUtils.toString(inputStream, "UTF-8");
        def recordIn = new XmlSlurper().parseText(result)
        def element = recordIn.depthFirst().find {
            it.name() == 'id'
        }

        def newId = Integer.parseInt(element.toString()) + 1
        def recordOut = new XmlSlurper().parseText(result)
        recordOut.Body.ClientMessage.RequestMessage.RequestContent.content.MessagePrimaryContent.ResponseBody.id = newId

        def res = new StreamingMarkupBuilder().bind { mkp.yield recordOut }.toString()
        outputStream.write(res.getBytes(StandardCharsets.UTF_8))
} as StreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
}
catch(Exception e) {
    log.error("Error during processing of validate.groovy", e)
    session.transfer(flowFile, REL_FAILURE)
}

จริงๆ แล้ว นี่คือจุดที่การปรับแต่งสแควร์สิ้นสุดลง จากนั้นไฟล์ที่อัปเดตจะถูกถ่ายโอนไปยังสแควร์ซึ่งมีหน้าที่ส่งไฟล์ไปยังเซิร์ฟเวอร์ ด้านล่างนี้คือการตั้งค่าสำหรับสแควร์นี้

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

เราอธิบายวิธีการส่งข้อความ SOAP เราเขียนที่ไหน ถัดไปคุณต้องระบุว่านี่คือ SOAP

Apache NIFI - ภาพรวมโดยย่อของโอกาสในทางปฏิบัติ

เพิ่มคุณสมบัติหลายอย่าง เช่น โฮสต์และการกระทำ (soapAction) เราบันทึกและตรวจสอบ คุณสามารถดูรายละเอียดเพิ่มเติมเกี่ยวกับวิธีการส่งคำขอ SOAP ที่นี่

เราดูตัวเลือกต่างๆ มากมายสำหรับการใช้กระบวนการ NIFI พวกเขาโต้ตอบกันอย่างไรและประโยชน์ที่แท้จริงของพวกเขาคืออะไร? ตัวอย่างที่พิจารณาเป็นเพียงตัวอย่างทดสอบและแตกต่างเล็กน้อยจากสิ่งที่เกิดขึ้นจริงในการต่อสู้ ฉันหวังว่าบทความนี้จะมีประโยชน์เล็กน้อยสำหรับนักพัฒนา ขอขอบคุณสำหรับความสนใจของคุณ. หากคุณมีคำถามใด ๆ เขียน ฉันจะพยายามตอบ

ที่มา: will.com

เพิ่มความคิดเห็น