Unverified Commit 69c52a1e authored by Clément Berthou's avatar Clément Berthou Committed by GitHub
Browse files

chore: refactor process fork handling (#1159)

parent 609f2e23
......@@ -85,39 +85,32 @@ A basic new ChildProcess would look like :
```typescript
// child-process.controller.ts
import MyChildProcess from "./child-process.fork.ts";
import { createAsyncWorkerControllerClass, AsyncWorkerEvent } from "../util/async-worker-util";
import { MessageTypes } from "../util/batch-process/batch-process-util-types";
import { createAsyncWorkerForChildProcessControllerFactory } from "util/async-worker/child-process";
import { MessageTypes } from "util/batch-process/batch-process-util-types";
import { cancelableBackgroundWorkerProcess$ } from "util/batch-process/batch-process-util";
export const runMyChildProcess = () => {
// We build a "newable" entity which is easier to use if you need to spawn mutliple process
const AsyncProcess = createAsyncWorkerControllerClass(MyChildProcess);
const asyncProcess = new AsyncProcess();
// We build a factory that return the AsyncWorker object. The parameter is the child process file name
const asyncProcessFactory = createAsyncWorkerForChildProcessControllerFactory("my-child-process.fork");
asyncProcess.postMessage({ type: MessageTypes.INITIALIZE, data: "hello" });
asyncProcess.addEventListener(AsyncWorkerEvent.MESSAGE, (message) => { console.log("messageReceived", message) });
const result$ = cancelableBackgroundWorkerProcess$({ data }, asyncProcessFactory);
// use the result observable
}
```
```typescript
// child-process.fork.ts
import {
AsyncWorkerEvent,
createAsyncWorkerForChildProcess,
fakeChildProcess
} from "../util/async-worker-util";
// my-child-process.fork.ts
import { MessageTypes } from "../util/batch-process/batch-process-util-types";
import { createAsyncWorkerForChildProcess } from "util/async-worker/child-process";
import { setupChildWorkerListeners } from "util/async-worker/async-worker-util";
const asyncWorker = createAsyncWorkerForChildProcess();
asyncWorker.addEventListener(AsyncWorkerEvent.MESSAGE, ({ data, type }) => {
if (type === MessageTypes.INITIALIZE) {
asyncWorker.postMessage({ type: MessageTypes.RESULT, result: "hello" });
}
});
// This export allows typescript compiler to not throw type errors. It will not really be used
// as it will be replaced by webpack-fork-loader
export default fakeChildProcess;
setupChildWorkerListeners(asyncWorker, {
onInitialize: () => console.log("handleInitializeMessage"),
onData: () => console.log("handleDataMessage"),
})
```
## Translations
......
......@@ -7,8 +7,8 @@ import {
} from "reducers/files-and-folders/files-and-folders-types";
import { TagMap } from "reducers/tags/tags-types";
import { backgroundWorkerProcess$ } from "util/batch-process/batch-process-util";
import CsvExporterFork from "./csv-exporter.fork";
import { HashesMap } from "reducers/hashes/hashes-types";
import { createAsyncWorkerForChildProcessControllerFactory } from "util/async-worker/child-process";
export interface GenerateCsvExportOptions {
aliases: AliasMap;
......@@ -27,5 +27,8 @@ export interface GenerateCsvExportOptions {
*/
export const generateCsvExport$ = (data: GenerateCsvExportOptions) => {
const { language } = translations;
return backgroundWorkerProcess$({ ...data, language }, CsvExporterFork);
return backgroundWorkerProcess$(
{ ...data, language },
createAsyncWorkerForChildProcessControllerFactory("csv-exporter.fork")
);
};
import {
createAsyncWorkerForChildProcess,
fakeChildProcess,
setupChildWorkerListeners,
} from "util/async-worker/async-worker-util";
import { setupChildWorkerListeners } from "util/async-worker/async-worker-util";
import { onInitialize } from "./csv-exporter.impl";
import { createAsyncWorkerForChildProcess } from "util/async-worker/child-process";
const asyncWorker = createAsyncWorkerForChildProcess();
setupChildWorkerListeners(asyncWorker, {
onInitialize,
});
export default fakeChildProcess;
import translations from "translations/translations";
import { FilesAndFoldersMap } from "reducers/files-and-folders/files-and-folders-types";
import { backgroundWorkerProcess$ } from "util/batch-process/batch-process-util";
import TreeCsvExporterFork from "./tree-csv-exporter.fork";
import { createAsyncWorkerForChildProcessControllerFactory } from "util/async-worker/child-process";
/**
* Asynchronously generates a tree csv export
......@@ -14,6 +14,6 @@ export const generateTreeCsvExport$ = (
const { language } = translations;
return backgroundWorkerProcess$(
{ filesAndFoldersMap, language },
TreeCsvExporterFork
createAsyncWorkerForChildProcessControllerFactory("tree-csv-exporter.fork")
);
};
import {
createAsyncWorkerForChildProcess,
fakeChildProcess,
setupChildWorkerListeners,
} from "util/async-worker/async-worker-util";
import { setupChildWorkerListeners } from "util/async-worker/async-worker-util";
import { onInitialize } from "./tree-csv-exporter.impl";
import { createAsyncWorkerForChildProcess } from "util/async-worker/child-process";
const asyncWorker = createAsyncWorkerForChildProcess();
setupChildWorkerListeners(asyncWorker, {
onInitialize,
});
export default fakeChildProcess;
import translations from "translations/translations";
import ExcelExporterWorker from "./excel-exporter.fork";
import { backgroundWorkerProcess$ } from "util/batch-process/batch-process-util";
import { CsvExporterData } from "exporters/csv/csv-exporter.impl";
import { createAsyncWorkerForChildProcessControllerFactory } from "util/async-worker/child-process";
export const generateExcelExport$ = (data: CsvExporterData) => {
const { language } = translations;
return backgroundWorkerProcess$({ ...data, language }, ExcelExporterWorker);
return backgroundWorkerProcess$(
{ ...data, language },
createAsyncWorkerForChildProcessControllerFactory("excel-exporter.fork")
);
};
import {
createAsyncWorkerForChildProcess,
fakeChildProcess,
setupChildWorkerListeners,
} from "util/async-worker/async-worker-util";
import { setupChildWorkerListeners } from "util/async-worker/async-worker-util";
import { exportToExcel } from "exporters/excel/excel-exporter.impl";
import { createAsyncWorkerForChildProcess } from "util/async-worker/child-process";
const asyncWorker = createAsyncWorkerForChildProcess();
setupChildWorkerListeners(asyncWorker, {
onInitialize: exportToExcel,
});
export default fakeChildProcess;
......@@ -11,8 +11,8 @@ import {
backgroundWorkerProcess$,
filterResults,
} from "util/batch-process/batch-process-util";
import ResipExportFork from "./resip-export.fork";
import { map } from "rxjs/operators";
import { createAsyncWorkerForChildProcessControllerFactory } from "util/async-worker/child-process";
interface ResipExportProgress {
count: number;
......@@ -58,7 +58,7 @@ export const generateResipExport$ = ({
language,
tags,
},
ResipExportFork
createAsyncWorkerForChildProcessControllerFactory("resip-export.fork")
)
.pipe(filterResults())
.pipe(map(({ result }) => result));
......
import translations from "translations/translations";
import {
createAsyncWorkerForChildProcess,
fakeChildProcess,
WorkerEventType,
} from "util/async-worker/async-worker-util";
import { WorkerEventType } from "util/async-worker/async-worker-util";
import { MessageTypes } from "util/batch-process/batch-process-util-types";
import { hookCounter } from "util/hook/hook-utils";
import resipExporter from "./resip-exporter";
import { createAsyncWorkerForChildProcess } from "util/async-worker/child-process";
const asyncWorker = createAsyncWorkerForChildProcess();
......@@ -56,5 +53,3 @@ asyncWorker.addEventListener(
}
}
);
export default fakeChildProcess;
import {
createAsyncWorkerForChildProcess,
fakeChildProcess,
setupChildWorkerListeners,
} from "util/async-worker/async-worker-util";
import { setupChildWorkerListeners } from "util/async-worker/async-worker-util";
import { onData, onInitialize } from "./file-hash-computer.impl";
import { createAsyncWorkerForChildProcess } from "util/async-worker/child-process";
const asyncWorker = createAsyncWorkerForChildProcess();
......@@ -11,5 +8,3 @@ setupChildWorkerListeners(asyncWorker, {
onData,
onInitialize,
});
export default fakeChildProcess;
import {
createAsyncWorkerForChildProcess,
fakeChildProcess,
setupChildWorkerListeners,
} from "util/async-worker/async-worker-util";
import { setupChildWorkerListeners } from "util/async-worker/async-worker-util";
import { onInitialize } from "hash-computer/folder-hash-computer-impl";
import { createAsyncWorkerForChildProcess } from "util/async-worker/child-process";
const asyncWorker = createAsyncWorkerForChildProcess();
setupChildWorkerListeners(asyncWorker, {
onInitialize,
});
export default fakeChildProcess;
......@@ -16,11 +16,10 @@ import {
} from "util/observable/observable-util";
import { createBufferedFileWriter } from "util/buffered-file-writer/buffered-file-writer";
import FileHashFork from "./file-hash-computer.fork";
import FolderHashFork from "./folder-hash-computer.fork";
import { FilesAndFoldersMap } from "reducers/files-and-folders/files-and-folders-types";
import { HashesMap } from "reducers/hashes/hashes-types";
import { Observable, OperatorFunction } from "rxjs";
import { createAsyncWorkerForChildProcessControllerFactory } from "util/async-worker/child-process";
const BATCH_SIZE = 500;
const BUFFER_TIME = 1000;
......@@ -41,7 +40,10 @@ export const computeHashes$ = (
paths: string[],
{ initialValues: { basePath } }: ComputeHashesOptions
): DataProcessingStream<HashesMap> => {
const hashes$ = computeBatch$(paths, FileHashFork, {
const workerFactory = createAsyncWorkerForChildProcessControllerFactory(
"file-hash-computer.fork"
);
const hashes$ = computeBatch$(paths, workerFactory, {
batchSize: BATCH_SIZE,
initialValues: { basePath },
});
......@@ -115,7 +117,12 @@ export const computeFolderHashes$ = ({
filesAndFolders,
hashes,
}: ComputeFolderHashesOptions): Observable<HashesMap> => {
return backgroundWorkerProcess$({ filesAndFolders, hashes }, FolderHashFork)
return backgroundWorkerProcess$(
{ filesAndFolders, hashes },
createAsyncWorkerForChildProcessControllerFactory(
"folder-hash-computer.fork"
)
)
.pipe(filterResults())
.pipe(map(({ result }) => result));
};
declare module "worker-loader!*" {
class WebpackWorker extends Worker {
constructor();
}
export default WebpackWorker;
}
declare module "*.svg" {
const content: string;
export default content;
......@@ -18,3 +10,4 @@ declare const SENTRY_DSN: string;
declare const SENTRY_MINIDUMP_URL: string;
declare const ARCHIFILTRE_SITE_URL: string;
declare const WRITE_DEBUG: string;
declare const WORKER_ROOT_FOLDER: string;
import { ChildProcess } from "child_process";
import {
MessageTypes,
WorkerMessage,
} from "../batch-process/batch-process-util-types";
import translations from "translations/translations";
import { ChildProcess } from "child_process";
export enum WorkerEventType {
MESSAGE = "message",
......@@ -28,62 +28,11 @@ export type ProcessControllerAsyncWorker = AsyncWorker<AsyncWorkerControllerEven
terminate: () => void;
};
type ChildProcessAsyncWorker = AsyncWorker;
/**
* Creates an AsyncWorker bound to the current ChildProcess context
*/
export const createAsyncWorkerForChildProcess = (): ChildProcessAsyncWorker => {
const localProcess = process as NodeJS.Process;
return {
addEventListener: (eventType, listener) => {
localProcess.addListener(eventType, (event) => {
listener(event);
});
},
removeEventListener: (eventType, listener) => {
localProcess.removeListener("loaded", listener);
},
postMessage: (message) => {
if (!localProcess || !localProcess.send) {
throw new Error("This must be called in a forked process");
}
localProcess.send(message);
},
};
export type ChildProcessControllerAsyncWorker = ProcessControllerAsyncWorker & {
childProcess: ChildProcess;
};
/**
* Creates an AsyncWorker from a ChildProcess
* @param childProcess
*/
export const createAsyncWorkerForChildProcessController = (
childProcess: ChildProcess
): ProcessControllerAsyncWorker => ({
addEventListener: (eventType, listener) => {
childProcess.addListener(eventType, (data) => {
listener(data);
});
},
removeEventListener: (eventType, listener) => {
childProcess.removeListener(eventType, listener);
},
postMessage: (message) => childProcess.send(message),
terminate: () => childProcess.kill(),
});
/**
* Creates a wrapper class for the childProcess contructor to be used in batch-process-common
* @param ChildProcessConstructor
*/
export const createAsyncWorkerControllerClass = (ChildProcessConstructor) => {
return class AsyncWorkerController {
constructor() {
const childProcess = new ChildProcessConstructor();
return createAsyncWorkerForChildProcessController(childProcess);
}
};
};
export type ChildProcessAsyncWorker = AsyncWorker;
export type WorkerMessageHandler = (
asyncWorker: AsyncWorker,
......@@ -150,12 +99,3 @@ export const setupChildWorkerListeners = (
makeChildWorkerMessageCallback(asyncWorker, listeners)
);
};
/**
* Fake object used to declare a file as a ChildProcess for the typescript compiler
* @example
* // my-child-process.fork.ts
* import { fakeChildProcess } from "./async-worker-util";
* export default fakeChildProcess;
*/
export const fakeChildProcess = {} as ChildProcess & (new () => ChildProcess);
import { ChildProcess, fork } from "child_process";
import path from "path";
import {
ChildProcessAsyncWorker,
ChildProcessControllerAsyncWorker,
} from "util/async-worker/async-worker-util";
import { EventEmitter } from "events";
/**
* Creates an AsyncWorker bound to the current ChildProcess context
*/
export const createAsyncWorkerForChildProcess = (): ChildProcessAsyncWorker => {
const localProcess = process as NodeJS.Process;
const eventEmitter = new EventEmitter();
localProcess.addListener("message", (event) => {
eventEmitter.emit("message", event);
});
return {
addEventListener: (eventType, listener) => {
eventEmitter.addListener(eventType, (event) => {
listener(event);
});
},
removeEventListener: (eventType, listener) => {
eventEmitter.removeListener(eventType, listener);
},
postMessage: (message) => {
if (!localProcess || !localProcess.send) {
throw new Error("This must be called in a forked process");
}
localProcess.send(message);
},
};
};
/**
* Creates an AsyncWorker from a ChildProcess
* @param childProcess
*/
export const createAsyncWorkerForChildProcessController = (
childProcess: ChildProcess
): ChildProcessControllerAsyncWorker => ({
addEventListener: (eventType, listener) => {
childProcess.addListener(eventType, (data) => {
listener(data);
});
},
removeEventListener: (eventType, listener) => {
childProcess.removeListener(eventType, listener);
},
postMessage: (message) => {
childProcess.send(message);
},
terminate: () => childProcess.kill(),
childProcess,
});
export const createAsyncWorkerForChildProcessControllerFactory = <
StreamParserResponse = any
>(
filename: string
) => (): ChildProcessControllerAsyncWorker => {
const workerPath = path.join(WORKER_ROOT_FOLDER, `${filename}.js`);
const worker = fork(workerPath);
const asyncWorker = createAsyncWorkerForChildProcessController(worker);
return asyncWorker;
};
import { chunk } from "lodash";
import { cpus } from "os";
import { fromEvent, merge, Observable, Subject } from "rxjs";
import { reportError } from "logging/reporter";
......@@ -12,7 +11,7 @@ import {
WorkerMessage,
} from "util/batch-process/batch-process-util-types";
import workerManager, {
ChildProcessConstructor,
ProcessControllerAsyncWorkerFactory,
} from "util/worker-manager/worker-manager";
import {
ProcessControllerAsyncWorker,
......@@ -41,10 +40,12 @@ type InitWorkersResult = {
};
const spawnWorkers = (
WorkerBuilder: ChildProcessConstructor,
asyncWorkerFactory: ProcessControllerAsyncWorkerFactory,
count = NB_CPUS
): ProcessControllerAsyncWorker[] =>
makeEmptyArray(count, null).map(() => workerManager.spawn(WorkerBuilder));
makeEmptyArray(count, null).map(() =>
workerManager.spawn(asyncWorkerFactory)
);
export const setupWorkers$ = (
workers: ProcessControllerAsyncWorker[],
......@@ -96,10 +97,10 @@ export const setupWorkers$ = (
};
export const initWorkers$ = (
WorkerBuilder: ChildProcessConstructor,
asyncWorkerFactory: ProcessControllerAsyncWorkerFactory,
{ initialValues, workerCount = NB_CPUS }: InitWorkersData
): InitWorkersResult => {
const workers = spawnWorkers(WorkerBuilder, workerCount);
const workers = spawnWorkers(asyncWorkerFactory, workerCount);
return setupWorkers$(workers, initialValues);
};
......@@ -121,17 +122,20 @@ export const processQueueWithWorkers = (
data: any[],
batchSize: number
) => {
const queue = chunk(data, batchSize);
const queueLength = queue.length;
let index = 0;
const subject = new Subject();
const messageCount = Math.ceil(data.length / batchSize);
results$
.pipe(
filter(filterResultsErrorsAndReady),
tap(({ worker, message }) => {
if (queue.length > 0) {
worker.postMessage({ type: MessageTypes.DATA, data: queue.shift() });
if (index < data.length) {
worker.postMessage({
type: MessageTypes.DATA,
data: data.slice(index, index + batchSize),
});
index += batchSize;
}
})
)
......@@ -140,16 +144,16 @@ export const processQueueWithWorkers = (
return subject.pipe(
map(({ message }) => message),
filter(filterResultsAndErrors),
take(queueLength)
take(messageCount)
);
};
export const computeBatch$ = (
data: any,
WorkerBuilder: ChildProcessConstructor,
asyncWorkerFactory: ProcessControllerAsyncWorkerFactory,
{ batchSize, initialValues }: { batchSize: number; initialValues: any }
): Observable<any> => {
const { result$ } = initWorkers$(WorkerBuilder, { initialValues });
const { result$ } = initWorkers$(asyncWorkerFactory, { initialValues });
return processQueueWithWorkers(result$, data, batchSize);
};
......@@ -169,14 +173,14 @@ const onMessageType = <T extends MessageTypes>(
/**
* Delegates work to a single worker. Progress will be piped in the returned Observable
* @param processedData - The data processed. It will be sent to the worker in an "initialize" message.
* @param WorkerBuilder - The Worker constructor.
* @param asyncWorkerFactory
* @returns {Observable} - A rxjs observable piping progress.
*/
export const backgroundWorkerProcess$ = (
processedData: any,
WorkerBuilder: ChildProcessConstructor
asyncWorkerFactory: ProcessControllerAsyncWorkerFactory
): Observable<ResultMessage | ErrorMessage> =>
cancelableBackgroundWorkerProcess$(processedData, WorkerBuilder).result$;
cancelableBackgroundWorkerProcess$(processedData, asyncWorkerFactory).result$;
type CancelableBackgroundWorkerProcessResult = {
result$: Observable<ResultMessage | ErrorMessage>;
......@@ -185,9 +189,9 @@ type CancelableBackgroundWorkerProcessResult = {
export const cancelableBackgroundWorkerProcess$ = (
processedData: any,
WorkerBuilder: ChildProcessConstructor
asyncWorkerFactory: ProcessControllerAsyncWorkerFactory
): CancelableBackgroundWorkerProcessResult => {
const { result$, terminate } = initWorkers$(WorkerBuilder, {
const { result$, terminate } = initWorkers$(asyncWorkerFactory, {
initialValues: processedData,
workerCount: 1,
});
......
......@@ -2,9 +2,9 @@ import { VirtualFileSystem } from "files-and-folders-loader/files-and-folders-lo
import { FileSystemLoadingStep } from "reducers/loading-state/loading-state-types";
import { Observable } from "rxjs";
import { cancelableBackgroundWorkerProcess$ } from "util/batch-process/batch-process-util";
import LoadFromFileSystemWorker from "./load-from-filesystem.fork";
import { FilesAndFoldersMap } from "reducers/files-and-folders/files-and-folders-types";