C++ designalternativ producers/consumers

Trädvy Permalänk
Medlem
Plats
Tomelilla
Registrerad
Okt 2001

C++ designalternativ producers/consumers

Har under en tid arbetat med ett projekt men vill bara försäkra mig om att jag har valt rätt väg (alltid svårt när man inte har någon att brainstorma med).
Använder C++11 med Boost och programmet ser ut som följande idag:

Baserat på "tjänster" som kan beskrivas som flera producers, flera consumers.
Tjänsterna är allt från canbus/i2c/websocket/multicast så datatyper jag behöver skicka mellan varje tjänst har ett brett spann..
Använder i idag boost any samt typeid för att lagra godtycklig datatyp i varje transaktion.
En gemensam cirkulär buffert lagrar alla transaktioner och en "Route" klass ser till att dom hamnar rätt (ibland skall samma transaktion till flera mottagare).
Adressering sker med uuid och detta är även basen för "route reglerna".

Använder en sammordnare(klass) som skapar alla tjänster och även tar hand om start/stop/error signalering.
Denna klassen äger även bufferten och agerar som consumer för att distribuera ut transaktionerna(med hjälp av router klassen) till sitt mål (tjänst).
Tjänsterna skriver till bufferten själva (mutex).
Varje transaktion skickas till sitt mål via Asio vilket gör att jag utnyttjar min threadpool för att inte låsa systemet.
Jag har gjort det enkelt för mig och använder shared_ptr för att lagra varje transaktion i bufferten (verkligen dåligt designval med tanke på overhead).

Det fungerar bra och klarar av att hantera några hundra transaktioner i sekunden utan problem på målplattformen som är raspberry pi (cpu belastning sällan över 15-20%).
Men bara för att det fungerar betyder det inte att det är det bästa sättet.
Så utan att posta några tusen rader kod, finns det några andra sätt som är bättre? något jag inte tänkt på?

Asus Zenith Extreme, TR 1950x@3975MHz, 4x16GB@2933Mhz

If debugging is the process of removing bugs,
then programming must be the process of putting them in.
- Edsger W. Dijkstra

Trädvy Permalänk
Medlem
Plats
Göteborg
Registrerad
Jun 2002

Det ut som en rimlig lösning i mina ögon. Finns där krav på prestanda eller liknande mätbara attribut? Problemkällor är ju som alltid låsning vid synkronisering (du nämnde mutex) av data. Se till att använda en mutex per buffert och inte låsa längre än absolut nödvändigt.

Antar att prestanda är av vikt då det är den mätningen du gjort. Hur skalar systemet under belastning? Skalar det korrekt med olika antal CPU-kärnor?

Så länge systemet uppfyller (ursprungs)kraven så skulle jag anse att det är en bra lösning.

Edit: såg nu att du skriver "bufferten". Om det endast finns en buffert i systemet bör du nog bryta ner det till en per producer/consumer för att undvika onödiga väntetider.

Skickades från m.sweclockers.com

..:: RiJo ::..
Computer: Lenovo X300
Platform: Gentoo

Trädvy Permalänk
Medlem
Plats
Tomelilla
Registrerad
Okt 2001

@RiJo: Tack för input! Skönt att man inte seglat ut i helt vansinniga lösningar

Har sprungit på ett par minor gällande flertrådat och synkronisering så jag hoppas att jag lärt mig något av dom missarna.

Ja prestandan är av vikt och det var där jag lagt mest tid för att få skalningen hyffsat linjär.
Det finns egentligen bara ett krav och det är att inkommande data på en av bussarna måste ovillkorligen generera ett svar inom 50ms annars droppas den anslutningen av servern utanför mitt program. Det löser jag med marginal idag (och gör jag det inte har jag system på plats för att återuppta kommunikationen).
Desto fler kärnor jag kan slänga på programmet desto bättre går det (så länge där finns arbete för alla kärnorna) så skalningen funkar bra.

