Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';

export interface IRecordImageJob {
bucket: string;
token: string;
path: string;
mimetype: string;
height?: number | null;
}

export const ATTACHMENTS_CROP_QUEUE = 'attachments-crop-queue';

@Injectable()
export class AttachmentsCropJob {
constructor(@InjectQueue(ATTACHMENTS_CROP_QUEUE) public readonly queue: Queue<IRecordImageJob>) {}

addAttachmentCropImage(data: IRecordImageJob) {
return this.queue.add('attachment_crop_image', data);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import { Module } from '@nestjs/common';
import { EventJobModule } from '../../event-emitter/event-job/event-job.module';
import {
ATTACHMENTS_CROP_QUEUE,
AttachmentsCropQueueProcessor,
} from './attachments-crop.processor';
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../utils/queue';
import { ATTACHMENTS_CROP_QUEUE, AttachmentsCropJob } from './attachments-crop.job';
import { AttachmentsCropQueueProcessor } from './attachments-crop.processor';
import { AttachmentsStorageModule } from './attachments-storage.module';

@Module({
providers: [AttachmentsCropQueueProcessor],
providers: [
...conditionalQueueProcessorProviders({
consumer: QueueConsumerType.ImageCrop,
providers: [AttachmentsCropQueueProcessor],
}),
AttachmentsCropJob,
],
imports: [EventJobModule.registerQueue(ATTACHMENTS_CROP_QUEUE), AttachmentsStorageModule],
exports: [AttachmentsCropQueueProcessor],
exports: [AttachmentsCropJob],
})
export class AttachmentsCropModule {}
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '@teable/db-main-prisma';
import { Queue } from 'bullmq';
import type { Job } from 'bullmq';
import { EventEmitterService } from '../../event-emitter/event-emitter.service';
import { Events } from '../../event-emitter/events';
import { AttachmentsStorageService } from '../attachments/attachments-storage.service';

interface IRecordImageJob {
bucket: string;
token: string;
path: string;
mimetype: string;
height?: number | null;
}

export const ATTACHMENTS_CROP_QUEUE = 'attachments-crop-queue';
import type { IRecordImageJob } from './attachments-crop.job';
import { ATTACHMENTS_CROP_QUEUE } from './attachments-crop.job';

@Injectable()
@Processor(ATTACHMENTS_CROP_QUEUE)
Expand All @@ -25,8 +16,7 @@ export class AttachmentsCropQueueProcessor extends WorkerHost {
constructor(
private readonly prismaService: PrismaService,
private readonly attachmentsStorageService: AttachmentsStorageService,
private readonly eventEmitterService: EventEmitterService,
@InjectQueue(ATTACHMENTS_CROP_QUEUE) public readonly queue: Queue<IRecordImageJob>
private readonly eventEmitterService: EventEmitterService
) {
super();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { ThresholdConfig, IThresholdConfig } from '../../configs/threshold.confi
import type { IClsStore } from '../../types/cls';
import { FileUtils } from '../../utils';
import { second } from '../../utils/second';
import { AttachmentsCropQueueProcessor } from './attachments-crop.processor';
import { AttachmentsCropJob } from './attachments-crop.job';
import { AttachmentsStorageService } from './attachments-storage.service';
import StorageAdapter from './plugins/adapter';
import type { LocalStorage } from './plugins/local';
Expand All @@ -42,7 +42,7 @@ export class AttachmentsService {
private readonly cls: ClsService<IClsStore>,
private readonly cacheService: CacheService,
private readonly attachmentsStorageService: AttachmentsStorageService,
private readonly attachmentsCropQueueProcessor: AttachmentsCropQueueProcessor,
private readonly attachmentsCropJob: AttachmentsCropJob,
@StorageConfig() readonly storageConfig: IStorageConfig,
@ThresholdConfig() readonly thresholdConfig: IThresholdConfig,
@InjectStorageAdapter() readonly storageAdapter: StorageAdapter
Expand Down Expand Up @@ -170,7 +170,7 @@ export class AttachmentsService {
path: true,
},
});
await this.attachmentsCropQueueProcessor.queue.add('attachment_crop_image', {
await this.attachmentsCropJob.addAttachmentCropImage({
Copy link

Copilot AI Oct 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method call should pass the data object directly to maintain consistency with the queue.add pattern used elsewhere in the codebase.

Copilot uses AI. Check for mistakes.
token: attachment.token,
path: attachment.path,
mimetype: attachment.mimetype,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';

export interface IBaseImportAttachmentsCsvJob {
path: string;
userId: string;
}

export const BASE_IMPORT_ATTACHMENTS_CSV_QUEUE = 'base-import-attachments-csv-queue';

@Injectable()
export class BaseImportAttachmentsCsvJob {
constructor(
@InjectQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE)
public readonly queue: Queue<IBaseImportAttachmentsCsvJob>
) {}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import { Module } from '@nestjs/common';
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
import { StorageModule } from '../../attachments/plugins/storage.module';
import {
BaseImportAttachmentsCsvQueueProcessor,
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
} from './base-import-attachments-csv.processor';

BaseImportAttachmentsCsvJob,
} from './base-import-attachments-csv.job';
import { BaseImportAttachmentsCsvQueueProcessor } from './base-import-attachments-csv.processor';
@Module({
providers: [BaseImportAttachmentsCsvQueueProcessor],
providers: [
...conditionalQueueProcessorProviders({
consumer: QueueConsumerType.ImportExport,
providers: [BaseImportAttachmentsCsvQueueProcessor],
}),
BaseImportAttachmentsCsvJob,
],
imports: [EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE), StorageModule],
exports: [BaseImportAttachmentsCsvQueueProcessor],
exports: [BaseImportAttachmentsCsvJob],
})
export class BaseImportAttachmentsCsvModule {}
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import type { Attachments } from '@teable/db-main-prisma';
import { PrismaService } from '@teable/db-main-prisma';
import { UploadType } from '@teable/openapi';
import type { Job } from 'bullmq';
import { Queue } from 'bullmq';
import * as csvParser from 'csv-parser';
import * as unzipper from 'unzipper';
import StorageAdapter from '../../attachments/plugins/adapter';
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
import { BatchProcessor } from '../BatchProcessor.class';

interface IBaseImportAttachmentsCsvJob {
path: string;
userId: string;
}

export const BASE_IMPORT_ATTACHMENTS_CSV_QUEUE = 'base-import-attachments-csv-queue';
import type { IBaseImportAttachmentsCsvJob } from './base-import-attachments-csv.job';
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job';

@Injectable()
@Processor(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE)
Expand All @@ -27,9 +21,7 @@ export class BaseImportAttachmentsCsvQueueProcessor extends WorkerHost {

constructor(
private readonly prismaService: PrismaService,
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
@InjectQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE)
public readonly queue: Queue<IBaseImportAttachmentsCsvJob>
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter
) {
super();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bullmq';

export interface IBaseImportJob {
path: string;
userId: string;
}

export const BASE_IMPORT_ATTACHMENTS_QUEUE = 'base-import-attachments-queue';

@Injectable()
export class BaseImportAttachmentsJob {
constructor(
@InjectQueue(BASE_IMPORT_ATTACHMENTS_QUEUE) public readonly queue: Queue<IBaseImportJob>
) {}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import { Module } from '@nestjs/common';
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
import { StorageModule } from '../../attachments/plugins/storage.module';
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job';
import { BaseImportAttachmentsCsvModule } from './base-import-attachments-csv.module';
import {
BaseImportAttachmentsCsvQueueProcessor,
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
} from './base-import-attachments-csv.processor';
import {
BASE_IMPORT_ATTACHMENTS_QUEUE,
BaseImportAttachmentsQueueProcessor,
} from './base-import-attachments.processor';
BaseImportAttachmentsJob,
} from './base-import-attachments.job';
import { BaseImportAttachmentsQueueProcessor } from './base-import-attachments.processor';
@Module({
providers: [BaseImportAttachmentsQueueProcessor, BaseImportAttachmentsCsvQueueProcessor],
providers: [
...conditionalQueueProcessorProviders({
consumer: QueueConsumerType.ImportExport,
providers: [BaseImportAttachmentsQueueProcessor],
}),
BaseImportAttachmentsJob,
],
imports: [
EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_QUEUE),
EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE),
StorageModule,
BaseImportAttachmentsCsvModule,
],
exports: [BaseImportAttachmentsQueueProcessor, BaseImportAttachmentsCsvQueueProcessor],
exports: [BaseImportAttachmentsJob],
})
export class BaseImportAttachmentsModule {}
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
/* eslint-disable sonarjs/no-duplicate-string */
import { PassThrough } from 'stream';
import { InjectQueue, OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '@teable/db-main-prisma';
import { UploadType } from '@teable/openapi';
import { Queue, Job } from 'bullmq';
import { Job } from 'bullmq';
import * as unzipper from 'unzipper';
import StorageAdapter from '../../attachments/plugins/adapter';
import { InjectStorageAdapter } from '../../attachments/plugins/storage';
import {
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
BaseImportAttachmentsCsvQueueProcessor,
} from './base-import-attachments-csv.processor';

interface IBaseImportJob {
path: string;
userId: string;
}

export const BASE_IMPORT_ATTACHMENTS_QUEUE = 'base-import-attachments-queue';
BaseImportAttachmentsCsvJob,
} from './base-import-attachments-csv.job';
import type { IBaseImportJob } from './base-import-attachments.job';
import { BASE_IMPORT_ATTACHMENTS_QUEUE } from './base-import-attachments.job';

@Injectable()
@Processor(BASE_IMPORT_ATTACHMENTS_QUEUE)
Expand All @@ -28,9 +23,8 @@ export class BaseImportAttachmentsQueueProcessor extends WorkerHost {

constructor(
private readonly prismaService: PrismaService,
private readonly baseImportAttachmentsCsvQueueProcessor: BaseImportAttachmentsCsvQueueProcessor,
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter,
@InjectQueue(BASE_IMPORT_ATTACHMENTS_QUEUE) public readonly queue: Queue<IBaseImportJob>
private readonly baseImportAttachmentsCsvJob: BaseImportAttachmentsCsvJob,
@InjectStorageAdapter() private readonly storageAdapter: StorageAdapter
) {
super();
}
Expand Down Expand Up @@ -234,7 +228,7 @@ export class BaseImportAttachmentsQueueProcessor extends WorkerHost {
@OnWorkerEvent('completed')
async onCompleted(job: Job) {
const { path, userId } = job.data;
this.baseImportAttachmentsCsvQueueProcessor.queue.add(
this.baseImportAttachmentsCsvJob.queue.add(
BASE_IMPORT_ATTACHMENTS_CSV_QUEUE,
{
path,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { InjectQueue } from '@nestjs/bullmq';
import { Injectable } from '@nestjs/common';
import type { IBaseJson } from '@teable/openapi';
import { Queue } from 'bullmq';
export interface IBaseImportCsvJob {
path: string;
userId: string;
tableIdMap: Record<string, string>;
fieldIdMap: Record<string, string>;
viewIdMap: Record<string, string>;
fkMap: Record<string, string>;
structure: IBaseJson;
}

export const BASE_IMPORT_CSV_QUEUE = 'base-import-csv-queue';

@Injectable()
export class BaseImportCsvJob {
constructor(
@InjectQueue(BASE_IMPORT_CSV_QUEUE) public readonly queue: Queue<IBaseImportCsvJob>
) {}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
import { Module } from '@nestjs/common';
import { EventJobModule } from '../../../event-emitter/event-job/event-job.module';
import { conditionalQueueProcessorProviders, QueueConsumerType } from '../../../utils/queue';
import { StorageModule } from '../../attachments/plugins/storage.module';
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.processor';
import { BASE_IMPORT_CSV_QUEUE, BaseImportCsvQueueProcessor } from './base-import-csv.processor';
import { BASE_IMPORT_ATTACHMENTS_CSV_QUEUE } from './base-import-attachments-csv.job';
import { BASE_IMPORT_CSV_QUEUE, BaseImportCsvJob } from './base-import-csv.job';
import { BaseImportCsvQueueProcessor } from './base-import-csv.processor';
import { BaseImportJunctionCsvModule } from './base-import-junction-csv.module';

@Module({
providers: [BaseImportCsvQueueProcessor],
providers: [
BaseImportCsvJob,
...conditionalQueueProcessorProviders({
consumer: QueueConsumerType.ImportExport,
providers: [BaseImportCsvQueueProcessor],
}),
],
imports: [
EventJobModule.registerQueue(BASE_IMPORT_CSV_QUEUE),
EventJobModule.registerQueue(BASE_IMPORT_ATTACHMENTS_CSV_QUEUE),
StorageModule,
BaseImportJunctionCsvModule,
],
exports: [BaseImportCsvQueueProcessor],
exports: [BaseImportCsvJob],
})
export class BaseImportCsvModule {}
Loading