Permalänk

c++, producer consumer + trådar

Hej.

Blandad topic men ett specifikt problem jag inte lyckas med att se igenom. Någon här som kan komma med bra tips?

Förutsättning: Har implementerat en logger (lugn ingen keylogger ) med hjälp av klassisk producer consumer metodik. En kö, ett trådlås och 2 trådar, en som matar, en som äter.

Problem: Jag misstänker att följande scenario uppkommer. Att under tiden som consumer tråden håller låset, kommer flera anrop till producertråden. Låset i sig fungerar och släpper bara in en i taget. Kön i sig garanterar FIFO. MEN det finns inget i denna implementation som garanterar FIFO när låset släpper.

Exempel.
Flera trådar vill skriva i producertråden samtidigt. Ponera att låset är låst. T1 vill skriva ETT, T2 vill skriva TVÅ; T3 vill skriva TRE o.s.v.. Om alla dessa står och väntar på att trådlåset släpps av consumertråden, så finns det ingen prioritet vem som kommer först in när låset släpper. Även om trådarna kommer i ordning T1, T2, T3 fram till låset, så kan det komma ut TRE, ETT, TVÅ i loggfilen.
---------

Rörigt förklarat jag vet, ursäkta för det. Är det någon som har tips på hur man kan attackera detta problem? Ber inte om kod utan om hur jag ska tänka. Jag har vandrat vilse i tankarna och flyttar bara runt problemet så att säga.

Permalänk
Hedersmedlem

Vill man verkligen ställa sådana krav? Man vet ju ändå inte hur trådarna körs; kanske kommer T1 på att den vill skriva ETT men blir sedan avbruten av T2 som kommer fram till TVÅ och lyckas rapportera detta innan T1 åter får kontrollen och rapporterar? Man kan ju naturligtvis synkronisera tidigare och mera, men då tappar man sannolikt en stor del av poängen med trådarna...

Permalänk

Det är helt klart en valid fråga. Har tyvärr inget bra svar.
Scenariot jag ser framför mig är att t.ex. 3 trådar väntar på att få skriva, men att de har kommit fram till låset i en ordning som borde behållas i loggningen.
Jag håller absolut med att problemet är ickeexisterande om man skulle hoppa över detta med trådar, målet är dock att loggningen inte skall belasta den sekvens som skapar meddelandena så att säga.

Men jag skall ta och göra lite expriment och mätningar och se hur stor inpackt det är egentligen. Kanske får tänka om.

Tack för svaret.

Permalänk
Medlem

Hur väljer programmet vilken av producertrådarna som får tillgång när låset släpper? Finns det någon styrning alls av det? Vilken sorts lås används?

Finns det möjlighet att få se hur låset hanteras i din kod?

Visa signatur

Desktop: | Win10 | InWin 303 | ASUS TUF X570 | AMD Ryzen 5 3600 | Noctua NH-U12S (PP) | Intel 600p 256GB | Gigabyte GTX 670 | 32GB DDR4 2400Mhz | Corsair RM650x | 3x 1080 Screens |
Datacenter: | 1x Physical | 1x Virtual |
Laptop: | 2x |

Dell Certified Technician

Permalänk
Datavetare

Håller helt med Elgot, du bör nog fundera igenom om det inte är så att du tänkt lite fel i designen om det finns ett sådant krav.

Det sagt, det finns helt klart fall när du vill upprätthålla en strikt FIFO-ordning kring ett lås. Vanligaste anledningen är att man vill garantera att ingen tråd får vänta väldigt länge eller kanske till och med för evigt.

Realtid OS har därför alltid semaphorer eller liknande där man kan specificera att trådar/taskar ska släppas vidare i samma ordning som de försökte ta semaphoren. "Vanliga" OS som Windows/Linux har typiskt inte någon sådan garanti på sina motsvarigheter, i fallet Linux så råkar s.k. spinlocks faktiskt vara sådana att de garanterar FIFO-ordning på x86 då man kunde göra detta utan att det kostade prestanda.

I C++11 har man lagt till en rad funktioner i standardbiblioteket för att hantera atomära variabler, lås, mutex, trådar etc. Så skrev ihop en lösning bara för att känna lite mer på C++11 funktionerna, är ett "hack" men den garanterar i alla fall att trådar kommer producera arbete i den ordning som de kör första raden inuti produce(), en rad som körs _innan_ man försöker ta låset som skyddar arbetskön.

#include <atomic> #include <condition_variable> #include <cstdint> #include <iostream> #include <memory> #include <mutex> #include <queue> #include <thread> #include <vector> // work_t represent some work entity that can be produced/consumed typedef std::string work_t; // Ticket to ensure FIFO ordering of waiting tasks typedef uint32_t ticket_t; // Next free ticket, can be taken without holding 'mtx' std::atomic<ticket_t> ticket; // Mutex protecting the state of the consumer-producer queue std::mutex mtx; // work queue std::queue<std::shared_ptr<work_t> > work_q; // Ticket number that is next to be servered ticket_t now_serving; // Condition that is signaled when the next ticket is up for service std::condition_variable cnd; void produce(std::shared_ptr<work_t> work) { // Fetch a ticket before grabbing the mutex ticket_t my_ticket = ticket.fetch_add(1); std::unique_lock<std::mutex> lock(mtx); // Wait until it is my turn, this line will check if the predicate // holds, if it does not hold then _atomically_ release the mutex // and wait for the condition variable to be signaled at which // point the check is re-done. The mutex is again locked when // running the statment following this statement. cnd.wait(lock, [=]() { return my_ticket == now_serving; }); // Queue the work work_q.push(work); // I'm done, can now serve the next in line now_serving++; // Must notify all waiting tasks, only the one with the right // ticket will proceed. cnd.notify_all(); } std::shared_ptr<work_t> consume() { std::unique_lock<std::mutex> lock(mtx); // Block until there is something to be consumed cnd.wait(lock, []() { return !work_q.empty(); }); auto work = work_q.front(); // Why does not pop() return the element in C++??? work_q.pop(); // Needed if this thread cut the line before the next producer cnd.notify_all(); return work; } std::shared_ptr<work_t> make_work(std::string s) { return std::make_shared<work_t>(s); } int main() { std::shared_ptr<work_t> work; std::vector<std::thread> threads; std::vector<std::string> prod{"A", "B", "C", "D"}; for (auto p: prod) threads.emplace_back([=]() { produce(make_work(p)); }); for (int i = prod.size(); i > 0; i--) std::cout << *consume() << std::endl; for (auto &t: threads) t.join(); }

