DTMF und bestimmte Tonfrequenz erkennen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
TommiB
User
Beiträge: 41
Registriert: Montag 9. Februar 2015, 18:15

Hallo,
ich versuche gerade von einer am Raspberry angeschlossenen Soundkarte DTMF Töne zu erkennen, sowie einen bestimmten Ton zu erkennen.

Ich habe folgendes heute durch googeln etc. gefunden, bzw. umgebaut:

Code: Alles auswählen


#!/usr/bin/env python
import pyaudio
from numpy import zeros,linspace,short,fromstring,hstack,transpose,log
from scipy import fft
from time import sleep

#Volume Sensitivity, 0.05: Extremely Sensitive, may give false alarms
#             0.1: Probably Ideal volume
#             1: Poorly sensitive, will only go off for relatively loud
SENSITIVITY= 1
# Alarm frequencies (Hz) to detect (Use audacity to record a wave and then do Analyze->Plot Spectrum)
TONE = 1750
#Bandwidth for detection (i.e., detect frequencies within this margin of error of the TONE)
BANDWIDTH = 10
# Show the most intense frequency detected (useful for configuration)
beeplength=6
frequencyoutput=False

#Set up audio sampler - 
NUM_SAMPLES = 2048
SAMPLING_RATE = 48000
pa = pyaudio.PyAudio()
_stream = pa.open(format=pyaudio.paInt16,
                  channels=1, rate=SAMPLING_RATE,
                  input=True,
                  frames_per_buffer=NUM_SAMPLES)

print("1750hz detector working. Press CTRL-C to quit.")

blipcount=0

while True:
    while _stream.get_read_available()< NUM_SAMPLES: sleep(0.01)
    audio_data  = fromstring(_stream.read(
         _stream.get_read_available()), dtype=short)[-NUM_SAMPLES:]
    # Each data point is a signed 16 bit number, so we can normalize by dividing 32*1024
    normalized_data = audio_data / 32768.0
    intensity = abs(fft(normalized_data))[:NUM_SAMPLES/2]
    frequencies = linspace(0.0, float(SAMPLING_RATE)/2, num=NUM_SAMPLES/2)
    if frequencyoutput:
        which = intensity[1:].argmax()+1
        # use quadratic interpolation around the max
        if which != len(intensity)-1:
            y0,y1,y2 = log(intensity[which-1:which+2:])
            x1 = (y2 - y0) * .5 / (2 * y1 - y2 - y0)
            # find the frequency and output it
            thefreq = (which+x1)*SAMPLING_RATE/NUM_SAMPLES
        else:
            thefreq = which*SAMPLING_RATE/NUM_SAMPLES
        print "freq=",thefreq
    if max(intensity[(frequencies < TONE+BANDWIDTH) & (frequencies > TONE-BANDWIDTH )]) > max(intensity[(frequencies < TONE-1000) & (frequencies > TONE-2000)]) + SENSITIVITY:
        if blipcount <= beeplength:
            blipcount+=1
        if blipcount == beeplength:
            print "1750hz"
    else:
        blipcount=0
   
    sleep(0.01)


Das funktioniert soweit... Sobald die 1750hz erkannt werden, wird der Text "1750hz" ausgegeben.

Nun bin ich am überlegen, das so zu erweitern, dass ich auch DTMF auswerten kann...
Ich weiß, dass DTMF aus zwei Tönen bestehen (1209.0,1336.0,1477.0,1633.0,697.0,770.0,852.0,941.0) und ich habe auch diesen Code gefunden:

Code: Alles auswählen


