Skip to content

Commit 473f72a

Browse files
authored
Initial implementation of the package. (dart-archive/native_synchronization#2)
1 parent 5183e24 commit 473f72a

File tree

15 files changed

+657
-16
lines changed

15 files changed

+657
-16
lines changed

pkgs/native_synchronization/.github/workflows/dart.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ jobs:
3434
run: dart analyze --fatal-infos
3535

3636
test:
37+
needs: analyze
3738
runs-on: ${{ matrix.os }}
3839
strategy:
3940
matrix:

pkgs/native_synchronization/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,6 @@ build/
88
# Omit committing pubspec.lock for library packages; see
99
# https://dart.dev/guides/libraries/private-files#pubspeclock.
1010
pubspec.lock
11+
12+
# VSCode configuration files
13+
.vscode/
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
## 0.1.0-wip
22

33
- Initial version.
4+
- Expose `Mutex` and `ConditionVariable`
5+
- Implement `Mailbox`.

pkgs/native_synchronization/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
[![Dart](https://github.com/dart-lang/native_synchronization/actions/workflows/dart.yaml/badge.svg)](https://github.com/dart-lang/native_synchronization/actions/workflows/dart.yaml)
22

3-
Low level synchronization primitives built on dart:ffi.
3+
This package exposes a portable interface for low-level thread
4+
synchronization primitives like `Mutex` and `ConditionVariable`.
45

5-
## TODO: Projects docs
6-
7-
TODO: Add a brief project description here.
6+
It also provides some slightly more high-level synchronization primitives
7+
like `Mailbox` built on top of low-level primitives.
88

99
## Status: experimental
1010

1111
**NOTE**: This package is currently experimental and published under the
1212
[labs.dart.dev](https://dart.dev/dart-team-packages) pub publisher in order to
13-
solicit feedback.
13+
solicit feedback.
1414

1515
For packages in the labs.dart.dev publisher we generally plan to either graduate
1616
the package into a supported publisher (dart.dev, tools.dart.dev) after a period
1717
of feedback and iteration, or discontinue the package. These packages have a
1818
much higher expected rate of API and breaking changes.
1919

2020
Your feedback is valuable and will help us evolve this package. For general
21-
feedback, suggestions, and comments, please file an issue in the
21+
feedback, suggestions, and comments, please file an issue in the
2222
[bug tracker](https://github.com/dart-lang/native_synchronization/issues).
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:ffi';
6+
import 'dart:typed_data';
7+
8+
import 'package:ffi/ffi.dart';
9+
10+
import 'package:native_synchronization/primitives.dart';
11+
import 'package:native_synchronization/sendable.dart';
12+
13+
final class _MailboxRepr extends Struct {
14+
external Pointer<Uint8> buffer;
15+
16+
@Int32()
17+
external int bufferLength;
18+
19+
@Int32()
20+
external int state;
21+
}
22+
23+
class _SendableMailbox {
24+
final int address;
25+
final Sendable<Mutex> mutex;
26+
final Sendable<ConditionVariable> condVar;
27+
28+
_SendableMailbox(
29+
{required this.address, required this.mutex, required this.condVar});
30+
}
31+
32+
/// Mailbox communication primitive.
33+
///
34+
/// This synchronization primitive allows a single producer to send messages
35+
/// to one or more consumers. Producer uses [put] to place a message into
36+
/// a mailbox which consumers can then [take] out.
37+
///
38+
/// [Mailbox] object can not be directly sent to other isolates via a
39+
/// `SendPort`, but it can be converted to a `Sendable<Mailbox>` via
40+
/// `asSendable` getter.
41+
///
42+
/// [Mailbox] object is owned by an isolate which created them.
43+
class Mailbox {
44+
final Pointer<_MailboxRepr> _mailbox;
45+
final Mutex _mutex;
46+
final ConditionVariable _condVar;
47+
48+
static const _stateEmpty = 0;
49+
static const _stateFull = 1;
50+
51+
static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) {
52+
calloc.free(mailbox.ref.buffer);
53+
calloc.free(mailbox);
54+
});
55+
56+
Mailbox()
57+
: _mailbox = calloc.allocate(sizeOf<_MailboxRepr>()),
58+
_mutex = Mutex(),
59+
_condVar = ConditionVariable() {
60+
finalizer.attach(this, _mailbox);
61+
}
62+
63+
Mailbox._fromSendable(_SendableMailbox sendable)
64+
: _mailbox = Pointer.fromAddress(sendable.address),
65+
_mutex = sendable.mutex.materialize(),
66+
_condVar = sendable.condVar.materialize();
67+
68+
/// Place a message into the mailbox if has space for it.
69+
///
70+
/// If mailbox already contains a message then [put] will throw.
71+
void put(Uint8List message) {
72+
final buffer = message.isEmpty ? nullptr : _toBuffer(message);
73+
_mutex.runLocked(() {
74+
if (_mailbox.ref.state != _stateEmpty) {
75+
throw StateError('Mailbox is full');
76+
}
77+
78+
_mailbox.ref.state = _stateFull;
79+
_mailbox.ref.buffer = buffer;
80+
_mailbox.ref.bufferLength = message.length;
81+
82+
_condVar.notify();
83+
});
84+
}
85+
86+
/// Take a message from the mailbox.
87+
///
88+
/// If mailbox is empty this will synchronously block until message
89+
/// is available.
90+
Uint8List take() => _mutex.runLocked(() {
91+
while (_mailbox.ref.state != _stateFull) {
92+
_condVar.wait(_mutex);
93+
}
94+
95+
final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength);
96+
97+
_mailbox.ref.state = _stateEmpty;
98+
_mailbox.ref.buffer = nullptr;
99+
_mailbox.ref.bufferLength = 0;
100+
return result;
101+
});
102+
103+
static final _emptyResponse = Uint8List(0);
104+
105+
static Uint8List _toList(Pointer<Uint8> buffer, int length) {
106+
return length == 0
107+
? _emptyResponse
108+
// We have to ignore sdk_version_since warning due to dartbug.com/53142.
109+
// ignore: sdk_version_since
110+
: buffer.asTypedList(length, finalizer: malloc.nativeFree);
111+
}
112+
113+
static Pointer<Uint8> _toBuffer(Uint8List list) {
114+
final buffer = malloc.allocate<Uint8>(list.length);
115+
buffer.asTypedList(list.length).setRange(0, list.length, list);
116+
return buffer;
117+
}
118+
119+
Sendable<Mailbox> get asSendable => Sendable.wrap(
120+
Mailbox._fromSendable,
121+
_SendableMailbox(
122+
address: _mailbox.address,
123+
mutex: _mutex.asSendable,
124+
condVar: _condVar.asSendable));
125+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
part of 'primitives.dart';
6+
7+
class _PosixMutex extends Mutex {
8+
/// This is maximum value of `sizeof(pthread_mutex_t)` across all supported
9+
/// platforms.
10+
static const _sizeInBytes = 64;
11+
12+
final Pointer<pthread_mutex_t> _impl;
13+
14+
static final _finalizer = Finalizer<Pointer<pthread_mutex_t>>((ptr) {
15+
pthread_mutex_destroy(ptr);
16+
malloc.free(ptr);
17+
});
18+
19+
_PosixMutex()
20+
: _impl = malloc.allocate(_PosixMutex._sizeInBytes),
21+
super._() {
22+
if (pthread_mutex_init(_impl, nullptr) != 0) {
23+
malloc.free(_impl);
24+
throw StateError('Failed to initialize mutex');
25+
}
26+
_finalizer.attach(this, _impl);
27+
}
28+
29+
_PosixMutex.fromAddress(int address)
30+
: _impl = Pointer.fromAddress(address),
31+
super._();
32+
33+
@override
34+
void _lock() {
35+
if (pthread_mutex_lock(_impl) != 0) {
36+
throw StateError('Failed to lock mutex');
37+
}
38+
}
39+
40+
@override
41+
void _unlock() {
42+
if (pthread_mutex_unlock(_impl) != 0) {
43+
throw StateError('Failed to unlock mutex');
44+
}
45+
}
46+
47+
@override
48+
int get _address => _impl.address;
49+
}
50+
51+
class _PosixConditionVariable extends ConditionVariable {
52+
/// This is maximum value of `sizeof(pthread_cond_t)` across all supported
53+
/// platforms.
54+
static const _sizeInBytes = 64;
55+
56+
final Pointer<pthread_cond_t> _impl;
57+
58+
static final _finalizer = Finalizer<Pointer<pthread_cond_t>>((ptr) {
59+
pthread_cond_destroy(ptr);
60+
malloc.free(ptr);
61+
});
62+
63+
_PosixConditionVariable()
64+
: _impl = malloc.allocate(_PosixConditionVariable._sizeInBytes),
65+
super._() {
66+
if (pthread_cond_init(_impl, nullptr) != 0) {
67+
malloc.free(_impl);
68+
throw StateError('Failed to initialize condition variable');
69+
}
70+
_finalizer.attach(this, _impl);
71+
}
72+
73+
_PosixConditionVariable.fromAddress(int address)
74+
: _impl = Pointer.fromAddress(address),
75+
super._();
76+
77+
@override
78+
void notify() {
79+
if (pthread_cond_signal(_impl) != 0) {
80+
throw StateError('Failed to signal condition variable');
81+
}
82+
}
83+
84+
@override
85+
void wait(covariant _PosixMutex mutex) {
86+
if (pthread_cond_wait(_impl, mutex._impl) != 0) {
87+
throw StateError('Failed to wait on a condition variable');
88+
}
89+
}
90+
91+
@override
92+
int get _address => _impl.address;
93+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
/// This library contains native synchronization primitives such as [Mutex]
6+
/// and [ConditionVariable] implemented on top of low-level primitives
7+
/// provided by the OS.
8+
///
9+
/// See OS specific documentation for more details:
10+
///
11+
/// * POSIX man pages (Linux, Android, Mac OS X and iOS X)
12+
/// * `pthread_mutex_lock` and `pthread_mutex_unlock`,
13+
/// * `pthread_cond_wait` and `pthread_cond_signal`.
14+
/// * Windows
15+
/// * [Slim Reader/Writer (SRW) Locks](https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks),
16+
/// * [Condition Variables](https://learn.microsoft.com/en-us/windows/win32/sync/condition-variables),
17+
library;
18+
19+
import 'dart:ffi';
20+
import 'dart:io';
21+
22+
import 'package:ffi/ffi.dart';
23+
import 'package:native_synchronization/sendable.dart';
24+
25+
import 'package:native_synchronization/src/bindings/pthread.dart';
26+
import 'package:native_synchronization/src/bindings/winapi.dart';
27+
28+
part 'posix.dart';
29+
part 'windows.dart';
30+
31+
/// A *mutex* synchronization primitive.
32+
///
33+
/// Mutex can be used to synchronize access to a native resource shared between
34+
/// multiple threads.
35+
///
36+
/// [Mutex] object can not be directly sent to other isolates via a `SendPort`,
37+
/// but it can be converted to a `Sendable<Mutex>` via `asSendable` getter.
38+
///
39+
/// Mutex objects are owned by an isolate which created them.
40+
sealed class Mutex implements Finalizable {
41+
Mutex._();
42+
43+
factory Mutex() => Platform.isWindows ? _WindowsMutex() : _PosixMutex();
44+
45+
/// Acquire exclusive ownership of this mutex.
46+
///
47+
/// If this mutex is already acquired then an attempt to acquire it
48+
/// blocks the current thread until the mutex is released by the
49+
/// current owner.
50+
///
51+
/// **Warning**: attempting to hold a mutex across asynchronous suspension
52+
/// points will lead to undefined behavior and potentially crashes.
53+
void _lock();
54+
55+
/// Release exclusive ownership of this mutex.
56+
///
57+
/// It is an error to release ownership of the mutex if it was not
58+
/// previously acquired.
59+
void _unlock();
60+
61+
/// Run the given synchronous `action` under a mutex.
62+
///
63+
/// This function takes exclusive ownership of the mutex, executes `action`
64+
/// and then releases the mutex. It returns the value returned by `action`.
65+
///
66+
/// **Warning**: you can't combine `runLocked` with an asynchronous code.
67+
R runLocked<R>(R Function() action) {
68+
_lock();
69+
try {
70+
return action();
71+
} finally {
72+
_unlock();
73+
}
74+
}
75+
76+
Sendable<Mutex> get asSendable => Sendable.wrap(
77+
Platform.isWindows ? _WindowsMutex.fromAddress : _PosixMutex.fromAddress,
78+
_address);
79+
80+
int get _address;
81+
}
82+
83+
/// A *condition variable* synchronization primitive.
84+
///
85+
/// Condition variable can be used to synchronously wait for a condition to
86+
/// occur.
87+
///
88+
/// [ConditionVariable] object can not be directly sent to other isolates via a
89+
/// `SendPort`, but it can be converted to a `Sendable<ConditionVariable>`
90+
/// object via [asSendable] getter.
91+
///
92+
/// [ConditionVariable] objects are owned by an isolate which created them.
93+
sealed class ConditionVariable implements Finalizable {
94+
ConditionVariable._();
95+
96+
factory ConditionVariable() => Platform.isWindows
97+
? _WindowsConditionVariable()
98+
: _PosixConditionVariable();
99+
100+
/// Block and wait until another thread calls [notify].
101+
///
102+
/// `mutex` must be a [Mutex] object exclusively held by the current thread.
103+
/// It will be released and the thread will block until another thread
104+
/// calls [notify].
105+
void wait(Mutex mutex);
106+
107+
/// Wake up at least one thread waiting on this condition variable.
108+
void notify();
109+
110+
Sendable<ConditionVariable> get asSendable => Sendable.wrap(
111+
Platform.isWindows
112+
? _WindowsConditionVariable.fromAddress
113+
: _PosixConditionVariable.fromAddress,
114+
_address);
115+
116+
int get _address;
117+
}

pkgs/native_synchronization/lib/sample.dart

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)