Utmaningen nu är att optimera tiden varje consumer lägger på att manipulera data och generera eventuella nya transaktioner.
Speciellt att generera data i realtid för användaren via websocket är det som äter upp mest cputid.
Det är som vanligt ett problem när man ska ta massa användarinput i form av strängar och försöka göra något snabbt och smidigt...

Svar på edit: Problemet är att varje service (nästan) alltid är både en consumer/producer så hade blivit en jäkla massa buffertar.
Däremot har tjänsterna som tar extern data en egen mottagningsbuffert just för att inte bli låsta om där blir kö.
Som det är idag så läser jag ju transaktionen från bufferten och sen lägger ut den tillsammans med sin tjänst i exekveringskön (boost asio) så även om det tar en stund att behandla datan så är bufferten fri.

auto transaction = this->m_queue->dequeue(); this->m_asio_strand.dispatch(boost::bind(&Service::process_transaction, service.get(), transaction));

Får väl kanske ta en ny funderare på hur jag löser den biten..
Eventuellt skulle man bara använda signalering att varje tjänst har ett meddelande i bufferten (och buffra dessa signaler) och låta den samordnande klassen läsa varje buffert individuellt istället.
Fördelen då är att jag kan prioritera de tjänsterna som absolut inte får vänta.

Asus Zenith Extreme, TR 1950x@3975MHz, 4x16GB@2933Mhz

If debugging is the process of removing bugs,
then programming must be the process of putting them in.
- Edsger W. Dijkstra

Trädvy Permalänk
Datavetare
Plats
Stockholm
Registrerad
Jun 2011

shared_ptr<> är en rejäl flaskhals om det som delas potentiellt kan användas av flera trådar, det förutsatt att du kör på en RPi med fler än en CPU-kärna (annars kvittar det). Detta då räknaren som används kommer orsaka något som kallas "cache-line bouncing".

Hur mycket data rör det sig om per transaktion? Om mängden är mindre än några få kB är det alltid snabbare att göra en kopia i lägen där flera trådar måste ha tillgång till informationen. Då blir ägarfrågan och därmed livslängden för objektet trivialt (krävs ingen synkronisering).

boost::asio gör absolut det enklast tänkbara i sin trådpool, det är en lista med jobb som skyddas av en mutex. Enkelt och korrekt, men lär knappast skala speciellt bra förbi två CPU-kärnor. Hittade också en bugg i boost scoped_lock.hpp, lär i praktiken aldrig trigga på x86 men kan mycket väl ställa till det på ARM

void unlock() { if (locked_) { mutex_.unlock(); locked_ = false; } }

Medlemmen locked_ tilldelas här utan synkronisering, finns därmed överhuvudtaget ingen garanti när eller ens om andra trådar kommer se denna tilldelning. Tilldelningen måste ske innan mutex_.unlock() då detta anrop är en s.k. release-barrier för den data som skyddas.

Ett enkelt att förstå race här är att tråd A kör mutex_.unlock() men inte nästa rad. Tråd B kör sedan

void lock() { if (!locked_) { mutex_.lock(); locked_ = true; } }

Uttrycket !locked_ är i det här läget falskt (locked_ är fortfarande true) så tråd B kör vidare i tron om att den håller låset fast det är A som gör det och den är precis på väg att släppa...

En annan väldigt kritisk fråga för att avgöra flaskhalsarna i designern är: ungefär hur lång tid tar en transaktion i genomsnitt och hur lång tid tar de i värsta fall?

Om arbetet som ska utföras per transaktion är relativt stort så fungerar det rätt OK att använda trådpooler och skydda saker mer mutexar.

Om jobbet per transaktion är väldigt litet och antal transaktioner är väldigt hög kommer skalbarheten över CPU-kärnor bli noll eller till och med negativ med en så pass simpel lösning. I det läget lär du ta till rätt annorlunda designer, boost::asio är då inte rätt metod utan du måste jobba direkt mot Linux epoll och hantera logiken så att transaktioner nästan aldrig byter tråd (svårt på Pi som helt saknar HW-stöd för att distribuera t.ex. nätverkstrafik över flera CPU-kärnor).