'''
A python implementation of the Goertzel algorithm to decode DTMF tones.
The wave file is split into bins and each bin is analyzed
for all the DTMF frequencies. The method run() will return a numeric
representation of the DTMF tone.
'''
import wave
import struct
import math
class pygoertzel_dtmf:
    def __init__(self, samplerate):
        self.samplerate = samplerate
        self.goertzel_freq = [1209.0,1336.0,1477.0,1633.0,697.0,770.0,852.0,941.0]
        self.s_prev = {}
        self.s_prev2 = {}
        self.totalpower = {}
        self.N = {}
        self.coeff = {}
        # create goertzel parameters for each frequency so that
        # all the frequencies are analyzed in parallel
        for k in self.goertzel_freq:
            self.s_prev[k] = 0.0
            self.s_prev2[k] = 0.0
            self.totalpower[k] = 0.0
            self.N[k] = 0.0
            normalizedfreq = k / self.samplerate
            self.coeff[k] = 2.0*math.cos(2.0 * math.pi * normalizedfreq)
    def __get_number(self, freqs):
        hi = [1209.0,1336.0,1477.0,1633.0]
        lo = [697.0,770.0,852.0,941.0]
        # get hi freq
        hifreq = 0.0
        hifreq_v = 0.0
        for f in hi:
            if freqs[f]>hifreq_v:
                hifreq_v = freqs[f]
                hifreq = f
        # get lo freq
        lofreq = 0.0
        lofreq_v = 0.0
        for f in lo:
            if freqs[f]>lofreq_v:
                lofreq_v = freqs[f]
                lofreq = f
        if lofreq==697.0:
            if hifreq==1209.0:
                return "1"
            elif hifreq==1336.0:
                return "2"
            elif hifreq==1477.0:
                return "3"
            elif hifreq==1633.0:
                return "A"
        elif lofreq==770.0:
            if hifreq==1209.0:
                return "4"
            elif hifreq==1336.0:
                return "5"
            elif hifreq==1477.0:
                return "6"
            elif hifreq==1633.0:
                return "B"
        elif lofreq==852.0:
            if hifreq==1209.0:
                return "7"
            elif hifreq==1336.0:
                return "8"
            elif hifreq==1477.0:
                return "9"
            elif hifreq==1633.0:
                return "C"
        elif lofreq==941.0:
            if hifreq==1209.0:
                return "*"
            elif hifreq==1336.0:
                return "0"
            elif hifreq==1477.0:
                return "#"
            elif hifreq==1633.0:
                return "D"
    def run(self, sample):
        freqs = {}
        for freq in self.goertzel_freq:
            s = sample + (self.coeff[freq] * self.s_prev[freq]) - self.s_prev2[freq]
            self.s_prev2[freq] = self.s_prev[freq]
            self.s_prev[freq] = s
            self.N[freq]+=1
            power = (self.s_prev2[freq]*self.s_prev2[freq]) + (self.s_prev[freq]*self.s_prev[freq]) - (self.coeff[freq]*self.s_prev[freq]*self.s_prev2[freq])
            self.totalpower[freq]+=sample*sample
            if (self.totalpower[freq] == 0):
                self.totalpower[freq] = 1
            freqs[freq] = power / self.totalpower[freq] / self.N[freq]
        return self.__get_number(freqs)
if __name__ == '__main__':
    # load wav file
    wav = wave.open('/home/michael/Downloads/dtmf.wav', 'r')
    (nchannels, sampwidth, framerate, nframes, comptype, compname) = wav.getparams()
    frames = wav.readframes(nframes * nchannels)
    # convert wave file to array of integers
    frames = struct.unpack_from("%dH" % nframes * nchannels, frames)
    # if stereo get left/right
    if nchannels == 2:
        left = [frames[i] for i in range(0,len(frames),2)]
        right = [frames[i] for i in range(1,len(frames),2)]
    else:
        left = frames
        right = left
    binsize = 400
    # Split the bin in 4 to average out errors due to noise
    binsize_split = 4
    prevvalue = ""
    prevcounter = 0
    for i in range(0,len(left)-binsize,binsize/binsize_split):
        goertzel = pygoertzel_dtmf(framerate)
        for j in left[i:i+binsize]:
            value = goertzel.run(j)
        if value==prevvalue:
            prevcounter+=1
            if prevcounter==10:
                print value
        else:
            prevcounter=0
            prevvalue=value
Beim letzten Code wird eine Wave-Datei nach DTMF durchsucht...

Wie bekomme ich das so umgebaut, dass es DTMF von der Soundkarte erkennt?

Gruß Tommi
TommiB
User
Beiträge: 41
Registriert: Montag 9. Februar 2015, 18:15

Kann mir keiner Helfen?
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

@TommiB: hast Du versucht, die Codes zu verstehen? Wie werden die Töne im ersten Fall gespeichert, wie im zweiten?

Das Programm sieht übrigens ziemlich schlimm aus.

Code: Alles auswählen

'''
A python implementation of the Goertzel algorithm to decode DTMF tones.
The wave file is split into bins and each bin is analyzed
for all the DTMF frequencies.
'''
import wave
import struct
import math

