自動プロビジョニング Yealink T19 + 動的アドレス帳

私がこの会社に入社したとき、すでに IP デバイスのデータベース、アスタリスクの付いたいくつかのサーバー、および FreeBPX 形式のパッチを持っていました。 さらに、アナログ PBX Samsung IDCS500 が並行して動作し、通常は社内の主要な通信システムとなっていましたが、IP テレフォニーは営業部門のみで動作していました。 そして、すべてがこのように調理され続けるはずでしたが、ある晴れた日、全員を IP 電話に移行するという法令が発令され、期限が合意され、機器が購入され、企業を 21 世紀に移行する計画が実行され始めました。
このような状況で最初に心配し始めるのは、どうにか管理する必要がある電話機の数が急速に増えていることであり、次に非常に心配だったのは電話帳です。 最初のもの (ちなみに、FreePBX の最新バージョンから切り取られたもの) について Endpoint Manager が役立つとしたら、この本でいくつかの疑問が生じます。

  • まず、ユーザーの位置/流動性が常に変化する場合、その精度をどのように確保するか?
  • 次に、電話を完全に非個人化する方法です。 連絡先名を毎回入力しないのですか?

この問題は興味深いもので、解決策が見つかるまでにそれほど時間はかかりませんでした。 ここで完全なリストを示し、それから順番に見ていきます。

from scapy.all import sniff
from scapy.layers.inet import IP
import mysql.connector
import ldap
import getpass
import tftpy
import requests
import os
import time
from string import replace

def conn_ldap(login):
    ad = ldap.initialize('ldap://***.local')
    ad.simple_bind_s('voip@***.local', 'password')
    basedn = 'OU=IT,DC=***,DC=LOCAL'
    basedn_user = 'OU=***,OU=***,DC=***,DC=LOCAL'
    scope = ldap.SCOPE_SUBTREE
    filterexp = "(&(sAMAccountName=" + login + ")(ObjectClass=person))"
    filterexp2 = "(&(ObjectClass=organizationUnit))"
    attrlist = ['cn']
    attrlist2 = ['OU']
    search = ad.search_s(basedn, scope, filterexp, attrlist)
    adname = search[0][1]['cn'][0].decode('utf-8')
    if adname == ' ':
        search = ad.search_s(basedn_user, scope, filterexp2, attrlist2)
        for i in range(1, len(search)+1):
            group = search[i][1]['ou'][0]
            basedn_user2 = 'OU='+group+','+basedn_user
            search = ad.search_s(basedn_user2, scope, filterexp, attrlist)
            adname = search[0][1]['cn'][0].decode('utf-8')
            if adname != ' ':
                return adname
        adname = search[0][1]['cn'][0].decode('utf-8')
    ad.unbind_s()
    return adname


def tftp_file_change(config,place,adname,current_account,current_account_password):

    client = tftpy.TftpClient("192.168.0.3", 69)
    client.download('template.cfg', place)
    fileread = open(place, 'r')
    line = fileread.readlines()
    fileread.close()
    line[5] = (('account.1.label = ').encode('utf-8') + adname.encode('utf-8') + 'n')
    line[2] = (('account.1.auth_name = ').encode('utf-8') + current_account.encode('utf-8') + 'n')
    line[3] = (('account.1.display_name = ').encode('utf-8') + current_account.encode('utf-8') + 'n')
    line[6] = (('account.1.password = ').encode('utf-8') + current_account_password[0][0] + 'n')
    filewrite = open(place, 'w')
    for i in line:
      filewrite.write(i)
    filewrite.close()
    print place
    print config
    client.upload(config,place)


def get_phone_inform(ipaddr):
    fileconf = requests.get('http://admin:admin@'+ipaddr+'/servlet?phonecfg=get[&accounts=1]')
    conf = fileconf.text.split('|')
    current_account = conf[2]
    return current_account


def sniff_frame():
    pcapf = sniff(count=1, timeout=70, filter="dst host 192.168.0.3 and port 5060")
    if len(pcapf) == 0:
        exit()
    frame = pcapf[0]
    macaddr = frame.src
    print macaddr[:8]
    if macaddr[:8] != '80:5e:c0':
        exit()
    ipaddr = frame[0][IP].src
    return macaddr, ipaddr


