Sensorflow: Benutzerdefiniertes Laden von Daten + asynchrone Berechnung [geschlossen]

Dies ist eine Anleitung, die meines Erachtens von TF-Beispielen übersehen wird.

Aufgabe

samples für jede Klasse werden in einem separaten Verzeichnis angegeben, und daher sind Labels indirekt (d. h. durch Verzeichnis) entkoppelte Last und Berechnungen in TF

Jedes einzelne Bit könnte gefunden werden, aber ich denke, dass es für TF-Anfänger (wie mich selbst) viel Zeit spart, wenn Sie alle an einem Ort haben.

Lässt angehen 1. in meinem Fall sind es zwei Sätze von Bildern:

# all filenames for .jpg in dir 
#  - list of fnames
#  - list of labels 
def path_fnames(f_path, label, ext = ['.jpg', '.jpeg']):
    f_n = [f_path+'/'+f for f in sorted(os.listdir(f_path)) if os.path.splitext(f)[1].lower() in ext]
    f_l = [label] * len(f_n)
    return f_n, f_l
#     
def dense_to_one_hot(labels_dense, num_classes=10, dtype=np.float32):
    """Convert class labels from scalars to one-hot vectors."""
    num_labels     = labels_dense.shape[0]
    index_offset   = np.arange(num_labels) * num_classes
    labels_one_hot = np.zeros((num_labels, num_classes),dtype=dtype)
    labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
    return labels_one_hot

data_dir = '/mnt/dataset/'
dir_1   = '/class_1'
dir_2   = '/class_2'

# --- get filenames for data ---
dpath = [data_dir+dir_1, data_dir+dir_2]

f_n1, f_l1 = path_fnames(dpath[0], 0)
f_n2, f_l2 = path_fnames(dpath[1], 1)

# --- create one-hot labels ---
ohl    = dense_to_one_hot(np.asarray(f_l1+f_l2), num_classes=2, dtype = np.float32)
fnames = f_n1+f_n2;               # one-hot labels created in this sequence

Jetzt haben wir alle Dateinamen und One-Hot-Labels vorinstalliert.

ehen wir zum 2.

Es basiert aufWie man Daten mit einer benutzerdefinierten Python-Funktion in tensorflow @ vorabru. Kurz gesagt hat es:

custom Image-Reader (durch Ihren ersetzen)Warteschlange fnl_q mit [Dateinamenbezeichnung], die vom Leser verwendet wird, um @ einzugebWarteschlange proc_q mit [Musteretikett], das für die Feed-Verarbeitung verwendet wird some_opthread, der read_op ausführt, um [sample label] zu erhalten, und enqueue_op, um pair in @ zu setz proc_q. Thread wird von tf.Coordinator @ gesteue some_op die zuerst Daten von @ erhalt proc_q von dequeue_many () und Rest der Berechnung (kann auch in einem separaten Thread abgelegt werden).

Anmerkungen

feature_read_op und label_read_op sind zwei separate Operationen.Ich benutze sleep (), um den Betrieb zu verlangsamen und zu kontrollieren - nur zu TestzweckenIch habe "Fütterung" und "Berechnung" Teile getrennt - im realen Fall führen Sie sie nur parallel
print 'TF version:', tf.__version__
# --- params ----
im_s       = [30, 30, 1]   # target image size
BATCH_SIZE = 16

# image reader 
# - fnl_queue: queue with [fn l] pairs 
# Notes 
# - to resize:  image_tensor = tf.image.resize_image_with_crop_or_pad(image_tensor, HEIGHT, WIDTH)
# - how about image preprocessing?
def img_reader_jpg(fnl_queue, ch = 3, keep = False):
    fn, label = fnl_queue.dequeue()

    if keep:
        fnl_queue.enqueue([fn, label])

    img_bytes = tf.read_file(fn)
    img_u8    = tf.image.decode_jpeg(img_bytes, channels=ch) 
    img_f32   = tf.cast(img_u8, tf.float32)/256.0  
    #img_4     = tf.expand_dims(img_f32,0)
    return img_f32, label

#  load [feature, label] and enqueue to processing queue
# - sess:             tf session 
# - sess:             tf Coordinator
# - [fr_op, lr_op ]:  feature_read_op label_read_op
# - enqueue_op:       [f l] pairs enqueue op
def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    i = 0
    while not coord.should_stop():
        # for testing purpose
        time.sleep(0.1)                     
        #print 'load_and_enqueue i=',i
        #i = i +1

        feature, label = sess.run([feature_read_op, label_read_op ])

        feed_dict = {feature_input: feature,
                     label_input  : label}

        sess.run(enqueue_op, feed_dict=feed_dict)


# --- TF part ---

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

#fnl_q    = tf.FIFOQueue(len(fnames), [tf.string, tf.float32])
fnl_q    = tf.RandomShuffleQueue(len(fnames), 0, [tf.string, tf.float32])
do_enq = fnl_q.enqueue_many([fv, lv])

# reading_op: feature_read_op label_read_op 
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = im_s[2])

# samples queue
f_s = im_s
l_s = 2
feature_input = tf.placeholder(tf.float32, shape=f_s, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=l_s, name='label_input')

#proc_q     = tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s])
proc_q     = tf.FIFOQueue(len(fnames), [tf.float32, tf.float32], shapes=[f_s, l_s])
enqueue_op = proc_q.enqueue([feature_input, label_input])

# test: 
# - some op
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)
some_op   = [img_batch, lab_batch]

