Multitasking die 2.: PostMessage

Für Fragen von Einsteigern und Programmieranfängern...
Antworten
Nimral
Beiträge: 44
Registriert: Mi 10. Jun 2015, 11:33

Multitasking die 2.: PostMessage

Beitrag von Nimral »

Auch bei meinem 2. Anlauf, die Architektur eines (bestehenden & funktionierenden) Messprogramms zu verbessern könnte ich einen Anschubs gebrauchen.

Die Ausgangssituation: am PC (Windows 7/10) bzw. Raspi (Linux ...) hängen x seriell angesteuerte Messgeräte. Manche plappern einfach periodisch Messwerte raus (max 5/sec), andere möchten bitte mit einem Steuerzeichen angefragt werden und spucken dann genau einen Messwert aus. Einige sind etwas intelligenter und können (idR mit dem selben Mechanismus) auch einen Statuswert zurückgeben, z.B. ob sie gerade Inch oder mm oder Celsius oder Kelvin ausspucken, gut, das interessiert nur den Auswerte-Thread, aber wenns schlimme Dinge sind wie z.B. "Low Battery" wird der Anwender alarmiert und muss dann was dagegen tun.

Ich habe für jedes Messgerät einen Abkömmling von TThread geschaffen, der genau dieses Gerät so bedient wie es das braucht, und in dem früher oder später der Messwert zusammen mit einem Timestamp anfällt. Derzeit verwende ich Synchronize um ihn auf der GUI darzustellen, bzw. Abkömmlinge von TEvent. Bei beiden Methoden muss der Mess-Thread bereits zur Compile-Time wissen, welche Auswertethreads sich für seinen Messwert bzw. seine Alarme interessieren, neben der GUI sind das z.B. auch Threads für Logging, Alarmierung oder auch ganz andere Threads welche aus den Messwerten mehrerer Messgeräte einen neuen Pseudo-Messwert berechnen und diesen wiederum wie ein "echtes" Messgerät an GUI und Logging weitergeben.

Das klappt, ist aber wenig flexibel, für jeden geänderten Messaufbau muss ich den Code (geringfügig) umstellen und neu kompilieren und neu verteilen. Lieber wärs mir, ich würde den Aufbau irgendwo als Datenstruktur (Config-File) definieren, und die Threads dynamisch erzeugen und verketten.

In einer idealen allgemeinen Struktur würde jedes Messgerät, wenn es einen Messwert oder einen Alarm zu melden hat, diesen ins System "posten", zusammen mit einem Handle damit man sehen kann wo er her kommt, und x "Listener" Threads würden diese Message empfangen. Wenn alle für diese Message "registrierten" Listener die Message "sehr wahrscheinlich" gesehen und ggf. abgearbeitet hätten, würde "irgendwer" die Message aus dem System killen. Eine garantierte Zustellung ist nicht nötig.

Ich bin jetzt auf Postmessage gestoßen, der kommt dem schon sehr nahe, aber ich sehe dass man als Parameter schon beim Absenden ein Handle für ein Ziel angeben muss. 'fix, wieder muss der Sender seine Empfänger vorab kennen. Ich würde die Message aber lieber als "Broadcast an alle" schicken, möge sie jeder Thread der Messages empfangen kann anschauen und entscheiden, ob er den Wert weiterverarbeiten möchte oder nicht. Vielleicht gibt es ja auch - ich träume mal ungehemmt weiter - eine Möglichkeit, verschiedene Typen von Messages bzw. "Broadcast-Adressen" zu definieren, damit nicht jeder Thread jede Message anschauen muss.

Ist das möglich, und zwar plattformübergreifend Windows & Linux?

Thnx, Armin.

Benutzeravatar
af0815
Lazarusforum e. V.
Beiträge: 4138
Registriert: So 7. Jan 2007, 10:20
OS, Lazarus, FPC: FPC fixes Lazarus fixes per fpcupdeluxe (win,linux,raspi)
CPU-Target: 32Bit (64Bit)
Wohnort: Niederösterreich
Kontaktdaten:

Re: Multitasking die 2.: PostMessage

