← run

go-04-map-concurrent

1.000
5/5 tests· concurrency
Challenge · difficulty 4/5
# Concurrent ordered map

Implement **`solution.go`** in `package challenge` exporting:

```go
func MapConcurrent(inputs []int, workers int, fn func(int) int) []int
```

Apply `fn` to every element of `inputs` using a pool of **at most `workers`**
goroutines running concurrently, and return the results.

Rules:

- The returned slice has the **same length** as `inputs`, and `result[i]` is
  `fn(inputs[i])` — i.e. results are in the **same order as the inputs**,
  regardless of the order goroutines finish in.
- Work must be distributed across concurrent goroutines (use goroutines plus
  channels and/or `sync.WaitGroup`). No more than `workers` goroutines may be
  processing elements at the same time.
- `workers` may be larger than `len(inputs)`; never start more workers than
  there is work for, and never start fewer than one when there is work.
- If `inputs` is empty (or `nil`), return an empty (non-nil is fine, but it must
  have length 0) slice without starting any work.
- You may assume `workers >= 1` and that `fn` is safe to call concurrently
  (it does not share mutable state).

Examples:

- `MapConcurrent([]int{1, 2, 3}, 2, func(x int) int { return x * x })` → `[]int{1, 4, 9}`
- `MapConcurrent([]int{}, 4, fn)` → `[]int{}`
- `MapConcurrent([]int{5}, 8, func(x int) int { return x + 1 })` → `[]int{6}`
tests/solution_test.go
package challenge

import (
	"reflect"
	"sync"
	"sync/atomic"
	"testing"
)

func TestMapConcurrentMatchesSequential(t *testing.T) {
	fn := func(x int) int { return x*x + 1 }
	cases := []struct {
		name    string
		inputs  []int
		workers int
	}{
		{"empty", []int{}, 4},
		{"nil", nil, 4},
		{"single", []int{7}, 1},
		{"single many workers", []int{7}, 16},
		{"workers one", []int{1, 2, 3, 4, 5}, 1},
		{"workers equal len", []int{1, 2, 3, 4}, 4},
		{"workers gt len", []int{1, 2, 3}, 10},
		{"larger", []int{9, 8, 7, 6, 5, 4, 3, 2, 1, 0, -1, -2}, 3},
		{"negatives", []int{-5, -4, -3, -2, -1}, 2},
	}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			want := make([]int, len(c.inputs))
			for i, v := range c.inputs {
				want[i] = fn(v)
			}
			got := MapConcurrent(c.inputs, c.workers, fn)
			if len(got) != len(c.inputs) {
				t.Fatalf("len = %d, want %d", len(got), len(c.inputs))
			}
			if !reflect.DeepEqual(got, want) {
				t.Errorf("MapConcurrent(%v, %d) = %v, want %v", c.inputs, c.workers, got, want)
			}
		})
	}
}

func TestMapConcurrentOrderPreserved(t *testing.T) {
	// Identity maps input value to output position; any reordering would be caught.
	inputs := make([]int, 200)
	for i := range inputs {
		inputs[i] = i * 3
	}
	got := MapConcurrent(inputs, 7, func(x int) int { return x })
	for i, v := range inputs {
		if got[i] != v {
			t.Fatalf("index %d = %d, want %d (order not preserved)", i, got[i], v)
		}
	}
}

func TestMapConcurrentEmptyReturnsLenZero(t *testing.T) {
	got := MapConcurrent(nil, 4, func(x int) int { return x })
	if len(got) != 0 {
		t.Fatalf("len = %d, want 0", len(got))
	}
}

func TestMapConcurrentRespectsWorkerLimit(t *testing.T) {
	inputs := make([]int, 100)
	for i := range inputs {
		inputs[i] = i
	}
	const limit = 4

	var mu sync.Mutex
	cond := sync.NewCond(&mu)
	var active int64
	var maxActive int64
	var arrived int

	// fn blocks until either `limit` callers are concurrently in flight or no more
	// callers can arrive. This forces real concurrency while bounding it at `limit`:
	// if the implementation ran fewer than `limit` workers it would deadlock-stall,
	// and any run above `limit` would be recorded in maxActive.
	fn := func(x int) int {
		cur := atomic.AddInt64(&active, 1)
		for {
			m := atomic.LoadInt64(&maxActive)
			if cur <= m || atomic.CompareAndSwapInt64(&maxActive, m, cur) {
				break
			}
		}
		mu.Lock()
		arrived++
		if arrived >= limit {
			cond.Broadcast()
		}
		for arrived < limit {
			cond.Wait()
		}
		mu.Unlock()
		atomic.AddInt64(&active, -1)
		return x * 2
	}

	got := MapConcurrent(inputs, limit, fn)
	if m := atomic.LoadInt64(&maxActive); m > limit {
		t.Fatalf("observed %d concurrent workers, limit was %d", m, limit)
	}
	if atomic.LoadInt64(&maxActive) < 2 {
		t.Fatalf("no real concurrency observed (maxActive=%d)", maxActive)
	}
	for i := range inputs {
		if got[i] != inputs[i]*2 {
			t.Fatalf("index %d = %d, want %d", i, got[i], inputs[i]*2)
		}
	}
}

