284 lines
7.8 KiB
TypeScript
284 lines
7.8 KiB
TypeScript
import { Injectable, NotFoundException, OnModuleInit } from '@nestjs/common';
|
|
import { InjectRepository } from '@nestjs/typeorm';
|
|
import { In, Repository } from 'typeorm';
|
|
import {
|
|
StravaActivityEntity,
|
|
StravaAthleteEntity,
|
|
StravaSyncJobEntity,
|
|
StravaSyncJobItemEntity,
|
|
} from '../database/entities';
|
|
import { mapStravaActivity } from './strava-activity.mapper';
|
|
import { StravaClientService } from './strava-client.service';
|
|
import { StravaRateLimitError } from './strava-rate-limit.error';
|
|
import { StravaStreamImportService } from './strava-stream-import.service';
|
|
import { StravaTokenService } from './strava-token.service';
|
|
import { StravaActivityPayload } from './strava.types';
|
|
|
|
@Injectable()
|
|
export class StravaSyncService implements OnModuleInit {
|
|
private readonly retryDelayMs = 15 * 60 * 1000;
|
|
private readonly retryTimers = new Map<string, NodeJS.Timeout>();
|
|
|
|
constructor(
|
|
@InjectRepository(StravaAthleteEntity)
|
|
private readonly athleteRepository: Repository<StravaAthleteEntity>,
|
|
@InjectRepository(StravaActivityEntity)
|
|
private readonly activityRepository: Repository<StravaActivityEntity>,
|
|
@InjectRepository(StravaSyncJobEntity)
|
|
private readonly jobRepository: Repository<StravaSyncJobEntity>,
|
|
@InjectRepository(StravaSyncJobItemEntity)
|
|
private readonly jobItemRepository: Repository<StravaSyncJobItemEntity>,
|
|
private readonly stravaTokenService: StravaTokenService,
|
|
private readonly stravaClientService: StravaClientService,
|
|
private readonly stravaStreamImportService: StravaStreamImportService,
|
|
) {}
|
|
|
|
async onModuleInit(): Promise<void> {
|
|
const waitingJobs = await this.jobRepository.find({
|
|
where: { status: 'rate_limited' },
|
|
});
|
|
|
|
waitingJobs.forEach((job) => this.scheduleRetry(job));
|
|
}
|
|
|
|
async startSync(): Promise<StravaSyncJobEntity> {
|
|
const athlete = await this.resolveAthlete();
|
|
const activeJob = await this.jobRepository.findOne({
|
|
where: {
|
|
stravaAthleteId: athlete.id,
|
|
status: In(['queued', 'running', 'rate_limited']),
|
|
},
|
|
order: { createdAt: 'DESC' },
|
|
});
|
|
|
|
if (activeJob) {
|
|
this.scheduleRetry(activeJob);
|
|
return activeJob;
|
|
}
|
|
|
|
const job = await this.jobRepository.save(
|
|
this.jobRepository.create({
|
|
stravaAthleteId: athlete.id,
|
|
status: 'queued',
|
|
}),
|
|
);
|
|
|
|
void this.runJob(job.id);
|
|
|
|
return job;
|
|
}
|
|
|
|
async getLatestJob(): Promise<StravaSyncJobEntity | null> {
|
|
const athlete = await this.resolveAthlete();
|
|
|
|
const job = await this.jobRepository.findOne({
|
|
where: { stravaAthleteId: athlete.id },
|
|
relations: { items: true },
|
|
order: { createdAt: 'DESC' },
|
|
});
|
|
|
|
if (job?.status === 'rate_limited') {
|
|
this.scheduleRetry(job);
|
|
}
|
|
|
|
return job;
|
|
}
|
|
|
|
async getJob(jobId: string): Promise<StravaSyncJobEntity> {
|
|
const job = await this.jobRepository.findOne({
|
|
where: { id: jobId },
|
|
relations: { items: true },
|
|
});
|
|
if (!job) {
|
|
throw new NotFoundException('Sync job not found');
|
|
}
|
|
|
|
return job;
|
|
}
|
|
|
|
async runJob(jobId: string): Promise<void> {
|
|
this.clearRetry(jobId);
|
|
|
|
const job = await this.getJob(jobId);
|
|
job.status = 'running';
|
|
job.startedAt = new Date();
|
|
job.finishedAt = null;
|
|
job.retryAfter = null;
|
|
job.errorMessage = null;
|
|
await this.jobRepository.save(job);
|
|
|
|
try {
|
|
const accessToken = await this.stravaTokenService.getValidAccessToken(
|
|
job.stravaAthleteId,
|
|
);
|
|
|
|
let page = 1;
|
|
const perPage = 100;
|
|
let summaries: StravaActivityPayload[] = [];
|
|
|
|
do {
|
|
summaries = await this.stravaClientService.listActivities(
|
|
accessToken,
|
|
page,
|
|
perPage,
|
|
);
|
|
|
|
for (const summary of summaries) {
|
|
await this.importActivity(job, accessToken, summary);
|
|
}
|
|
|
|
page += 1;
|
|
} while (summaries.length === perPage);
|
|
|
|
job.status = 'completed';
|
|
job.finishedAt = new Date();
|
|
job.retryAfter = null;
|
|
await this.jobRepository.save(job);
|
|
} catch (error) {
|
|
await this.failJob(job, error);
|
|
}
|
|
}
|
|
|
|
private async importActivity(
|
|
job: StravaSyncJobEntity,
|
|
accessToken: string,
|
|
summary: StravaActivityPayload,
|
|
): Promise<void> {
|
|
const stravaActivityId = String(summary.id);
|
|
const item = await this.getOrCreateJobItem(job.id, stravaActivityId);
|
|
|
|
if (item.status === 'completed') {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const existingSummary = await this.activityRepository.findOne({
|
|
where: {
|
|
stravaAthleteId: job.stravaAthleteId,
|
|
stravaActivityId,
|
|
},
|
|
});
|
|
let activity = await this.activityRepository.save(
|
|
mapStravaActivity(job.stravaAthleteId, summary, existingSummary),
|
|
);
|
|
job.activityCount += 1;
|
|
|
|
const detail = await this.stravaClientService.getActivity(
|
|
accessToken,
|
|
stravaActivityId,
|
|
);
|
|
activity = await this.activityRepository.save(
|
|
mapStravaActivity(job.stravaAthleteId, detail, activity),
|
|
);
|
|
job.detailCount += 1;
|
|
|
|
job.streamPointCount +=
|
|
await this.stravaStreamImportService.importStreamsForActivity(activity);
|
|
|
|
item.status = 'completed';
|
|
item.errorMessage = null;
|
|
await this.jobItemRepository.save(item);
|
|
await this.jobRepository.save(job);
|
|
} catch (error) {
|
|
if (error instanceof StravaRateLimitError) {
|
|
throw error;
|
|
}
|
|
|
|
item.status = 'failed';
|
|
item.errorMessage = this.errorMessage(error);
|
|
await this.jobItemRepository.save(item);
|
|
await this.jobRepository.save(job);
|
|
}
|
|
}
|
|
|
|
private async getOrCreateJobItem(
|
|
jobId: string,
|
|
stravaActivityId: string,
|
|
): Promise<StravaSyncJobItemEntity> {
|
|
const existing = await this.jobItemRepository.findOne({
|
|
where: { jobId, stravaActivityId },
|
|
});
|
|
if (existing) {
|
|
if (existing.status === 'completed') {
|
|
return existing;
|
|
}
|
|
|
|
existing.status = 'pending';
|
|
existing.errorMessage = null;
|
|
return this.jobItemRepository.save(existing);
|
|
}
|
|
|
|
return this.jobItemRepository.save(
|
|
this.jobItemRepository.create({
|
|
jobId,
|
|
stravaActivityId,
|
|
status: 'pending',
|
|
}),
|
|
);
|
|
}
|
|
|
|
private async resolveAthlete(): Promise<StravaAthleteEntity> {
|
|
const athlete = await this.athleteRepository.findOne({
|
|
where: { accountKey: 'primary' },
|
|
});
|
|
|
|
if (!athlete) {
|
|
throw new NotFoundException('No Strava athlete connected');
|
|
}
|
|
|
|
return athlete;
|
|
}
|
|
|
|
private async failJob(
|
|
job: StravaSyncJobEntity,
|
|
error: unknown,
|
|
): Promise<void> {
|
|
const isRateLimit = error instanceof StravaRateLimitError;
|
|
|
|
job.status = isRateLimit ? 'rate_limited' : 'failed';
|
|
job.errorMessage = this.errorMessage(error);
|
|
job.retryAfter = isRateLimit
|
|
? new Date(Date.now() + this.retryDelayMs)
|
|
: null;
|
|
job.finishedAt = isRateLimit ? null : new Date();
|
|
await this.jobRepository.save(job);
|
|
|
|
if (isRateLimit) {
|
|
this.scheduleRetry(job);
|
|
}
|
|
}
|
|
|
|
private errorMessage(error: unknown): string {
|
|
return error instanceof Error ? error.message : 'Unknown sync error';
|
|
}
|
|
|
|
private scheduleRetry(job: StravaSyncJobEntity): void {
|
|
if (job.status !== 'rate_limited' || this.retryTimers.has(job.id)) {
|
|
return;
|
|
}
|
|
|
|
const retryAfter =
|
|
job.retryAfter instanceof Date
|
|
? job.retryAfter
|
|
: new Date(Date.now() + this.retryDelayMs);
|
|
const delay = Math.max(retryAfter.getTime() - Date.now(), 0);
|
|
const timer = setTimeout(() => {
|
|
this.retryTimers.delete(job.id);
|
|
void this.runJob(job.id);
|
|
}, delay);
|
|
|
|
timer.unref?.();
|
|
this.retryTimers.set(job.id, timer);
|
|
}
|
|
|
|
private clearRetry(jobId: string): void {
|
|
const timer = this.retryTimers.get(jobId);
|
|
if (!timer) {
|
|
return;
|
|
}
|
|
|
|
clearTimeout(timer);
|
|
this.retryTimers.delete(jobId);
|
|
}
|
|
}
|