Versuche, eine konstante Bytegeschwindigkeit zu simulieren. Verwechslung mit time.sleep ergibt
Ich verwende Windows 7 auf meinem Computer (dem Player) und Linux (Debian) auf meinem College-Computer (dem Streamer), den ich mit ssh steuere.
Ich habe versucht, eine konstante Bytegeschwindigkeit eines Mikrofons durch Lesen einer Wave-Datei zu simulieren, als würde jemand sprechen. Das Problem war, dass die Bytegeschwindigkeit unter dem Ziel lag.
Wählen Sie eine Geschwindigkeit von 32 KB / s und eine Aufnahmedauer von 0,020 Sekunden.
Ich habe das simulierte Mikrofon mit time.sleep implementiert, um jeden Datenblock alle 0,020 Sekunden zu erhalten. Aber die Rate war um 27KB / s, nicht 32KB / s
Ich habe mich entschlossen zu testen, wie genau time.sleep auf dem Linux-Rechner ist, indem ich @ gelesen habdiese Frag.
Ich habe 2 Arten von Tests gemacht. 1) beschäftigt schlafen 2) normal schlafen
Im Durchschnitt ergibt sich aus den Beispielen, die ich erhalten habe, dass die Schlafauflösung des Linux-Computers 4 ms beträgt. Unter Windows ist es kleiner / gleich 1ms.
FrageWas kann möglicherweise die Auflösung des Ruhezustands auf dem Linux-Computer einschränken? (Unter Linux) Warum hat der beschäftigte Schlaf dieselbe Auflösung wie time.sleep?Wie kann ich erfolgreich simulieren, dass ein Mikrofon eine Wave-Datei liest?Codimport time
def busy_sleep(t):
s=time.time()
while time.time() - s < t:
pass
e=time.time()
return e-s
def normal_sleep(t):
s=time.time()
time.sleep(t)
e=time.time()
return e-s
def test(fun):
f = lambda x: sum(fun(x) for d in range(10))/10
print("0.100:{}".format(f(0.100)))
print("0.050:{}".format(f(0.050)))
print("0.025:{}".format(f(0.025)))
print("0.010:{}".format(f(0.010)))
print("0.009:{}".format(f(0.010)))
print("0.008:{}".format(f(0.008)))
print("0.007:{}".format(f(0.007)))
print("0.006:{}".format(f(0.006)))
print("0.005:{}".format(f(0.005)))
print("0.004:{}".format(f(0.004)))
print("0.003:{}".format(f(0.003)))
print("0.002:{}".format(f(0.002)))
print("0.001:{}".format(f(0.001)))
if __name__=="__main__":
print("Testing busy_sleep:")
test(busy_sleep)
print("Testing normal_sleep:")
test(normal_sleep)
Ergebniss"""
Debian
Testing busy_sleep:
0.100:0.10223722934722901
0.050:0.051996989250183104
0.025:0.027996940612792967
0.020:0.02207831859588623
0.010:0.011997451782226562
0.009:0.011997222900390625
0.008:0.009998440742492676
0.007:0.007997279167175292
0.006:0.0079974365234375
0.005:0.007997465133666993
0.004:0.005918483734130859
0.003:0.003997836112976074
0.002:0.0039977550506591795
0.001:0.003997611999511719
Testing normal_sleep:
0.100:0.1020797061920166
0.050:0.051999988555908205
0.025:0.028000001907348634
0.020:0.02192000865936279
0.010:0.011999979019165039
0.009:0.012000055313110351
0.008:0.010639991760253906
0.007:0.008000001907348633
0.006:0.00799997329711914
0.005:0.008000059127807617
0.004:0.006159958839416504
0.003:0.004000000953674317
0.002:0.00399998664855957
0.001:0.004000091552734375
$ uname -a
Linux 3.2.0-4-amd64 #1 SMP Debian 3.2.57-3+deb7u2 x86_64 GNU/Linux
"""
"""
Windows 7
Testing busy_sleep:
0.100:0.10000572204589844
0.050:0.05000288486480713
0.025:0.0250014066696167
0.010:0.010500597953796388
0.009:0.010500597953796388
0.008:0.008000493049621582
0.007:0.00740041732788086
0.006:0.006400299072265625
0.005:0.005400300025939942
0.004:0.004700303077697754
0.003:0.003200197219848633
0.002:0.002700185775756836
0.001:0.0016000032424926759
Testing normal_sleep:
0.100:0.10000579357147217
0.050:0.0500028133392334
0.025:0.02500150203704834
0.010:0.01000049114227295
0.009:0.0100006103515625
0.008:0.008000493049621582
0.007:0.007000398635864257
0.006:0.006000304222106934
0.005:0.00500030517578125
0.004:0.0040001869201660155
0.003:0.0030002117156982424
0.002:0.0020000934600830078
0.001:0.0010000944137573243
"""
Real codeimport os import wave import sys import io import time
FORMAT = 8 #get_format_from_width(2)
NCHANNELS = 1
FRAMERATE = 16000 # samples per second
SAMPWIDTH = 2 # bytes in a sample
BYTE_RATE = FRAMERATE*SAMPWIDTH
CHUNK_DURATION = 0.020
CHUNK_BYTES = int(CHUNK_DURATION*BYTE_RATE)
class StreamSimulator:
def __init__(self):
wf = wave.open("Kalimba.wav","rb")
buf = io.BytesIO()
buf.write(wf.readframes(wf.getnframes()))
wf.close()
buf.seek(0)
self.buf = buf
self.step = time.time()
def delay(self):
#delay
delta = time.time() - self.step
self.step=time.time()
delay = CHUNK_DURATION - delta
if delay > 0.001:
time.sleep(delay)
def read(self):
buf = self.buf
data = buf.read(CHUNK_BYTES)
if len(data) == 0:
buf.seek(0)
data = buf.read(CHUNK_BYTES)
self.delay()
return data
def close(self):
self.buf.close()
class DynamicPainter:
def __init__(self):
self.l=0
def paint(self,obj):
str1=str(obj)
l1=len(str1)
bs="\b"*self.l
clean=" "*self.l
total = bs+clean+bs+str1
sys.stdout.write(total)
sys.stdout.flush()
self.l=l1
if __name__=="__main__":
painter = DynamicPainter()
stream = StreamSimulator()
produced = 0
how_many = 0
painted = time.time()
while True:
while time.time()-painted < 1:
d = stream.read()
produced += len(d)
how_many += 1
producing_speed = int(produced/(time.time()-painted))
painter.paint("Producing speed: {} how many: {}".format(producing_speed,how_many))
produced=0
how_many=0
painted = time.time()
BearbeiteChanged "Real Code", Zeitmaß einschließlich Schlafenszeit hinzugefügt.
Aber jetzt habe ich die DOUBLE Byte Rate:Producing speed: 63996 how many: 100
Das hat mich so sehr verwirrt. Ich habe es mit verschiedenen Byteraten versucht und es wird jedes Mal das Doppelte.
Danke an @ J.F.Sebastianund sein Code, Ich habe gelernt, dass:
Es ist besser, eine Frist als Zeitreferenz zu verwenden, als jede Schleife auf eine neue Referenz zu erstellenMit einem Stichtag wird die Ungenauigkeit der Zeit "amortisiert". Dabei wird die gewünschte Bitrate ein wenig verschoben, was jedoch zu einem korrekten (und wesentlich konstanteren) Durchschnitt führt.Sie müssen time.time () nur einmal verwenden, was weniger Ungenauigkeiten bei der Berechnung bedeutet.ls Ergebnis erhalte ich manchmal konstante 32000 B / s, die auf 31999 und sehr selten auf 31745 oszilliere
Jetzt kann ich die Musik ohne Verzögerung oder Jitter hören!
def read(self):
buf = self.buf
data = buf.read(CHUNK_BYTES)
if len(data) == 0:
buf.seek(0)
data = buf.read(CHUNK_BYTES)
self.deadline += CHUNK_DURATION
delay = self.deadline - time.time()
if delay > 0:
time.sleep(delay)
return data