Multithreading mit Threadpool - Warte auf letzten lesenden Thread

  • VB.NET

Es gibt 51 Antworten in diesem Thema. Der letzte Beitrag () ist von StGo.

    Multithreading mit Threadpool - Warte auf letzten lesenden Thread

    Hallo zusammen,

    ich habe eine Frage. Ich habe mir einen Code geschrieben mit dem ich parallel Excel Dateien auslesen kann. Diese Werte schreibe ich in ein Array und will dieses später in eine Excel Datei schreiben. Ich habe leider keine Idee wie ich am Ende des Threadpools auf den letzten noch lesenden Thread warte.

    Hier mein Code

    Quellcode

    1. Imports System.Threading
    2. Imports Microsoft.Office.Interop.Excel
    3. Imports Microsoft.VisualBasic.FileIO
    4. Imports Microsoft.VisualBasic.FileIO.FileSystem
    5. Module Module1
    6. '##################################################################################################################################################
    7. 'Alle Anpassungen der Variablen hier:
    8. Dim Dateipfad As String = "\\Eigene Dokumente\xls Test Dateien"
    9. Dim DateipfadAusgabe As String = "\\Eigene Dokumente\Test_File.xls"
    10. Dim DokumentenfeldAuslesen As String = "B1"
    11. Dim WorksheetNr As Integer = 1
    12. Dim Stackgroesse As Integer = 100
    13. '##################################################################################################################################################
    14. 'Deklaration Excel Mappe
    15. Dim oExcel As Microsoft.Office.Interop.Excel.Application
    16. Dim oBook As Microsoft.Office.Interop.Excel.Workbook
    17. Dim oSheet As Microsoft.Office.Interop.Excel.Worksheet
    18. 'Initialisiert eine neue Instanz der Semaphore-Klasse, gibt die maximale Anzahl gleichzeitiger Einträge an und reserviert optional einige Einträge.
    19. 'Semaphore(initialCount, maximumCount)
    20. 'initialCount: Die anfängliche Anzahl von Anforderungen für das Semaphor, die gleichzeitig gewährt werden können (Int32).
    21. 'maximumCount: Die maximale Anzahl von Anforderungen für das Semaphor, die gleichzeitig gewährt werden können (Int32).
    22. Dim sem As New Semaphore(1, 1)
    23. 'Stack erstellen
    24. Dim meinStack As New Stack(Stackgroesse)
    25. 'Variablen Deklaration incl. Dateipfad
    26. Dim Dokumentennummer As Integer
    27. Dim DateiCounter As System.Collections.ObjectModel.ReadOnlyCollection(Of String) = My.Computer.FileSystem.GetFiles(Dateipfad)
    28. Dim meinArray(DateiCounter.Count - 1, 1)
    29. Dim AnzahlThreads As Integer = 0
    30. 'Hautroutine
    31. 'Verzeichnis wird durchsucht und Dateipfade werden aufden Stack geschrieben
    32. Sub Main()
    33. For Each File As String In System.IO.Directory.GetFiles(Dateipfad)
    34. meinStack.Push(File)
    35. Next
    36. Console.WriteLine("Stack ist mit {0} Dokumentenpfaden gefüllt", meinStack.Count)
    37. 'Verbrauchroutine starten
    38. Verbraucher()
    39. End Sub
    40. 'In Verbraucher Sub Thread-Pool erzeugen
    41. Sub Verbraucher()
    42. While meinStack.Count <> 0
    43. ThreadPool.QueueUserWorkItem(AddressOf LeseDateiAus, AnzahlThreads)
    44. End While
    45. oExcel = New Microsoft.Office.Interop.Excel.Application
    46. oExcel.Visible = False
    47. oBook = oExcel.Workbooks.Add
    48. oSheet = oBook.Worksheets(1)
    49. oSheet.Range("A1").Value = "Dokumentennummer"
    50. oSheet.Range("B1").Value = "Pfad der Datei"
    51. oSheet.Range("A2").Resize(DateiCounter, 2).Value = meinArray
    52. oBook.SaveAs(DateipfadAusgabe)
    53. oBook.Close()
    54. oExcel.Quit()
    55. End Sub
    56. 'Threadsichere Sub
    57. Sub LeseDateiAus(ByVal state As Object)
    58. Console.WriteLine("Thread {0} tritt in das Semaphore ein", Thread.CurrentThread.ManagedThreadId)
    59. sem.WaitOne()
    60. Try
    61. oExcel = New Microsoft.Office.Interop.Excel.Application
    62. oExcel.Visible = False
    63. If meinStack.Count = 0 Then Exit Try
    64. oBook = oExcel.Workbooks.Open(meinStack.Pop, , True)
    65. oSheet = oBook.Worksheets(WorksheetNr)
    66. Dokumentennummer = oSheet.Range(DokumentenfeldAuslesen).Value
    67. oBook.Close()
    68. oExcel.Quit()
    69. Console.WriteLine(Dokumentennummer)
    70. meinArray(meinStack.Count, 0) = Dokumentennummer
    71. meinArray(meinStack.Count, 1) = meinStack.Pop
    72. Finally
    73. sem.Release()
    74. End Try
    75. Console.WriteLine("Thread {0} tritt aus dem Semaphore aus", Thread.CurrentThread.ManagedThreadId)
    76. End Sub
    77. End Module
    @StGo
    Willkommen im Forum. :thumbup:
    1. mach Dir eine EinForm-Anwendung, keine Console.
    2. mach Dir ein Flag-Array mit sovielen Elementen, wie Threads laufen.
    3. Lass jeden Event bei Beendigung ein Event senden.
      Im Eventhandler wird das kommunizierende Flag geändert.
    4. Überprüfe bei jedem Event, ob alle Flags geändert sind. Wenn ja, war es der letzte Thread.
    Falls Du diesen Code kopierst, achte auf die C&P-Bremse.
    Jede einzelne Zeile Deines Programms, die Du nicht explizit getestet hast, ist falsch :!:
    Ein guter .NET-Snippetkonverter (der ist verfügbar).
    Programmierfragen über PN / Konversation werden ignoriert!
    Danke für die schnelle Antwort. Aber ich kann im Threadpool doch nicht wissen wieviele Threads laufen. Das wird doch vom Scheduling bestimmt oder irre ich mich. Davon mal abgesehen ist eine Windows Forms Anwendung keine Option. Meine Vorgabe ist eine Konolenanwendung. Warum kann ich nicht beantworten.

    Gruß

    StGo schrieb:

    Aber ich kann im Threadpool doch nicht wissen wieviele Threads laufen.

    Brauchst du auch nicht. Du musst das Array(oder die Liste) nur mit so vielen Flags austatten, wie du Threads erstellst. Und beim Eventhandling eben einen weiteren Eintrag auf True setzen und prüfen, ob alle schon True sind.

    PS: Welches Framework nutzt du? Wenn du 4.5 nutzt kannst du(wenn erlaubt) auch mit Tasks arbeiten. Geht glaube ich auch schon mit 4.0, aber ohne Async/Await, sondern mit Task.WaitAll(...)(was sich wohl sowieso anbieten würde)...

    StGo schrieb:

    Aber ich kann im Threadpool doch nicht wissen
    Soeben hast Du gelernt, dass solch völlig unwichtige Information doch nicht ganz unwichtig ist und beim nächsten Problem unbedingt in den Eröffnungspost gehört. Die Lösung hat Dir @Higlav gegeben.
    Falls der erste Thread fertig ist, bevor der zweite anfängt, mach Dir noch ein Zusatzflag, das erst dann die beendeten Threads abgefragt werden, wenn der letzte Thread gestartet wurde.
    Sonst bist Du fertig, bevor Du angefangen hast.
    Falls Du diesen Code kopierst, achte auf die C&P-Bremse.
    Jede einzelne Zeile Deines Programms, die Du nicht explizit getestet hast, ist falsch :!:
    Ein guter .NET-Snippetkonverter (der ist verfügbar).
    Programmierfragen über PN / Konversation werden ignoriert!
    Ich frage mich gerade ob ich auf dem Schlauch stehe!?

    Ich weiss nicht wieviele Threads der Threadpool erzeugt ergo kann sie auch nicht in eine Liste packen und mit einem Flag versehen. ImSemaphore gebe ich nur an wieviele Startthreads es gibt und wie die Max Anzahl im Semaphore ist.

    Ich suche eien Art "Thrad Join" oder "WaitAll". Meine Hoffnung lag daran das sich jemand besser mit dem Thema auskennt als ich und eventuell eine erklärung parat hat. Was auch der Grund dafür ist das ich kein weiteres Wissen zum Threadpool im ersten Post habe.

    Gruß

    PS: .NET 4.0.30319
    Ich gebe zu, mit Semaphores oder Mutexes habe ich bis jetzt noch nicht gearbeitet(war auch nie nötig, genausowenig wie Threadsynchronisierung), aber bei .NET 4.0 funktioniert untenstehender Code:

    VB.NET-Quellcode

    1. Dim t1 = Task.Factory.StartNew(Function() "TestString")
    2. Dim t2 = Task.Factory.StartNew(Sub() Console.WriteLine("Hällöü"))
    3. Task.WaitAll(t1, t2)
    4. Console.WriteLine(t1.Result)

    Ich weiss nicht, wie, oder ob du das einbauen kannst, das wäre dein Part.

    StGo schrieb:

    ergo
    solltest Du fragen, wie solch geacht wird. X(
    In Deinem MainWindow / MainThread legst Du Dir eine Private MyThreadList As New List(Of IRGENDWAS) an. Struktue mit Thread-Kennung oder so.
    Irgendwo in Deinem Programm wird einer Deiner Threads aus dem MainWindow / MainThread heraus gestartet.
    Exakt da befüllst Du diese Liste mit einem speziell für diesen Thread generierten Item:

    VB.NET-Quellcode

    1. MyThreadList.Add(New (IRGENDWAS(paramX))
    und feddich.
    Falls Du diesen Code kopierst, achte auf die C&P-Bremse.
    Jede einzelne Zeile Deines Programms, die Du nicht explizit getestet hast, ist falsch :!:
    Ein guter .NET-Snippetkonverter (der ist verfügbar).
    Programmierfragen über PN / Konversation werden ignoriert!
    Es geht einfacher. Beispiel:

    VB.NET-Quellcode

    1. Dim threadCount As Int32
    2. Dim are As AutoResetEvent
    3. Dim lock As New Object
    4. Sub StartMyThreads()
    5. threadCount = 5
    6. are = New AutoResetEvent(False)
    7. For i = 1 To threadCount
    8. Console.WriteLine("Starting Thread {0}", i)
    9. ThreadPool.QueueUserWorkItem(AddressOf Work, i)
    10. Next
    11. While True
    12. SyncLock lock
    13. If threadCount = 0 Then Exit While
    14. End SyncLock
    15. Console.WriteLine("Waiting for {0} remaining threads", threadCount)
    16. are.WaitOne()
    17. End While
    18. Console.WriteLine("All threads ended")
    19. End Sub
    20. Sub Work(state As Object)
    21. 'Do work
    22. Thread.Sleep(2000)
    23. SyncLock lock
    24. threadCount -= 1
    25. Console.WriteLine("Release from Thread {0}", CInt(state))
    26. are.Set()
    27. End SyncLock
    28. End Sub

    Die Bedingung in Zeile 16 prüft die Anzahl der Threads, die gestartet wurden. Falls das an verschiedenen Stellen passiert, muss die Zählvariable VOR dem Aufruf an QueueUserWorkItem() in SyncLock erhöht werden. Diese Vorgehensweise ist aber nicht empfehlenswert - die Anzahl sollte im Voraus bekannt sein.

    Edit: In deinem Fall ist threadCount = meinStack.Count. Wenn dieser Wert jedoch nicht fest ist, hast du ein erstklassiges Producer-Consumer-Problem, vgl. en.wikipedia.org/wiki/Producer-consumer_problem
    Gruß
    hal2000

    Dieser Beitrag wurde bereits 2 mal editiert, zuletzt von „hal2000“ ()

    @hal2000null
    interessante Lösung. Was mich gerade etwas verunsichert ist das du das Producer Consumer Problem ansprichts. Eigentlich ist der Sinn meines Programms besteht darin das Dokumente (bzw. deren Pfade) auf dem Stack nicht doppelt ausgelesen werden. Demnach ist der Stack die geteilte Resscource die Gesperrt ist. Den Threadpool nutze ich um so viel Prozesse wie möglich und so wenig wie nötig zu beanspruchen.

    Meine Idee Stammt aus dem Open Book von der Galilio Press:
    openbook.galileocomputing.de/v…585f3701860b60859e305def1


    Ressourcenbegrenzung mit Semaphore
    Wenn eine Ressource nicht exklusiv gesperrt werden soll, sondern nur ihre
    Nutzung nicht unbeschränkt sein soll, kommt die Klasse Semaphore (Signalmast) ins Spiel. In einem internen Zähler
    hält ein solches Objekt die Anzahl Threads fest, die es beanspruchen.


    Ich dachte das man an diesen Zähler herankommt und ihn abfragen kann.

    @Higlav
    Ja soetwas meinte ich. Nur bekomme ich das mit dem Semaphore nicht hin.

    StGo schrieb:

    [...] Producer Consumer Problem ansprichts. Eigentlich ist der Sinn meines Programms besteht darin das Dokumente (bzw. deren Pfade) auf dem Stack nicht doppelt ausgelesen werden.
    Das eine hat mit dem anderen nichts zutun. Wenn der Producer eine Datei nur einmal zum Verbrauchen in die Queue einstellt, wird sie auch nur einmal bearbeitet. Das PC-Problem zielt auf die Synchronisierung ab, damit a) nicht aus einer leeren Queue gelesen wird (Producer zu langsam / Consumer zu schnell) und b) die Queue nicht überläuft (Producer zu schnell / Consumer zu langsam). Die Daten in der Queue sind unerheblich - in der Theorie werden die nur als "Schlüssel" bezeichnet (einfach eine ID auf ein beliebiges Objekt).

    StGo schrieb:

    Ich dachte das man an diesen Zähler herankommt und ihn abfragen kann.
    Kann man nicht, und das ist Absicht. Der interne Zähler speichert nur die Anzahl an Anfragen, die vom Semaphor angefordert wurden. Darüber wird ermittelt, wann das Semaphor blockiert.
    Gruß
    hal2000
    @ErfinderDesRades

    Dein Link ist interessant. Leider kann ich nicht behaupten das ich den so ohne weiteres komplett verstehe. Mich würde aber interessieren wie ich mein Grundproblem besser lösen kann wenn meine Lösung veraltet ist.

    Kurze Beschreibung:
    1. Ich lese ein verzeichnis aus um speichere die Dateipfade in einem Stack.
    2. Ich starte mehrere Threads (Threadpool) und lasse sie per wechselseitigem Ausschluss auf den Stack zugreifen wo sie einen Pfad auslesen und das Dokument öffnen. Damit Kollisionen vermieden werden sperre ich den Stack und lasse immer nur einen Thread den Stack auslesen
    3. Wenn alle Informationen ausgelesen sind un din einem Array gespeichert sind trage ich diese Informationen in einem Neuen Dokument zusammen.

    Ich finde aber auch keine passende Literatur zum Nachschlagen. Meine Idee stammt aus einer Vorlesung zur parallel Programmierung.

    Danke

    Gruß

    StGo schrieb:

    Ich starte mehrere Threads (Threadpool) und lasse sie per wechselseitigem Ausschluss auf den Stack zugreifen wo sie einen Pfad auslesen und das Dokument öffnen.
    Das ist glaub zwecklos.
    Die Festplatte hat nur einen Lesekopf, und es gibt ühaupt keinen Gewinn, wenn mehrere Threads gleichzeitig den ansteuern.
    @StGo
    Der Link von EDR bedeutet in Kurzform (ist übrigens tatsächlich die elegantere Lösung):

    VB.NET-Quellcode

    1. Sub StartMyThreads()
    2. Dim threadCount As Int32 = 5
    3. Dim asyncResults As New List(Of IAsyncResult)
    4. 'Zeiger auf Work erstellen
    5. Dim callDelegate As New Action(AddressOf Work)
    6. 'mehrfach asynchron aufrufen (BeginInvoke verwendet den Threadpool)
    7. For i = 0 To threadCount
    8. asyncResults.Add(callDelegate.BeginInvoke(Nothing, Nothing))
    9. Next
    10. 'EndInvoke blockiert nur, wenn der Thread in Work noch nicht fertig ist
    11. For Each ar In asyncResults
    12. callDelegate.EndInvoke(ar)
    13. Next
    14. End Sub
    15. Sub Work()
    16. 'DoWork
    17. End Sub

    Falls Work Parameter hat, musst du einen passenden Action(Of T...)-Delegaten verwenden. Wenn es noch einen Rückgabewert gibt, benötigst du stattdessen Func(Of T). Die Rückgabewerte kannst du bei EndInvoke() einsammeln, wie es im Link beschrieben wird.
    Gruß
    hal2000

    Das ist glaub zwecklos.
    Die Festplatte hat nur einen Lesekopf, und es gibt ühaupt keinen Gewinn, wenn mehrere Threads gleichzeitig den ansteuern.


    Auch wenn ich diese tatsache nicht bedacht habe ist es nur ein Prototyp den ich schreibe. Der später die Files aus verschiedenen Verzeichnissen auf verschiedenen Servern ausliest und eine Übersicht schreibt. Das Threading soll später die Ladezeiten sinnvoll nutzen.

    @hal2000
    Der Code sieht gut aus. Morgenfrüh nach dem kaffee setzte ich mich damit auseinander.

    Danke dür die Antworten.

    StGo schrieb:

    ist es nur ein Prototyp den ich schreibe.
    Auch den sollte man anständig proggen.
    Falls Du diesen Code kopierst, achte auf die C&P-Bremse.
    Jede einzelne Zeile Deines Programms, die Du nicht explizit getestet hast, ist falsch :!:
    Ein guter .NET-Snippetkonverter (der ist verfügbar).
    Programmierfragen über PN / Konversation werden ignoriert!