class GoertzelDTFM:
    CODES = {
        (697.0, 1209.0): "1",
        (697.0, 1336.0): "2",
        (697.0, 1477.0): "3",
        (697.0, 1633.0): "A",
        (770.0, 1209.0): "4",
        (770.0, 1336.0): "5",
        (770.0, 1477.0): "6",
        (770.0, 1633.0): "B",
        (852.0, 1209.0): "7",
        (852.0, 1336.0): "8",
        (852.0, 1477.0): "9",
        (852.0, 1633.0): "C",
        (941.0, 1209.0): "*",
        (941.0, 1336.0): "0",
        (941.0, 1477.0): "#",
        (941.0, 1633.0): "D",
    }
    LOW_FREQ = set(a for a,b in CODES)
    HIGH_FREQ = set(b for a,b in CODES)
    ALL_FREQ = LOW_FREQ | HIGH_FREQ

    def __init__(self, samplerate):
        # create goertzel parameters for each frequency so that
        # all the frequencies are analyzed in parallel
        self.prev = dict.fromkeys(self.ALL_FREQ, 0)
        self.prev2 = dict.fromkeys(self.ALL_FREQ, 0)
        self.coeff = {
            k: 2.0 * math.cos(2.0 * math.pi * k / samplerate)
            for k in self.ALL_FREQ
        }

    def run(self, samples):
        for sample in samples:
            for freq in self.ALL_FREQ:
                prev, prev2 = self.prev[freq], self.prev2[freq]
                self.prev2[freq] = prev
                self.prev[freq] = sample + (self.coeff[freq] * prev) - prev2
    
    def get_number(self):
        power = {}
        for freq in self.ALL_FREQ:
            prev, prev2 = self.prev[freq], self.prev2[freq]
            power[freq] = prev2**2 + prev**2 - self.coeff[freq]*prev*prev2
        high_freq = max(self.HIGH_FREQ, key=power.get)
        low_freq = max(self.LOW_FREQ, key=power.get)
        return self.CODES[low_freq, high_freq]

def read_wav(filename):
    wav = wave.open(filename, 'r')
    (nchannels, sampwidth, framerate, nframes, comptype, compname) = wav.getparams()
    frames = wav.readframes(nframes * nchannels)
    # convert wave file to array of integers
    frames = struct.unpack_from("%dH" % nframes * nchannels, frames)
    # if stereo get left/right
    if nchannels == 2:
        left, right = frames[0::2], frame[1::2]
    else:
        left = right = frames
    return left, right, framerate

def main():
    left, right, framerate = read_wav('/home/michael/Downloads/dtmf.wav')
    binsize = 400
    # Split the bin in 4 to average out errors due to noise
    binsize_split = 4
    prevvalue = None
    prevcounter = 0
    for i in range(0,len(left)-binsize,binsize/binsize_split):
        goertzel = GoertzelDTFM(framerate)
        goertzel.run(left[i:i+binsize])
        value = goertzel.get_number()
        if value == prevvalue:
            prevcounter += 1
            if prevcounter == 10:
                print value
        else:
            prevcounter = 0
            prevvalue = value

if __name__ == '__main__':
    main()
TommiB
User
Beiträge: 41
Registriert: Montag 9. Februar 2015, 18:15

Hallo Sirius3,

wo wird da was gespeichert?
Wie die Töne verarbeitet wird? Nee, das verstehe ich nicht...

Beim ersten Code wird eine Textausgabe von der Soundkarte getätigt, wenn ein 1750hz Ton erkannt wird.

Beim 2. Code werden auch die DTMF-Töne nach der Erkennung (hier muss es mindestens 10 x erkannt werden) ausgegeben.

Ich habe das übrigens jetzt probiert. Für ein 10 Sek. Wave-File braucht das Script 90 Sekunden.
Das ist definitiv für "on fly" nicht geeignet.

Ich habe mittlerweile die Befürchtung, dass das mit Python "on fly" nicht geht...
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Gehen tut das schon, weil man ja nur eine FFT mit Numpy machen muss, und dann nach den entsprechenden Frequenzen filtern. Und weil numpy das so effizient macht wie es irgend geht.

Aber man kann halt nicht zwei beliebige Stücke Code gegeneinander hauen & hoffen, das es geht.
Benutzeravatar
snafu
User
Beiträge: 6740
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