Care About Your Craft: Why spend your life developing software unless you care about doing it well? - The Pragmatic Programmer

Trädvy Permalänk
Medlem
Plats
Tomelilla
Registrerad
Okt 2001

@Yoshman: Jag valde shared_ptr just för enkelheten men jag vet, precis som du säger, att det inte är rätt väg att gå.
Tanken är att gå över till rena kopior istället då det handlar om max 10Kb per transaktion i absolut värsta fall.

Första projektet med asio så är mycket möjligt att jag missförstått det helt och hållet när det gäller distribution över flera kärnor..

boost::thread_group threadGroup; boost::asio::io_service asio; boost::asio::signal_set signals(asio, SIGINT, SIGTERM, SIGQUIT); signals.async_wait(boost::bind(&boost::asio::io_service::stop, &asio)); std::auto_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(asio)); for (int i = 0; i < thread_count; ++i) { threadGroup.create_thread([&]() { asio.run(); }); }

Tack för tipset angående scoped_lock då det är det mutex:et jag oftast använder!

Transaktioner tar allt mellan 2-3ms till >100ms beroende på vad som ska göras (på rpi).
Allt ifrån att sätta/läsa lite bitar till att logga saker till fil.
Alternativet är att differentiera hanteringen av transaktioner då jag alltid vet vilka tjänster som kommer att generera långsamma exekveringar. Frågan är om det ens är värt jobbet?

Är verkligen guld att få input från andra så bara ös på (om ni orkar)!

Asus Zenith Extreme, TR 1950x@3975MHz, 4x16GB@2933Mhz

If debugging is the process of removing bugs,
then programming must be the process of putting them in.
- Edsger W. Dijkstra

Trädvy Permalänk
Datavetare
Plats
Stockholm
Registrerad
Jun 2011
Skrivet av the_weird:

Transaktioner tar allt mellan 2-3ms till >100ms beroende på vad som ska göras (på rpi).
Allt ifrån att sätta/läsa lite bitar till att logga saker till fil.
Alternativet är att differentiera hanteringen av transaktioner då jag alltid vet vilka tjänster som kommer att generera långsamma exekveringar. Frågan är om det ens är värt jobbet?

Finns ju ett väldigt enkelt sätt att förbättra latensen och till viss del även förbättra skalbarhet över CPU-kärnor om du vet vilka transaktioner som tar lång tid.

Om det är en RPi2/3 så har du fyra kärnor. Vad du då kan göra är att skapa en boost::asio::io_service som specifikt hanterar de transaktionerna som tar lång tid, lämpligen ger du två till fyra trådar till poolen som hanterar denna io_service.

Sedan skapar du en separat boost::asio::io_service instans som drivs av en separat trådpool. Denna tar hand om de riktigt korta och latenskritiska jobben. Här tror jag du klarar dig nog med en tråd. Ingen anledningen att ha väldigt många trådar då jobben avklaras väldigt fort här, är en seriell del för att samordna delar och den kommer äta allt mer cykler ju fler trådar du stoppar in utan att göra något vettigt jobb.

Sist har du en tråd eller kanske två trådar (men tror det räcker med en, kan till och med var mer effektivt) vars enda uppgift är att vänta på inkommande händelser, klassificera dessa om det handlar om en långkörare eller ett latenskritiskt jobb. Beroende på klassificering postas jobbet till motsvarande boost::asio::io_service.

Anledningen till att använda separata boost::asio::io_service är dels för att långkörare inte ska blocka de latenskritiska jobben, men också för att det är separata jobbköer som skyddas av separata mutex:ar. Detta ökar skalbarheten, lär ändå inte vara linjärt till fyra kärnor men borde ge en OK skalning.

Care About Your Craft: Why spend your life developing software unless you care about doing it well? - The Pragmatic Programmer

Trädvy Permalänk
Medlem
Plats
Tomelilla
Registrerad
Okt 2001

@Yoshman: Perfekt!