def conn_mysql(query,fquery,macaddr,qwery2):
    connect = mysql.connector.connect(host='192.168.0.3', database='voip', user='voip_wr', password='***')
    cursor = connect.cursor()
    cursor.execute(fquery)
    state = cursor.fetchall()
    state = bool(state[0][0])
    if state == True:
        cursor.execute(qwery2)
        connect.commit()
        connect.close()
    else:
        cursor.execute(query)
        connect.commit()
        connect.close()


def check_account(current_account):
    connect = mysql.connector.connect(host='192.168.0.3', database='asterisk', user='voip_wr', password='***')
    cursor = connect.cursor()
    qwery = 'select data from sip where id=' + current_account + ' and keyword="secret";'
    cursor.execute(qwery)
    password = cursor.fetchall()
    if password == ' ':
        exit()
    else:
        return password


if __name__ == '__main__':
    macaddr, ipaddr = sniff_frame()
    current_account = get_phone_inform(ipaddr)
    current_account_password = check_account(current_account)
    macaddr = macaddr.replace(':', '')
    ipaddr = ipaddr.decode('utf-8')
    adname = conn_ldap(getpass.getuser())
    query = 'INSERT INTO station (mac, ip, name, number) VALUES (' + '"' + macaddr + '",' + '"' + ipaddr + '",' + '"' + adname + '",' + '"' + get_phone_inform(ipaddr) + '"' + ')'
    qwery2 = 'UPDATE station SET ip=' + '"' + ipaddr + '"' + ', name=' + '"' + adname + '"' + ', number=' + '"' + get_phone_inform(ipaddr) + '"' + ' WHERE mac=' + '"' + macaddr + '"'
    fquery = 'SELECT EXISTS(SELECT mac FROM voip.station WHERE mac=' + '"' + macaddr + '")'
    query = query.encode('utf-8')
    fquery = fquery.encode('utf-8')
    config = macaddr + '.cfg'
    place = os.path.expanduser("~") + "" + "AppDataLocal" + config
    conn_mysql(query,fquery,macaddr,qwery2)
    tftp_file_change(config,place,adname,current_account,current_account_password)
    requests.get('http://admin:admin@'+ipaddr+'/cgi-bin/ConfigManApp.com?key=AutoP')
    requests.get('http://admin:admin@'+ipaddr+'/cgi-bin/ConfigManApp.com?key=Reboot')

Yealink T19 はゲートウェイとして機能できないため、プログラムはユーザーのコンピュータ上で実行され、コンピュータが電話経由でネットワークに接続されている限り機能します。

まず、接続されているかどうかを理解する必要があります。 そして私たちの電話が持っているMacとIP。

def sniff_frame():
    pcapf = sniff(count=1, timeout=70, filter="dst host 192.168.0.3 and port 5060")
    if len(pcapf) == 0:
        exit()
    frame = pcapf[0]
    macaddr = frame.src
    print macaddr[:8]
    if macaddr[:8] != '80:5e:c0':
        exit()
    ipaddr = frame[0][IP].src
    return macaddr, ipaddr

ここでは、scapy フレームワークの sniff 関数を使用します。その助けを借りて、所定の udp パケットを受信し、70 秒待って、何もキャッチできない場合は終了します。

count=1, timeout=70, filter="dst host 192.168.0.3 and port 5060"

次に、デバイスが実際に Yealink であることを確認し、必要な値 (ip および mac) を返します。

特別なリクエストを使用して、電話で現在の口座を調べます。 これを行うには、現在の設定が電話機からダウンロードされ、解析されます。

def get_phone_inform(ipaddr):
    fileconf = requests.get('http://admin:admin@'+ipaddr+'/servlet?phonecfg=get[&accounts=1]')
    conf = fileconf.text.split('|')
    current_account = conf[2]
    return current_account

このアカウントのパスワードを調べてください。 これを行うには、asterisk.sip テーブルとその中のデータ フィールドを参照します。

def check_account(current_account):
    connect = mysql.connector.connect(host='192.168.0.3', database='asterisk', user='voip_wr', password='***')
    cursor = connect.cursor()
    qwery = 'select data from sip where id=' + current_account + ' and keyword="secret";'
    cursor.execute(qwery)
    password = cursor.fetchall()
    if password == ' ':
        exit()
    else:
        return password