func TestMapConcurrentCallsFnOncePerElement(t *testing.T) {
	inputs := []int{10, 20, 30, 40, 50, 60}
	var calls int64
	got := MapConcurrent(inputs, 3, func(x int) int {
		atomic.AddInt64(&calls, 1)
		return x + 1
	})
	if calls != int64(len(inputs)) {
		t.Fatalf("fn called %d times, want %d", calls, len(inputs))
	}
	for i, v := range inputs {
		if got[i] != v+1 {
			t.Errorf("index %d = %d, want %d", i, got[i], v+1)
		}
	}
}
System prompt
You are an expert programmer. Solve the task exactly as specified. Output your solution as fenced code blocks using the required file name(s) and the exact function/type signatures requested. Prefer correctness; do not include prose outside code unless asked.
Proposed solution
```go
package challenge

import "sync"

func MapConcurrent(inputs []int, workers int, fn func(int) int) []int {
	n := len(inputs)
	if n == 0 {
		return []int{}
	}

	result := make([]int, n)
	work := make(chan struct {
		index int
		value int
	}, n)

	var wg sync.WaitGroup
	sem := make(chan struct{}, workers)

	for i := 0; i < workers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for item := range work {
				sem <- struct{}{}
				result[item.index] = fn(item.value)
				<-sem
			}
		}()
	}

	for i, val := range inputs {
		work <- struct {
			index int
			value int
		}{index: i, value: val}
	}
	close(work)

	wg.Wait()

	return result
}
```

Test output (stdout)

:"2026-07-02T04:34:29.9825459+02:00","Action":"run","Package":"challenge","Test":"TestMapConcurrentMatchesSequential/negatives"}
{"Time":"2026-07-02T04:34:29.98254745+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentMatchesSequential/negatives","Output":"=== RUN   TestMapConcurrentMatchesSequential/negatives\n"}
{"Time":"2026-07-02T04:34:29.98261551+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentMatchesSequential/negatives","Output":"--- PASS: TestMapConcurrentMatchesSequential/negatives (0.00s)\n"}
{"Time":"2026-07-02T04:34:29.98262111+02:00","Action":"pass","Package":"challenge","Test":"TestMapConcurrentMatchesSequential/negatives","Elapsed":0}
{"Time":"2026-07-02T04:34:29.98262368+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentMatchesSequential","Output":"--- PASS: TestMapConcurrentMatchesSequential (0.00s)\n"}
{"Time":"2026-07-02T04:34:29.98262556+02:00","Action":"pass","Package":"challenge","Test":"TestMapConcurrentMatchesSequential","Elapsed":0}
{"Time":"2026-07-02T04:34:29.98262784+02:00","Action":"run","Package":"challenge","Test":"TestMapConcurrentOrderPreserved"}
{"Time":"2026-07-02T04:34:29.98262923+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentOrderPreserved","Output":"=== RUN   TestMapConcurrentOrderPreserved\n"}
{"Time":"2026-07-02T04:34:29.98266731+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentOrderPreserved","Output":"--- PASS: TestMapConcurrentOrderPreserved (0.00s)\n"}
{"Time":"2026-07-02T04:34:29.98266961+02:00","Action":"pass","Package":"challenge","Test":"TestMapConcurrentOrderPreserved","Elapsed":0}
{"Time":"2026-07-02T04:34:29.98267141+02:00","Action":"run","Package":"challenge","Test":"TestMapConcurrentEmptyReturnsLenZero"}
{"Time":"2026-07-02T04:34:29.98267295+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentEmptyReturnsLenZero","Output":"=== RUN   TestMapConcurrentEmptyReturnsLenZero\n"}
{"Time":"2026-07-02T04:34:29.98267529+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentEmptyReturnsLenZero","Output":"--- PASS: TestMapConcurrentEmptyReturnsLenZero (0.00s)\n"}
{"Time":"2026-07-02T04:34:29.98267713+02:00","Action":"pass","Package":"challenge","Test":"TestMapConcurrentEmptyReturnsLenZero","Elapsed":0}
{"Time":"2026-07-02T04:34:29.9826789+02:00","Action":"run","Package":"challenge","Test":"TestMapConcurrentRespectsWorkerLimit"}
{"Time":"2026-07-02T04:34:29.98268078+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentRespectsWorkerLimit","Output":"=== RUN   TestMapConcurrentRespectsWorkerLimit\n"}
{"Time":"2026-07-02T04:34:29.98269504+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentRespectsWorkerLimit","Output":"--- PASS: TestMapConcurrentRespectsWorkerLimit (0.00s)\n"}
{"Time":"2026-07-02T04:34:29.9826969+02:00","Action":"pass","Package":"challenge","Test":"TestMapConcurrentRespectsWorkerLimit","Elapsed":0}
{"Time":"2026-07-02T04:34:29.98269862+02:00","Action":"run","Package":"challenge","Test":"TestMapConcurrentCallsFnOncePerElement"}
{"Time":"2026-07-02T04:34:29.98270013+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentCallsFnOncePerElement","Output":"=== RUN   TestMapConcurrentCallsFnOncePerElement\n"}
{"Time":"2026-07-02T04:34:29.98270694+02:00","Action":"output","Package":"challenge","Test":"TestMapConcurrentCallsFnOncePerElement","Output":"--- PASS: TestMapConcurrentCallsFnOncePerElement (0.00s)\n"}
{"Time":"2026-07-02T04:34:29.98270897+02:00","Action":"pass","Package":"challenge","Test":"TestMapConcurrentCallsFnOncePerElement","Elapsed":0}
{"Time":"2026-07-02T04:34:29.98271142+02:00","Action":"output","Package":"challenge","Output":"PASS\n"}
{"Time":"2026-07-02T04:34:29.982782349+02:00","Action":"output","Package":"challenge","Output":"ok  \tchallenge\t0.001s\n"}
{"Time":"2026-07-02T04:34:29.982795179+02:00","Action":"pass","Package":"challenge","Elapsed":0.001}
Reasoning (chain-of-thought)
The task is to implement a concurrent map function in Go that:
1. Takes a slice of integers, number of workers, and a function
2. Applies the function to each element concurrently
3. Returns results in the same order as inputs
4. Uses at most `workers` goroutines at a time
5. Handles edge cases (empty input, workers > len(inputs))