TommiB hat geschrieben:Ich habe mittlerweile die Befürchtung, dass das mit Python "on fly" nicht geht...
Mit der Standardbibliothek und dem "üblichen" CPython-Compiler klappt es wahrscheinlich nicht in akzeptabler Zeit. Es gibt jedoch alternative Python-Compiler sowie Bibliotheken, die genau für solche Fälle ausgelegt sind. Lies dich am besten Mal in das Thema Scientific Computing im Verbindung mit Python ein und befasse dich mit den entsprechenden Möglichkeiten, die sich da sicherlich auftun.
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

@TommiB: mit den entsprechenden Paketen ist das mit Python kein Problem. Ich habe mal die Klasse wegoptimiert, weil die wirklich nichts sinnvolles beiträgt:

Code: Alles auswählen

'''
A python implementation of the Goertzel algorithm to decode DTMF tones.
The wave file is split into bins and each bin is analyzed
for all the DTMF frequencies.
'''
import wave
import numpy
from numba import jit

CODES = {
    (697.0, 1209.0): "1",
    (697.0, 1336.0): "2",
    (697.0, 1477.0): "3",
    (697.0, 1633.0): "A",
    (770.0, 1209.0): "4",
    (770.0, 1336.0): "5",
    (770.0, 1477.0): "6",
    (770.0, 1633.0): "B",
    (852.0, 1209.0): "7",
    (852.0, 1336.0): "8",
    (852.0, 1477.0): "9",
    (852.0, 1633.0): "C",
    (941.0, 1209.0): "*",
    (941.0, 1336.0): "0",
    (941.0, 1477.0): "#",
    (941.0, 1633.0): "D",
}
LOW_FREQ = list(set(a for a,b in CODES))
HIGH_FREQ = list(set(b for a,b in CODES))
ALL_FREQ = numpy.array(LOW_FREQ + HIGH_FREQ)

@jit(nopython=True)
def calc_power(samples, samplerate):
    coeff = 2.0 * numpy.cos(2.0 * numpy.pi * ALL_FREQ / samplerate)
    prev = numpy.zeros_like(coeff)
    prev2 = numpy.zeros_like(coeff)
    for sample in samples:
        prev, prev2 = sample + prev*coeff - prev2, prev
    return prev2**2 + prev**2 - coeff*prev*prev2

def goertzel(samples, samplerate):
    power = calc_power(samples, samplerate)
    high_freq = HIGH_FREQ[power[len(LOW_FREQ):].argmax()]
    low_freq = LOW_FREQ[power[:len(LOW_FREQ)].argmax()]
    return CODES[low_freq, high_freq]
 
def read_wav(filename):
    wav = wave.open(filename, 'r')
    (nchannels, sampwidth, framerate, nframes, comptype, compname) = wav.getparams()
    frames = wav.readframes(nframes * nchannels)
    # convert wave file to array of integers
    frames = numpy.fromstring(frames, dtype=numpy.uint16)
    # if stereo get left/right
    if nchannels == 2:
        left, right = frames[0::2], frame[1::2]
    else:
        left = right = frames
    return left, right, framerate
 
def main():
    left, right, framerate = read_wav('/home/michael/Downloads/dtmf.wav')
    binsize = 400
    # Split the bin in 4 to average out errors due to noise
    binsize_split = 4
    prevvalue = None
    prevcounter = 0
    for i in range(0,len(left)-binsize,binsize/binsize_split):
        value = goertzel(left[i:i+binsize], framerate)
        if value == prevvalue:
            prevcounter += 1
            if prevcounter == 10:
                print value
        else:
            prevcounter = 0
            prevvalue = value
 
if __name__ == '__main__':
    main()
TommiB
User
Beiträge: 41
Registriert: Montag 9. Februar 2015, 18:15

Hallo Sirius3,

danke für die Mühe, die Klasse wegzuoptimieren.

Ich habe mal versucht, Deinen Code zu starten...

Dort wird ja numba importiert...
Habe jetzt mehrere Stunden versucht das hier zu installieren...

git clone https://github.com/numba/numba.git
cd numba
python setup.py install

Beim python Setup.py install läuft die Installroutine durch, aber macht zwischen drin einige Fehler...
Beim Ausführen Deines Scriptes gibt es dann auch sofort einen Abbruch "From numba Import jit"

Habe jetzt mehrere Seiten gegoogelt...

:-) Nächstes Jahr ist auch noch Zeit.... Dann geht es damit weiter..

Gruß Tommi
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

@TommiB: ach, das soll auf einem Raspberry laufen, da schein es nicht so einfach zu sein, numba zum Laufen zu bringen. Wenn ich mal die Tage Zeit habe, teste ich das mal.
Antworten