Aagaman Luitel

Producer Consumer Problem

Aagaman Luitel | words: 906 | 4 minutes |

I was given a presentation task on this topic and I found it super interesting. Prior to this I had never properly used multithreading in my programs so discovering this was super fun and insightful.

Producer Consumer Problem (Bounded buffer problem)

The concept is simple. There are two working entity (functions/threads) and one buffer.

  • producer -> produces data
  • consumer -> consumed data
  • buffer -> Common shared buffer/data

One simple example will be, suppose I have a stock analysis program, producer will generate stock values, profit and loss and so on and put theat generated value in the buffer, consumer takes that value and generates a real time graph. Thats it! Thats what producer and consumers are.

But…

On multithreading system implementing this simple architecture gives a problem. The problem might also happen in single threaded program but the margin of error is too low. Lets say its hard to get it wrong on single threaded program. But on multithreading, this is not a simple fix… well if you stick to this article its actually quite easy fix.

What happens in multithreading?

  • [M] producers
  • [N] consumers
  • buffer

There are M no of producers and N no of consumers where each producers and consumers will have a thread of its own. Even so they will act on a single buffer. Even if there are 100 threads, all of them will put value in the same buffer.

Now whats the problem in that? You might say

Suppose if one producer thread produces 1 byte of data and one consumer thread consumes 1 byte of data and the buffer size is just 10 bytes. What happens if we have 13 threads trying to put the value in the buffer? It will most likely give overflow. Similarly if we have nothing in the buffer and consumers are trying to consume then they will consume garbage value and probably show garbage result or crash on some instruction error.

Rules

To solve this we need to make some rules

  • on buffer full lock the producer threads
  • on buffer empty lock the consumer threads
  • mutually exclusive data using mutexes

Note: here are some definitions you need to know. Consulting wikipedia on this topics is preferred.

  • semaphore: semaphores are signals that can be used by multiple thread to request an access
  • mutex: mutex is like a unique key that can be used by only one thread

Solution

To solve this we need to use semaphores and mutex. More specifically 2 binary semaphores and 1 mutex is needed

  • empty semaphore[P]
    • tracks no of empty slots available in the buffer
    • producer [M] have to wait if there are no semaphores available
    • ideally we make P semaphores where P = sizeof(buffer)/sizeof(produced item)
  • full semaphore[0]
    • acts as a flag if buffer is empty or not empty
    • consumer [N] consumes if buffer is not empty i.e 1
  • mutex
    • does not allows other thread to access the buffer

Lets take a scenario where there are

  • 10 producers generates 1 byte
  • 10 consumers consumes 1 byte
  • buffer size is 5 bytes

In this case we make 5 semaphores since creating any more than that means there is a chance of buffer overflow. So in this case we create list of binary semaphore and call it empty_semaphore. Empty sempahore is taken by a thread and decreased and threads that don’t have the semaphore will wait until its available again. Similarly consusmers use full_semaphore. Full semaphore is just a flag that is set by producers to say there is a value at the buffer. Producers increment this flag to let consumers know its their turn. This just makes sure that consumers never act on an empty buffer. After one whole process of producing and consuming conusmers increment full_semaphore indicating one thread can now get access now that one thread work had already been consumed.

Here is a simple C++ program to demonstrate this. Try changing the return size to 1 byte (meaning we would gives semaphores to more thread).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <iostream>
#include <mutex>
#include <semaphore>
#include <thread>

#define BUFFER_SIZE 16
#define RETURN_SIZE 2

struct IntStorage {
  int buffer[BUFFER_SIZE];
  int index = 0;
  void insert(int c) {
    buffer[index] = c;
    index++;
  }
  int pop() {
    int c = buffer[index];
    index--;
    return c;
  }
};

IntStorage buffer;

std::binary_semaphore full_semaphore{0};
std::binary_semaphore empty_semaphore{BUFFER_SIZE / RETURN_SIZE};
std::mutex mutex;

// puts 2 byte data in the buffer
void producer(int c) {
  while (true) {
    // empty--
    empty_semaphore.acquire();
    mutex.lock();
    std::cout << "inserted: " << c << '\n';
    buffer.insert(c);
    buffer.insert(c);
    mutex.unlock();
    full_semaphore.release();
  }
}

// pops 2 bytes of data
void consumer() {
  while (true) {
    full_semaphore.acquire();
    mutex.lock();
    std::cout << "released: " << buffer.pop() << '\n';
    buffer.pop();
    mutex.unlock();
    empty_semaphore.release();
  }
}

int main() {
  std::thread producers[32];
  std::thread consumers[32];
  for (int i = 0; i < 32; ++i) {
    producers[i] = std::thread(producer, i);
  }
  for (int i = 0; i < 32; ++i) {
    consumers[i] = std::thread(consumer);
  }
  for (auto &thread : producers) {
    thread.join();
  }
  for (auto &thread : consumers) {
    thread.join();
  }
}

Compile and run

1
2
clang++ -std=c++20 pcp.cpp -o pcp
./pcp
Strong people don't put others down. They lift them up.