Går hand i hand med vad jag tänkte mig.
Fördelen är att alla latenskrav jag har på mig sker i tjänster som har väldigt lite jobb att göra (rena kommunikationsenheter).
Handlar iprincip bara om att populera egna datatyper med lite maskning/skiftning och sen skicka vidare (eller ta emot och skicka ut).

Det som sen tar "lång" tid har förvisso också ett latenskrav men inte alls lika strikt utan där räcker
det med att det "upplevs som realtid" vilket ger mig ganska goda marginaler.

Bara att sätta mig vid ritbordet nu då och se vad jag kan få ihop! Tackar hjärtligt för hjälpen!

Asus Zenith Extreme, TR 1950x@3975MHz, 4x16GB@2933Mhz

If debugging is the process of removing bugs,
then programming must be the process of putting them in.
- Edsger W. Dijkstra

Trädvy Permalänk
Medlem
Registrerad
Sep 2013
Skrivet av Yoshman:

Hittade också en bugg i boost scoped_lock.hpp, lär i praktiken aldrig trigga på x86 men kan mycket väl ställa till det på ARM

void unlock() { if (locked_) { mutex_.unlock(); locked_ = false; } }

Medlemmen locked_ tilldelas här utan synkronisering, finns därmed överhuvudtaget ingen garanti när eller ens om andra trådar kommer se denna tilldelning. Tilldelningen måste ske innan mutex_.unlock() då detta anrop är en s.k. release-barrier för den data som skyddas.

Ett enkelt att förstå race här är att tråd A kör mutex_.unlock() men inte nästa rad. Tråd B kör sedan

void lock() { if (!locked_) { mutex_.lock(); locked_ = true; } }

Uttrycket !locked_ är i det här läget falskt (locked_ är fortfarande true) så tråd B kör vidare i tron om att den håller låset fast det är A som gör det och den är precis på väg att släppa...

Hej!
scoped_lock implementeringen ser inte fel ut tycker jag. Man använder ju denna i ett "scope", dvs som en temporär mekanism för att låsa själva mutex och inte lock i sig självt. Det enda "locked_" används för här är för att filtrera bort anrop från samma tråd, tex vid rekursiva anrop. Så scoped_lock körs alltid i en tråd, alltså i ditt exempel så har man en lock i varje tråd.

Trädvy Permalänk
Datavetare
Plats
Stockholm
Registrerad
Jun 2011
Skrivet av ludde.andersson:

Hej!
scoped_lock implementeringen ser inte fel ut tycker jag. Man använder ju denna i ett "scope", dvs som en temporär mekanism för att låsa själva mutex och inte lock i sig självt. Det enda "locked_" används för här är för att filtrera bort anrop från samma tråd, tex vid rekursiva anrop. Så scoped_lock körs alltid i en tråd, alltså i ditt exempel så har man en lock i varje tråd.

Sant!

När jag ögnade igenom koden för att se hur boost::asio::io_service fungerade tittade jag bara igenom scoped_lock som hastigast då det används i io_service. Har sett just buggen att man vill göra rekursiva mutex:ar med optimeringen att inte anropa lock/unlock m.h.a. en flagga som håller redan på om låset redan är taget, en sådan optimering måste se till att access av flaggan är korrekt synkroniserad.

I detta fall är scoped_lock endast tänkt att erbjuda RAII för en mutex i ett specifikt block. Dock nämner inte dokumentationen något om att scoped_lock måste hållas inom en specifik tråd och scoped_lock har "move semantics" så fullt möjligt att flytta över ägarskapet till ett nytt block som då kan vara en closure och därmed köras på en ny tråd (vilket har fler buggar då en mutex, till skillnad från semaforer, måste släppas av samma tråd som tog den). Men å andra sådan långt ifrån ett unikt sätt som C++ gör det möjligt att skjuta bort fötterna

Detta är inte är kritik mot C++, det är ett designval i C++ att alltid välja den bättre presterande vägen än den idiotsäkra vägen, ett val som gör C++ till ett av väldigt få praktiska alternativ i flera lägen.

Care About Your Craft: Why spend your life developing software unless you care about doing it well? - The Pragmatic Programmer