diff --git a/.github/workflows/native_synchronization.yml b/.github/workflows/native_synchronization.yml new file mode 100644 index 00000000..49051065 --- /dev/null +++ b/.github/workflows/native_synchronization.yml @@ -0,0 +1,66 @@ +name: package:native_synchronization +permissions: read-all + +on: + pull_request: + branches: [ main ] + paths: + - '.github/workflows/native_synchronization.yml' + - 'pkgs/native_synchronization/**' + push: + branches: [ main ] + paths: + - '.github/workflows/native_synchronization.yml' + - 'pkgs/native_synchronization/**' + schedule: + - cron: '0 0 * * 0' # weekly + +jobs: + analyze: + runs-on: ${{ matrix.os }} + defaults: + run: + working-directory: pkgs/native_synchronization + strategy: + matrix: + os: [ubuntu-latest] + sdk: [dev, stable] + + steps: + # These are the latest versions of the github actions; dependabot will + # send PRs to keep these up-to-date. + - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 + - uses: dart-lang/setup-dart@0a8a0fc875eb934c15d08629302413c671d3f672 + with: + sdk: ${{ matrix.sdk }} + + - name: Install dependencies + run: dart pub get + + - name: Verify formatting + run: dart format --output=none --set-exit-if-changed . + + - name: Analyze project source + run: dart analyze --fatal-infos + + test: + needs: analyze + runs-on: ${{ matrix.os }} + defaults: + run: + working-directory: pkgs/native_synchronization + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + sdk: [dev, stable] + steps: + - uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 + - uses: dart-lang/setup-dart@0a8a0fc875eb934c15d08629302413c671d3f672 + with: + sdk: ${{ matrix.sdk }} + + - name: Install dependencies + run: dart pub get + + - name: Run tests + run: dart test diff --git a/pkgs/native_synchronization/.gitignore b/pkgs/native_synchronization/.gitignore new file mode 100644 index 00000000..52160c45 --- /dev/null +++ b/pkgs/native_synchronization/.gitignore @@ -0,0 +1,13 @@ +# Files and directories created by pub. +.dart_tool/ +.packages + +# Conventional directory for build outputs. +build/ + +# Omit committing pubspec.lock for library packages; see +# https://dart.dev/guides/libraries/private-files#pubspeclock. +pubspec.lock + +# VSCode configuration files +.vscode/ diff --git a/pkgs/native_synchronization/CHANGELOG.md b/pkgs/native_synchronization/CHANGELOG.md new file mode 100644 index 00000000..7a52326a --- /dev/null +++ b/pkgs/native_synchronization/CHANGELOG.md @@ -0,0 +1,13 @@ +## 0.3.0 + +- Add a closed state to `Mailbox`. + +## 0.2.0 + +- Lower SDK lower bound to 3.0.0. + +## 0.1.0 + +- Initial version. +- Expose `Mutex` and `ConditionVariable` +- Implement `Mailbox`. diff --git a/pkgs/native_synchronization/LICENSE b/pkgs/native_synchronization/LICENSE new file mode 100644 index 00000000..4fd5739c --- /dev/null +++ b/pkgs/native_synchronization/LICENSE @@ -0,0 +1,27 @@ +Copyright 2023, the Dart project authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + * Neither the name of Google LLC nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/pkgs/native_synchronization/README.md b/pkgs/native_synchronization/README.md new file mode 100644 index 00000000..0c43bf57 --- /dev/null +++ b/pkgs/native_synchronization/README.md @@ -0,0 +1,24 @@ +[![package:native_synchronization](https://github.com/dart-lang/labs/actions/workflows/native_synchronization.yml/badge.svg)](https://github.com/dart-lang/labs/actions/workflows/native_synchronization.yml) +[![pub package](https://img.shields.io/pub/v/native_synchronization.svg)](https://pub.dev/packages/native_synchronization) +[![package publisher](https://img.shields.io/pub/publisher/native_synchronization.svg)](https://pub.dev/packages/native_synchronization/publisher) + +This package exposes a portable interface for low-level thread +synchronization primitives like `Mutex` and `ConditionVariable`. + +It also provides some slightly more high-level synchronization primitives +like `Mailbox` built on top of low-level primitives. + +## Status: experimental + +**NOTE**: This package is currently experimental and published under the +[labs.dart.dev](https://dart.dev/dart-team-packages) pub publisher in order to +solicit feedback. + +For packages in the labs.dart.dev publisher we generally plan to either graduate +the package into a supported publisher (dart.dev, tools.dart.dev) after a period +of feedback and iteration, or discontinue the package. These packages have a +much higher expected rate of API and breaking changes. + +Your feedback is valuable and will help us evolve this package. For general +feedback, suggestions, and comments, please file an issue in the +[bug tracker](https://github.com/dart-lang/native_synchronization/issues). diff --git a/pkgs/native_synchronization/analysis_options.yaml b/pkgs/native_synchronization/analysis_options.yaml new file mode 100644 index 00000000..d978f811 --- /dev/null +++ b/pkgs/native_synchronization/analysis_options.yaml @@ -0,0 +1 @@ +include: package:dart_flutter_team_lints/analysis_options.yaml diff --git a/pkgs/native_synchronization/lib/mailbox.dart b/pkgs/native_synchronization/lib/mailbox.dart new file mode 100644 index 00000000..57cb9975 --- /dev/null +++ b/pkgs/native_synchronization/lib/mailbox.dart @@ -0,0 +1,158 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:ffi'; +import 'dart:typed_data'; + +import 'package:ffi/ffi.dart'; + +import 'package:native_synchronization/primitives.dart'; +import 'package:native_synchronization/sendable.dart'; + +final class _MailboxRepr extends Struct { + external Pointer buffer; + + @Int32() + external int bufferLength; + + @Int32() + external int state; +} + +class _SendableMailbox { + final int address; + final Sendable mutex; + final Sendable condVar; + + _SendableMailbox( + {required this.address, required this.mutex, required this.condVar}); +} + +/// Mailbox communication primitive. +/// +/// This synchronization primitive allows a single producer to send messages +/// to one or more consumers. Producer uses [put] to place a message into +/// a mailbox which consumers can then [take] out. +/// +/// [Mailbox] object can not be directly sent to other isolates via a +/// `SendPort`, but it can be converted to a `Sendable` via +/// `asSendable` getter. +/// +/// [Mailbox] object is owned by an isolate which created them. +class Mailbox { + final Pointer<_MailboxRepr> _mailbox; + final Mutex _mutex; + final ConditionVariable _condVar; + + static const _stateEmpty = 0; + static const _stateFull = 1; + static const _stateClosed = 2; + + static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) { + calloc.free(mailbox.ref.buffer); + calloc.free(mailbox); + }); + + Mailbox() + : _mailbox = calloc.allocate(sizeOf<_MailboxRepr>()), + _mutex = Mutex(), + _condVar = ConditionVariable() { + finalizer.attach(this, _mailbox); + } + + Mailbox._fromSendable(_SendableMailbox sendable) + : _mailbox = Pointer.fromAddress(sendable.address), + _mutex = sendable.mutex.materialize(), + _condVar = sendable.condVar.materialize(); + + /// Place a message into the mailbox if has space for it. + /// + /// If mailbox already contains a message or mailbox is closed then [put] will + /// throw [StateError]. + void put(Uint8List message) { + final buffer = message.isEmpty ? nullptr : _toBuffer(message); + _mutex.runLocked(() { + if (_mailbox.ref.state != _stateEmpty) { + throw StateError('Mailbox is closed or full'); + } + + _mailbox.ref.state = _stateFull; + _mailbox.ref.buffer = buffer; + _mailbox.ref.bufferLength = message.length; + + _condVar.notify(); + }); + } + + /// Close a mailbox. + /// + /// If mailbox already contains a message then [close] will drop the message. + void close() => _mutex.runLocked(() { + if (_mailbox.ref.state == _stateFull && _mailbox.ref.bufferLength > 0) { + malloc.free(_mailbox.ref.buffer); + } + + _mailbox.ref.state = _stateClosed; + _mailbox.ref.buffer = nullptr; + _mailbox.ref.bufferLength = 0; + + _condVar.notify(); + }); + + /// Take a message from the mailbox. + /// + /// If mailbox is empty then [take] will synchronously block until message + /// is available or mailbox is closed. If mailbox is closed then [take] will + /// throw [StateError]. + Uint8List take() => _mutex.runLocked(() { + while (_mailbox.ref.state == _stateEmpty) { + _condVar.wait(_mutex); + } + + if (_mailbox.ref.state == _stateClosed) { + throw StateError('Mailbox is closed'); + } + + final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength); + + _mailbox.ref.state = _stateEmpty; + _mailbox.ref.buffer = nullptr; + _mailbox.ref.bufferLength = 0; + return result; + }); + + static final _emptyResponse = Uint8List(0); + + static Uint8List _toList(Pointer buffer, int length) { + if (length == 0) { + return _emptyResponse; + } + + // TODO: remove feature detection once 3.1 becomes stable. + // ignore: omit_local_variable_types + final Uint8List Function(int) asTypedList = buffer.asTypedList; + if (asTypedList is Uint8List Function(int, + {Pointer finalizer})) { + return asTypedList(length, finalizer: malloc.nativeFree); + } + + final result = Uint8List(length); + result.setRange(0, length, buffer.asTypedList(length)); + malloc.free(buffer); + return result; + } + + static Pointer _toBuffer(Uint8List list) { + final buffer = malloc.allocate(list.length); + buffer.asTypedList(list.length).setRange(0, list.length, list); + return buffer; + } + + Sendable get asSendable => Sendable.wrap( + Mailbox._fromSendable, + _SendableMailbox( + address: _mailbox.address, + mutex: _mutex.asSendable, + condVar: _condVar.asSendable)); +} diff --git a/pkgs/native_synchronization/lib/posix.dart b/pkgs/native_synchronization/lib/posix.dart new file mode 100644 index 00000000..b59d7ae0 --- /dev/null +++ b/pkgs/native_synchronization/lib/posix.dart @@ -0,0 +1,93 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of 'primitives.dart'; + +class _PosixMutex extends Mutex { + /// This is maximum value of `sizeof(pthread_mutex_t)` across all supported + /// platforms. + static const _sizeInBytes = 64; + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + pthread_mutex_destroy(ptr); + malloc.free(ptr); + }); + + _PosixMutex() + : _impl = malloc.allocate(_PosixMutex._sizeInBytes), + super._() { + if (pthread_mutex_init(_impl, nullptr) != 0) { + malloc.free(_impl); + throw StateError('Failed to initialize mutex'); + } + _finalizer.attach(this, _impl); + } + + _PosixMutex.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void _lock() { + if (pthread_mutex_lock(_impl) != 0) { + throw StateError('Failed to lock mutex'); + } + } + + @override + void _unlock() { + if (pthread_mutex_unlock(_impl) != 0) { + throw StateError('Failed to unlock mutex'); + } + } + + @override + int get _address => _impl.address; +} + +class _PosixConditionVariable extends ConditionVariable { + /// This is maximum value of `sizeof(pthread_cond_t)` across all supported + /// platforms. + static const _sizeInBytes = 64; + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + pthread_cond_destroy(ptr); + malloc.free(ptr); + }); + + _PosixConditionVariable() + : _impl = malloc.allocate(_PosixConditionVariable._sizeInBytes), + super._() { + if (pthread_cond_init(_impl, nullptr) != 0) { + malloc.free(_impl); + throw StateError('Failed to initialize condition variable'); + } + _finalizer.attach(this, _impl); + } + + _PosixConditionVariable.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void notify() { + if (pthread_cond_signal(_impl) != 0) { + throw StateError('Failed to signal condition variable'); + } + } + + @override + void wait(covariant _PosixMutex mutex) { + if (pthread_cond_wait(_impl, mutex._impl) != 0) { + throw StateError('Failed to wait on a condition variable'); + } + } + + @override + int get _address => _impl.address; +} diff --git a/pkgs/native_synchronization/lib/primitives.dart b/pkgs/native_synchronization/lib/primitives.dart new file mode 100644 index 00000000..c792dd96 --- /dev/null +++ b/pkgs/native_synchronization/lib/primitives.dart @@ -0,0 +1,117 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +/// This library contains native synchronization primitives such as [Mutex] +/// and [ConditionVariable] implemented on top of low-level primitives +/// provided by the OS. +/// +/// See OS specific documentation for more details: +/// +/// * POSIX man pages (Linux, Android, Mac OS X and iOS X) +/// * `pthread_mutex_lock` and `pthread_mutex_unlock`, +/// * `pthread_cond_wait` and `pthread_cond_signal`. +/// * Windows +/// * [Slim Reader/Writer (SRW) Locks](https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks), +/// * [Condition Variables](https://learn.microsoft.com/en-us/windows/win32/sync/condition-variables), +library; + +import 'dart:ffi'; +import 'dart:io'; + +import 'package:ffi/ffi.dart'; +import 'package:native_synchronization/sendable.dart'; + +import 'package:native_synchronization/src/bindings/pthread.dart'; +import 'package:native_synchronization/src/bindings/winapi.dart'; + +part 'posix.dart'; +part 'windows.dart'; + +/// A *mutex* synchronization primitive. +/// +/// Mutex can be used to synchronize access to a native resource shared between +/// multiple threads. +/// +/// [Mutex] object can not be directly sent to other isolates via a `SendPort`, +/// but it can be converted to a `Sendable` via `asSendable` getter. +/// +/// Mutex objects are owned by an isolate which created them. +sealed class Mutex implements Finalizable { + Mutex._(); + + factory Mutex() => Platform.isWindows ? _WindowsMutex() : _PosixMutex(); + + /// Acquire exclusive ownership of this mutex. + /// + /// If this mutex is already acquired then an attempt to acquire it + /// blocks the current thread until the mutex is released by the + /// current owner. + /// + /// **Warning**: attempting to hold a mutex across asynchronous suspension + /// points will lead to undefined behavior and potentially crashes. + void _lock(); + + /// Release exclusive ownership of this mutex. + /// + /// It is an error to release ownership of the mutex if it was not + /// previously acquired. + void _unlock(); + + /// Run the given synchronous `action` under a mutex. + /// + /// This function takes exclusive ownership of the mutex, executes `action` + /// and then releases the mutex. It returns the value returned by `action`. + /// + /// **Warning**: you can't combine `runLocked` with an asynchronous code. + R runLocked(R Function() action) { + _lock(); + try { + return action(); + } finally { + _unlock(); + } + } + + Sendable get asSendable => Sendable.wrap( + Platform.isWindows ? _WindowsMutex.fromAddress : _PosixMutex.fromAddress, + _address); + + int get _address; +} + +/// A *condition variable* synchronization primitive. +/// +/// Condition variable can be used to synchronously wait for a condition to +/// occur. +/// +/// [ConditionVariable] object can not be directly sent to other isolates via a +/// `SendPort`, but it can be converted to a `Sendable` +/// object via [asSendable] getter. +/// +/// [ConditionVariable] objects are owned by an isolate which created them. +sealed class ConditionVariable implements Finalizable { + ConditionVariable._(); + + factory ConditionVariable() => Platform.isWindows + ? _WindowsConditionVariable() + : _PosixConditionVariable(); + + /// Block and wait until another thread calls [notify]. + /// + /// `mutex` must be a [Mutex] object exclusively held by the current thread. + /// It will be released and the thread will block until another thread + /// calls [notify]. + void wait(Mutex mutex); + + /// Wake up at least one thread waiting on this condition variable. + void notify(); + + Sendable get asSendable => Sendable.wrap( + Platform.isWindows + ? _WindowsConditionVariable.fromAddress + : _PosixConditionVariable.fromAddress, + _address); + + int get _address; +} diff --git a/pkgs/native_synchronization/lib/sendable.dart b/pkgs/native_synchronization/lib/sendable.dart new file mode 100644 index 00000000..6afba3d8 --- /dev/null +++ b/pkgs/native_synchronization/lib/sendable.dart @@ -0,0 +1,21 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +abstract final class Sendable { + static Sendable wrap(T Function(U) make, U data) { + return _SendableImpl._(make, data); + } + + T materialize(); +} + +final class _SendableImpl implements Sendable { + final U _data; + final T Function(U v) _make; + + _SendableImpl._(this._make, this._data); + + @override + T materialize() => _make(_data); +} diff --git a/pkgs/native_synchronization/lib/src/bindings/pthread.dart b/pkgs/native_synchronization/lib/src/bindings/pthread.dart new file mode 100644 index 00000000..d1776716 --- /dev/null +++ b/pkgs/native_synchronization/lib/src/bindings/pthread.dart @@ -0,0 +1,38 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// ignore_for_file: non_constant_identifier_names, camel_case_types + +import 'dart:ffi'; + +final class pthread_mutex_t extends Opaque {} + +final class pthread_cond_t extends Opaque {} + +@Native, Pointer)>() +external int pthread_mutex_init( + Pointer mutex, Pointer attrs); + +@Native)>() +external int pthread_mutex_lock(Pointer mutex); + +@Native)>() +external int pthread_mutex_unlock(Pointer mutex); + +@Native)>() +external int pthread_mutex_destroy(Pointer cond); + +@Native, Pointer)>() +external int pthread_cond_init( + Pointer cond, Pointer attrs); + +@Native, Pointer)>() +external int pthread_cond_wait( + Pointer cond, Pointer mutex); + +@Native)>() +external int pthread_cond_destroy(Pointer cond); + +@Native)>() +external int pthread_cond_signal(Pointer cond); diff --git a/pkgs/native_synchronization/lib/src/bindings/winapi.dart b/pkgs/native_synchronization/lib/src/bindings/winapi.dart new file mode 100644 index 00000000..20aec4e4 --- /dev/null +++ b/pkgs/native_synchronization/lib/src/bindings/winapi.dart @@ -0,0 +1,32 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// ignore_for_file: non_constant_identifier_names, camel_case_types + +import 'dart:ffi'; + +final class SRWLOCK extends Opaque {} + +final class CONDITION_VARIABLE extends Opaque {} + +@Native)>() +external void InitializeSRWLock(Pointer lock); + +@Native)>() +external void AcquireSRWLockExclusive(Pointer lock); + +@Native)>() +external void ReleaseSRWLockExclusive(Pointer mutex); + +@Native)>() +external void InitializeConditionVariable(Pointer condVar); + +@Native< + Int Function( + Pointer, Pointer, Uint32, Uint32)>() +external int SleepConditionVariableSRW(Pointer condVar, + Pointer srwLock, int timeOut, int flags); + +@Native)>() +external void WakeConditionVariable(Pointer condVar); diff --git a/pkgs/native_synchronization/lib/windows.dart b/pkgs/native_synchronization/lib/windows.dart new file mode 100644 index 00000000..b15da7e6 --- /dev/null +++ b/pkgs/native_synchronization/lib/windows.dart @@ -0,0 +1,74 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of 'primitives.dart'; + +class _WindowsMutex extends Mutex { + static const _sizeInBytes = 8; // `sizeof(SRWLOCK)` + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + malloc.free(ptr); + }); + + _WindowsMutex() + : _impl = malloc.allocate(_WindowsMutex._sizeInBytes), + super._() { + InitializeSRWLock(_impl); + _finalizer.attach(this, _impl); + } + + _WindowsMutex.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void _lock() => AcquireSRWLockExclusive(_impl); + + @override + void _unlock() => ReleaseSRWLockExclusive(_impl); + + @override + int get _address => _impl.address; +} + +class _WindowsConditionVariable extends ConditionVariable { + static const _sizeInBytes = 8; // `sizeof(CONDITION_VARIABLE)` + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + malloc.free(ptr); + }); + + _WindowsConditionVariable() + : _impl = malloc.allocate(_WindowsConditionVariable._sizeInBytes), + super._() { + InitializeConditionVariable(_impl); + _finalizer.attach(this, _impl); + } + + _WindowsConditionVariable.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void notify() { + WakeConditionVariable(_impl); + } + + @override + void wait(covariant _WindowsMutex mutex) { + const infinite = 0xFFFFFFFF; + const exclusive = 0; + if (SleepConditionVariableSRW(_impl, mutex._impl, infinite, exclusive) == + 0) { + throw StateError('Failed to wait on a condition variable'); + } + } + + @override + int get _address => _impl.address; +} diff --git a/pkgs/native_synchronization/pubspec.yaml b/pkgs/native_synchronization/pubspec.yaml new file mode 100644 index 00000000..bf93c3f1 --- /dev/null +++ b/pkgs/native_synchronization/pubspec.yaml @@ -0,0 +1,14 @@ +name: native_synchronization +description: Low level synchronization primitives built on dart:ffi. +version: 0.3.0 +repository: https://github.com/dart-lang/native_synchronization + +environment: + sdk: ">=3.0.0 <4.0.0" + +dependencies: + ffi: ^2.1.0 + +dev_dependencies: + dart_flutter_team_lints: ^1.0.0 + test: ^1.16.0 diff --git a/pkgs/native_synchronization/test/mailbox_test.dart b/pkgs/native_synchronization/test/mailbox_test.dart new file mode 100644 index 00000000..9b32bfa3 --- /dev/null +++ b/pkgs/native_synchronization/test/mailbox_test.dart @@ -0,0 +1,53 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; +import 'dart:isolate'; +import 'dart:typed_data'; + +import 'package:native_synchronization/mailbox.dart'; +import 'package:native_synchronization/sendable.dart'; +import 'package:test/test.dart'; + +void main() { + Future startHelperIsolate(Sendable sendableMailbox) { + return Isolate.run(() { + sleep(const Duration(milliseconds: 500)); + final mailbox = sendableMailbox.materialize(); + mailbox.put(Uint8List(42)..[41] = 42); + return 'success'; + }); + } + + test('mailbox', () async { + final mailbox = Mailbox(); + final helperResult = startHelperIsolate(mailbox.asSendable); + final value = mailbox.take(); + expect(value, isA()); + expect(value.length, equals(42)); + expect(value[41], equals(42)); + expect(await helperResult, equals('success')); + }); + + Future startHelperIsolateClose(Sendable sendableMailbox) { + return Isolate.run(() { + sleep(const Duration(milliseconds: 500)); + final mailbox = sendableMailbox.materialize(); + try { + mailbox.take(); + } catch (_) { + return 'success'; + } + return 'failed'; + }); + } + + test('mailbox close', () async { + final mailbox = Mailbox(); + mailbox.put(Uint8List(42)..[41] = 42); + mailbox.close(); + final helperResult = startHelperIsolateClose(mailbox.asSendable); + expect(await helperResult, equals('success')); + }); +} diff --git a/pkgs/native_synchronization/test/primitives_test.dart b/pkgs/native_synchronization/test/primitives_test.dart new file mode 100644 index 00000000..e1d6b1fb --- /dev/null +++ b/pkgs/native_synchronization/test/primitives_test.dart @@ -0,0 +1,119 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:ffi'; +import 'dart:io'; +import 'dart:isolate'; + +import 'package:ffi/ffi.dart'; +import 'package:native_synchronization/primitives.dart'; +import 'package:native_synchronization/sendable.dart'; +import 'package:test/test.dart'; + +void main() { + group('mutex', () { + test('simple', () { + final mutex = Mutex(); + expect(mutex.runLocked(() => 42), equals(42)); + }); + + Future spawnHelperIsolate( + int ptrAddress, Sendable sendableMutex) { + return Isolate.run(() { + final ptr = Pointer.fromAddress(ptrAddress); + final mutex = sendableMutex.materialize(); + + while (true) { + sleep(Duration(milliseconds: 10)); + if (mutex.runLocked(() { + if (ptr.value == 2) { + return true; + } + ptr.value = 0; + sleep(Duration(milliseconds: 500)); + ptr.value = 1; + return false; + })) { + break; + } + } + + return 'success'; + }); + } + + test('isolate', () async { + await using((arena) async { + final ptr = arena.allocate(1); + final mutex = Mutex(); + + final helperResult = spawnHelperIsolate(ptr.address, mutex.asSendable); + + while (true) { + final sw = Stopwatch()..start(); + if (mutex.runLocked(() { + if (sw.elapsedMilliseconds > 300 && ptr.value == 1) { + ptr.value = 2; + return true; + } + return false; + })) { + break; + } + await Future.delayed(const Duration(milliseconds: 10)); + } + expect(await helperResult, equals('success')); + }); + }); + }); + + group('condvar', () { + Future spawnHelperIsolate( + int ptrAddress, + Sendable sendableMutex, + Sendable sendableCondVar) { + return Isolate.run(() { + final ptr = Pointer.fromAddress(ptrAddress); + final mutex = sendableMutex.materialize(); + final condVar = sendableCondVar.materialize(); + + return mutex.runLocked(() { + ptr.value = 1; + while (ptr.value == 1) { + condVar.wait(mutex); + } + return ptr.value == 2 ? 'success' : 'failure'; + }); + }); + } + + test('isolate', () async { + await using((arena) async { + final ptr = arena.allocate(1); + final mutex = Mutex(); + final condVar = ConditionVariable(); + + final helperResult = spawnHelperIsolate( + ptr.address, mutex.asSendable, condVar.asSendable); + + while (true) { + final success = mutex.runLocked(() { + if (ptr.value == 1) { + ptr.value = 2; + condVar.notify(); + return true; + } + return false; + }); + if (success) { + break; + } + await Future.delayed(const Duration(milliseconds: 20)); + } + + expect(await helperResult, equals('success')); + }); + }); + }); +}