# service ops
init_op   = tf.initialize_all_variables()

# let run stuff
with tf.Session() as sess:

    sess.run(init_op)
    sess.run(do_enq)

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    # --- test thread stuff ---
    #  - fill proc_q
    coord = tf.train.Coordinator()
    t = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op))
    t.start()

    time.sleep(2.1)

    coord.request_stop()
    coord.join([t])

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval()

    #  - process a bit 
    ss = sess.run(some_op)
    print 'ss[0].shape', ss[0].shape 
    print ' ss[1]:\n', ss[1]

    print "fnl_q.size:", fnl_q.size().eval()
    print "proc_q.size:", proc_q.size().eval() 

print 'ok'

Typische Ausgabe:

TF version: 0.6.0

fnl_q.size: 1225
proc_q.size: 0

fnl_q.size: 1204
proc_q.size: 21

ss[0].shape (16, 30, 30, 1)
 ss[1]:
[[ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]
 [ 0.  1.]
 [ 1.  0.]
 [ 1.  0.]
 [ 0.  1.]
 [ 1.  0.]
 [ 0.  1.]]

fnl_q.size: 1204
proc_q.size: 5

ok  

All as expected

Stapel von Paaren [Musteretikett] werden erstellt Paare werden gemischt

Wenden Sie TF nur so an, wie es durch Ersetzen von @ verwendet werden sol some_op:)

Und eine Frage: ein beobachtetes Problem Problem - falls ich @ benuttf.FIFOQueue für Dateinamen undtf.RandomShuffleQueue für Samples - Mischen findet nicht statt. Anders herum (wie oben beschrieben) mischt es sich jedoch perfekt.

Jedes Problem mit dem Mischen für
tf.RandomShuffleQueue(len(fnames), 0, [tf.float32, tf.float32], shapes=[f_s, l_s]) ?

ADD: Die Version mit zwei Threads:

one für das Auffüllen / Aktualisieren / Ändern der Dateinamen-Warteschlangesecond zum Füllen der Proben in die Verarbeitungswarteschlange.

Auch korrekte Methode zum Stoppen von Threads hinzugefügt.

def load_and_enqueue(sess, coord, feature_read_op, label_read_op , enqueue_op):
    try:
        while not coord.should_stop():
            feature, label = sess.run([feature_read_op, label_read_op ])
            feed_dict = {feature_input: feature,
                         label_input  : label}
            sess.run(enqueue_op, feed_dict=feed_dict)
    except Exception as e:
        return


# periodically check the state of fnl queue and if needed refill it
#  - enqueue_op: 'refill' file-name_label queue 
def enqueue_fnl(sess, coord, fnl_q, enqueue_op):
    try:
        while not coord.should_stop():
            time.sleep(0.5)
            s = sess.run(fnl_q.size())
            if  s < (9*BATCH_SIZE) :
                sess.run(enqueue_op)
    except Exception as e:
        return


#  -- ops for feed part --

# filenames and labels are pre-loaded
fv = tf.constant(fnames)
lv = tf.constant(ohl)

# read op
fnl_q      = tf.RandomShuffleQueue(len(fnames)*2, 0, [tf.string, tf.float32], name = 'fnl_q')  # add some margin for re-fill to fit
do_fnl_enq = fnl_q.enqueue_many([fv, lv])
feature_read_op, label_read_op = img_reader_jpg(fnl_q, ch = IMG_SIZE[2])

# samples queue
feature_input = tf.placeholder(tf.float32, shape=IMG_SIZE, name='feature_input')
label_input   = tf.placeholder(tf.float32, shape=LAB_SIZE, name='label_input')
proc_q        = tf.FIFOQueue(len(fnames)*3, [tf.float32, tf.float32], shapes=[IMG_SIZE, LAB_SIZE], name = 'fe_la_q') 
enqueue_op    = proc_q.enqueue([feature_input, label_input])

# -- ops for trainind end eval
img_batch, lab_batch = proc_q.dequeue_many(BATCH_SIZE)

... here is your model

loss       = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, lab_ph))
optimizer  = tf.train.AdamOptimizer(1e-4).minimize(loss)

with tf.Session() as sess:

    coord = tf.train.Coordinator()
    t_le  = threading.Thread(target=load_and_enqueue, args = (sess, coord, feature_read_op, label_read_op , enqueue_op) , name = 'load_and_enqueue')
    t_re  = threading.Thread(target=enqueue_fnl, args = (sess, coord, fnl_q, do_fnl_enq), name = 'enqueue_fnl')  # re-enq thread i.e. refiling filename queue 
    t_le.start()
    t_re.start()

    try:
    # training
    for step in xrange(823):
        # some proc
        img_v, lab_v = sess.run([img_batch, lab_batch])
        feed_dict = { img_ph   : img_v,
              lab_ph   : lab_v,
              keep_prob: 0.7}
        _, loss_v = sess.run([optimizer, loss], feed_dict = feed_dict)

    except Exception as e:
    print 'Training: Exception:', e


    # stop threads 
    coord.request_stop()                                     # ask to stop
    sess.run(fnl_q.close(cancel_pending_enqueues=True))      # tell proc_q don't wait for enque anymore
    sess.run(proc_q.close(cancel_pending_enqueues=True))     # tell proc_q don't wait for enque anymore
    coord.join([t_le, t_re], stop_grace_period_secs=8)       

Antworten auf die Frage(0)

Ihre Antwort auf die Frage