Key requirements:
- Results must be in the same order as inputs
- At most `workers` goroutines running concurrently
- Handle empty/nil inputs
- Workers >= 1 when there's work

Approach:
1. Create a result slice of the same length as inputs
2. Use a worker pool pattern with channels
3. Use a semaphore (channel with workers capacity) to limit concurrent workers
4. Use a WaitGroup to track when all work is done
5. Send each input to a work channel, receive results and place them in the correct position

Let me think about the implementation:

```go
func MapConcurrent(inputs []int, workers int, fn func(int) int) []int {
    n := len(inputs)
    if n == 0 {
        return []int{}
    }
    
    result := make([]int, n)
    work := make(chan int, n)
    done := make(chan struct{}, n)
    
    // Semaphore to limit concurrent workers
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for val := range work {
                result[val] = fn(val)
                done <- struct{}{}
            }
        }()
    }
    
    // Send all work
    for i, val := range inputs {
        work <- val
    }
    close(work)
    
    // Wait for all workers to finish
    wg.Wait()
    
    return result
}
```

Wait, there's an issue with this approach. The problem is that I'm using `result[val] = fn(val)` but `val` is the input value, not the index. I need to track the index.

Let me reconsider:

```go
func MapConcurrent(inputs []int, workers int, fn func(int) int) []int {
    n := len(inputs)
    if n == 0 {
        return []int{}
    }
    
    result := make([]int, n)
    work := make(chan int, n)
    done := make(chan struct{}, n)
    
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for val := range work {
                // We need to know the index, not just the value
                // So we should send (index, value) pairs
            }
        }()
    }
}
```

I need to send both the index and value to preserve order. Let me use a struct or send two values.

Actually, I can send the index along with the value:

```go
type workItem struct {
    index int
    value int
}
```

Or I can use two channels, but that's more complex. Let me use a struct:

```go
func MapConcurrent(inputs []int, workers int, fn func(int) int) []int {
    n := len(inputs)
    if n == 0 {
        return []int{}
    }
    
    result := make([]int, n)
    work := make(chan workItem, n)
    done := make(chan struct{}, n)
    
    var wg sync.WaitGroup
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for item := range work {
                result[item.index] = fn(item.value)
                done <- struct{}{}
            }
        }()
    }
    
    for i, val := range inputs {
        work <- workItem{index: i, value: val}
    }
    close(work)
    
    wg.Wait()
    
    return result
}
```

I'm tracking the index to ensure results are placed in the correct position. The semaphore pattern limits concurrent workers, and the WaitGroup ensures all goroutines complete before returning.

The key is maintaining order while processing concurrently. By sending index-value pairs, I can place results in the correct slice position regardless of goroutine completion order.

I'll use a work channel with a semaphore to control concurrent processing. The semaphore ensures we don't exceed the worker limit, and the WaitGroup tracks goroutine completion.

The implementation handles edge cases like empty inputs and ensures results are returned in the original input order. Each worker processes items from the work channel, applying the function and storing results at the correct index.

The semaphore pattern with a buffered channel limits concurrent workers, preventing resource exhaustion while maintaining parallel processing efficiency.

I'll create a work channel to distribute tasks, ensuring each goroutine processes items sequentially. The semaphore controls the number of active workers, preventing overwhelming the system.

The implementation uses a WaitGroup to track goroutine completion, ensuring all work finishes before returning results. By sending work items with their original indices, I can preserve the input order in the final output slice.

The key is balancing concurrent processing with maintaining result order, using a controlled worker pool to process inputs efficiently.