Dold text

Noter att det inte finns någon garanti för att utskriften alltid blir A,B,C,D. Ordningen blir den ordning som producerartrådarna råkar anropa produce() vilket visar på att värdet i detta fall för strikt FIFO-ordning i just produce() inte har något större värde i sig.

Edit: men i.o.f.s. så garanterar detta att ingen tråd får vänta för evigt på att köa sitt arbete...

Visa signatur

Care About Your Craft: Why spend your life developing software unless you care about doing it well? - The Pragmatic Programmer

Permalänk
Hedersmedlem

Man bör väl också försöka se till att ingen behöver hålla låset särskilt länge. Kanske kan också läsartråden ha en liten buffert som till exempel kan sorteras efter tid innan man skriver till fil eller vad som nu är tanken?

Permalänk
Medlem
Skrivet av Elgot:

Man bör väl också försöka se till att ingen behöver hålla låset särskilt länge. Kanske kan också läsartråden ha en liten buffert som till exempel kan sorteras efter tid innan man skriver till fil eller vad som nu är tanken?

Nu kanske jag bara är ute och seglar för c++ kan jag extremt lite om,,
Men kan man inte ha sorterings bufferten innan värdena kommer in i FIFO tabellen?
Ibland vill man tappa värden från olika positioner i en FIFO, tex om man samlar värden på en sträcka och vill veta värdet vid 50m, 75, samt i slutet på kön.

Permalänk
Medlem

Beror helt på hur du implementerat dina tråd/lås klasser ju. Mina tråd/lås klasser fungerar på så sätt att när en tråd kommer till ett lås som är upptaget så lägger den till sig själv i en kö för det låset och sen sover, och när en tråd släpper ett lås så väcker den den första tråden i kön om där finns någon tråd i kön. Detta garanterar ju att den tråden som först försöker få tag i låset är den tråden som får det först när det blir tillgängligt.

Visa signatur

Intel i7-7700k @ 4.9Ghz - Noctua NH-U12P SE2 - MSI GTX 1070 Armor OC - AsRock Z270 Extreme4 - G.Skill Ripjaws V DDR4 3200MHz CL16 2x8GB - Corsair RM750x 750W - Samsung 970 EVO 500GB - Acer Predator X34 - Silverstone RV02-E - Asus Xonar Essence STX II 7.1 - Mionix Naos 8200 - Corsair Gaming MM400 - Das Keyboard 4 Ultimate MX Brown - Beyerdynamic DT990 Pro 250 Ohm - Antlion ModMic 4.0 Unidirectional

Permalänk
Hedersmedlem
Skrivet av Stenull:

Nu kanske jag bara är ute och seglar för c++ kan jag extremt lite om,,
Men kan man inte ha sorterings bufferten innan värdena kommer in i FIFO tabellen?
Ibland vill man tappa värden från olika positioner i en FIFO, tex om man samlar värden på en sträcka och vill veta värdet vid 50m, 75, samt i slutet på kön.

Om man kan tänka sig att hantera sortering på något annat sätt behöver man ju egentligen inte ha ett fifo längre, men kanske kan det ändå vara smidigt för att inte i onödan komplicera (och därmed göra den långsammare) funktionen som skriver till loggen.

Permalänk

Tack för alla svar och funderingar.

Lösningen idag konkret bygger på en c++ standard queue.
Funktioner, som kan köras parallellt, har följande loggningsförfarande:
- loggningsmeddelande kommer till en funktion.
- denna låser en CCriticalSection, skapar ett objekt som tar meddelandet och tidsstämplar det och lägger till källinformation och stoppar det i kön.
- CCriticalSection släpps.

På andra sidan snurrar en tråd som:
- låser CCriticalSection.
- om det finns meddelanden i kön, plockar av dessa, alltså dequeue på objectreferenserna (vilket torde gå väldigt snabbt)
- släpper CCriticalSection och i sin egen värld skriver datat till fil
- en viss sleep
- repeat

Som Elgot skriver bör jag fundera på / mäta, om trådad lösning är det bästa/behövs. Kanske ställer till mer oreda (bevisligen) än avhjälper.
Jag kanske skjuter mygg med kanoner så att säga.

Så absolut det är detta med semaphorer jag får kika på. Det viktiga är att den som vill posta först till kön får göra så.

Tack igen.

Permalänk
Datavetare
Skrivet av wahlfridsson:

Så absolut det är detta med semaphorer jag får kika på. Det viktiga är att den som vill posta först till kön får göra så.

Om det är målet så är en av de enklaste och mest välkända sättet att använda någon form av "biljettbaserad" teknik, oftast så man implementerar FIFO-mutex/semaphores.

Rent generellt så tar varje tråd en "biljett" via en "atomic fetch&add" instruktion och väntar sedan på sin tur innan man kliver in i den skyddade regionen.

Visa signatur

Care About Your Craft: Why spend your life developing software unless you care about doing it well? - The Pragmatic Programmer