Beitrag von af0815 »

hat jetzt nicht unbedingt direkt mit dem Thema zu tun. Aber bei Deiner Beschreibung muss ich an MQTT denken. Damit ist die Plattform egal. MQTT von bkeevil habe ich unter Windows, Linux 64 und RasPi am laufen.
Blöd kann man ruhig sein, nur zu Helfen muss man sich wissen (oder nachsehen in LazInfos/LazSnippets).

mschnell
Beiträge: 3408
Registriert: Mo 11. Sep 2006, 10:24
OS, Lazarus, FPC: svn (Window32, Linux x64, Linux ARM (QNAP) (cross+nativ)
CPU-Target: X32 / X64 / ARMv5
Wohnort: Krefeld

Re: Multitasking die 2.: PostMessage

Beitrag von mschnell »

Die Message Sachen sind ein "Relikt" aus Windows.
Da fpc aber auf vielen Betriebssystemen läuft, braucht man es nicht. FPC enthält entsprechende "allgemeine" Funktionalitäten im "Sprachumfang".
"Message", "Procedure message", etc verwendet 1:1 die Windows API, wenn das Programm auf Windows läuft und in der Library realisierte Workalikes, wenn das Programm auf einem anderen Betriebbssystem läuft.
Deshalb würde ich Messsage etc nur verwenden , wenn ich weiß dass das Programm auf Windows laufen wird und spezielle Windows Funktionalität genutzt werden soll.
-Michael
Zuletzt geändert von mschnell am Mo 27. Jan 2020, 11:06, insgesamt 1-mal geändert.

mschnell
Beiträge: 3408
Registriert: Mo 11. Sep 2006, 10:24
OS, Lazarus, FPC: svn (Window32, Linux x64, Linux ARM (QNAP) (cross+nativ)
CPU-Target: X32 / X64 / ARMv5
Wohnort: Krefeld

Re: Multitasking die 2.: PostMessage

Beitrag von mschnell »

Die Frage hat mit Messages (aber auch mit Critical Section) nichts zu tun.

Aber siehe -> https://www.lazarusforum.de/viewtopic.php?f=55&t=12691 -> von mschnell » 23/1/2020, 09:58

Du kannst natürlich eine allgemeine TMessThread Klasse definieren, die durch Properties gesteuert jedes beliebige Messgerät behandeln kann und die Daten mit der dort beschriebenen Methodik an den Mainthread sendet.

Dann kannst du die Konfigurations-Datei abarbeiten und für jedes Messgerät anhand der dort gespeicherten Spezifikationen eine Instanz von TMessThread (im Stop Zustand) kreieren und mit den entsprechenden Properties und Callbacks versorgen. Am Ende der Initialisierungsphase werden dann alle Instanzen gestartet.
-Michael

Nimral
Beiträge: 44
Registriert: Mi 10. Jun 2015, 11:33

Re: Multitasking die 2.: PostMessage

Beitrag von Nimral »

mschnell hat geschrieben:Die Frage hat mit Messages (aber auch mit Critical Section) nichts zu tun.

Aber siehe -> https://www.lazarusforum.de/viewtopic.php?f=55&t=12691 -> von mschnell » 23/1/2020, 09:58

Du kannst natürlich eine allgemeine TMessThread Klasse definieren, die durch Properties gesteuert jedes beliebige Messgerät behandeln kann und die Daten mit der dort beschriebenen Methodik an den Mainthread sendet.

Dann kannst du die Konfigurations-Datei abarbeiten und für jedes Messgerät anhand der dort gespeicherten Spezifikationen eine Instanz von TMessThread (im Stop Zustand) kreieren und mit den entsprechenden Properties und Callbacks versorgen. Am Ende der Initialisierungsphase werden dann alle Instanzen gestartet.
-Michael


Genau so habe ich es ja schon mal überlegt, und inzwischen ist auch der Code genau so. Das dynamische Erzeugen der Threads für die seriellen Geräte ist gut machbar.

Die Synchronisation der Sender- und Empfänger-Threads stellt sich aber immer mehr als ein ganz übles Thema heraus. Die TRTLEvent Klasse habe ich inzwischen als völlig unbrauchbar (für meine Anwendung) abgehakt.

Im Moment habe ich meine Hoffnungen in die TEventObject Klasse gesetzt, deprecated hin oder her. Immerhin schien sie laut der reichlich mehrdeutigen Dok durchaus Grund zur Hoffnung zu geben: wenn die eine Hälfte der Doku richtig wäre, würde es möglich sein, mehreren Thread-Objekten den gleichen Namen (Parameter "name") zu geben und sie würden wie ein einzelnes Thread-Objekt funktionieren. Jeder Thread, der den "Name" kennt, könnte das Event triggern, und der Text der Doku legt sogar nahe, dass man mehrere wartende Threads bedienen kann, indem man das Event "irgendwie" am Ende der Event-Loop selber von Hand rücksetzt. Ein Traum, wenns denn wahr wäre, genau das will ich haben. Die andere Hälfte der Dok ist dagegen etwas vorsichtiger und will TEvent als eine Objektinstanz mit "global scope" sehen.

Im Übrigen glaube ich immer noch, dass - wenn auch TEventObject sich als Zeitvernichtung herausstellt, Messages mein Lösungsweg sind. Da die LCL auf den für mich interessanten Plattformen unterstützt wird und ihrerseits ja mit einer Message-Queue hantiert muss es irgendwie möglich sein, sich in sie einzuhängen, und ein paar lausige Zahlen- und String-Werte zu übermitteln...

Armin.

Warf
Beiträge: 1421
Registriert: Di 23. Sep 2014, 17:46
OS, Lazarus, FPC: MacOS | Win 10 | Linux
CPU-Target: x86_64
Wohnort: Aachen

Re: Multitasking die 2.: PostMessage

Beitrag von Warf »

Ich glaub du denkst generell ein bisschen zu kompliziert. Threads kommunizieren zu lassen ist grundsätzlich recht einfach, du brauchst einfach einen geteilten speicherbereich den du mit Critical Sections sicherst

Du kannst z.B. mit einer Queue arbeiten, die Objekte speichern kann (sieht vielleicht viel aus, ist aber viel Spoiler code, die eigentliche funktionalität ist richtig simpel):

Code: Alles auswählen

program Project1;
 
{$mode objfpc}{$H+}
 
uses
  {$IFDEF UNIX}cthreads,{$ENDIF}
  Classes, contnrs, sysutils;
 
type
 
  { TLockedObject }
 
  generic TLockedObject<T> = class
  private
    FObject: T;
    FFreeObject: Boolean;
    FCS: TRTLCriticalSection;
  public
    constructor Create(const Obj: T; const FreeObject: Boolean = true);
    destructor Destroy; override;
    function Lock: T;
    procedure Unlock;
  end;         
 
{ TLockedQueue }
 
TLockedQueue = class(specialize TLockedObject<TObjectQueue>)
public
  constructor Create;
  // ermöglicht schnellzugriff damit man nicht immer .lock, ..., .unlock machen
  function Push(AObject: TObject): TObject;
  function Pop: TObject;
  function Peek: TObject;
end;     
 
{ TMessage }
 
generic TMessage<T> = class
private
  FData: T;
  FSender: TObject;
public
  constructor Create(const AData: T; const ASender: TObject);
  property Data: T read FData write FData;
  property Sender: TObject read FSender write FSender;
end;
 
{ TStringMessage }
 
TStringMessage = class(specialize TMessage<String>)
public
  constructor Create(AData: String; const ASender: TObject);
end;
 
{ TIntegerMessage }
 
TIntegerMessage = class(specialize TMessage<Integer>);
 
{ TStringMessage }
 
constructor TStringMessage.Create(AData: String; const ASender: TObject);
begin
  // Sicherstellen das genau 1 referenz auf den String zeigt, nämlich unsere
  SetLength(AData, Length(AData));
  inherited Create(AData, ASender);
end;
 
{ TMessage }
 
constructor TMessage.Create(const AData: T; const ASender: TObject);
begin
  FData := AData;
  FSender := ASender;
end;
 
{ TLockedQueue }
 
constructor TLockedQueue.Create;
begin
  inherited Create(TObjectQueue.Create, True);
end;
 
function TLockedQueue.Push(AObject: TObject): TObject;
begin
  try
    Result := self.Lock.Push(AObject);
  finally
    Self.Unlock;
  end;
end;
 
function TLockedQueue.Pop: TObject;
begin
  try
    Result := self.Lock.Pop;
  finally
    Self.Unlock;
  end;
end;
 
function TLockedQueue.Peek: TObject;
begin
  try
    Result := self.Lock.Peek;
  finally
    Self.Unlock;
  end;
end;
 
{ TLockedObject }
 
constructor TLockedObject.Create(const Obj: T; const FreeObject: Boolean);
begin
  InitCriticalSection(FCS);
  FFreeObject:=FreeObject;
  FObject := Obj;
end;
 
destructor TLockedObject.Destroy;
begin
  if FFreeObject then FObject.Free;
  DoneCriticalsection(FCS);
  inherited Destroy;
end;
 
function TLockedObject.Lock: T;
begin
  EnterCriticalsection(FCS);
  Result := FObject;
end;
 
procedure TLockedObject.Unlock;
begin
  LeaveCriticalsection(FCS);
end;
 
// ...
 
 
type
 
  { TWorker }
 
  TWorker = class(TThread)
  private
    FMessageQueue: TLockedQueue;
  protected
    procedure Execute; override;
  public
    constructor Create(const MessageQueue: TLockedQueue; CreateSuspended: Boolean;
      const StackSize: SizeUInt = DefaultStackSize );
  end;
 
{ TWorker }
 
procedure TWorker.Execute;
var
  i: Integer;
begin
  for i:=0 to 100 do
  begin
    FMessageQueue.Push(TIntegerMessage.Create(i, Self));
    sleep(100);
  end;
end;
 
constructor TWorker.Create(const MessageQueue: TLockedQueue;
  CreateSuspended: Boolean; const StackSize: SizeUInt);
begin
  FMessageQueue := MessageQueue;
  // Inherited nach dem setzen der Queue aufrufen, denn das kann den Thread bereits starten
  inherited Create(CreateSuspended, StackSize);
end;
 
var t1, t2: TWorker;
  MessageQueue: TLockedQueue;
  M: TIntegerMessage;
  Q: TObjectQueue;
begin
  MessageQueue := TLockedQueue.Create;
  t1 := TWorker.Create(MessageQueue, False);
  t2 := TWorker.Create(MessageQueue, False);
 
  while not t1.Finished and not t2.Finished do
  begin                                     
    M := MessageQueue.Pop as TIntegerMessage;
    if Assigned(M) then // Wenn überhaupt ne message drin war
    begin
      try
        Write('Message from ');
        if M.Sender = t1 then
          Write('Thread1')
        else
          Write('Thread2');
        WriteLn(': ', M.Data.ToString);
      finally
        M.Free;
      end;
    end;
    sleep(50);
  end;
  // die restlichen nachrichten auslesen
  Q := MessageQueue.Lock;
  try
    while Q.Count > 0 do
    begin
      M := Q.Pop as TIntegerMessage;
      try
        Write('Message from ');
        if M.Sender = t1 then
          Write('Thread1')
        else
          Write('Thread2');
        WriteLn(': ', M.Data.ToString);
      finally
 
      end;
    end;
  finally
    MessageQueue.Unlock;
    MessageQueue.Free;
  end;
  t1.Free;
  t2.Free;
end.
 


Wenn du mehrere Listener hast kannst du einfach einen Dispatcher Thread hinzufügen, der Messages von Nachrichten an einen bestimmten thread dispatcht (also ein Thread bekommt alle nachrichten und kümmert sich nur darum die an den richtigen thread weiterzuleiten). Oder du hast eine Liste von Message Queues an die das gesendet wird. Die Möglichkeiten sind mehr oder weniger grenzenlos. Du musst lediglich dafür sorgen das am anfang einmal gelockt wird, und am ende geunlockt wird. Ansonsten kannst du jede Datenstruktur benutzen

Nimral
Beiträge: 44
Registriert: Mi 10. Jun 2015, 11:33

Re: Multitasking die 2.: PostMessage

Beitrag von Nimral »

Warf hat geschrieben:Ich glaub du denkst generell ein bisschen zu kompliziert. Threads kommunizieren zu lassen ist grundsätzlich recht einfach


Hi Warf,

herzlichen Dank für die Demo, "generische Listen" mit einer generischen Klasse kombinieren ist auch so eine Detailfertigkeit die ich mal erwerben wollte, und Du löst sehr ausführlich einige Probleme, die ich im Moment nicht habe, aber vermutlich in Zukunft: die threadsichere Übergabe eines beliebigen Wertes in eine Queue als Puffer. Das wäre großartig z.B. als Basis einsetzbar, wenn ich einmal eine garantierte Zustellung der Werte erreichen wollte, und auch sonst fallen mir gleich ein Haufen Anwendungsmöglichkeiten ein.

Das Problem an dem ich von Anfang an nage ist allerdings nicht die Werteübergabe, sondern das Übergeben der Werte (bzw. eines Signals dass einer da wäre) genau dann (also eventgetriggert und nicht gepollt) wenn ein neuer Wert eingelaufen ist. Abgeholt werden soll von einen anderen Thread, nennnen wir ihn "Consumer", der erst zur Laufzeit erzeugt wurde, und von dem es auch mehrere geben kann. Dein Sample würde eine vielleicht mal wichtige EIgenschaft beitragen, einen Pufferspeicher zwischen Sender und Empfänger, aber ihm fehlt genau mein Knackpunkt: Du hast einen schönen Worker, Du hast einen schönen (Stack)Speicher, aber keinen "Consumer", der (1) durch einen neuen Wert aufgeweckt wird und der (2) dynamisch in Deinen Stack eingeklinkt wurde. Da komm ich wieder zur RTLEvent Klasse, mit ihren bereits bekannten Einschränkungen, und damit habe ich mich wieder einmal erfolgreich im Kreis gedreht und kann mir in den A**** beißen.

Ich kenne sowas in anderen Systemen als "Publish and Subscribe" Architektur. Realisierbar mit einem "onNewObject" Event in der Stack Klasse, der eine dynamische Liste von Methodenzeigern abarbeitet, wo sich die "Consumer" eingetragen haben (garantierte Zustellung), oder der alternativ ein Event triggert auf den einige oder alle oder kein Consumer warten die dann aufgeweckt werden und sich den letzten Wert aus dem Stack abholen (keine garantierte Zustellung).

Heute mache ich mir ein Sample mit dem TEventObject Ansatz, mal sehen wie weit ich damit komme, bis ich mir (eventuell wieder) auf den Daumen haue und den Ansatz als "macht mehr Probleme, und bietet keine Lösung" zur Seite lege.

HG aus Bayern,

Armin.

Socke
Lazarusforum e. V.
Beiträge: 2733
Registriert: Di 22. Jul 2008, 19:27
OS, Lazarus, FPC: Lazarus: SVN; FPC: svn; Win 10/Linux/Raspbian/openSUSE
CPU-Target: 32bit x86 armhf
Wohnort: Köln
Kontaktdaten:

Re: Multitasking die 2.: PostMessage

Beitrag von Socke »

Wenn du jedes Sample an jeden Thread zustellen willst, schlage ich vor:
Jeder Consumerthread hat eine eigene, threadsichere, von außen zugreifbare Eventqueue sowie ein RTL-Signal.
Ein Dispatcherthread sammelt die Samples ein und stellt eine Kopie in die Eventqueues der Consumerthreads.
Beim Hinzufügen eines Elements in die Eventqueue wird ein RTL-Signal an den Thread gesendet, sodass dieser dann das neue Event abarbeiten kann; die Sample-Objekte können - da kopiert - nach Abarbeitung lokal freigegeben werden.

Das benachrichtigen der Consumerthreads kannst du dir gerne bei meinem Future-Manager in der Methode TFutureManager.AddFuture() abschauen. Wie so ein Consumerthread aussehen könnte, siehst du in Methode TFutureManager.TWorkerThread.Execute; Anstatt der Zeile TheFuture.ThreadCalculate; implementierst du die Logik zur Ausführung deiner verschiedenen Events.
MfG Socke
Ein Gedicht braucht keinen Reim//Ich pack’ hier trotzdem einen rein

Warf
Beiträge: 1421
Registriert: Di 23. Sep 2014, 17:46
OS, Lazarus, FPC: MacOS | Win 10 | Linux
CPU-Target: x86_64
Wohnort: Aachen

Re: Multitasking die 2.: PostMessage

Beitrag von Warf »

Muss der Consumer unbedingt ein Thread sein?
Ansonsten würde ich es zu nem eigenen Programm machen, das du über TProcess starten kannst. Das hat den Vorteil das du Signale verwenden kannst. Damit kannst du polling ganz einfach vermeiden. Die Daten kannst du dann über einen IPC clienten rüberschicken und dann mit dem Signal praktisch eine Nachricht senden das was angekommen ist.

Mit Posix Threads geht das tatsächlich auch für Threads, aber ist halt unter Windows nicht so einfach (oder überhaupt möglich) zu benutzen.

Aber warum kann dein Consumer nicht pollen? Ich mein ich seh das so, für gewöhnlich gibt es für so einen Consumer 2 Möglichkeiten: 1. er hat grade nix zu tun, dann kommt das Signal rein, und er fängt an was zu machen 2. er hat grade was zu tun, wenn er dann eine nachricht bekommen würde, kann er ja nicht einfach ewig im signal handler bleiben (signalhandler können nicht gestackt werden, also muss man so schnell wie möglich wieder raus), muss also die eingehnde nachricht irgendwo zwischen speichern, dann da weiter machen wo er unterbrochen wurde, und irgendwann die zwischengespeicherte Nachricht abarbeiten.
In beiden fällen gibt es keinen Unterschied zu einem Poll Loop.
Wenn du unbedingt zeitnah abbarbeiten müsstest, und dein Consumer grade beschäftigt ist, kannst du das ganz einfach lösen, mehr consumer threads, einfach einen Thread-Pool vorhalten, sodass falls ein consumer grade keine neue anfrage bei zeiten abbarbeiten kann, du immer einen freien thread hast der übernehmen kann.

mschnell
Beiträge: 3408
Registriert: Mo 11. Sep 2006, 10:24
OS, Lazarus, FPC: svn (Window32, Linux x64, Linux ARM (QNAP) (cross+nativ)
CPU-Target: X32 / X64 / ARMv5
Wohnort: Krefeld

Re: Multitasking die 2.: PostMessage

Beitrag von mschnell »

Nimral hat geschrieben:Das Problem an dem ich von Anfang an nage ist allerdings nicht die Werteübergabe, sondern das Übergeben der Werte


Häää ???
-Michael

mschnell
Beiträge: 3408
Registriert: Mo 11. Sep 2006, 10:24
OS, Lazarus, FPC: svn (Window32, Linux x64, Linux ARM (QNAP) (cross+nativ)
CPU-Target: X32 / X64 / ARMv5
Wohnort: Krefeld

Re: Multitasking die 2.: PostMessage

Beitrag von mschnell »

Übrigens: Wenn Du mehrere Instanzen derselben Thread-Klasse hast, kann es ein Problem sein, innerhalb des Thread-Codes festzustellen, welcher Thread gerade bearbeitet wird (da der Code ja für mehrere Threads zuständig ist, z.B. wenn ein Callback im Code ankommt, weiß man u.U. nicht woher er kommt und wozu er gehört.
Dafür gibt es die "ThreadVar" Variablen. Alle anderen Variablen sind unabhängig davon, in welchem Thread der Code gerade läutf. Threadvars werden für jeden Thread separat angelegt. Du kannst sie dann in der Initialisierungsphase vorbelegen und später zeigen sie den gerade laufenden Thread an.
(P.S.: TheradVars sind dynamisch, der Self-pointer zum TThread-Objekt ist statisch, das kann einen Unterschied machen.)
-Michael

Antworten