さて、最終段階では、関数を通じて取得した sAMAccountName を使用して ldap AD に接続します。 getpass.getuser() 現在のユーザーの cn を取得します (通常、これにはユーザーのフルネームが含まれます)。

def conn_ldap(login):
    ad = ldap.initialize('ldap://***.local')
    ad.simple_bind_s('voip@***.local', 'password')
    basedn = 'OU=***,DC=***,DC=LOCAL'
    basedn_user = 'OU=***,OU=***,DC=***,DC=LOCAL'
    scope = ldap.SCOPE_SUBTREE
    filterexp = "(&(sAMAccountName=" + login + ")(ObjectClass=person))"
    filterexp2 = "(&(ObjectClass=organizationUnit))"
    attrlist = ['cn']
    attrlist2 = ['OU']
    search = ad.search_s(basedn, scope, filterexp, attrlist)
    adname = search[0][1]['cn'][0].decode('utf-8')
    if adname == ' ':
        search = ad.search_s(basedn_user, scope, filterexp2, attrlist2)
        for i in range(1, len(search)+1):
            group = search[i][1]['ou'][0]
            basedn_user2 = 'OU='+group+','+basedn_user
            search = ad.search_s(basedn_user2, scope, filterexp, attrlist)
            adname = search[0][1]['cn'][0].decode('utf-8')
            if adname != ' ':
                return adname
        adname = search[0][1]['cn'][0].decode('utf-8')
    ad.unbind_s()
    return adname

データベース内に事前に作成したテーブル (私はそこでテーブルを作成しました) に接続し、学んだことすべて、つまり ip、ma​​c、ユーザー名を入力します。

def conn_mysql(query,fquery,macaddr,qwery2):
    connect = mysql.connector.connect(host='192.168.0.3', database='voip', user='voip_wr', password='***')
    cursor = connect.cursor()
    cursor.execute(fquery)
    state = cursor.fetchall()
    state = bool(state[0][0])
    if state == True:
        cursor.execute(qwery2)
        connect.commit()
        connect.close()
    else:
        cursor.execute(query)
        connect.commit()
        connect.close()

すでに動的アドレス帳を作成しているので、ここでやめてもいいのではないかと思われるかもしれませんが、ここではさらに進んでデバイスの自動プロビジョニングを追加しました。

これを行うには、事前設定された tftp サーバーからテンプレート設定をダウンロードし、変更を加えて mac.cfg として保存します。 つまり、Yealink には XNUMX 種類の設定があり、XNUMX つはグローバル、もう XNUMX つは特定の電話に適用され、mac_phone.cfg の形式である必要があります。

ファイルにすべての変更を加えて tftp サーバに保存し直した後、電話機にコマンドを与えて、デバイスをプロビジョニングして再起動します。

def tftp_file_change(config,place,adname,current_account,current_account_password):

    client = tftpy.TftpClient("192.168.0.3", 69)
    client.download('template.cfg', place)
    fileread = open(place, 'r')
    line = fileread.readlines()
    fileread.close()
    line[5] = (('account.1.label = ').encode('utf-8') + adname.encode('utf-8') + 'n')
    line[2] = (('account.1.auth_name = ').encode('utf-8') + current_account.encode('utf-8') + 'n')
    line[3] = (('account.1.display_name = ').encode('utf-8') + current_account.encode('utf-8') + 'n')
    line[6] = (('account.1.password = ').encode('utf-8') + current_account_password[0][0] + 'n')
    filewrite = open(place, 'w')
    for i in line:
      filewrite.write(i)
    filewrite.close()
    print place
    print config
    client.upload(config,place)

requests.get('http://admin:admin@'+ipaddr+'/cgi-bin/ConfigManApp.com?key=AutoP')
requests.get('http://admin:admin@'+ipaddr+'/cgi-bin/ConfigManApp.com?key=Reboot')

デバイスを再起動すると、電話画面にフルネームが表示され、データベース形式で常に正しく入力されたアドレス帳が表示されます。その後、コンテンツを動的に表示するために XML と少しの PHP を追加するだけです。 このような例はたくさんあり、YEALINK 自体にもあります。

PS: 拡張性を高めるために、メイン設定 (変数) を別のファイルに移動できます。

出所: habr.com

コメントを追加します